.distinctUntilChanged()
.subscribe(this::onWebSocketStateChange);
signalWebSocket.connect();
+ signalWebSocket.registerKeepAliveToken("receive");
try {
receiveMessagesInternal(signalWebSocket, timeout, returnOnTimeout, maxMessages, handler, queuedActions);
hasCaughtUpWithOldMessages = false;
handleQueuedActions(queuedActions.keySet());
queuedActions.clear();
+ signalWebSocket.removeKeepAliveToken("receive");
signalWebSocket.disconnect();
webSocketStateDisposable.dispose();
shouldStop = false;