]> nmode's Git Repositories - signal-cli/commitdiff
Improve stop receive handling
authorAsamK <asamk@gmx.de>
Sat, 12 Feb 2022 11:26:42 +0000 (12:26 +0100)
committerAsamK <asamk@gmx.de>
Sat, 12 Feb 2022 13:09:30 +0000 (14:09 +0100)
Only interrupt the receive thread if it is currently waiting for new
messages from the server, otherwise just set a stop flag.

graalvm-config-dir/resource-config.json
lib/src/main/java/org/asamk/signal/manager/ManagerImpl.java
lib/src/main/java/org/asamk/signal/manager/helper/ProfileHelper.java
lib/src/main/java/org/asamk/signal/manager/helper/ReceiveHelper.java
lib/src/main/java/org/asamk/signal/manager/storage/sendLog/MessageSendLogStore.java

index c0866365331a56f148344b8a188a811e2bd04a3b..08a9dcc50d69ed3673b6b3aef58c81b68fc07240 100644 (file)
     {
       "pattern":"\\Qcom/google/i18n/phonenumbers/data/PhoneNumberMetadataProto_CH\\E"
     }, 
+    {
+      "pattern":"\\Qcom/google/i18n/phonenumbers/data/PhoneNumberMetadataProto_CI\\E"
+    }, 
+    {
+      "pattern":"\\Qcom/google/i18n/phonenumbers/data/PhoneNumberMetadataProto_CL\\E"
+    }, 
     {
       "pattern":"\\Qcom/google/i18n/phonenumbers/data/PhoneNumberMetadataProto_CN\\E"
     }, 
     {
       "pattern":"\\Qcom/google/i18n/phonenumbers/data/PhoneNumberMetadataProto_TH\\E"
     }, 
+    {
+      "pattern":"\\Qcom/google/i18n/phonenumbers/data/PhoneNumberMetadataProto_TR\\E"
+    }, 
     {
       "pattern":"\\Qcom/google/i18n/phonenumbers/data/PhoneNumberMetadataProto_UA\\E"
     }, 
     {
       "pattern":"\\Qcom/google/i18n/phonenumbers/data/PhoneNumberMetadataProto_US\\E"
     }, 
+    {
+      "pattern":"\\Qcom/google/i18n/phonenumbers/data/PhoneNumberMetadataProto_VE\\E"
+    }, 
     {
       "pattern":"\\Qcom/google/i18n/phonenumbers/data/PhoneNumberMetadataProto_XK\\E"
     }, 
       "name":"net.sourceforge.argparse4j.internal.ArgumentParserImpl",
       "locales":[
         "", 
+        "en", 
         "und"
       ]
     }]
index c06436049542ceba826738b19b9f3933d1828c79..3f418de8452fa9277b978b5f5b07a4d1ead3902d 100644 (file)
@@ -765,24 +765,17 @@ class ManagerImpl implements Manager {
         }
         receiveThread = new Thread(() -> {
             logger.debug("Starting receiving messages");
-            while (!Thread.interrupted()) {
-                try {
-                    context.getReceiveHelper().receiveMessages(Duration.ofMinutes(1), false, (envelope, e) -> {
-                        synchronized (messageHandlers) {
-                            Stream.concat(messageHandlers.stream(), weakHandlers.stream()).forEach(h -> {
-                                try {
-                                    h.handleMessage(envelope, e);
-                                } catch (Exception ex) {
-                                    logger.warn("Message handler failed, ignoring", ex);
-                                }
-                            });
+            context.getReceiveHelper().receiveMessagesContinuously((envelope, e) -> {
+                synchronized (messageHandlers) {
+                    Stream.concat(messageHandlers.stream(), weakHandlers.stream()).forEach(h -> {
+                        try {
+                            h.handleMessage(envelope, e);
+                        } catch (Exception ex) {
+                            logger.warn("Message handler failed, ignoring", ex);
                         }
                     });
-                    break;
-                } catch (IOException e) {
-                    logger.warn("Receiving messages failed, retrying", e);
                 }
-            }
+            });
             logger.debug("Finished receiving messages");
             synchronized (messageHandlers) {
                 receiveThread = null;
@@ -816,7 +809,10 @@ class ManagerImpl implements Manager {
     }
 
     private void stopReceiveThread(final Thread thread) {
-        thread.interrupt();
+        if (context.getReceiveHelper().requestStopReceiveMessages()) {
+            logger.debug("Receive stop requested, interrupting read from server.");
+            thread.interrupt();
+        }
         try {
             thread.join();
         } catch (InterruptedException ignored) {
@@ -1030,14 +1026,15 @@ class ManagerImpl implements Manager {
         dependencies.getSignalWebSocket().disconnect();
         disposable.dispose();
 
+        if (account != null) {
+            account.close();
+        }
+
         synchronized (closedListeners) {
             closedListeners.forEach(Runnable::run);
             closedListeners.clear();
         }
 
-        if (account != null) {
-            account.close();
-        }
         account = null;
     }
 }
index ec4d385e1e9effd65403c380cba77dcec1042b42..8e58e9769ce34c693d529311bc42936b94b0efb5 100644 (file)
@@ -347,9 +347,6 @@ public final class ProfileHelper {
                     .storeProfileAvatar(address,
                             outputStream -> retrieveProfileAvatar(avatarPath, profileKey, outputStream));
         } catch (Throwable e) {
-            if (e instanceof AssertionError && e.getCause() instanceof InterruptedException) {
-                Thread.currentThread().interrupt();
-            }
             logger.warn("Failed to download profile avatar, ignoring: {}", e.getMessage());
         }
     }
index 8dfc7deb569476b11bf119a85cbebae0021700da..7bc2c224052362d826438edd502d29dc44d79eea 100644 (file)
@@ -2,8 +2,8 @@ package org.asamk.signal.manager.helper;
 
 import org.asamk.signal.manager.Manager;
 import org.asamk.signal.manager.SignalDependencies;
-import org.asamk.signal.manager.api.UntrustedIdentityException;
 import org.asamk.signal.manager.actions.HandleAction;
+import org.asamk.signal.manager.api.UntrustedIdentityException;
 import org.asamk.signal.manager.storage.SignalAccount;
 import org.asamk.signal.manager.storage.messageCache.CachedMessage;
 import org.slf4j.Logger;
@@ -37,6 +37,8 @@ public class ReceiveHelper {
     private boolean ignoreAttachments = false;
     private boolean needsToRetryFailedMessages = false;
     private boolean hasCaughtUpWithOldMessages = false;
+    private boolean isWaitingForMessage = false;
+    private boolean shouldStop = false;
     private Callable authenticationFailureListener;
     private Callable caughtUpWithOldMessagesListener;
 
@@ -66,6 +68,22 @@ public class ReceiveHelper {
         this.caughtUpWithOldMessagesListener = caughtUpWithOldMessagesListener;
     }
 
+    public boolean requestStopReceiveMessages() {
+        this.shouldStop = true;
+        return isWaitingForMessage;
+    }
+
+    public void receiveMessagesContinuously(Manager.ReceiveMessageHandler handler) {
+        while (!shouldStop) {
+            try {
+                receiveMessages(Duration.ofMinutes(1), false, handler);
+                break;
+            } catch (IOException e) {
+                logger.warn("Receiving messages failed, retrying", e);
+            }
+        }
+    }
+
     public void receiveMessages(
             Duration timeout, boolean returnOnTimeout, Manager.ReceiveMessageHandler handler
     ) throws IOException {
@@ -92,6 +110,7 @@ public class ReceiveHelper {
             queuedActions.clear();
             dependencies.getSignalWebSocket().disconnect();
             webSocketStateDisposable.dispose();
+            shouldStop = false;
         }
     }
 
@@ -104,8 +123,9 @@ public class ReceiveHelper {
         final var signalWebSocket = dependencies.getSignalWebSocket();
 
         var backOffCounter = 0;
+        isWaitingForMessage = false;
 
-        while (!Thread.interrupted()) {
+        while (!shouldStop) {
             if (needsToRetryFailedMessages) {
                 retryFailedReceivedMessages(handler);
                 needsToRetryFailedMessages = false;
@@ -118,13 +138,16 @@ public class ReceiveHelper {
             }
             logger.debug("Checking for new message from server");
             try {
+                isWaitingForMessage = true;
                 var result = signalWebSocket.readOrEmpty(timeout.toMillis(), envelope1 -> {
+                    isWaitingForMessage = false;
                     final var recipientId = envelope1.hasSourceUuid() ? account.getRecipientStore()
                             .resolveRecipient(envelope1.getSourceAddress()) : null;
                     logger.trace("Storing new message from {}", recipientId);
                     // store message on disk, before acknowledging receipt to the server
                     cachedMessage[0] = account.getMessageCache().cacheMessage(envelope1, recipientId);
                 });
+                isWaitingForMessage = false;
                 backOffCounter = 0;
 
                 if (result.isPresent()) {
@@ -143,7 +166,6 @@ public class ReceiveHelper {
                 }
             } catch (AssertionError e) {
                 if (e.getCause() instanceof InterruptedException) {
-                    Thread.currentThread().interrupt();
                     break;
                 } else {
                     throw e;
@@ -255,23 +277,14 @@ public class ReceiveHelper {
 
     private void handleQueuedActions(final Collection<HandleAction> queuedActions) {
         logger.debug("Handling message actions");
-        var interrupted = false;
         for (var action : queuedActions) {
             logger.debug("Executing action {}", action.getClass().getSimpleName());
             try {
                 action.execute(context);
             } catch (Throwable e) {
-                if ((e instanceof AssertionError || e instanceof RuntimeException)
-                        && e.getCause() instanceof InterruptedException) {
-                    interrupted = true;
-                    continue;
-                }
                 logger.warn("Message action failed.", e);
             }
         }
-        if (interrupted) {
-            Thread.currentThread().interrupt();
-        }
     }
 
     private void onWebSocketStateChange(final WebSocketConnectionState s) {
index c0e7d429af0a1ebe5a44c489dab5a7dc892636ce..3b75de65499ba9305158d6f125c72f618018d163 100644 (file)
@@ -49,7 +49,7 @@ public class MessageSendLogStore implements AutoCloseable {
         this.cleanupThread = new Thread(() -> {
             try {
                 final var interval = Duration.ofHours(1).toMillis();
-                while (true) {
+                while (!Thread.interrupted()) {
                     try (final var connection = database.getConnection()) {
                         deleteOutdatedEntries(connection);
                     } catch (SQLException e) {