]> nmode's Git Repositories - signal-cli/commitdiff
Extract ReceiveHelper
authorAsamK <asamk@gmx.de>
Thu, 30 Dec 2021 21:44:38 +0000 (22:44 +0100)
committerAsamK <asamk@gmx.de>
Thu, 30 Dec 2021 21:44:38 +0000 (22:44 +0100)
lib/src/main/java/org/asamk/signal/manager/ManagerImpl.java
lib/src/main/java/org/asamk/signal/manager/helper/Context.java
lib/src/main/java/org/asamk/signal/manager/helper/ReceiveHelper.java [new file with mode: 0644]

index 1af99ff155510576d496dd84538705c6469c240f..1f731ee897717dbc75c050a5afa2460859db2afc 100644 (file)
@@ -16,7 +16,6 @@
  */
 package org.asamk.signal.manager;
 
-import org.asamk.signal.manager.actions.HandleAction;
 import org.asamk.signal.manager.api.Configuration;
 import org.asamk.signal.manager.api.Device;
 import org.asamk.signal.manager.api.Group;
@@ -43,7 +42,6 @@ import org.asamk.signal.manager.helper.Context;
 import org.asamk.signal.manager.storage.SignalAccount;
 import org.asamk.signal.manager.storage.groups.GroupInfo;
 import org.asamk.signal.manager.storage.identities.IdentityInfo;
-import org.asamk.signal.manager.storage.messageCache.CachedMessage;
 import org.asamk.signal.manager.storage.recipients.Contact;
 import org.asamk.signal.manager.storage.recipients.Profile;
 import org.asamk.signal.manager.storage.recipients.RecipientAddress;
@@ -59,7 +57,6 @@ import org.whispersystems.libsignal.ecc.ECPublicKey;
 import org.whispersystems.libsignal.util.guava.Optional;
 import org.whispersystems.signalservice.api.SignalSessionLock;
 import org.whispersystems.signalservice.api.messages.SignalServiceDataMessage;
-import org.whispersystems.signalservice.api.messages.SignalServiceEnvelope;
 import org.whispersystems.signalservice.api.messages.SignalServiceReceiptMessage;
 import org.whispersystems.signalservice.api.messages.SignalServiceTypingMessage;
 import org.whispersystems.signalservice.api.push.ACI;
@@ -67,8 +64,6 @@ import org.whispersystems.signalservice.api.push.exceptions.AuthorizationFailedE
 import org.whispersystems.signalservice.api.util.DeviceNameUtil;
 import org.whispersystems.signalservice.api.util.InvalidNumberException;
 import org.whispersystems.signalservice.api.util.PhoneNumberFormatter;
-import org.whispersystems.signalservice.api.websocket.WebSocketConnectionState;
-import org.whispersystems.signalservice.api.websocket.WebSocketUnavailableException;
 import org.whispersystems.signalservice.internal.util.DynamicCredentialsProvider;
 import org.whispersystems.signalservice.internal.util.Hex;
 import org.whispersystems.signalservice.internal.util.Util;
@@ -81,7 +76,6 @@ import java.net.URLEncoder;
 import java.nio.charset.StandardCharsets;
 import java.time.Duration;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -91,14 +85,10 @@ import java.util.UUID;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
-import io.reactivex.rxjava3.core.Observable;
-import io.reactivex.rxjava3.schedulers.Schedulers;
-
 import static org.asamk.signal.manager.config.ServiceConfig.capabilities;
 
 public class ManagerImpl implements Manager {
@@ -113,15 +103,11 @@ public class ManagerImpl implements Manager {
 
     private final Context context;
 
-    private boolean hasCaughtUpWithOldMessages = false;
-    private boolean ignoreAttachments = false;
-
     private Thread receiveThread;
     private final Set<ReceiveMessageHandler> weakHandlers = new HashSet<>();
     private final Set<ReceiveMessageHandler> messageHandlers = new HashSet<>();
     private final List<Runnable> closedListeners = new ArrayList<>();
     private boolean isReceivingSynchronous;
-    private boolean needsToRetryFailedMessages = false;
 
     ManagerImpl(
             SignalAccount account,
@@ -155,6 +141,18 @@ public class ManagerImpl implements Manager {
         final var stickerPackStore = new StickerPackStore(pathConfig.stickerPacksPath());
 
         this.context = new Context(account, dependencies, avatarStore, attachmentStore, stickerPackStore);
+        this.context.getReceiveHelper().setAuthenticationFailureListener(() -> {
+            try {
+                close();
+            } catch (IOException e) {
+                logger.warn("Failed to close account after authentication failure", e);
+            }
+        });
+        this.context.getReceiveHelper().setCaughtUpWithOldMessagesListener(() -> {
+            synchronized (this) {
+                this.notifyAll();
+            }
+        });
     }
 
     @Override
@@ -257,7 +255,7 @@ public class ManagerImpl implements Manager {
     @Override
     public void updateConfiguration(
             Configuration configuration
-    ) throws IOException, NotMasterDeviceException {
+    ) throws NotMasterDeviceException {
         if (!account.isMasterDevice()) {
             throw new NotMasterDeviceException();
         }
@@ -762,7 +760,7 @@ public class ManagerImpl implements Manager {
     @Override
     public void setGroupBlocked(
             final GroupId groupId, final boolean blocked
-    ) throws GroupNotFoundException, IOException, NotMasterDeviceException {
+    ) throws GroupNotFoundException, NotMasterDeviceException {
         if (!account.isMasterDevice()) {
             throw new NotMasterDeviceException();
         }
@@ -832,54 +830,6 @@ public class ManagerImpl implements Manager {
         }
     }
 
-    private void retryFailedReceivedMessages(ReceiveMessageHandler handler) {
-        Set<HandleAction> queuedActions = new HashSet<>();
-        for (var cachedMessage : account.getMessageCache().getCachedMessages()) {
-            var actions = retryFailedReceivedMessage(handler, cachedMessage);
-            if (actions != null) {
-                queuedActions.addAll(actions);
-            }
-        }
-        handleQueuedActions(queuedActions);
-    }
-
-    private List<HandleAction> retryFailedReceivedMessage(
-            final ReceiveMessageHandler handler, final CachedMessage cachedMessage
-    ) {
-        var envelope = cachedMessage.loadEnvelope();
-        if (envelope == null) {
-            cachedMessage.delete();
-            return null;
-        }
-
-        final var result = context.getIncomingMessageHandler()
-                .handleRetryEnvelope(envelope, ignoreAttachments, handler);
-        final var actions = result.first();
-        final var exception = result.second();
-
-        if (exception instanceof UntrustedIdentityException) {
-            if (System.currentTimeMillis() - envelope.getServerDeliveredTimestamp() > 1000L * 60 * 60 * 24 * 30) {
-                // Envelope is more than a month old, cleaning up.
-                cachedMessage.delete();
-                return null;
-            }
-            if (!envelope.hasSourceUuid()) {
-                final var identifier = ((UntrustedIdentityException) exception).getSender();
-                final var recipientId = account.getRecipientStore().resolveRecipient(identifier);
-                try {
-                    account.getMessageCache().replaceSender(cachedMessage, recipientId);
-                } catch (IOException ioException) {
-                    logger.warn("Failed to move cached message to recipient folder: {}", ioException.getMessage());
-                }
-            }
-            return null;
-        }
-
-        // If successful and for all other errors that are not recoverable, delete the cached message
-        cachedMessage.delete();
-        return actions;
-    }
-
     @Override
     public void addReceiveHandler(final ReceiveMessageHandler handler, final boolean isWeakListener) {
         if (isReceivingSynchronous) {
@@ -903,7 +853,7 @@ public class ManagerImpl implements Manager {
             logger.debug("Starting receiving messages");
             while (!Thread.interrupted()) {
                 try {
-                    receiveMessagesInternal(Duration.ofMinutes(1), false, (envelope, e) -> {
+                    context.getReceiveHelper().receiveMessages(Duration.ofMinutes(1), false, (envelope, e) -> {
                         synchronized (messageHandlers) {
                             Stream.concat(messageHandlers.stream(), weakHandlers.stream()).forEach(h -> {
                                 try {
@@ -920,7 +870,6 @@ public class ManagerImpl implements Manager {
                 }
             }
             logger.debug("Finished receiving messages");
-            hasCaughtUpWithOldMessages = false;
             synchronized (messageHandlers) {
                 receiveThread = null;
 
@@ -988,180 +937,21 @@ public class ManagerImpl implements Manager {
         isReceivingSynchronous = true;
         receiveThread = Thread.currentThread();
         try {
-            receiveMessagesInternal(timeout, returnOnTimeout, handler);
+            context.getReceiveHelper().receiveMessages(timeout, returnOnTimeout, handler);
         } finally {
             receiveThread = null;
-            hasCaughtUpWithOldMessages = false;
             isReceivingSynchronous = false;
         }
     }
 
-    private void receiveMessagesInternal(
-            Duration timeout, boolean returnOnTimeout, ReceiveMessageHandler handler
-    ) throws IOException {
-        needsToRetryFailedMessages = true;
-
-        // Use a Map here because java Set doesn't have a get method ...
-        Map<HandleAction, HandleAction> queuedActions = new HashMap<>();
-
-        final var signalWebSocket = dependencies.getSignalWebSocket();
-        final var webSocketStateDisposable = Observable.merge(signalWebSocket.getUnidentifiedWebSocketState(),
-                        signalWebSocket.getWebSocketState())
-                .subscribeOn(Schedulers.computation())
-                .observeOn(Schedulers.computation())
-                .distinctUntilChanged()
-                .subscribe(this::onWebSocketStateChange);
-        signalWebSocket.connect();
-
-        hasCaughtUpWithOldMessages = false;
-        var backOffCounter = 0;
-        final var MAX_BACKOFF_COUNTER = 9;
-
-        while (!Thread.interrupted()) {
-            if (needsToRetryFailedMessages) {
-                retryFailedReceivedMessages(handler);
-                needsToRetryFailedMessages = false;
-            }
-            SignalServiceEnvelope envelope;
-            final CachedMessage[] cachedMessage = {null};
-            final var nowMillis = System.currentTimeMillis();
-            if (nowMillis - account.getLastReceiveTimestamp() > 60000) {
-                account.setLastReceiveTimestamp(nowMillis);
-            }
-            logger.debug("Checking for new message from server");
-            try {
-                var result = signalWebSocket.readOrEmpty(timeout.toMillis(), envelope1 -> {
-                    final var recipientId = envelope1.hasSourceUuid() ? account.getRecipientStore()
-                            .resolveRecipient(envelope1.getSourceAddress()) : null;
-                    // store message on disk, before acknowledging receipt to the server
-                    cachedMessage[0] = account.getMessageCache().cacheMessage(envelope1, recipientId);
-                });
-                backOffCounter = 0;
-
-                if (result.isPresent()) {
-                    envelope = result.get();
-                    logger.debug("New message received from server");
-                } else {
-                    logger.debug("Received indicator that server queue is empty");
-                    handleQueuedActions(queuedActions.keySet());
-                    queuedActions.clear();
-
-                    hasCaughtUpWithOldMessages = true;
-                    synchronized (this) {
-                        this.notifyAll();
-                    }
-
-                    // Continue to wait another timeout for new messages
-                    continue;
-                }
-            } catch (AssertionError e) {
-                if (e.getCause() instanceof InterruptedException) {
-                    Thread.currentThread().interrupt();
-                    break;
-                } else {
-                    throw e;
-                }
-            } catch (IOException e) {
-                logger.debug("Pipe unexpectedly unavailable: {}", e.getMessage());
-                if (e instanceof WebSocketUnavailableException || "Connection closed!".equals(e.getMessage())) {
-                    final var sleepMilliseconds = 100 * (long) Math.pow(2, backOffCounter);
-                    backOffCounter = Math.min(backOffCounter + 1, MAX_BACKOFF_COUNTER);
-                    logger.warn("Connection closed unexpectedly, reconnecting in {} ms", sleepMilliseconds);
-                    try {
-                        Thread.sleep(sleepMilliseconds);
-                    } catch (InterruptedException interruptedException) {
-                        return;
-                    }
-                    hasCaughtUpWithOldMessages = false;
-                    signalWebSocket.connect();
-                    continue;
-                }
-                throw e;
-            } catch (TimeoutException e) {
-                backOffCounter = 0;
-                if (returnOnTimeout) return;
-                continue;
-            }
-
-            final var result = context.getIncomingMessageHandler().handleEnvelope(envelope, ignoreAttachments, handler);
-            for (final var h : result.first()) {
-                final var existingAction = queuedActions.get(h);
-                if (existingAction == null) {
-                    queuedActions.put(h, h);
-                } else {
-                    existingAction.mergeOther(h);
-                }
-            }
-            final var exception = result.second();
-
-            if (hasCaughtUpWithOldMessages) {
-                handleQueuedActions(queuedActions.keySet());
-                queuedActions.clear();
-            }
-            if (cachedMessage[0] != null) {
-                if (exception instanceof UntrustedIdentityException) {
-                    logger.debug("Keeping message with untrusted identity in message cache");
-                    final var address = ((UntrustedIdentityException) exception).getSender();
-                    final var recipientId = account.getRecipientStore().resolveRecipient(address);
-                    if (!envelope.hasSourceUuid()) {
-                        try {
-                            cachedMessage[0] = account.getMessageCache().replaceSender(cachedMessage[0], recipientId);
-                        } catch (IOException ioException) {
-                            logger.warn("Failed to move cached message to recipient folder: {}",
-                                    ioException.getMessage());
-                        }
-                    }
-                } else {
-                    cachedMessage[0].delete();
-                }
-            }
-        }
-        handleQueuedActions(queuedActions.keySet());
-        queuedActions.clear();
-        dependencies.getSignalWebSocket().disconnect();
-        webSocketStateDisposable.dispose();
-    }
-
-    private void onWebSocketStateChange(final WebSocketConnectionState s) {
-        if (s.equals(WebSocketConnectionState.AUTHENTICATION_FAILED)) {
-            account.setRegistered(false);
-            try {
-                close();
-            } catch (IOException e) {
-                e.printStackTrace();
-            }
-        }
-    }
-
     @Override
     public void setIgnoreAttachments(final boolean ignoreAttachments) {
-        this.ignoreAttachments = ignoreAttachments;
+        context.getReceiveHelper().setIgnoreAttachments(ignoreAttachments);
     }
 
     @Override
     public boolean hasCaughtUpWithOldMessages() {
-        return hasCaughtUpWithOldMessages;
-    }
-
-    private void handleQueuedActions(final Collection<HandleAction> queuedActions) {
-        logger.debug("Handling message actions");
-        var interrupted = false;
-        for (var action : queuedActions) {
-            logger.debug("Executing action {}", action.getClass().getSimpleName());
-            try {
-                action.execute(context);
-            } catch (Throwable e) {
-                if ((e instanceof AssertionError || e instanceof RuntimeException)
-                        && e.getCause() instanceof InterruptedException) {
-                    interrupted = true;
-                    continue;
-                }
-                logger.warn("Message action failed.", e);
-            }
-        }
-        if (interrupted) {
-            Thread.currentThread().interrupt();
-        }
+        return context.getReceiveHelper().hasCaughtUpWithOldMessages();
     }
 
     @Override
@@ -1268,7 +1058,7 @@ public class ManagerImpl implements Manager {
         }
         final var updated = context.getIdentityHelper().trustIdentityVerified(recipientId, fingerprint);
         if (updated && this.isReceiving()) {
-            needsToRetryFailedMessages = true;
+            context.getReceiveHelper().setNeedsToRetryFailedMessages(true);
         }
         return updated;
     }
@@ -1291,7 +1081,7 @@ public class ManagerImpl implements Manager {
         }
         final var updated = context.getIdentityHelper().trustIdentityVerifiedSafetyNumber(recipientId, safetyNumber);
         if (updated && this.isReceiving()) {
-            needsToRetryFailedMessages = true;
+            context.getReceiveHelper().setNeedsToRetryFailedMessages(true);
         }
         return updated;
     }
@@ -1314,7 +1104,7 @@ public class ManagerImpl implements Manager {
         }
         final var updated = context.getIdentityHelper().trustIdentityVerifiedSafetyNumber(recipientId, safetyNumber);
         if (updated && this.isReceiving()) {
-            needsToRetryFailedMessages = true;
+            context.getReceiveHelper().setNeedsToRetryFailedMessages(true);
         }
         return updated;
     }
@@ -1334,7 +1124,7 @@ public class ManagerImpl implements Manager {
         }
         final var updated = context.getIdentityHelper().trustIdentityAllKeys(recipientId);
         if (updated && this.isReceiving()) {
-            needsToRetryFailedMessages = true;
+            context.getReceiveHelper().setNeedsToRetryFailedMessages(true);
         }
         return updated;
     }
index 155bd4433b8da09b3b3bbe1063db9d637361fdfd..79b7b959116d21a714f0c003b988a3b18143a9c6 100644 (file)
@@ -29,6 +29,7 @@ public class Context {
     private PinHelper pinHelper;
     private PreKeyHelper preKeyHelper;
     private ProfileHelper profileHelper;
+    private ReceiveHelper receiveHelper;
     private RecipientHelper recipientHelper;
     private SendHelper sendHelper;
     private StorageHelper storageHelper;
@@ -111,6 +112,10 @@ public class Context {
         return getOrCreate(() -> profileHelper, () -> profileHelper = new ProfileHelper(this));
     }
 
+    public ReceiveHelper getReceiveHelper() {
+        return getOrCreate(() -> receiveHelper, () -> receiveHelper = new ReceiveHelper(this));
+    }
+
     public RecipientHelper getRecipientHelper() {
         return getOrCreate(() -> recipientHelper, () -> recipientHelper = new RecipientHelper(this));
     }
diff --git a/lib/src/main/java/org/asamk/signal/manager/helper/ReceiveHelper.java b/lib/src/main/java/org/asamk/signal/manager/helper/ReceiveHelper.java
new file mode 100644 (file)
index 0000000..631e271
--- /dev/null
@@ -0,0 +1,288 @@
+package org.asamk.signal.manager.helper;
+
+import org.asamk.signal.manager.Manager;
+import org.asamk.signal.manager.SignalDependencies;
+import org.asamk.signal.manager.UntrustedIdentityException;
+import org.asamk.signal.manager.actions.HandleAction;
+import org.asamk.signal.manager.storage.SignalAccount;
+import org.asamk.signal.manager.storage.messageCache.CachedMessage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.whispersystems.signalservice.api.messages.SignalServiceEnvelope;
+import org.whispersystems.signalservice.api.websocket.WebSocketConnectionState;
+import org.whispersystems.signalservice.api.websocket.WebSocketUnavailableException;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeoutException;
+
+import io.reactivex.rxjava3.core.Observable;
+import io.reactivex.rxjava3.schedulers.Schedulers;
+
+public class ReceiveHelper {
+
+    private final static Logger logger = LoggerFactory.getLogger(ReceiveHelper.class);
+    private final static int MAX_BACKOFF_COUNTER = 9;
+
+    private final SignalAccount account;
+    private final SignalDependencies dependencies;
+    private final Context context;
+
+    private boolean ignoreAttachments = false;
+    private boolean needsToRetryFailedMessages = false;
+    private boolean hasCaughtUpWithOldMessages = false;
+    private Callable authenticationFailureListener;
+    private Callable caughtUpWithOldMessagesListener;
+
+    public ReceiveHelper(final Context context) {
+        this.account = context.getAccount();
+        this.dependencies = context.getDependencies();
+        this.context = context;
+    }
+
+    public void setIgnoreAttachments(final boolean ignoreAttachments) {
+        this.ignoreAttachments = ignoreAttachments;
+    }
+
+    public void setNeedsToRetryFailedMessages(final boolean needsToRetryFailedMessages) {
+        this.needsToRetryFailedMessages = needsToRetryFailedMessages;
+    }
+
+    public boolean hasCaughtUpWithOldMessages() {
+        return hasCaughtUpWithOldMessages;
+    }
+
+    public void setAuthenticationFailureListener(final Callable authenticationFailureListener) {
+        this.authenticationFailureListener = authenticationFailureListener;
+    }
+
+    public void setCaughtUpWithOldMessagesListener(final Callable caughtUpWithOldMessagesListener) {
+        this.caughtUpWithOldMessagesListener = caughtUpWithOldMessagesListener;
+    }
+
+    public void receiveMessages(
+            Duration timeout, boolean returnOnTimeout, Manager.ReceiveMessageHandler handler
+    ) throws IOException {
+        needsToRetryFailedMessages = true;
+        hasCaughtUpWithOldMessages = false;
+
+        // Use a Map here because java Set doesn't have a get method ...
+        Map<HandleAction, HandleAction> queuedActions = new HashMap<>();
+
+        final var signalWebSocket = dependencies.getSignalWebSocket();
+        final var webSocketStateDisposable = Observable.merge(signalWebSocket.getUnidentifiedWebSocketState(),
+                        signalWebSocket.getWebSocketState())
+                .subscribeOn(Schedulers.computation())
+                .observeOn(Schedulers.computation())
+                .distinctUntilChanged()
+                .subscribe(this::onWebSocketStateChange);
+        signalWebSocket.connect();
+
+        try {
+            receiveMessagesInternal(timeout, returnOnTimeout, handler, queuedActions);
+        } finally {
+            hasCaughtUpWithOldMessages = false;
+            handleQueuedActions(queuedActions.keySet());
+            queuedActions.clear();
+            dependencies.getSignalWebSocket().disconnect();
+            webSocketStateDisposable.dispose();
+        }
+    }
+
+    private void receiveMessagesInternal(
+            Duration timeout,
+            boolean returnOnTimeout,
+            Manager.ReceiveMessageHandler handler,
+            final Map<HandleAction, HandleAction> queuedActions
+    ) throws IOException {
+        final var signalWebSocket = dependencies.getSignalWebSocket();
+
+        var backOffCounter = 0;
+
+        while (!Thread.interrupted()) {
+            if (needsToRetryFailedMessages) {
+                retryFailedReceivedMessages(handler);
+                needsToRetryFailedMessages = false;
+            }
+            SignalServiceEnvelope envelope;
+            final CachedMessage[] cachedMessage = {null};
+            final var nowMillis = System.currentTimeMillis();
+            if (nowMillis - account.getLastReceiveTimestamp() > 60000) {
+                account.setLastReceiveTimestamp(nowMillis);
+            }
+            logger.debug("Checking for new message from server");
+            try {
+                var result = signalWebSocket.readOrEmpty(timeout.toMillis(), envelope1 -> {
+                    final var recipientId = envelope1.hasSourceUuid() ? account.getRecipientStore()
+                            .resolveRecipient(envelope1.getSourceAddress()) : null;
+                    logger.trace("Storing new message from {}", recipientId);
+                    // store message on disk, before acknowledging receipt to the server
+                    cachedMessage[0] = account.getMessageCache().cacheMessage(envelope1, recipientId);
+                });
+                backOffCounter = 0;
+
+                if (result.isPresent()) {
+                    envelope = result.get();
+                    logger.debug("New message received from server");
+                } else {
+                    logger.debug("Received indicator that server queue is empty");
+                    handleQueuedActions(queuedActions.keySet());
+                    queuedActions.clear();
+
+                    hasCaughtUpWithOldMessages = true;
+                    caughtUpWithOldMessagesListener.call();
+
+                    // Continue to wait another timeout for new messages
+                    continue;
+                }
+            } catch (AssertionError e) {
+                if (e.getCause() instanceof InterruptedException) {
+                    Thread.currentThread().interrupt();
+                    break;
+                } else {
+                    throw e;
+                }
+            } catch (IOException e) {
+                logger.debug("Pipe unexpectedly unavailable: {}", e.getMessage());
+                if (e instanceof WebSocketUnavailableException || "Connection closed!".equals(e.getMessage())) {
+                    final var sleepMilliseconds = 100 * (long) Math.pow(2, backOffCounter);
+                    backOffCounter = Math.min(backOffCounter + 1, MAX_BACKOFF_COUNTER);
+                    logger.warn("Connection closed unexpectedly, reconnecting in {} ms", sleepMilliseconds);
+                    try {
+                        Thread.sleep(sleepMilliseconds);
+                    } catch (InterruptedException interruptedException) {
+                        return;
+                    }
+                    hasCaughtUpWithOldMessages = false;
+                    signalWebSocket.connect();
+                    continue;
+                }
+                throw e;
+            } catch (TimeoutException e) {
+                backOffCounter = 0;
+                if (returnOnTimeout) return;
+                continue;
+            }
+
+            final var result = context.getIncomingMessageHandler().handleEnvelope(envelope, ignoreAttachments, handler);
+            for (final var h : result.first()) {
+                final var existingAction = queuedActions.get(h);
+                if (existingAction == null) {
+                    queuedActions.put(h, h);
+                } else {
+                    existingAction.mergeOther(h);
+                }
+            }
+            final var exception = result.second();
+
+            if (hasCaughtUpWithOldMessages) {
+                handleQueuedActions(queuedActions.keySet());
+                queuedActions.clear();
+            }
+            if (cachedMessage[0] != null) {
+                if (exception instanceof UntrustedIdentityException) {
+                    logger.debug("Keeping message with untrusted identity in message cache");
+                    final var address = ((UntrustedIdentityException) exception).getSender();
+                    final var recipientId = account.getRecipientStore().resolveRecipient(address);
+                    if (!envelope.hasSourceUuid()) {
+                        try {
+                            cachedMessage[0] = account.getMessageCache().replaceSender(cachedMessage[0], recipientId);
+                        } catch (IOException ioException) {
+                            logger.warn("Failed to move cached message to recipient folder: {}",
+                                    ioException.getMessage());
+                        }
+                    }
+                } else {
+                    cachedMessage[0].delete();
+                }
+            }
+        }
+    }
+
+    private void retryFailedReceivedMessages(Manager.ReceiveMessageHandler handler) {
+        Set<HandleAction> queuedActions = new HashSet<>();
+        for (var cachedMessage : account.getMessageCache().getCachedMessages()) {
+            var actions = retryFailedReceivedMessage(handler, cachedMessage);
+            if (actions != null) {
+                queuedActions.addAll(actions);
+            }
+        }
+        handleQueuedActions(queuedActions);
+    }
+
+    private List<HandleAction> retryFailedReceivedMessage(
+            final Manager.ReceiveMessageHandler handler, final CachedMessage cachedMessage
+    ) {
+        var envelope = cachedMessage.loadEnvelope();
+        if (envelope == null) {
+            cachedMessage.delete();
+            return null;
+        }
+
+        final var result = context.getIncomingMessageHandler()
+                .handleRetryEnvelope(envelope, ignoreAttachments, handler);
+        final var actions = result.first();
+        final var exception = result.second();
+
+        if (exception instanceof UntrustedIdentityException) {
+            if (System.currentTimeMillis() - envelope.getServerDeliveredTimestamp() > 1000L * 60 * 60 * 24 * 30) {
+                // Envelope is more than a month old, cleaning up.
+                cachedMessage.delete();
+                return null;
+            }
+            if (!envelope.hasSourceUuid()) {
+                final var identifier = ((UntrustedIdentityException) exception).getSender();
+                final var recipientId = account.getRecipientStore().resolveRecipient(identifier);
+                try {
+                    account.getMessageCache().replaceSender(cachedMessage, recipientId);
+                } catch (IOException ioException) {
+                    logger.warn("Failed to move cached message to recipient folder: {}", ioException.getMessage());
+                }
+            }
+            return null;
+        }
+
+        // If successful and for all other errors that are not recoverable, delete the cached message
+        cachedMessage.delete();
+        return actions;
+    }
+
+    private void handleQueuedActions(final Collection<HandleAction> queuedActions) {
+        logger.debug("Handling message actions");
+        var interrupted = false;
+        for (var action : queuedActions) {
+            logger.debug("Executing action {}", action.getClass().getSimpleName());
+            try {
+                action.execute(context);
+            } catch (Throwable e) {
+                if ((e instanceof AssertionError || e instanceof RuntimeException)
+                        && e.getCause() instanceof InterruptedException) {
+                    interrupted = true;
+                    continue;
+                }
+                logger.warn("Message action failed.", e);
+            }
+        }
+        if (interrupted) {
+            Thread.currentThread().interrupt();
+        }
+    }
+
+    private void onWebSocketStateChange(final WebSocketConnectionState s) {
+        if (s.equals(WebSocketConnectionState.AUTHENTICATION_FAILED)) {
+            account.setRegistered(false);
+            authenticationFailureListener.call();
+        }
+    }
+
+    public interface Callable {
+
+        void call();
+    }
+}