]> nmode's Git Repositories - signal-cli/blobdiff - lib/src/main/java/org/asamk/signal/manager/helper/ReceiveHelper.java
Update libsignal-service
[signal-cli] / lib / src / main / java / org / asamk / signal / manager / helper / ReceiveHelper.java
index d27fe121787ddcaa91da7391b674b3d54c46fc87..71f690816f52b15efe19d0982442d21145d4cdcb 100644 (file)
@@ -1,15 +1,20 @@
 package org.asamk.signal.manager.helper;
 
 import org.asamk.signal.manager.Manager;
-import org.asamk.signal.manager.SignalDependencies;
 import org.asamk.signal.manager.actions.HandleAction;
 import org.asamk.signal.manager.api.ReceiveConfig;
 import org.asamk.signal.manager.api.UntrustedIdentityException;
+import org.asamk.signal.manager.internal.SignalDependencies;
+import org.asamk.signal.manager.jobs.CleanOldPreKeysJob;
 import org.asamk.signal.manager.storage.SignalAccount;
 import org.asamk.signal.manager.storage.messageCache.CachedMessage;
+import org.asamk.signal.manager.storage.recipients.RecipientAddress;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.whispersystems.signalservice.api.messages.SignalServiceEnvelope;
+import org.whispersystems.signalservice.api.push.ServiceId;
+import org.whispersystems.signalservice.api.push.ServiceId.ACI;
+import org.whispersystems.signalservice.api.websocket.SignalWebSocket;
 import org.whispersystems.signalservice.api.websocket.WebSocketConnectionState;
 import org.whispersystems.signalservice.api.websocket.WebSocketUnavailableException;
 
@@ -23,20 +28,18 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeoutException;
 
-import io.reactivex.rxjava3.core.Observable;
 import io.reactivex.rxjava3.schedulers.Schedulers;
 
 public class ReceiveHelper {
 
-    private final static Logger logger = LoggerFactory.getLogger(ReceiveHelper.class);
-    private final static int MAX_BACKOFF_COUNTER = 9;
+    private static final Logger logger = LoggerFactory.getLogger(ReceiveHelper.class);
+    private static final int MAX_BACKOFF_COUNTER = 9;
 
     private final SignalAccount account;
     private final SignalDependencies dependencies;
     private final Context context;
 
-    private ReceiveConfig receiveConfig = new ReceiveConfig(false);
-    private boolean needsToRetryFailedMessages = false;
+    private ReceiveConfig receiveConfig = new ReceiveConfig(false, false, false);
     private boolean hasCaughtUpWithOldMessages = false;
     private boolean isWaitingForMessage = false;
     private boolean shouldStop = false;
@@ -51,14 +54,7 @@ public class ReceiveHelper {
 
     public void setReceiveConfig(final ReceiveConfig receiveConfig) {
         this.receiveConfig = receiveConfig;
-    }
-
-    public void setNeedsToRetryFailedMessages(final boolean needsToRetryFailedMessages) {
-        this.needsToRetryFailedMessages = needsToRetryFailedMessages;
-    }
-
-    public boolean hasCaughtUpWithOldMessages() {
-        return hasCaughtUpWithOldMessages;
+        dependencies.setAllowStories(!receiveConfig.ignoreStories());
     }
 
     public void setAuthenticationFailureListener(final Callable authenticationFailureListener) {
@@ -77,7 +73,7 @@ public class ReceiveHelper {
     public void receiveMessagesContinuously(Manager.ReceiveMessageHandler handler) {
         while (!shouldStop) {
             try {
-                receiveMessages(Duration.ofMinutes(1), false, handler);
+                receiveMessages(Duration.ofMinutes(1), false, null, handler);
                 break;
             } catch (IOException e) {
                 logger.warn("Receiving messages failed, retrying", e);
@@ -86,50 +82,54 @@ public class ReceiveHelper {
     }
 
     public void receiveMessages(
-            Duration timeout, boolean returnOnTimeout, Manager.ReceiveMessageHandler handler
+            Duration timeout,
+            boolean returnOnTimeout,
+            Integer maxMessages,
+            Manager.ReceiveMessageHandler handler
     ) throws IOException {
-        needsToRetryFailedMessages = true;
+        account.setNeedsToRetryFailedMessages(true);
         hasCaughtUpWithOldMessages = false;
 
         // Use a Map here because java Set doesn't have a get method ...
         Map<HandleAction, HandleAction> queuedActions = new HashMap<>();
 
-        final var signalWebSocket = dependencies.getSignalWebSocket();
-        final var webSocketStateDisposable = Observable.merge(signalWebSocket.getUnidentifiedWebSocketState(),
-                        signalWebSocket.getWebSocketState())
+        final var signalWebSocket = dependencies.getAuthenticatedSignalWebSocket();
+        final var webSocketStateDisposable = signalWebSocket.getState()
                 .subscribeOn(Schedulers.computation())
                 .observeOn(Schedulers.computation())
                 .distinctUntilChanged()
                 .subscribe(this::onWebSocketStateChange);
         signalWebSocket.connect();
+        signalWebSocket.registerKeepAliveToken("receive");
 
         try {
-            receiveMessagesInternal(timeout, returnOnTimeout, handler, queuedActions);
+            receiveMessagesInternal(signalWebSocket, timeout, returnOnTimeout, maxMessages, handler, queuedActions);
         } finally {
             hasCaughtUpWithOldMessages = false;
             handleQueuedActions(queuedActions.keySet());
             queuedActions.clear();
-            dependencies.getSignalWebSocket().disconnect();
+            signalWebSocket.removeKeepAliveToken("receive");
+            signalWebSocket.disconnect();
             webSocketStateDisposable.dispose();
             shouldStop = false;
         }
     }
 
     private void receiveMessagesInternal(
+            final SignalWebSocket.AuthenticatedWebSocket signalWebSocket,
             Duration timeout,
             boolean returnOnTimeout,
+            Integer maxMessages,
             Manager.ReceiveMessageHandler handler,
             final Map<HandleAction, HandleAction> queuedActions
     ) throws IOException {
-        final var signalWebSocket = dependencies.getSignalWebSocket();
-
+        int remainingMessages = maxMessages == null ? -1 : maxMessages;
         var backOffCounter = 0;
         isWaitingForMessage = false;
 
-        while (!shouldStop) {
-            if (needsToRetryFailedMessages) {
+        while (!shouldStop && remainingMessages != 0) {
+            if (account.getNeedsToRetryFailedMessages()) {
                 retryFailedReceivedMessages(handler);
-                needsToRetryFailedMessages = false;
             }
             SignalServiceEnvelope envelope;
             final CachedMessage[] cachedMessage = {null};
@@ -140,25 +140,41 @@ public class ReceiveHelper {
             logger.debug("Checking for new message from server");
             try {
                 isWaitingForMessage = true;
-                var result = signalWebSocket.readOrEmpty(timeout.toMillis(), envelope1 -> {
+                var queueNotEmpty = signalWebSocket.readMessageBatch(timeout.toMillis(), 1, batch -> {
+                    logger.debug("Retrieved {} envelopes!", batch.size());
                     isWaitingForMessage = false;
-                    final var recipientId = envelope1.hasSourceUuid() ? account.getRecipientResolver()
-                            .resolveRecipient(envelope1.getSourceAddress()) : null;
-                    logger.trace("Storing new message from {}", recipientId);
-                    // store message on disk, before acknowledging receipt to the server
-                    cachedMessage[0] = account.getMessageCache().cacheMessage(envelope1, recipientId);
+                    for (final var it : batch) {
+                        SignalServiceEnvelope envelope1 = new SignalServiceEnvelope(it.getEnvelope(),
+                                it.getServerDeliveredTimestamp());
+                        final var recipientId = envelope1.getSourceServiceId()
+                                .map(ServiceId::parseOrNull)
+                                .map(s -> account.getRecipientResolver().resolveRecipient(s))
+                                .orElse(null);
+                        logger.trace("Storing new message from {}", recipientId);
+                        // store message on disk, before acknowledging receipt to the server
+                        cachedMessage[0] = account.getMessageCache().cacheMessage(envelope1, recipientId);
+                        try {
+                            signalWebSocket.sendAck(it);
+                        } catch (IOException e) {
+                            logger.warn("Failed to ack envelope to server after storing it: {}", e.getMessage());
+                        }
+                    }
                 });
                 isWaitingForMessage = false;
                 backOffCounter = 0;
 
-                if (result.isPresent()) {
-                    envelope = result.get();
+                if (queueNotEmpty) {
+                    if (remainingMessages > 0) {
+                        remainingMessages -= 1;
+                    }
+                    envelope = cachedMessage[0].loadEnvelope();
                     logger.debug("New message received from server");
                 } else {
                     logger.debug("Received indicator that server queue is empty");
                     handleQueuedActions(queuedActions.keySet());
                     queuedActions.clear();
 
+                    context.getJobExecutor().enqueueJob(new CleanOldPreKeysJob());
                     hasCaughtUpWithOldMessages = true;
                     caughtUpWithOldMessagesListener.call();
 
@@ -191,39 +207,49 @@ public class ReceiveHelper {
                 backOffCounter = 0;
                 if (returnOnTimeout) return;
                 continue;
+            } catch (Exception e) {
+                logger.error("Unknown error when receiving messages", e);
+                continue;
             }
 
-            final var result = context.getIncomingMessageHandler().handleEnvelope(envelope, receiveConfig, handler);
-            for (final var h : result.first()) {
-                final var existingAction = queuedActions.get(h);
-                if (existingAction == null) {
-                    queuedActions.put(h, h);
-                } else {
-                    existingAction.mergeOther(h);
+            try {
+                final var result = context.getIncomingMessageHandler().handleEnvelope(envelope, receiveConfig, handler);
+                for (final var h : result.first()) {
+                    final var existingAction = queuedActions.get(h);
+                    if (existingAction == null) {
+                        queuedActions.put(h, h);
+                    } else {
+                        existingAction.mergeOther(h);
+                    }
                 }
-            }
-            final var exception = result.second();
+                final var exception = result.second();
 
-            if (hasCaughtUpWithOldMessages) {
-                handleQueuedActions(queuedActions.keySet());
-                queuedActions.clear();
-            }
-            if (cachedMessage[0] != null) {
-                if (exception instanceof UntrustedIdentityException) {
-                    logger.debug("Keeping message with untrusted identity in message cache");
-                    final var address = ((UntrustedIdentityException) exception).getSender();
-                    final var recipientId = account.getRecipientResolver().resolveRecipient(address);
-                    if (!envelope.hasSourceUuid()) {
-                        try {
-                            cachedMessage[0] = account.getMessageCache().replaceSender(cachedMessage[0], recipientId);
-                        } catch (IOException ioException) {
-                            logger.warn("Failed to move cached message to recipient folder: {}",
-                                    ioException.getMessage());
+                if (hasCaughtUpWithOldMessages) {
+                    handleQueuedActions(queuedActions.keySet());
+                    queuedActions.clear();
+                }
+                if (cachedMessage[0] != null) {
+                    if (exception instanceof UntrustedIdentityException) {
+                        logger.debug("Keeping message with untrusted identity in message cache");
+                        final var address = ((UntrustedIdentityException) exception).getSender();
+                        if (envelope.getSourceServiceId().isEmpty() && address.aci().isPresent()) {
+                            final var recipientId = account.getRecipientResolver()
+                                    .resolveRecipient(ACI.parseOrThrow(address.aci().get()));
+                            try {
+                                cachedMessage[0] = account.getMessageCache()
+                                        .replaceSender(cachedMessage[0], recipientId);
+                            } catch (IOException ioException) {
+                                logger.warn("Failed to move cached message to recipient folder: {}",
+                                        ioException.getMessage(),
+                                        ioException);
+                            }
                         }
+                    } else {
+                        cachedMessage[0].delete();
                     }
-                } else {
-                    cachedMessage[0].delete();
                 }
+            } catch (Exception e) {
+                logger.error("Unknown error when handling messages", e);
             }
         }
     }
@@ -237,10 +263,12 @@ public class ReceiveHelper {
             }
         }
         handleQueuedActions(queuedActions);
+        account.setNeedsToRetryFailedMessages(false);
     }
 
     private List<HandleAction> retryFailedReceivedMessage(
-            final Manager.ReceiveMessageHandler handler, final CachedMessage cachedMessage
+            final Manager.ReceiveMessageHandler handler,
+            final CachedMessage cachedMessage
     ) {
         var envelope = cachedMessage.loadEnvelope();
         if (envelope == null) {
@@ -253,18 +281,21 @@ public class ReceiveHelper {
         final var exception = result.second();
 
         if (exception instanceof UntrustedIdentityException) {
-            if (System.currentTimeMillis() - envelope.getServerDeliveredTimestamp() > 1000L * 60 * 60 * 24 * 30) {
-                // Envelope is more than a month old, cleaning up.
+            if (System.currentTimeMillis() - envelope.getServerDeliveredTimestamp() > 1000L * 60 * 60 * 24 * 14) {
+                // Envelope is more than two weeks old, cleaning up.
                 cachedMessage.delete();
                 return null;
             }
-            if (!envelope.hasSourceUuid()) {
+            if (envelope.getSourceServiceId().isEmpty()) {
                 final var identifier = ((UntrustedIdentityException) exception).getSender();
-                final var recipientId = account.getRecipientResolver().resolveRecipient(identifier);
+                final var recipientId = account.getRecipientResolver()
+                        .resolveRecipient(new RecipientAddress(identifier));
                 try {
                     account.getMessageCache().replaceSender(cachedMessage, recipientId);
                 } catch (IOException ioException) {
-                    logger.warn("Failed to move cached message to recipient folder: {}", ioException.getMessage());
+                    logger.warn("Failed to move cached message to recipient folder: {}",
+                            ioException.getMessage(),
+                            ioException);
                 }
             }
             return null;