]> nmode's Git Repositories - signal-cli/commitdiff
Add optional message limit for receive command
authorAsamK <asamk@gmx.de>
Mon, 31 Oct 2022 10:17:52 +0000 (11:17 +0100)
committerAsamK <asamk@gmx.de>
Mon, 31 Oct 2022 10:17:52 +0000 (11:17 +0100)
lib/src/main/java/org/asamk/signal/manager/Manager.java
lib/src/main/java/org/asamk/signal/manager/ManagerImpl.java
lib/src/main/java/org/asamk/signal/manager/helper/ReceiveHelper.java
man/signal-cli.1.adoc
src/main/java/org/asamk/signal/commands/ReceiveCommand.java
src/main/java/org/asamk/signal/dbus/DbusManagerImpl.java

index 3dcf8f5976788119c8f49e5557f1af0cabbf2153..ec168a4c345af2c3cc8b8ba34d6dd2195ff0da43 100644 (file)
@@ -202,12 +202,9 @@ public interface Manager extends Closeable {
     /**
      * Receive new messages from server, returns if no new message arrive in a timespan of timeout.
      */
-    void receiveMessages(Duration timeout, ReceiveMessageHandler handler) throws IOException;
-
-    /**
-     * Receive new messages from server, returns only if the thread is interrupted.
-     */
-    void receiveMessages(ReceiveMessageHandler handler) throws IOException;
+    public void receiveMessages(
+            Optional<Duration> timeout, Optional<Integer> maxMessages, ReceiveMessageHandler handler
+    ) throws IOException;
 
     void setReceiveConfig(ReceiveConfig receiveConfig);
 
index 4ffeb99fdd10a278f8ccef1a7e4f250397cca2a7..95f5bde4e710d62bf9f9b538c2624f027bf5ec2f 100644 (file)
@@ -961,17 +961,16 @@ class ManagerImpl implements Manager {
     }
 
     @Override
-    public void receiveMessages(Duration timeout, ReceiveMessageHandler handler) throws IOException {
-        receiveMessages(timeout, true, handler);
-    }
-
-    @Override
-    public void receiveMessages(ReceiveMessageHandler handler) throws IOException {
-        receiveMessages(Duration.ofMinutes(1), false, handler);
+    public void receiveMessages(
+            Optional<Duration> timeout,
+            Optional<Integer> maxMessages,
+            ReceiveMessageHandler handler
+    ) throws IOException {
+        receiveMessages(timeout.orElse(Duration.ofMinutes(1)), timeout.isPresent(), maxMessages.orElse(null), handler);
     }
 
     private void receiveMessages(
-            Duration timeout, boolean returnOnTimeout, ReceiveMessageHandler handler
+            Duration timeout, boolean returnOnTimeout, Integer maxMessages, ReceiveMessageHandler handler
     ) throws IOException {
         if (isReceiving()) {
             throw new IllegalStateException("Already receiving message.");
@@ -979,7 +978,7 @@ class ManagerImpl implements Manager {
         isReceivingSynchronous = true;
         receiveThread = Thread.currentThread();
         try {
-            context.getReceiveHelper().receiveMessages(timeout, returnOnTimeout, handler);
+            context.getReceiveHelper().receiveMessages(timeout, returnOnTimeout, maxMessages, handler);
         } finally {
             receiveThread = null;
             isReceivingSynchronous = false;
index 9fe1bf54587d78435d8db902161bb772f77d1c7f..c15f4f94cf89d6993a282a52fc89ad70f45199b7 100644 (file)
@@ -80,7 +80,7 @@ public class ReceiveHelper {
     public void receiveMessagesContinuously(Manager.ReceiveMessageHandler handler) {
         while (!shouldStop) {
             try {
-                receiveMessages(Duration.ofMinutes(1), false, handler);
+                receiveMessages(Duration.ofMinutes(1), false, null, handler);
                 break;
             } catch (IOException e) {
                 logger.warn("Receiving messages failed, retrying", e);
@@ -89,7 +89,7 @@ public class ReceiveHelper {
     }
 
     public void receiveMessages(
-            Duration timeout, boolean returnOnTimeout, Manager.ReceiveMessageHandler handler
+            Duration timeout, boolean returnOnTimeout, Integer maxMessages, Manager.ReceiveMessageHandler handler
     ) throws IOException {
         needsToRetryFailedMessages = true;
         hasCaughtUpWithOldMessages = false;
@@ -107,7 +107,7 @@ public class ReceiveHelper {
         signalWebSocket.connect();
 
         try {
-            receiveMessagesInternal(signalWebSocket, timeout, returnOnTimeout, handler, queuedActions);
+            receiveMessagesInternal(signalWebSocket, timeout, returnOnTimeout, maxMessages, handler, queuedActions);
         } finally {
             hasCaughtUpWithOldMessages = false;
             handleQueuedActions(queuedActions.keySet());
@@ -122,13 +122,15 @@ public class ReceiveHelper {
             final SignalWebSocket signalWebSocket,
             Duration timeout,
             boolean returnOnTimeout,
+            Integer maxMessages,
             Manager.ReceiveMessageHandler handler,
             final Map<HandleAction, HandleAction> queuedActions
     ) throws IOException {
+        int remainingMessages = maxMessages == null ? -1 : maxMessages;
         var backOffCounter = 0;
         isWaitingForMessage = false;
 
-        while (!shouldStop) {
+        while (!shouldStop && remainingMessages != 0) {
             if (needsToRetryFailedMessages) {
                 retryFailedReceivedMessages(handler);
                 needsToRetryFailedMessages = false;
@@ -154,6 +156,9 @@ public class ReceiveHelper {
                 backOffCounter = 0;
 
                 if (result.isPresent()) {
+                    if (remainingMessages > 0) {
+                        remainingMessages -= 1;
+                    }
                     envelope = result.get();
                     logger.debug("New message received from server");
                 } else {
index 5b3af7c1df12deea2f20184ba17cc82142c03387..4d34582aa83341fde7d3c6c782bc27a5c5d968e2 100644 (file)
@@ -372,6 +372,9 @@ In json mode this is outputted as one json object per line.
 Number of seconds to wait for new messages (negative values disable timeout).
 Default is 5 seconds.
 
+*--max-messages*::
+Maximum number of messages to receive, before returning.
+
 *--ignore-attachments*::
 Don’t download attachments of received messages.
 
index 4d5bdff0792553552ffb6ab510333849ec0d4695..0095e758716e94aa5c4df989b1e5fe2710d90551 100644 (file)
@@ -20,6 +20,7 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.time.Duration;
 import java.util.List;
+import java.util.Optional;
 
 public class ReceiveCommand implements LocalCommand {
 
@@ -37,6 +38,10 @@ public class ReceiveCommand implements LocalCommand {
                 .type(double.class)
                 .setDefault(3.0)
                 .help("Number of seconds to wait for new messages (negative values disable timeout)");
+        subparser.addArgument("--max-messages")
+                .type(int.class)
+                .setDefault(-1)
+                .help("Maximum number of messages to receive, before returning.");
         subparser.addArgument("--ignore-attachments")
                 .help("Don’t download attachments of received messages.")
                 .action(Arguments.storeTrue());
@@ -58,6 +63,7 @@ public class ReceiveCommand implements LocalCommand {
             final Namespace ns, final Manager m, final OutputWriter outputWriter
     ) throws CommandException {
         final var timeout = ns.getDouble("timeout");
+        final var maxMessagesRaw = ns.getInt("max-messages");
         final var ignoreAttachments = Boolean.TRUE.equals(ns.getBoolean("ignore-attachments"));
         final var ignoreStories = Boolean.TRUE.equals(ns.getBoolean("ignore-stories"));
         final var sendReadReceipts = Boolean.TRUE.equals(ns.getBoolean("send-read-receipts"));
@@ -65,11 +71,9 @@ public class ReceiveCommand implements LocalCommand {
         try {
             final var handler = outputWriter instanceof JsonWriter ? new JsonReceiveMessageHandler(m,
                     (JsonWriter) outputWriter) : new ReceiveMessageHandler(m, (PlainTextWriter) outputWriter);
-            if (timeout < 0) {
-                m.receiveMessages(handler);
-            } else {
-                m.receiveMessages(Duration.ofMillis((long) (timeout * 1000)), handler);
-            }
+            final var duration = timeout < 0 ? null : Duration.ofMillis((long) (timeout * 1000));
+            final var maxMessages = maxMessagesRaw < 0 ? null : maxMessagesRaw;
+            m.receiveMessages(Optional.ofNullable(duration), Optional.ofNullable(maxMessages), handler);
         } catch (IOException e) {
             throw new IOErrorException("Error while receiving messages: " + e.getMessage(), e);
         }
index 8e92cdf4ab4efe596b03a0d3fa65bb11a81408a8..b59be923d0c0091ab8a3935166b3dc72690305f7 100644 (file)
@@ -58,6 +58,7 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Function;
 import java.util.function.Supplier;
@@ -497,39 +498,46 @@ public class DbusManagerImpl implements Manager {
         }
     }
 
-    @Override
-    public void receiveMessages(final ReceiveMessageHandler handler) throws IOException {
-        addReceiveHandler(handler);
-        try {
-            synchronized (this) {
-                this.wait();
-            }
-        } catch (InterruptedException ignored) {
-        }
-        removeReceiveHandler(handler);
-    }
-
     @Override
     public void receiveMessages(
-            final Duration timeout, final ReceiveMessageHandler handler
+            Optional<Duration> timeout, Optional<Integer> maxMessages, ReceiveMessageHandler handler
     ) throws IOException {
+        final var remainingMessages = new AtomicInteger(maxMessages.orElse(-1));
         final var lastMessage = new AtomicLong(System.currentTimeMillis());
+        final var thread = Thread.currentThread();
 
         final ReceiveMessageHandler receiveHandler = (envelope, e) -> {
             lastMessage.set(System.currentTimeMillis());
             handler.handleMessage(envelope, e);
+            if (remainingMessages.get() > 0) {
+                if (remainingMessages.decrementAndGet() <= 0) {
+                    remainingMessages.set(0);
+                    thread.interrupt();
+                }
+            }
         };
         addReceiveHandler(receiveHandler);
-        while (true) {
+        if (timeout.isPresent()) {
+            while (remainingMessages.get() != 0) {
+                try {
+                    final var passedTime = System.currentTimeMillis() - lastMessage.get();
+                    final var sleepTimeRemaining = timeout.get().toMillis() - passedTime;
+                    if (sleepTimeRemaining < 0) {
+                        break;
+                    }
+                    Thread.sleep(sleepTimeRemaining);
+                } catch (InterruptedException ignored) {
+                }
+            }
+        } else {
             try {
-                final var sleepTimeRemaining = timeout.toMillis() - (System.currentTimeMillis() - lastMessage.get());
-                if (sleepTimeRemaining < 0) {
-                    break;
+                synchronized (this) {
+                    this.wait();
                 }
-                Thread.sleep(sleepTimeRemaining);
             } catch (InterruptedException ignored) {
             }
         }
+
         removeReceiveHandler(receiveHandler);
     }