]> nmode's Git Repositories - signal-cli/blobdiff - lib/src/main/java/org/asamk/signal/manager/helper/ReceiveHelper.java
Update libsignal-service
[signal-cli] / lib / src / main / java / org / asamk / signal / manager / helper / ReceiveHelper.java
index 8dfc7deb569476b11bf119a85cbebae0021700da..71f690816f52b15efe19d0982442d21145d4cdcb 100644 (file)
@@ -1,14 +1,20 @@
 package org.asamk.signal.manager.helper;
 
 import org.asamk.signal.manager.Manager;
-import org.asamk.signal.manager.SignalDependencies;
-import org.asamk.signal.manager.api.UntrustedIdentityException;
 import org.asamk.signal.manager.actions.HandleAction;
+import org.asamk.signal.manager.api.ReceiveConfig;
+import org.asamk.signal.manager.api.UntrustedIdentityException;
+import org.asamk.signal.manager.internal.SignalDependencies;
+import org.asamk.signal.manager.jobs.CleanOldPreKeysJob;
 import org.asamk.signal.manager.storage.SignalAccount;
 import org.asamk.signal.manager.storage.messageCache.CachedMessage;
+import org.asamk.signal.manager.storage.recipients.RecipientAddress;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.whispersystems.signalservice.api.messages.SignalServiceEnvelope;
+import org.whispersystems.signalservice.api.push.ServiceId;
+import org.whispersystems.signalservice.api.push.ServiceId.ACI;
+import org.whispersystems.signalservice.api.websocket.SignalWebSocket;
 import org.whispersystems.signalservice.api.websocket.WebSocketConnectionState;
 import org.whispersystems.signalservice.api.websocket.WebSocketUnavailableException;
 
@@ -22,21 +28,21 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeoutException;
 
-import io.reactivex.rxjava3.core.Observable;
 import io.reactivex.rxjava3.schedulers.Schedulers;
 
 public class ReceiveHelper {
 
-    private final static Logger logger = LoggerFactory.getLogger(ReceiveHelper.class);
-    private final static int MAX_BACKOFF_COUNTER = 9;
+    private static final Logger logger = LoggerFactory.getLogger(ReceiveHelper.class);
+    private static final int MAX_BACKOFF_COUNTER = 9;
 
     private final SignalAccount account;
     private final SignalDependencies dependencies;
     private final Context context;
 
-    private boolean ignoreAttachments = false;
-    private boolean needsToRetryFailedMessages = false;
+    private ReceiveConfig receiveConfig = new ReceiveConfig(false, false, false);
     private boolean hasCaughtUpWithOldMessages = false;
+    private boolean isWaitingForMessage = false;
+    private boolean shouldStop = false;
     private Callable authenticationFailureListener;
     private Callable caughtUpWithOldMessagesListener;
 
@@ -46,16 +52,9 @@ public class ReceiveHelper {
         this.context = context;
     }
 
-    public void setIgnoreAttachments(final boolean ignoreAttachments) {
-        this.ignoreAttachments = ignoreAttachments;
-    }
-
-    public void setNeedsToRetryFailedMessages(final boolean needsToRetryFailedMessages) {
-        this.needsToRetryFailedMessages = needsToRetryFailedMessages;
-    }
-
-    public boolean hasCaughtUpWithOldMessages() {
-        return hasCaughtUpWithOldMessages;
+    public void setReceiveConfig(final ReceiveConfig receiveConfig) {
+        this.receiveConfig = receiveConfig;
+        dependencies.setAllowStories(!receiveConfig.ignoreStories());
     }
 
     public void setAuthenticationFailureListener(final Callable authenticationFailureListener) {
@@ -66,49 +65,71 @@ public class ReceiveHelper {
         this.caughtUpWithOldMessagesListener = caughtUpWithOldMessagesListener;
     }
 
+    public boolean requestStopReceiveMessages() {
+        this.shouldStop = true;
+        return isWaitingForMessage;
+    }
+
+    public void receiveMessagesContinuously(Manager.ReceiveMessageHandler handler) {
+        while (!shouldStop) {
+            try {
+                receiveMessages(Duration.ofMinutes(1), false, null, handler);
+                break;
+            } catch (IOException e) {
+                logger.warn("Receiving messages failed, retrying", e);
+            }
+        }
+    }
+
     public void receiveMessages(
-            Duration timeout, boolean returnOnTimeout, Manager.ReceiveMessageHandler handler
+            Duration timeout,
+            boolean returnOnTimeout,
+            Integer maxMessages,
+            Manager.ReceiveMessageHandler handler
     ) throws IOException {
-        needsToRetryFailedMessages = true;
+        account.setNeedsToRetryFailedMessages(true);
         hasCaughtUpWithOldMessages = false;
 
         // Use a Map here because java Set doesn't have a get method ...
         Map<HandleAction, HandleAction> queuedActions = new HashMap<>();
 
-        final var signalWebSocket = dependencies.getSignalWebSocket();
-        final var webSocketStateDisposable = Observable.merge(signalWebSocket.getUnidentifiedWebSocketState(),
-                        signalWebSocket.getWebSocketState())
+        final var signalWebSocket = dependencies.getAuthenticatedSignalWebSocket();
+        final var webSocketStateDisposable = signalWebSocket.getState()
                 .subscribeOn(Schedulers.computation())
                 .observeOn(Schedulers.computation())
                 .distinctUntilChanged()
                 .subscribe(this::onWebSocketStateChange);
         signalWebSocket.connect();
+        signalWebSocket.registerKeepAliveToken("receive");
 
         try {
-            receiveMessagesInternal(timeout, returnOnTimeout, handler, queuedActions);
+            receiveMessagesInternal(signalWebSocket, timeout, returnOnTimeout, maxMessages, handler, queuedActions);
         } finally {
             hasCaughtUpWithOldMessages = false;
             handleQueuedActions(queuedActions.keySet());
             queuedActions.clear();
-            dependencies.getSignalWebSocket().disconnect();
+            signalWebSocket.removeKeepAliveToken("receive");
+            signalWebSocket.disconnect();
             webSocketStateDisposable.dispose();
+            shouldStop = false;
         }
     }
 
     private void receiveMessagesInternal(
+            final SignalWebSocket.AuthenticatedWebSocket signalWebSocket,
             Duration timeout,
             boolean returnOnTimeout,
+            Integer maxMessages,
             Manager.ReceiveMessageHandler handler,
             final Map<HandleAction, HandleAction> queuedActions
     ) throws IOException {
-        final var signalWebSocket = dependencies.getSignalWebSocket();
-
+        int remainingMessages = maxMessages == null ? -1 : maxMessages;
         var backOffCounter = 0;
+        isWaitingForMessage = false;
 
-        while (!Thread.interrupted()) {
-            if (needsToRetryFailedMessages) {
+        while (!shouldStop && remainingMessages != 0) {
+            if (account.getNeedsToRetryFailedMessages()) {
                 retryFailedReceivedMessages(handler);
-                needsToRetryFailedMessages = false;
             }
             SignalServiceEnvelope envelope;
             final CachedMessage[] cachedMessage = {null};
@@ -118,23 +139,42 @@ public class ReceiveHelper {
             }
             logger.debug("Checking for new message from server");
             try {
-                var result = signalWebSocket.readOrEmpty(timeout.toMillis(), envelope1 -> {
-                    final var recipientId = envelope1.hasSourceUuid() ? account.getRecipientStore()
-                            .resolveRecipient(envelope1.getSourceAddress()) : null;
-                    logger.trace("Storing new message from {}", recipientId);
-                    // store message on disk, before acknowledging receipt to the server
-                    cachedMessage[0] = account.getMessageCache().cacheMessage(envelope1, recipientId);
+                isWaitingForMessage = true;
+                var queueNotEmpty = signalWebSocket.readMessageBatch(timeout.toMillis(), 1, batch -> {
+                    logger.debug("Retrieved {} envelopes!", batch.size());
+                    isWaitingForMessage = false;
+                    for (final var it : batch) {
+                        SignalServiceEnvelope envelope1 = new SignalServiceEnvelope(it.getEnvelope(),
+                                it.getServerDeliveredTimestamp());
+                        final var recipientId = envelope1.getSourceServiceId()
+                                .map(ServiceId::parseOrNull)
+                                .map(s -> account.getRecipientResolver().resolveRecipient(s))
+                                .orElse(null);
+                        logger.trace("Storing new message from {}", recipientId);
+                        // store message on disk, before acknowledging receipt to the server
+                        cachedMessage[0] = account.getMessageCache().cacheMessage(envelope1, recipientId);
+                        try {
+                            signalWebSocket.sendAck(it);
+                        } catch (IOException e) {
+                            logger.warn("Failed to ack envelope to server after storing it: {}", e.getMessage());
+                        }
+                    }
                 });
+                isWaitingForMessage = false;
                 backOffCounter = 0;
 
-                if (result.isPresent()) {
-                    envelope = result.get();
+                if (queueNotEmpty) {
+                    if (remainingMessages > 0) {
+                        remainingMessages -= 1;
+                    }
+                    envelope = cachedMessage[0].loadEnvelope();
                     logger.debug("New message received from server");
                 } else {
                     logger.debug("Received indicator that server queue is empty");
                     handleQueuedActions(queuedActions.keySet());
                     queuedActions.clear();
 
+                    context.getJobExecutor().enqueueJob(new CleanOldPreKeysJob());
                     hasCaughtUpWithOldMessages = true;
                     caughtUpWithOldMessagesListener.call();
 
@@ -143,7 +183,6 @@ public class ReceiveHelper {
                 }
             } catch (AssertionError e) {
                 if (e.getCause() instanceof InterruptedException) {
-                    Thread.currentThread().interrupt();
                     break;
                 } else {
                     throw e;
@@ -168,39 +207,49 @@ public class ReceiveHelper {
                 backOffCounter = 0;
                 if (returnOnTimeout) return;
                 continue;
+            } catch (Exception e) {
+                logger.error("Unknown error when receiving messages", e);
+                continue;
             }
 
-            final var result = context.getIncomingMessageHandler().handleEnvelope(envelope, ignoreAttachments, handler);
-            for (final var h : result.first()) {
-                final var existingAction = queuedActions.get(h);
-                if (existingAction == null) {
-                    queuedActions.put(h, h);
-                } else {
-                    existingAction.mergeOther(h);
+            try {
+                final var result = context.getIncomingMessageHandler().handleEnvelope(envelope, receiveConfig, handler);
+                for (final var h : result.first()) {
+                    final var existingAction = queuedActions.get(h);
+                    if (existingAction == null) {
+                        queuedActions.put(h, h);
+                    } else {
+                        existingAction.mergeOther(h);
+                    }
                 }
-            }
-            final var exception = result.second();
+                final var exception = result.second();
 
-            if (hasCaughtUpWithOldMessages) {
-                handleQueuedActions(queuedActions.keySet());
-                queuedActions.clear();
-            }
-            if (cachedMessage[0] != null) {
-                if (exception instanceof UntrustedIdentityException) {
-                    logger.debug("Keeping message with untrusted identity in message cache");
-                    final var address = ((UntrustedIdentityException) exception).getSender();
-                    final var recipientId = account.getRecipientStore().resolveRecipient(address);
-                    if (!envelope.hasSourceUuid()) {
-                        try {
-                            cachedMessage[0] = account.getMessageCache().replaceSender(cachedMessage[0], recipientId);
-                        } catch (IOException ioException) {
-                            logger.warn("Failed to move cached message to recipient folder: {}",
-                                    ioException.getMessage());
+                if (hasCaughtUpWithOldMessages) {
+                    handleQueuedActions(queuedActions.keySet());
+                    queuedActions.clear();
+                }
+                if (cachedMessage[0] != null) {
+                    if (exception instanceof UntrustedIdentityException) {
+                        logger.debug("Keeping message with untrusted identity in message cache");
+                        final var address = ((UntrustedIdentityException) exception).getSender();
+                        if (envelope.getSourceServiceId().isEmpty() && address.aci().isPresent()) {
+                            final var recipientId = account.getRecipientResolver()
+                                    .resolveRecipient(ACI.parseOrThrow(address.aci().get()));
+                            try {
+                                cachedMessage[0] = account.getMessageCache()
+                                        .replaceSender(cachedMessage[0], recipientId);
+                            } catch (IOException ioException) {
+                                logger.warn("Failed to move cached message to recipient folder: {}",
+                                        ioException.getMessage(),
+                                        ioException);
+                            }
                         }
+                    } else {
+                        cachedMessage[0].delete();
                     }
-                } else {
-                    cachedMessage[0].delete();
                 }
+            } catch (Exception e) {
+                logger.error("Unknown error when handling messages", e);
             }
         }
     }
@@ -214,10 +263,12 @@ public class ReceiveHelper {
             }
         }
         handleQueuedActions(queuedActions);
+        account.setNeedsToRetryFailedMessages(false);
     }
 
     private List<HandleAction> retryFailedReceivedMessage(
-            final Manager.ReceiveMessageHandler handler, final CachedMessage cachedMessage
+            final Manager.ReceiveMessageHandler handler,
+            final CachedMessage cachedMessage
     ) {
         var envelope = cachedMessage.loadEnvelope();
         if (envelope == null) {
@@ -225,24 +276,26 @@ public class ReceiveHelper {
             return null;
         }
 
-        final var result = context.getIncomingMessageHandler()
-                .handleRetryEnvelope(envelope, ignoreAttachments, handler);
+        final var result = context.getIncomingMessageHandler().handleRetryEnvelope(envelope, receiveConfig, handler);
         final var actions = result.first();
         final var exception = result.second();
 
         if (exception instanceof UntrustedIdentityException) {
-            if (System.currentTimeMillis() - envelope.getServerDeliveredTimestamp() > 1000L * 60 * 60 * 24 * 30) {
-                // Envelope is more than a month old, cleaning up.
+            if (System.currentTimeMillis() - envelope.getServerDeliveredTimestamp() > 1000L * 60 * 60 * 24 * 14) {
+                // Envelope is more than two weeks old, cleaning up.
                 cachedMessage.delete();
                 return null;
             }
-            if (!envelope.hasSourceUuid()) {
+            if (envelope.getSourceServiceId().isEmpty()) {
                 final var identifier = ((UntrustedIdentityException) exception).getSender();
-                final var recipientId = account.getRecipientStore().resolveRecipient(identifier);
+                final var recipientId = account.getRecipientResolver()
+                        .resolveRecipient(new RecipientAddress(identifier));
                 try {
                     account.getMessageCache().replaceSender(cachedMessage, recipientId);
                 } catch (IOException ioException) {
-                    logger.warn("Failed to move cached message to recipient folder: {}", ioException.getMessage());
+                    logger.warn("Failed to move cached message to recipient folder: {}",
+                            ioException.getMessage(),
+                            ioException);
                 }
             }
             return null;
@@ -255,23 +308,14 @@ public class ReceiveHelper {
 
     private void handleQueuedActions(final Collection<HandleAction> queuedActions) {
         logger.debug("Handling message actions");
-        var interrupted = false;
         for (var action : queuedActions) {
             logger.debug("Executing action {}", action.getClass().getSimpleName());
             try {
                 action.execute(context);
             } catch (Throwable e) {
-                if ((e instanceof AssertionError || e instanceof RuntimeException)
-                        && e.getCause() instanceof InterruptedException) {
-                    interrupted = true;
-                    continue;
-                }
                 logger.warn("Message action failed.", e);
             }
         }
-        if (interrupted) {
-            Thread.currentThread().interrupt();
-        }
     }
 
     private void onWebSocketStateChange(final WebSocketConnectionState s) {