X-Git-Url: https://git.nmode.ca/signal-cli/blobdiff_plain/7805622f0717062deff752895ef2fd8cf53b80aa..3606fb67bb77feb7ac3ea5d73dc27b5a7abbf52b:/lib/src/main/java/org/asamk/signal/manager/ManagerImpl.java diff --git a/lib/src/main/java/org/asamk/signal/manager/ManagerImpl.java b/lib/src/main/java/org/asamk/signal/manager/ManagerImpl.java index 58863d08..bb55746c 100644 --- a/lib/src/main/java/org/asamk/signal/manager/ManagerImpl.java +++ b/lib/src/main/java/org/asamk/signal/manager/ManagerImpl.java @@ -16,6 +16,7 @@ */ package org.asamk.signal.manager; +import org.asamk.signal.manager.api.AlreadyReceivingException; import org.asamk.signal.manager.api.AttachmentInvalidException; import org.asamk.signal.manager.api.Configuration; import org.asamk.signal.manager.api.Device; @@ -25,6 +26,7 @@ import org.asamk.signal.manager.api.InactiveGroupLinkException; import org.asamk.signal.manager.api.InvalidDeviceLinkException; import org.asamk.signal.manager.api.InvalidStickerException; import org.asamk.signal.manager.api.Message; +import org.asamk.signal.manager.api.MessageEnvelope; import org.asamk.signal.manager.api.NotPrimaryDeviceException; import org.asamk.signal.manager.api.Pair; import org.asamk.signal.manager.api.PendingAdminApprovalException; @@ -82,6 +84,7 @@ import org.whispersystems.signalservice.internal.util.Util; import java.io.ByteArrayInputStream; import java.io.File; import java.io.IOException; +import java.io.InputStream; import java.net.URI; import java.nio.charset.StandardCharsets; import java.time.Duration; @@ -391,7 +394,7 @@ class ManagerImpl implements Manager { @Override public Pair createGroup( - String name, Set members, File avatarFile + String name, Set members, String avatarFile ) throws IOException, AttachmentInvalidException, UnregisteredRecipientException { return context.getGroupHelper() .createGroup(name, @@ -585,7 +588,8 @@ class ManagerImpl implements Manager { quote.message(), List.of(), resolveMentions(quote.mentions()), - SignalServiceDataMessage.Quote.Type.NORMAL)); + SignalServiceDataMessage.Quote.Type.NORMAL, + List.of())); } if (message.sticker().isPresent()) { final var sticker = message.sticker().get(); @@ -624,6 +628,14 @@ class ManagerImpl implements Manager { } messageBuilder.withPreviews(previews); } + if (message.storyReply().isPresent()) { + final var storyReply = message.storyReply().get(); + final var authorServiceId = context.getRecipientHelper() + .resolveSignalServiceAddress(context.getRecipientHelper().resolveRecipient(storyReply.author())) + .getServiceId(); + messageBuilder.withStoryContext(new SignalServiceDataMessage.StoryContext(authorServiceId, + storyReply.timestamp())); + } } private ArrayList resolveMentions(final List mentionList) throws UnregisteredRecipientException { @@ -667,14 +679,19 @@ class ManagerImpl implements Manager { boolean remove, RecipientIdentifier.Single targetAuthor, long targetSentTimestamp, - Set recipients + Set recipients, + final boolean isStory ) throws IOException, NotAGroupMemberException, GroupNotFoundException, GroupSendingNotAllowedException, UnregisteredRecipientException { var targetAuthorRecipientId = context.getRecipientHelper().resolveRecipient(targetAuthor); - var reaction = new SignalServiceDataMessage.Reaction(emoji, - remove, - context.getRecipientHelper().resolveSignalServiceAddress(targetAuthorRecipientId).getServiceId(), - targetSentTimestamp); + final var authorServiceId = context.getRecipientHelper() + .resolveSignalServiceAddress(targetAuthorRecipientId) + .getServiceId(); + var reaction = new SignalServiceDataMessage.Reaction(emoji, remove, authorServiceId, targetSentTimestamp); final var messageBuilder = SignalServiceDataMessage.newBuilder().withReaction(reaction); + if (isStory) { + messageBuilder.withStoryContext(new SignalServiceDataMessage.StoryContext(authorServiceId, + targetSentTimestamp)); + } return sendMessage(messageBuilder, recipients); } @@ -683,7 +700,7 @@ class ManagerImpl implements Manager { byte[] receipt, String note, RecipientIdentifier.Single recipient ) throws IOException { final var paymentNotification = new SignalServiceDataMessage.PaymentNotification(receipt, note); - final var payment = new SignalServiceDataMessage.Payment(paymentNotification); + final var payment = new SignalServiceDataMessage.Payment(paymentNotification, null); final var messageBuilder = SignalServiceDataMessage.newBuilder().withPayment(payment); try { return sendMessage(messageBuilder, Set.of(recipient)); @@ -861,9 +878,6 @@ class ManagerImpl implements Manager { @Override public void addReceiveHandler(final ReceiveMessageHandler handler, final boolean isWeakListener) { - if (isReceivingSynchronous) { - throw new IllegalStateException("Already receiving message synchronously."); - } synchronized (messageHandlers) { if (isWeakListener) { weakHandlers.add(handler); @@ -877,23 +891,12 @@ class ManagerImpl implements Manager { private static final AtomicInteger threadNumber = new AtomicInteger(0); private void startReceiveThreadIfRequired() { - if (receiveThread != null) { + if (receiveThread != null || isReceivingSynchronous) { return; } receiveThread = new Thread(() -> { logger.debug("Starting receiving messages"); - context.getReceiveHelper().receiveMessagesContinuously((envelope, e) -> { - synchronized (messageHandlers) { - final var handlers = Stream.concat(messageHandlers.stream(), weakHandlers.stream()).toList(); - handlers.forEach(h -> { - try { - h.handleMessage(envelope, e); - } catch (Throwable ex) { - logger.warn("Message handler failed, ignoring", ex); - } - }); - } - }); + context.getReceiveHelper().receiveMessagesContinuously(this::passReceivedMessageToHandlers); logger.debug("Finished receiving messages"); synchronized (messageHandlers) { receiveThread = null; @@ -910,6 +913,18 @@ class ManagerImpl implements Manager { receiveThread.start(); } + private void passReceivedMessageToHandlers(MessageEnvelope envelope, Throwable e) { + synchronized (messageHandlers) { + Stream.concat(messageHandlers.stream(), weakHandlers.stream()).forEach(h -> { + try { + h.handleMessage(envelope, e); + } catch (Throwable ex) { + logger.warn("Message handler failed, ignoring", ex); + } + }); + } + } + @Override public void removeReceiveHandler(final ReceiveMessageHandler handler) { final Thread thread; @@ -948,28 +963,35 @@ class ManagerImpl implements Manager { } @Override - public void receiveMessages(Duration timeout, ReceiveMessageHandler handler) throws IOException { - receiveMessages(timeout, true, handler); - } - - @Override - public void receiveMessages(ReceiveMessageHandler handler) throws IOException { - receiveMessages(Duration.ofMinutes(1), false, handler); + public void receiveMessages( + Optional timeout, Optional maxMessages, ReceiveMessageHandler handler + ) throws IOException, AlreadyReceivingException { + receiveMessages(timeout.orElse(Duration.ofMinutes(1)), timeout.isPresent(), maxMessages.orElse(null), handler); } private void receiveMessages( - Duration timeout, boolean returnOnTimeout, ReceiveMessageHandler handler - ) throws IOException { - if (isReceiving()) { - throw new IllegalStateException("Already receiving message."); + Duration timeout, boolean returnOnTimeout, Integer maxMessages, ReceiveMessageHandler handler + ) throws IOException, AlreadyReceivingException { + synchronized (messageHandlers) { + if (isReceiving()) { + throw new AlreadyReceivingException("Already receiving message."); + } + isReceivingSynchronous = true; + receiveThread = Thread.currentThread(); } - isReceivingSynchronous = true; - receiveThread = Thread.currentThread(); try { - context.getReceiveHelper().receiveMessages(timeout, returnOnTimeout, handler); + context.getReceiveHelper().receiveMessages(timeout, returnOnTimeout, maxMessages, (envelope, e) -> { + passReceivedMessageToHandlers(envelope, e); + handler.handleMessage(envelope, e); + }); } finally { - receiveThread = null; - isReceivingSynchronous = false; + synchronized (messageHandlers) { + receiveThread = null; + isReceivingSynchronous = false; + if (messageHandlers.size() > 0) { + startReceiveThreadIfRequired(); + } + } } } @@ -1147,6 +1169,11 @@ class ManagerImpl implements Manager { } } + @Override + public InputStream retrieveAttachment(final String id) throws IOException { + return context.getAttachmentHelper().retrieveAttachment(id).getStream(); + } + @Override public void close() { Thread thread;