X-Git-Url: https://git.nmode.ca/signal-cli/blobdiff_plain/5ed9db4f08e52ed0c42cb42740f85d2ad346e13c..ccb37c00f6fed576664427a907fcecc21d2dbc67:/lib/src/main/java/org/asamk/signal/manager/ManagerImpl.java diff --git a/lib/src/main/java/org/asamk/signal/manager/ManagerImpl.java b/lib/src/main/java/org/asamk/signal/manager/ManagerImpl.java index 4ffeb99f..652b4e76 100644 --- a/lib/src/main/java/org/asamk/signal/manager/ManagerImpl.java +++ b/lib/src/main/java/org/asamk/signal/manager/ManagerImpl.java @@ -16,6 +16,7 @@ */ package org.asamk.signal.manager; +import org.asamk.signal.manager.api.AlreadyReceivingException; import org.asamk.signal.manager.api.AttachmentInvalidException; import org.asamk.signal.manager.api.Configuration; import org.asamk.signal.manager.api.Device; @@ -25,6 +26,7 @@ import org.asamk.signal.manager.api.InactiveGroupLinkException; import org.asamk.signal.manager.api.InvalidDeviceLinkException; import org.asamk.signal.manager.api.InvalidStickerException; import org.asamk.signal.manager.api.Message; +import org.asamk.signal.manager.api.MessageEnvelope; import org.asamk.signal.manager.api.NotPrimaryDeviceException; import org.asamk.signal.manager.api.Pair; import org.asamk.signal.manager.api.PendingAdminApprovalException; @@ -82,6 +84,7 @@ import org.whispersystems.signalservice.internal.util.Util; import java.io.ByteArrayInputStream; import java.io.File; import java.io.IOException; +import java.io.InputStream; import java.net.URI; import java.nio.charset.StandardCharsets; import java.time.Duration; @@ -696,7 +699,7 @@ class ManagerImpl implements Manager { byte[] receipt, String note, RecipientIdentifier.Single recipient ) throws IOException { final var paymentNotification = new SignalServiceDataMessage.PaymentNotification(receipt, note); - final var payment = new SignalServiceDataMessage.Payment(paymentNotification); + final var payment = new SignalServiceDataMessage.Payment(paymentNotification, null); final var messageBuilder = SignalServiceDataMessage.newBuilder().withPayment(payment); try { return sendMessage(messageBuilder, Set.of(recipient)); @@ -874,9 +877,6 @@ class ManagerImpl implements Manager { @Override public void addReceiveHandler(final ReceiveMessageHandler handler, final boolean isWeakListener) { - if (isReceivingSynchronous) { - throw new IllegalStateException("Already receiving message synchronously."); - } synchronized (messageHandlers) { if (isWeakListener) { weakHandlers.add(handler); @@ -890,23 +890,12 @@ class ManagerImpl implements Manager { private static final AtomicInteger threadNumber = new AtomicInteger(0); private void startReceiveThreadIfRequired() { - if (receiveThread != null) { + if (receiveThread != null || isReceivingSynchronous) { return; } receiveThread = new Thread(() -> { logger.debug("Starting receiving messages"); - context.getReceiveHelper().receiveMessagesContinuously((envelope, e) -> { - synchronized (messageHandlers) { - final var handlers = Stream.concat(messageHandlers.stream(), weakHandlers.stream()).toList(); - handlers.forEach(h -> { - try { - h.handleMessage(envelope, e); - } catch (Throwable ex) { - logger.warn("Message handler failed, ignoring", ex); - } - }); - } - }); + context.getReceiveHelper().receiveMessagesContinuously(this::passReceivedMessageToHandlers); logger.debug("Finished receiving messages"); synchronized (messageHandlers) { receiveThread = null; @@ -923,6 +912,18 @@ class ManagerImpl implements Manager { receiveThread.start(); } + private void passReceivedMessageToHandlers(MessageEnvelope envelope, Throwable e) { + synchronized (messageHandlers) { + Stream.concat(messageHandlers.stream(), weakHandlers.stream()).forEach(h -> { + try { + h.handleMessage(envelope, e); + } catch (Throwable ex) { + logger.warn("Message handler failed, ignoring", ex); + } + }); + } + } + @Override public void removeReceiveHandler(final ReceiveMessageHandler handler) { final Thread thread; @@ -961,28 +962,35 @@ class ManagerImpl implements Manager { } @Override - public void receiveMessages(Duration timeout, ReceiveMessageHandler handler) throws IOException { - receiveMessages(timeout, true, handler); - } - - @Override - public void receiveMessages(ReceiveMessageHandler handler) throws IOException { - receiveMessages(Duration.ofMinutes(1), false, handler); + public void receiveMessages( + Optional timeout, Optional maxMessages, ReceiveMessageHandler handler + ) throws IOException, AlreadyReceivingException { + receiveMessages(timeout.orElse(Duration.ofMinutes(1)), timeout.isPresent(), maxMessages.orElse(null), handler); } private void receiveMessages( - Duration timeout, boolean returnOnTimeout, ReceiveMessageHandler handler - ) throws IOException { - if (isReceiving()) { - throw new IllegalStateException("Already receiving message."); + Duration timeout, boolean returnOnTimeout, Integer maxMessages, ReceiveMessageHandler handler + ) throws IOException, AlreadyReceivingException { + synchronized (messageHandlers) { + if (isReceiving()) { + throw new AlreadyReceivingException("Already receiving message."); + } + isReceivingSynchronous = true; + receiveThread = Thread.currentThread(); } - isReceivingSynchronous = true; - receiveThread = Thread.currentThread(); try { - context.getReceiveHelper().receiveMessages(timeout, returnOnTimeout, handler); + context.getReceiveHelper().receiveMessages(timeout, returnOnTimeout, maxMessages, (envelope, e) -> { + passReceivedMessageToHandlers(envelope, e); + handler.handleMessage(envelope, e); + }); } finally { - receiveThread = null; - isReceivingSynchronous = false; + synchronized (messageHandlers) { + receiveThread = null; + isReceivingSynchronous = false; + if (messageHandlers.size() > 0) { + startReceiveThreadIfRequired(); + } + } } } @@ -1160,6 +1168,11 @@ class ManagerImpl implements Manager { } } + @Override + public InputStream retrieveAttachment(final String id) throws IOException { + return context.getAttachmentHelper().retrieveAttachment(id).getStream(); + } + @Override public void close() { Thread thread;