import org.asamk.signal.manager.storage.recipients.RecipientAddress;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.whispersystems.signalservice.api.SignalWebSocket;
import org.whispersystems.signalservice.api.messages.SignalServiceEnvelope;
import org.whispersystems.signalservice.api.push.ServiceId;
import org.whispersystems.signalservice.api.push.ServiceId.ACI;
+import org.whispersystems.signalservice.api.websocket.SignalWebSocket;
import org.whispersystems.signalservice.api.websocket.WebSocketConnectionState;
import org.whispersystems.signalservice.api.websocket.WebSocketUnavailableException;
import java.util.Set;
import java.util.concurrent.TimeoutException;
-import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.schedulers.Schedulers;
public class ReceiveHelper {
}
public void receiveMessages(
- Duration timeout, boolean returnOnTimeout, Integer maxMessages, Manager.ReceiveMessageHandler handler
+ Duration timeout,
+ boolean returnOnTimeout,
+ Integer maxMessages,
+ Manager.ReceiveMessageHandler handler
) throws IOException {
account.setNeedsToRetryFailedMessages(true);
hasCaughtUpWithOldMessages = false;
// Use a Map here because java Set doesn't have a get method ...
Map<HandleAction, HandleAction> queuedActions = new HashMap<>();
- final var signalWebSocket = dependencies.getSignalWebSocket();
- final var webSocketStateDisposable = Observable.merge(signalWebSocket.getUnidentifiedWebSocketState(),
- signalWebSocket.getWebSocketState())
+ final var signalWebSocket = dependencies.getAuthenticatedSignalWebSocket();
+ final var webSocketStateDisposable = signalWebSocket.getState()
.subscribeOn(Schedulers.computation())
.observeOn(Schedulers.computation())
.distinctUntilChanged()
.subscribe(this::onWebSocketStateChange);
signalWebSocket.connect();
+ signalWebSocket.registerKeepAliveToken("receive");
try {
receiveMessagesInternal(signalWebSocket, timeout, returnOnTimeout, maxMessages, handler, queuedActions);
hasCaughtUpWithOldMessages = false;
handleQueuedActions(queuedActions.keySet());
queuedActions.clear();
+ signalWebSocket.removeKeepAliveToken("receive");
signalWebSocket.disconnect();
webSocketStateDisposable.dispose();
shouldStop = false;
}
private void receiveMessagesInternal(
- final SignalWebSocket signalWebSocket,
+ final SignalWebSocket.AuthenticatedWebSocket signalWebSocket,
Duration timeout,
boolean returnOnTimeout,
Integer maxMessages,
}
private List<HandleAction> retryFailedReceivedMessage(
- final Manager.ReceiveMessageHandler handler, final CachedMessage cachedMessage
+ final Manager.ReceiveMessageHandler handler,
+ final CachedMessage cachedMessage
) {
var envelope = cachedMessage.loadEnvelope();
if (envelope == null) {