From: AsamK Date: Tue, 1 Nov 2022 12:56:40 +0000 (+0100) Subject: Improve behavior with synchronous and asynchronous receivers X-Git-Tag: v0.11.5~20 X-Git-Url: https://git.nmode.ca/signal-cli/commitdiff_plain/90962296377ad76531191cea7caaad738af00f2f Improve behavior with synchronous and asynchronous receivers --- diff --git a/lib/src/main/java/org/asamk/signal/manager/Manager.java b/lib/src/main/java/org/asamk/signal/manager/Manager.java index ec168a4c..01d88e46 100644 --- a/lib/src/main/java/org/asamk/signal/manager/Manager.java +++ b/lib/src/main/java/org/asamk/signal/manager/Manager.java @@ -1,5 +1,6 @@ package org.asamk.signal.manager; +import org.asamk.signal.manager.api.AlreadyReceivingException; import org.asamk.signal.manager.api.AttachmentInvalidException; import org.asamk.signal.manager.api.Configuration; import org.asamk.signal.manager.api.Device; @@ -204,7 +205,7 @@ public interface Manager extends Closeable { */ public void receiveMessages( Optional timeout, Optional maxMessages, ReceiveMessageHandler handler - ) throws IOException; + ) throws IOException, AlreadyReceivingException; void setReceiveConfig(ReceiveConfig receiveConfig); diff --git a/lib/src/main/java/org/asamk/signal/manager/ManagerImpl.java b/lib/src/main/java/org/asamk/signal/manager/ManagerImpl.java index 95f5bde4..31231630 100644 --- a/lib/src/main/java/org/asamk/signal/manager/ManagerImpl.java +++ b/lib/src/main/java/org/asamk/signal/manager/ManagerImpl.java @@ -16,6 +16,7 @@ */ package org.asamk.signal.manager; +import org.asamk.signal.manager.api.AlreadyReceivingException; import org.asamk.signal.manager.api.AttachmentInvalidException; import org.asamk.signal.manager.api.Configuration; import org.asamk.signal.manager.api.Device; @@ -25,6 +26,7 @@ import org.asamk.signal.manager.api.InactiveGroupLinkException; import org.asamk.signal.manager.api.InvalidDeviceLinkException; import org.asamk.signal.manager.api.InvalidStickerException; import org.asamk.signal.manager.api.Message; +import org.asamk.signal.manager.api.MessageEnvelope; import org.asamk.signal.manager.api.NotPrimaryDeviceException; import org.asamk.signal.manager.api.Pair; import org.asamk.signal.manager.api.PendingAdminApprovalException; @@ -874,9 +876,6 @@ class ManagerImpl implements Manager { @Override public void addReceiveHandler(final ReceiveMessageHandler handler, final boolean isWeakListener) { - if (isReceivingSynchronous) { - throw new IllegalStateException("Already receiving message synchronously."); - } synchronized (messageHandlers) { if (isWeakListener) { weakHandlers.add(handler); @@ -890,23 +889,12 @@ class ManagerImpl implements Manager { private static final AtomicInteger threadNumber = new AtomicInteger(0); private void startReceiveThreadIfRequired() { - if (receiveThread != null) { + if (receiveThread != null || isReceivingSynchronous) { return; } receiveThread = new Thread(() -> { logger.debug("Starting receiving messages"); - context.getReceiveHelper().receiveMessagesContinuously((envelope, e) -> { - synchronized (messageHandlers) { - final var handlers = Stream.concat(messageHandlers.stream(), weakHandlers.stream()).toList(); - handlers.forEach(h -> { - try { - h.handleMessage(envelope, e); - } catch (Throwable ex) { - logger.warn("Message handler failed, ignoring", ex); - } - }); - } - }); + context.getReceiveHelper().receiveMessagesContinuously(this::passReceivedMessageToHandlers); logger.debug("Finished receiving messages"); synchronized (messageHandlers) { receiveThread = null; @@ -923,6 +911,18 @@ class ManagerImpl implements Manager { receiveThread.start(); } + private void passReceivedMessageToHandlers(MessageEnvelope envelope, Throwable e) { + synchronized (messageHandlers) { + Stream.concat(messageHandlers.stream(), weakHandlers.stream()).forEach(h -> { + try { + h.handleMessage(envelope, e); + } catch (Throwable ex) { + logger.warn("Message handler failed, ignoring", ex); + } + }); + } + } + @Override public void removeReceiveHandler(final ReceiveMessageHandler handler) { final Thread thread; @@ -962,26 +962,34 @@ class ManagerImpl implements Manager { @Override public void receiveMessages( - Optional timeout, - Optional maxMessages, - ReceiveMessageHandler handler - ) throws IOException { + Optional timeout, Optional maxMessages, ReceiveMessageHandler handler + ) throws IOException, AlreadyReceivingException { receiveMessages(timeout.orElse(Duration.ofMinutes(1)), timeout.isPresent(), maxMessages.orElse(null), handler); } private void receiveMessages( Duration timeout, boolean returnOnTimeout, Integer maxMessages, ReceiveMessageHandler handler - ) throws IOException { - if (isReceiving()) { - throw new IllegalStateException("Already receiving message."); + ) throws IOException, AlreadyReceivingException { + synchronized (messageHandlers) { + if (isReceiving()) { + throw new AlreadyReceivingException("Already receiving message."); + } + isReceivingSynchronous = true; + receiveThread = Thread.currentThread(); } - isReceivingSynchronous = true; - receiveThread = Thread.currentThread(); try { - context.getReceiveHelper().receiveMessages(timeout, returnOnTimeout, maxMessages, handler); + context.getReceiveHelper().receiveMessages(timeout, returnOnTimeout, maxMessages, (envelope, e) -> { + passReceivedMessageToHandlers(envelope, e); + handler.handleMessage(envelope, e); + }); } finally { - receiveThread = null; - isReceivingSynchronous = false; + synchronized (messageHandlers) { + receiveThread = null; + isReceivingSynchronous = false; + if (messageHandlers.size() > 0) { + startReceiveThreadIfRequired(); + } + } } } diff --git a/lib/src/main/java/org/asamk/signal/manager/api/AlreadyReceivingException.java b/lib/src/main/java/org/asamk/signal/manager/api/AlreadyReceivingException.java new file mode 100644 index 00000000..298e3f2f --- /dev/null +++ b/lib/src/main/java/org/asamk/signal/manager/api/AlreadyReceivingException.java @@ -0,0 +1,12 @@ +package org.asamk.signal.manager.api; + +public class AlreadyReceivingException extends Exception { + + public AlreadyReceivingException(String message) { + super(message); + } + + public AlreadyReceivingException(String message, Exception e) { + super(message, e); + } +} diff --git a/src/main/java/org/asamk/signal/commands/ReceiveCommand.java b/src/main/java/org/asamk/signal/commands/ReceiveCommand.java index 79d0ee1d..ab65f837 100644 --- a/src/main/java/org/asamk/signal/commands/ReceiveCommand.java +++ b/src/main/java/org/asamk/signal/commands/ReceiveCommand.java @@ -10,8 +10,10 @@ import org.asamk.signal.OutputType; import org.asamk.signal.ReceiveMessageHandler; import org.asamk.signal.commands.exceptions.CommandException; import org.asamk.signal.commands.exceptions.IOErrorException; +import org.asamk.signal.commands.exceptions.UserErrorException; import org.asamk.signal.json.JsonReceiveMessageHandler; import org.asamk.signal.manager.Manager; +import org.asamk.signal.manager.api.AlreadyReceivingException; import org.asamk.signal.manager.api.ReceiveConfig; import org.asamk.signal.output.JsonWriter; import org.asamk.signal.output.OutputWriter; @@ -79,6 +81,8 @@ public class ReceiveCommand implements LocalCommand, JsonRpcSingleCommand