]> nmode's Git Repositories - signal-cli/blobdiff - lib/src/main/java/org/asamk/signal/manager/ManagerImpl.java
Extract ReceiveHelper
[signal-cli] / lib / src / main / java / org / asamk / signal / manager / ManagerImpl.java
index 1af99ff155510576d496dd84538705c6469c240f..1f731ee897717dbc75c050a5afa2460859db2afc 100644 (file)
@@ -16,7 +16,6 @@
  */
 package org.asamk.signal.manager;
 
-import org.asamk.signal.manager.actions.HandleAction;
 import org.asamk.signal.manager.api.Configuration;
 import org.asamk.signal.manager.api.Device;
 import org.asamk.signal.manager.api.Group;
@@ -43,7 +42,6 @@ import org.asamk.signal.manager.helper.Context;
 import org.asamk.signal.manager.storage.SignalAccount;
 import org.asamk.signal.manager.storage.groups.GroupInfo;
 import org.asamk.signal.manager.storage.identities.IdentityInfo;
-import org.asamk.signal.manager.storage.messageCache.CachedMessage;
 import org.asamk.signal.manager.storage.recipients.Contact;
 import org.asamk.signal.manager.storage.recipients.Profile;
 import org.asamk.signal.manager.storage.recipients.RecipientAddress;
@@ -59,7 +57,6 @@ import org.whispersystems.libsignal.ecc.ECPublicKey;
 import org.whispersystems.libsignal.util.guava.Optional;
 import org.whispersystems.signalservice.api.SignalSessionLock;
 import org.whispersystems.signalservice.api.messages.SignalServiceDataMessage;
-import org.whispersystems.signalservice.api.messages.SignalServiceEnvelope;
 import org.whispersystems.signalservice.api.messages.SignalServiceReceiptMessage;
 import org.whispersystems.signalservice.api.messages.SignalServiceTypingMessage;
 import org.whispersystems.signalservice.api.push.ACI;
@@ -67,8 +64,6 @@ import org.whispersystems.signalservice.api.push.exceptions.AuthorizationFailedE
 import org.whispersystems.signalservice.api.util.DeviceNameUtil;
 import org.whispersystems.signalservice.api.util.InvalidNumberException;
 import org.whispersystems.signalservice.api.util.PhoneNumberFormatter;
-import org.whispersystems.signalservice.api.websocket.WebSocketConnectionState;
-import org.whispersystems.signalservice.api.websocket.WebSocketUnavailableException;
 import org.whispersystems.signalservice.internal.util.DynamicCredentialsProvider;
 import org.whispersystems.signalservice.internal.util.Hex;
 import org.whispersystems.signalservice.internal.util.Util;
@@ -81,7 +76,6 @@ import java.net.URLEncoder;
 import java.nio.charset.StandardCharsets;
 import java.time.Duration;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -91,14 +85,10 @@ import java.util.UUID;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
-import io.reactivex.rxjava3.core.Observable;
-import io.reactivex.rxjava3.schedulers.Schedulers;
-
 import static org.asamk.signal.manager.config.ServiceConfig.capabilities;
 
 public class ManagerImpl implements Manager {
@@ -113,15 +103,11 @@ public class ManagerImpl implements Manager {
 
     private final Context context;
 
-    private boolean hasCaughtUpWithOldMessages = false;
-    private boolean ignoreAttachments = false;
-
     private Thread receiveThread;
     private final Set<ReceiveMessageHandler> weakHandlers = new HashSet<>();
     private final Set<ReceiveMessageHandler> messageHandlers = new HashSet<>();
     private final List<Runnable> closedListeners = new ArrayList<>();
     private boolean isReceivingSynchronous;
-    private boolean needsToRetryFailedMessages = false;
 
     ManagerImpl(
             SignalAccount account,
@@ -155,6 +141,18 @@ public class ManagerImpl implements Manager {
         final var stickerPackStore = new StickerPackStore(pathConfig.stickerPacksPath());
 
         this.context = new Context(account, dependencies, avatarStore, attachmentStore, stickerPackStore);
+        this.context.getReceiveHelper().setAuthenticationFailureListener(() -> {
+            try {
+                close();
+            } catch (IOException e) {
+                logger.warn("Failed to close account after authentication failure", e);
+            }
+        });
+        this.context.getReceiveHelper().setCaughtUpWithOldMessagesListener(() -> {
+            synchronized (this) {
+                this.notifyAll();
+            }
+        });
     }
 
     @Override
@@ -257,7 +255,7 @@ public class ManagerImpl implements Manager {
     @Override
     public void updateConfiguration(
             Configuration configuration
-    ) throws IOException, NotMasterDeviceException {
+    ) throws NotMasterDeviceException {
         if (!account.isMasterDevice()) {
             throw new NotMasterDeviceException();
         }
@@ -762,7 +760,7 @@ public class ManagerImpl implements Manager {
     @Override
     public void setGroupBlocked(
             final GroupId groupId, final boolean blocked
-    ) throws GroupNotFoundException, IOException, NotMasterDeviceException {
+    ) throws GroupNotFoundException, NotMasterDeviceException {
         if (!account.isMasterDevice()) {
             throw new NotMasterDeviceException();
         }
@@ -832,54 +830,6 @@ public class ManagerImpl implements Manager {
         }
     }
 
-    private void retryFailedReceivedMessages(ReceiveMessageHandler handler) {
-        Set<HandleAction> queuedActions = new HashSet<>();
-        for (var cachedMessage : account.getMessageCache().getCachedMessages()) {
-            var actions = retryFailedReceivedMessage(handler, cachedMessage);
-            if (actions != null) {
-                queuedActions.addAll(actions);
-            }
-        }
-        handleQueuedActions(queuedActions);
-    }
-
-    private List<HandleAction> retryFailedReceivedMessage(
-            final ReceiveMessageHandler handler, final CachedMessage cachedMessage
-    ) {
-        var envelope = cachedMessage.loadEnvelope();
-        if (envelope == null) {
-            cachedMessage.delete();
-            return null;
-        }
-
-        final var result = context.getIncomingMessageHandler()
-                .handleRetryEnvelope(envelope, ignoreAttachments, handler);
-        final var actions = result.first();
-        final var exception = result.second();
-
-        if (exception instanceof UntrustedIdentityException) {
-            if (System.currentTimeMillis() - envelope.getServerDeliveredTimestamp() > 1000L * 60 * 60 * 24 * 30) {
-                // Envelope is more than a month old, cleaning up.
-                cachedMessage.delete();
-                return null;
-            }
-            if (!envelope.hasSourceUuid()) {
-                final var identifier = ((UntrustedIdentityException) exception).getSender();
-                final var recipientId = account.getRecipientStore().resolveRecipient(identifier);
-                try {
-                    account.getMessageCache().replaceSender(cachedMessage, recipientId);
-                } catch (IOException ioException) {
-                    logger.warn("Failed to move cached message to recipient folder: {}", ioException.getMessage());
-                }
-            }
-            return null;
-        }
-
-        // If successful and for all other errors that are not recoverable, delete the cached message
-        cachedMessage.delete();
-        return actions;
-    }
-
     @Override
     public void addReceiveHandler(final ReceiveMessageHandler handler, final boolean isWeakListener) {
         if (isReceivingSynchronous) {
@@ -903,7 +853,7 @@ public class ManagerImpl implements Manager {
             logger.debug("Starting receiving messages");
             while (!Thread.interrupted()) {
                 try {
-                    receiveMessagesInternal(Duration.ofMinutes(1), false, (envelope, e) -> {
+                    context.getReceiveHelper().receiveMessages(Duration.ofMinutes(1), false, (envelope, e) -> {
                         synchronized (messageHandlers) {
                             Stream.concat(messageHandlers.stream(), weakHandlers.stream()).forEach(h -> {
                                 try {
@@ -920,7 +870,6 @@ public class ManagerImpl implements Manager {
                 }
             }
             logger.debug("Finished receiving messages");
-            hasCaughtUpWithOldMessages = false;
             synchronized (messageHandlers) {
                 receiveThread = null;
 
@@ -988,180 +937,21 @@ public class ManagerImpl implements Manager {
         isReceivingSynchronous = true;
         receiveThread = Thread.currentThread();
         try {
-            receiveMessagesInternal(timeout, returnOnTimeout, handler);
+            context.getReceiveHelper().receiveMessages(timeout, returnOnTimeout, handler);
         } finally {
             receiveThread = null;
-            hasCaughtUpWithOldMessages = false;
             isReceivingSynchronous = false;
         }
     }
 
-    private void receiveMessagesInternal(
-            Duration timeout, boolean returnOnTimeout, ReceiveMessageHandler handler
-    ) throws IOException {
-        needsToRetryFailedMessages = true;
-
-        // Use a Map here because java Set doesn't have a get method ...
-        Map<HandleAction, HandleAction> queuedActions = new HashMap<>();
-
-        final var signalWebSocket = dependencies.getSignalWebSocket();
-        final var webSocketStateDisposable = Observable.merge(signalWebSocket.getUnidentifiedWebSocketState(),
-                        signalWebSocket.getWebSocketState())
-                .subscribeOn(Schedulers.computation())
-                .observeOn(Schedulers.computation())
-                .distinctUntilChanged()
-                .subscribe(this::onWebSocketStateChange);
-        signalWebSocket.connect();
-
-        hasCaughtUpWithOldMessages = false;
-        var backOffCounter = 0;
-        final var MAX_BACKOFF_COUNTER = 9;
-
-        while (!Thread.interrupted()) {
-            if (needsToRetryFailedMessages) {
-                retryFailedReceivedMessages(handler);
-                needsToRetryFailedMessages = false;
-            }
-            SignalServiceEnvelope envelope;
-            final CachedMessage[] cachedMessage = {null};
-            final var nowMillis = System.currentTimeMillis();
-            if (nowMillis - account.getLastReceiveTimestamp() > 60000) {
-                account.setLastReceiveTimestamp(nowMillis);
-            }
-            logger.debug("Checking for new message from server");
-            try {
-                var result = signalWebSocket.readOrEmpty(timeout.toMillis(), envelope1 -> {
-                    final var recipientId = envelope1.hasSourceUuid() ? account.getRecipientStore()
-                            .resolveRecipient(envelope1.getSourceAddress()) : null;
-                    // store message on disk, before acknowledging receipt to the server
-                    cachedMessage[0] = account.getMessageCache().cacheMessage(envelope1, recipientId);
-                });
-                backOffCounter = 0;
-
-                if (result.isPresent()) {
-                    envelope = result.get();
-                    logger.debug("New message received from server");
-                } else {
-                    logger.debug("Received indicator that server queue is empty");
-                    handleQueuedActions(queuedActions.keySet());
-                    queuedActions.clear();
-
-                    hasCaughtUpWithOldMessages = true;
-                    synchronized (this) {
-                        this.notifyAll();
-                    }
-
-                    // Continue to wait another timeout for new messages
-                    continue;
-                }
-            } catch (AssertionError e) {
-                if (e.getCause() instanceof InterruptedException) {
-                    Thread.currentThread().interrupt();
-                    break;
-                } else {
-                    throw e;
-                }
-            } catch (IOException e) {
-                logger.debug("Pipe unexpectedly unavailable: {}", e.getMessage());
-                if (e instanceof WebSocketUnavailableException || "Connection closed!".equals(e.getMessage())) {
-                    final var sleepMilliseconds = 100 * (long) Math.pow(2, backOffCounter);
-                    backOffCounter = Math.min(backOffCounter + 1, MAX_BACKOFF_COUNTER);
-                    logger.warn("Connection closed unexpectedly, reconnecting in {} ms", sleepMilliseconds);
-                    try {
-                        Thread.sleep(sleepMilliseconds);
-                    } catch (InterruptedException interruptedException) {
-                        return;
-                    }
-                    hasCaughtUpWithOldMessages = false;
-                    signalWebSocket.connect();
-                    continue;
-                }
-                throw e;
-            } catch (TimeoutException e) {
-                backOffCounter = 0;
-                if (returnOnTimeout) return;
-                continue;
-            }
-
-            final var result = context.getIncomingMessageHandler().handleEnvelope(envelope, ignoreAttachments, handler);
-            for (final var h : result.first()) {
-                final var existingAction = queuedActions.get(h);
-                if (existingAction == null) {
-                    queuedActions.put(h, h);
-                } else {
-                    existingAction.mergeOther(h);
-                }
-            }
-            final var exception = result.second();
-
-            if (hasCaughtUpWithOldMessages) {
-                handleQueuedActions(queuedActions.keySet());
-                queuedActions.clear();
-            }
-            if (cachedMessage[0] != null) {
-                if (exception instanceof UntrustedIdentityException) {
-                    logger.debug("Keeping message with untrusted identity in message cache");
-                    final var address = ((UntrustedIdentityException) exception).getSender();
-                    final var recipientId = account.getRecipientStore().resolveRecipient(address);
-                    if (!envelope.hasSourceUuid()) {
-                        try {
-                            cachedMessage[0] = account.getMessageCache().replaceSender(cachedMessage[0], recipientId);
-                        } catch (IOException ioException) {
-                            logger.warn("Failed to move cached message to recipient folder: {}",
-                                    ioException.getMessage());
-                        }
-                    }
-                } else {
-                    cachedMessage[0].delete();
-                }
-            }
-        }
-        handleQueuedActions(queuedActions.keySet());
-        queuedActions.clear();
-        dependencies.getSignalWebSocket().disconnect();
-        webSocketStateDisposable.dispose();
-    }
-
-    private void onWebSocketStateChange(final WebSocketConnectionState s) {
-        if (s.equals(WebSocketConnectionState.AUTHENTICATION_FAILED)) {
-            account.setRegistered(false);
-            try {
-                close();
-            } catch (IOException e) {
-                e.printStackTrace();
-            }
-        }
-    }
-
     @Override
     public void setIgnoreAttachments(final boolean ignoreAttachments) {
-        this.ignoreAttachments = ignoreAttachments;
+        context.getReceiveHelper().setIgnoreAttachments(ignoreAttachments);
     }
 
     @Override
     public boolean hasCaughtUpWithOldMessages() {
-        return hasCaughtUpWithOldMessages;
-    }
-
-    private void handleQueuedActions(final Collection<HandleAction> queuedActions) {
-        logger.debug("Handling message actions");
-        var interrupted = false;
-        for (var action : queuedActions) {
-            logger.debug("Executing action {}", action.getClass().getSimpleName());
-            try {
-                action.execute(context);
-            } catch (Throwable e) {
-                if ((e instanceof AssertionError || e instanceof RuntimeException)
-                        && e.getCause() instanceof InterruptedException) {
-                    interrupted = true;
-                    continue;
-                }
-                logger.warn("Message action failed.", e);
-            }
-        }
-        if (interrupted) {
-            Thread.currentThread().interrupt();
-        }
+        return context.getReceiveHelper().hasCaughtUpWithOldMessages();
     }
 
     @Override
@@ -1268,7 +1058,7 @@ public class ManagerImpl implements Manager {
         }
         final var updated = context.getIdentityHelper().trustIdentityVerified(recipientId, fingerprint);
         if (updated && this.isReceiving()) {
-            needsToRetryFailedMessages = true;
+            context.getReceiveHelper().setNeedsToRetryFailedMessages(true);
         }
         return updated;
     }
@@ -1291,7 +1081,7 @@ public class ManagerImpl implements Manager {
         }
         final var updated = context.getIdentityHelper().trustIdentityVerifiedSafetyNumber(recipientId, safetyNumber);
         if (updated && this.isReceiving()) {
-            needsToRetryFailedMessages = true;
+            context.getReceiveHelper().setNeedsToRetryFailedMessages(true);
         }
         return updated;
     }
@@ -1314,7 +1104,7 @@ public class ManagerImpl implements Manager {
         }
         final var updated = context.getIdentityHelper().trustIdentityVerifiedSafetyNumber(recipientId, safetyNumber);
         if (updated && this.isReceiving()) {
-            needsToRetryFailedMessages = true;
+            context.getReceiveHelper().setNeedsToRetryFailedMessages(true);
         }
         return updated;
     }
@@ -1334,7 +1124,7 @@ public class ManagerImpl implements Manager {
         }
         final var updated = context.getIdentityHelper().trustIdentityAllKeys(recipientId);
         if (updated && this.isReceiving()) {
-            needsToRetryFailedMessages = true;
+            context.getReceiveHelper().setNeedsToRetryFailedMessages(true);
         }
         return updated;
     }