]> nmode's Git Repositories - signal-cli/blobdiff - lib/src/main/java/org/asamk/signal/manager/helper/ReceiveHelper.java
Update libsignal-service
[signal-cli] / lib / src / main / java / org / asamk / signal / manager / helper / ReceiveHelper.java
index baa0d583b8f4caccb7adab1dd58fc19cf233d3ef..71f690816f52b15efe19d0982442d21145d4cdcb 100644 (file)
@@ -11,10 +11,10 @@ import org.asamk.signal.manager.storage.messageCache.CachedMessage;
 import org.asamk.signal.manager.storage.recipients.RecipientAddress;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.whispersystems.signalservice.api.SignalWebSocket;
 import org.whispersystems.signalservice.api.messages.SignalServiceEnvelope;
 import org.whispersystems.signalservice.api.push.ServiceId;
 import org.whispersystems.signalservice.api.push.ServiceId.ACI;
+import org.whispersystems.signalservice.api.websocket.SignalWebSocket;
 import org.whispersystems.signalservice.api.websocket.WebSocketConnectionState;
 import org.whispersystems.signalservice.api.websocket.WebSocketUnavailableException;
 
@@ -28,7 +28,6 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeoutException;
 
-import io.reactivex.rxjava3.core.Observable;
 import io.reactivex.rxjava3.schedulers.Schedulers;
 
 public class ReceiveHelper {
@@ -83,7 +82,10 @@ public class ReceiveHelper {
     }
 
     public void receiveMessages(
-            Duration timeout, boolean returnOnTimeout, Integer maxMessages, Manager.ReceiveMessageHandler handler
+            Duration timeout,
+            boolean returnOnTimeout,
+            Integer maxMessages,
+            Manager.ReceiveMessageHandler handler
     ) throws IOException {
         account.setNeedsToRetryFailedMessages(true);
         hasCaughtUpWithOldMessages = false;
@@ -91,14 +93,14 @@ public class ReceiveHelper {
         // Use a Map here because java Set doesn't have a get method ...
         Map<HandleAction, HandleAction> queuedActions = new HashMap<>();
 
-        final var signalWebSocket = dependencies.getSignalWebSocket();
-        final var webSocketStateDisposable = Observable.merge(signalWebSocket.getUnidentifiedWebSocketState(),
-                        signalWebSocket.getWebSocketState())
+        final var signalWebSocket = dependencies.getAuthenticatedSignalWebSocket();
+        final var webSocketStateDisposable = signalWebSocket.getState()
                 .subscribeOn(Schedulers.computation())
                 .observeOn(Schedulers.computation())
                 .distinctUntilChanged()
                 .subscribe(this::onWebSocketStateChange);
         signalWebSocket.connect();
+        signalWebSocket.registerKeepAliveToken("receive");
 
         try {
             receiveMessagesInternal(signalWebSocket, timeout, returnOnTimeout, maxMessages, handler, queuedActions);
@@ -106,6 +108,7 @@ public class ReceiveHelper {
             hasCaughtUpWithOldMessages = false;
             handleQueuedActions(queuedActions.keySet());
             queuedActions.clear();
+            signalWebSocket.removeKeepAliveToken("receive");
             signalWebSocket.disconnect();
             webSocketStateDisposable.dispose();
             shouldStop = false;
@@ -113,7 +116,7 @@ public class ReceiveHelper {
     }
 
     private void receiveMessagesInternal(
-            final SignalWebSocket signalWebSocket,
+            final SignalWebSocket.AuthenticatedWebSocket signalWebSocket,
             Duration timeout,
             boolean returnOnTimeout,
             Integer maxMessages,
@@ -264,7 +267,8 @@ public class ReceiveHelper {
     }
 
     private List<HandleAction> retryFailedReceivedMessage(
-            final Manager.ReceiveMessageHandler handler, final CachedMessage cachedMessage
+            final Manager.ReceiveMessageHandler handler,
+            final CachedMessage cachedMessage
     ) {
         var envelope = cachedMessage.loadEnvelope();
         if (envelope == null) {