]> nmode's Git Repositories - signal-cli/blobdiff - lib/src/main/java/org/asamk/signal/manager/helper/ReceiveHelper.java
Improve stop receive handling
[signal-cli] / lib / src / main / java / org / asamk / signal / manager / helper / ReceiveHelper.java
index 8dfc7deb569476b11bf119a85cbebae0021700da..7bc2c224052362d826438edd502d29dc44d79eea 100644 (file)
@@ -2,8 +2,8 @@ package org.asamk.signal.manager.helper;
 
 import org.asamk.signal.manager.Manager;
 import org.asamk.signal.manager.SignalDependencies;
-import org.asamk.signal.manager.api.UntrustedIdentityException;
 import org.asamk.signal.manager.actions.HandleAction;
+import org.asamk.signal.manager.api.UntrustedIdentityException;
 import org.asamk.signal.manager.storage.SignalAccount;
 import org.asamk.signal.manager.storage.messageCache.CachedMessage;
 import org.slf4j.Logger;
@@ -37,6 +37,8 @@ public class ReceiveHelper {
     private boolean ignoreAttachments = false;
     private boolean needsToRetryFailedMessages = false;
     private boolean hasCaughtUpWithOldMessages = false;
+    private boolean isWaitingForMessage = false;
+    private boolean shouldStop = false;
     private Callable authenticationFailureListener;
     private Callable caughtUpWithOldMessagesListener;
 
@@ -66,6 +68,22 @@ public class ReceiveHelper {
         this.caughtUpWithOldMessagesListener = caughtUpWithOldMessagesListener;
     }
 
+    public boolean requestStopReceiveMessages() {
+        this.shouldStop = true;
+        return isWaitingForMessage;
+    }
+
+    public void receiveMessagesContinuously(Manager.ReceiveMessageHandler handler) {
+        while (!shouldStop) {
+            try {
+                receiveMessages(Duration.ofMinutes(1), false, handler);
+                break;
+            } catch (IOException e) {
+                logger.warn("Receiving messages failed, retrying", e);
+            }
+        }
+    }
+
     public void receiveMessages(
             Duration timeout, boolean returnOnTimeout, Manager.ReceiveMessageHandler handler
     ) throws IOException {
@@ -92,6 +110,7 @@ public class ReceiveHelper {
             queuedActions.clear();
             dependencies.getSignalWebSocket().disconnect();
             webSocketStateDisposable.dispose();
+            shouldStop = false;
         }
     }
 
@@ -104,8 +123,9 @@ public class ReceiveHelper {
         final var signalWebSocket = dependencies.getSignalWebSocket();
 
         var backOffCounter = 0;
+        isWaitingForMessage = false;
 
-        while (!Thread.interrupted()) {
+        while (!shouldStop) {
             if (needsToRetryFailedMessages) {
                 retryFailedReceivedMessages(handler);
                 needsToRetryFailedMessages = false;
@@ -118,13 +138,16 @@ public class ReceiveHelper {
             }
             logger.debug("Checking for new message from server");
             try {
+                isWaitingForMessage = true;
                 var result = signalWebSocket.readOrEmpty(timeout.toMillis(), envelope1 -> {
+                    isWaitingForMessage = false;
                     final var recipientId = envelope1.hasSourceUuid() ? account.getRecipientStore()
                             .resolveRecipient(envelope1.getSourceAddress()) : null;
                     logger.trace("Storing new message from {}", recipientId);
                     // store message on disk, before acknowledging receipt to the server
                     cachedMessage[0] = account.getMessageCache().cacheMessage(envelope1, recipientId);
                 });
+                isWaitingForMessage = false;
                 backOffCounter = 0;
 
                 if (result.isPresent()) {
@@ -143,7 +166,6 @@ public class ReceiveHelper {
                 }
             } catch (AssertionError e) {
                 if (e.getCause() instanceof InterruptedException) {
-                    Thread.currentThread().interrupt();
                     break;
                 } else {
                     throw e;
@@ -255,23 +277,14 @@ public class ReceiveHelper {
 
     private void handleQueuedActions(final Collection<HandleAction> queuedActions) {
         logger.debug("Handling message actions");
-        var interrupted = false;
         for (var action : queuedActions) {
             logger.debug("Executing action {}", action.getClass().getSimpleName());
             try {
                 action.execute(context);
             } catch (Throwable e) {
-                if ((e instanceof AssertionError || e instanceof RuntimeException)
-                        && e.getCause() instanceof InterruptedException) {
-                    interrupted = true;
-                    continue;
-                }
                 logger.warn("Message action failed.", e);
             }
         }
-        if (interrupted) {
-            Thread.currentThread().interrupt();
-        }
     }
 
     private void onWebSocketStateChange(final WebSocketConnectionState s) {