]> nmode's Git Repositories - signal-cli/blobdiff - lib/src/main/java/org/asamk/signal/manager/helper/ReceiveHelper.java
Update libsignal-service
[signal-cli] / lib / src / main / java / org / asamk / signal / manager / helper / ReceiveHelper.java
index 3d4ca38d24361b18c60fbd3de3bcfc527b194e26..71f690816f52b15efe19d0982442d21145d4cdcb 100644 (file)
@@ -5,14 +5,16 @@ import org.asamk.signal.manager.actions.HandleAction;
 import org.asamk.signal.manager.api.ReceiveConfig;
 import org.asamk.signal.manager.api.UntrustedIdentityException;
 import org.asamk.signal.manager.internal.SignalDependencies;
+import org.asamk.signal.manager.jobs.CleanOldPreKeysJob;
 import org.asamk.signal.manager.storage.SignalAccount;
 import org.asamk.signal.manager.storage.messageCache.CachedMessage;
 import org.asamk.signal.manager.storage.recipients.RecipientAddress;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.whispersystems.signalservice.api.SignalWebSocket;
 import org.whispersystems.signalservice.api.messages.SignalServiceEnvelope;
 import org.whispersystems.signalservice.api.push.ServiceId;
+import org.whispersystems.signalservice.api.push.ServiceId.ACI;
+import org.whispersystems.signalservice.api.websocket.SignalWebSocket;
 import org.whispersystems.signalservice.api.websocket.WebSocketConnectionState;
 import org.whispersystems.signalservice.api.websocket.WebSocketUnavailableException;
 
@@ -26,20 +28,18 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeoutException;
 
-import io.reactivex.rxjava3.core.Observable;
 import io.reactivex.rxjava3.schedulers.Schedulers;
 
 public class ReceiveHelper {
 
-    private final static Logger logger = LoggerFactory.getLogger(ReceiveHelper.class);
-    private final static int MAX_BACKOFF_COUNTER = 9;
+    private static final Logger logger = LoggerFactory.getLogger(ReceiveHelper.class);
+    private static final int MAX_BACKOFF_COUNTER = 9;
 
     private final SignalAccount account;
     private final SignalDependencies dependencies;
     private final Context context;
 
     private ReceiveConfig receiveConfig = new ReceiveConfig(false, false, false);
-    private boolean needsToRetryFailedMessages = false;
     private boolean hasCaughtUpWithOldMessages = false;
     private boolean isWaitingForMessage = false;
     private boolean shouldStop = false;
@@ -57,10 +57,6 @@ public class ReceiveHelper {
         dependencies.setAllowStories(!receiveConfig.ignoreStories());
     }
 
-    public void setNeedsToRetryFailedMessages(final boolean needsToRetryFailedMessages) {
-        this.needsToRetryFailedMessages = needsToRetryFailedMessages;
-    }
-
     public void setAuthenticationFailureListener(final Callable authenticationFailureListener) {
         this.authenticationFailureListener = authenticationFailureListener;
     }
@@ -86,22 +82,25 @@ public class ReceiveHelper {
     }
 
     public void receiveMessages(
-            Duration timeout, boolean returnOnTimeout, Integer maxMessages, Manager.ReceiveMessageHandler handler
+            Duration timeout,
+            boolean returnOnTimeout,
+            Integer maxMessages,
+            Manager.ReceiveMessageHandler handler
     ) throws IOException {
-        needsToRetryFailedMessages = true;
+        account.setNeedsToRetryFailedMessages(true);
         hasCaughtUpWithOldMessages = false;
 
         // Use a Map here because java Set doesn't have a get method ...
         Map<HandleAction, HandleAction> queuedActions = new HashMap<>();
 
-        final var signalWebSocket = dependencies.getSignalWebSocket();
-        final var webSocketStateDisposable = Observable.merge(signalWebSocket.getUnidentifiedWebSocketState(),
-                        signalWebSocket.getWebSocketState())
+        final var signalWebSocket = dependencies.getAuthenticatedSignalWebSocket();
+        final var webSocketStateDisposable = signalWebSocket.getState()
                 .subscribeOn(Schedulers.computation())
                 .observeOn(Schedulers.computation())
                 .distinctUntilChanged()
                 .subscribe(this::onWebSocketStateChange);
         signalWebSocket.connect();
+        signalWebSocket.registerKeepAliveToken("receive");
 
         try {
             receiveMessagesInternal(signalWebSocket, timeout, returnOnTimeout, maxMessages, handler, queuedActions);
@@ -109,6 +108,7 @@ public class ReceiveHelper {
             hasCaughtUpWithOldMessages = false;
             handleQueuedActions(queuedActions.keySet());
             queuedActions.clear();
+            signalWebSocket.removeKeepAliveToken("receive");
             signalWebSocket.disconnect();
             webSocketStateDisposable.dispose();
             shouldStop = false;
@@ -116,7 +116,7 @@ public class ReceiveHelper {
     }
 
     private void receiveMessagesInternal(
-            final SignalWebSocket signalWebSocket,
+            final SignalWebSocket.AuthenticatedWebSocket signalWebSocket,
             Duration timeout,
             boolean returnOnTimeout,
             Integer maxMessages,
@@ -128,9 +128,8 @@ public class ReceiveHelper {
         isWaitingForMessage = false;
 
         while (!shouldStop && remainingMessages != 0) {
-            if (needsToRetryFailedMessages) {
+            if (account.getNeedsToRetryFailedMessages()) {
                 retryFailedReceivedMessages(handler);
-                needsToRetryFailedMessages = false;
             }
             SignalServiceEnvelope envelope;
             final CachedMessage[] cachedMessage = {null};
@@ -147,8 +146,10 @@ public class ReceiveHelper {
                     for (final var it : batch) {
                         SignalServiceEnvelope envelope1 = new SignalServiceEnvelope(it.getEnvelope(),
                                 it.getServerDeliveredTimestamp());
-                        final var recipientId = envelope1.hasSourceUuid() ? account.getRecipientResolver()
-                                .resolveRecipient(envelope1.getSourceAddress()) : null;
+                        final var recipientId = envelope1.getSourceServiceId()
+                                .map(ServiceId::parseOrNull)
+                                .map(s -> account.getRecipientResolver().resolveRecipient(s))
+                                .orElse(null);
                         logger.trace("Storing new message from {}", recipientId);
                         // store message on disk, before acknowledging receipt to the server
                         cachedMessage[0] = account.getMessageCache().cacheMessage(envelope1, recipientId);
@@ -173,6 +174,7 @@ public class ReceiveHelper {
                     handleQueuedActions(queuedActions.keySet());
                     queuedActions.clear();
 
+                    context.getJobExecutor().enqueueJob(new CleanOldPreKeysJob());
                     hasCaughtUpWithOldMessages = true;
                     caughtUpWithOldMessagesListener.call();
 
@@ -205,40 +207,49 @@ public class ReceiveHelper {
                 backOffCounter = 0;
                 if (returnOnTimeout) return;
                 continue;
+            } catch (Exception e) {
+                logger.error("Unknown error when receiving messages", e);
+                continue;
             }
 
-            final var result = context.getIncomingMessageHandler().handleEnvelope(envelope, receiveConfig, handler);
-            for (final var h : result.first()) {
-                final var existingAction = queuedActions.get(h);
-                if (existingAction == null) {
-                    queuedActions.put(h, h);
-                } else {
-                    existingAction.mergeOther(h);
+            try {
+                final var result = context.getIncomingMessageHandler().handleEnvelope(envelope, receiveConfig, handler);
+                for (final var h : result.first()) {
+                    final var existingAction = queuedActions.get(h);
+                    if (existingAction == null) {
+                        queuedActions.put(h, h);
+                    } else {
+                        existingAction.mergeOther(h);
+                    }
                 }
-            }
-            final var exception = result.second();
+                final var exception = result.second();
 
-            if (hasCaughtUpWithOldMessages) {
-                handleQueuedActions(queuedActions.keySet());
-                queuedActions.clear();
-            }
-            if (cachedMessage[0] != null) {
-                if (exception instanceof UntrustedIdentityException) {
-                    logger.debug("Keeping message with untrusted identity in message cache");
-                    final var address = ((UntrustedIdentityException) exception).getSender();
-                    if (!envelope.hasSourceUuid() && address.uuid().isPresent()) {
-                        final var recipientId = account.getRecipientResolver()
-                                .resolveRecipient(ServiceId.from(address.uuid().get()));
-                        try {
-                            cachedMessage[0] = account.getMessageCache().replaceSender(cachedMessage[0], recipientId);
-                        } catch (IOException ioException) {
-                            logger.warn("Failed to move cached message to recipient folder: {}",
-                                    ioException.getMessage());
+                if (hasCaughtUpWithOldMessages) {
+                    handleQueuedActions(queuedActions.keySet());
+                    queuedActions.clear();
+                }
+                if (cachedMessage[0] != null) {
+                    if (exception instanceof UntrustedIdentityException) {
+                        logger.debug("Keeping message with untrusted identity in message cache");
+                        final var address = ((UntrustedIdentityException) exception).getSender();
+                        if (envelope.getSourceServiceId().isEmpty() && address.aci().isPresent()) {
+                            final var recipientId = account.getRecipientResolver()
+                                    .resolveRecipient(ACI.parseOrThrow(address.aci().get()));
+                            try {
+                                cachedMessage[0] = account.getMessageCache()
+                                        .replaceSender(cachedMessage[0], recipientId);
+                            } catch (IOException ioException) {
+                                logger.warn("Failed to move cached message to recipient folder: {}",
+                                        ioException.getMessage(),
+                                        ioException);
+                            }
                         }
+                    } else {
+                        cachedMessage[0].delete();
                     }
-                } else {
-                    cachedMessage[0].delete();
                 }
+            } catch (Exception e) {
+                logger.error("Unknown error when handling messages", e);
             }
         }
     }
@@ -252,10 +263,12 @@ public class ReceiveHelper {
             }
         }
         handleQueuedActions(queuedActions);
+        account.setNeedsToRetryFailedMessages(false);
     }
 
     private List<HandleAction> retryFailedReceivedMessage(
-            final Manager.ReceiveMessageHandler handler, final CachedMessage cachedMessage
+            final Manager.ReceiveMessageHandler handler,
+            final CachedMessage cachedMessage
     ) {
         var envelope = cachedMessage.loadEnvelope();
         if (envelope == null) {
@@ -268,19 +281,21 @@ public class ReceiveHelper {
         final var exception = result.second();
 
         if (exception instanceof UntrustedIdentityException) {
-            if (System.currentTimeMillis() - envelope.getServerDeliveredTimestamp() > 1000L * 60 * 60 * 24 * 30) {
-                // Envelope is more than a month old, cleaning up.
+            if (System.currentTimeMillis() - envelope.getServerDeliveredTimestamp() > 1000L * 60 * 60 * 24 * 14) {
+                // Envelope is more than two weeks old, cleaning up.
                 cachedMessage.delete();
                 return null;
             }
-            if (!envelope.hasSourceUuid()) {
+            if (envelope.getSourceServiceId().isEmpty()) {
                 final var identifier = ((UntrustedIdentityException) exception).getSender();
                 final var recipientId = account.getRecipientResolver()
                         .resolveRecipient(new RecipientAddress(identifier));
                 try {
                     account.getMessageCache().replaceSender(cachedMessage, recipientId);
                 } catch (IOException ioException) {
-                    logger.warn("Failed to move cached message to recipient folder: {}", ioException.getMessage());
+                    logger.warn("Failed to move cached message to recipient folder: {}",
+                            ioException.getMessage(),
+                            ioException);
                 }
             }
             return null;