1 package org
.asamk
.signal
.manager
.internal
;
3 import org
.slf4j
.Logger
;
4 import org
.slf4j
.LoggerFactory
;
5 import org
.whispersystems
.signalservice
.api
.util
.Preconditions
;
6 import org
.whispersystems
.signalservice
.api
.util
.SleepTimer
;
7 import org
.whispersystems
.signalservice
.api
.websocket
.HealthMonitor
;
8 import org
.whispersystems
.signalservice
.api
.websocket
.SignalWebSocket
;
9 import org
.whispersystems
.signalservice
.api
.websocket
.WebSocketConnectionState
;
10 import org
.whispersystems
.signalservice
.internal
.websocket
.OkHttpWebSocketConnection
;
12 import java
.util
.concurrent
.Executor
;
13 import java
.util
.concurrent
.Executors
;
14 import java
.util
.concurrent
.TimeUnit
;
16 import io
.reactivex
.rxjava3
.schedulers
.Schedulers
;
19 final class SignalWebSocketHealthMonitor
implements HealthMonitor
{
21 private static final Logger logger
= LoggerFactory
.getLogger(SignalWebSocketHealthMonitor
.class);
24 * This is the amount of time in between sent keep alives. Must be greater than [KEEP_ALIVE_TIMEOUT]
26 private static final long KEEP_ALIVE_SEND_CADENCE
= TimeUnit
.SECONDS
.toMillis(OkHttpWebSocketConnection
.KEEPALIVE_FREQUENCY_SECONDS
);
29 * This is the amount of time we will wait for a response to the keep alive before we consider the websockets dead.
30 * It is required that this value be less than [KEEP_ALIVE_SEND_CADENCE]
32 private static final long KEEP_ALIVE_TIMEOUT
= TimeUnit
.SECONDS
.toMillis(20);
34 private final Executor executor
= Executors
.newSingleThreadExecutor();
35 private final SleepTimer sleepTimer
;
36 private SignalWebSocket webSocket
= null;
37 private volatile KeepAliveSender keepAliveSender
= null;
38 private boolean needsKeepAlive
= false;
39 private long lastKeepAliveReceived
= 0;
41 public SignalWebSocketHealthMonitor(SleepTimer sleepTimer
) {
42 this.sleepTimer
= sleepTimer
;
45 void monitor(SignalWebSocket webSocket
) {
46 Preconditions
.checkNotNull(webSocket
);
47 Preconditions
.checkArgument(this.webSocket
== null, "monitor can only be called once");
49 executor
.execute(() -> {
51 this.webSocket
= webSocket
;
54 .subscribeOn(Schedulers
.computation())
55 .observeOn(Schedulers
.computation())
56 .distinctUntilChanged()
57 .subscribe(this::onStateChanged
);
59 webSocket
.setKeepAliveChangedListener(this::updateKeepAliveSenderStatus
);
63 private void onStateChanged(WebSocketConnectionState connectionState
) {
64 executor
.execute(() -> {
65 needsKeepAlive
= connectionState
== WebSocketConnectionState
.CONNECTED
;
67 updateKeepAliveSenderStatus();
72 public void onKeepAliveResponse(long sentTimestamp
, boolean isIdentifiedWebSocket
) {
73 final var keepAliveTime
= System
.currentTimeMillis();
74 executor
.execute(() -> lastKeepAliveReceived
= keepAliveTime
);
78 public void onMessageError(int status
, boolean isIdentifiedWebSocket
) {
81 private Unit
updateKeepAliveSenderStatus() {
82 if (keepAliveSender
== null && sendKeepAlives()) {
83 keepAliveSender
= new KeepAliveSender();
84 keepAliveSender
.start();
85 } else if (keepAliveSender
!= null && !sendKeepAlives()) {
86 keepAliveSender
.shutdown();
87 keepAliveSender
= null;
92 private boolean sendKeepAlives() {
93 return needsKeepAlive
&& webSocket
!= null && webSocket
.shouldSendKeepAlives();
97 * Sends periodic heartbeats/keep-alives over the WebSocket to prevent connection timeouts. If
98 * the WebSocket fails to get a return heartbeat after [KEEP_ALIVE_TIMEOUT] seconds, it is forced to be recreated.
100 private final class KeepAliveSender
extends Thread
{
102 private volatile boolean shouldKeepRunning
= true;
106 logger
.debug("[KeepAliveSender({})] started", this.threadId());
107 lastKeepAliveReceived
= System
.currentTimeMillis();
109 var keepAliveSendTime
= System
.currentTimeMillis();
110 while (shouldKeepRunning
&& sendKeepAlives()) {
112 final var nextKeepAliveSendTime
= keepAliveSendTime
+ KEEP_ALIVE_SEND_CADENCE
;
113 sleepUntil(nextKeepAliveSendTime
);
115 if (shouldKeepRunning
&& sendKeepAlives()) {
116 keepAliveSendTime
= System
.currentTimeMillis();
117 webSocket
.sendKeepAlive();
120 final var responseRequiredTime
= keepAliveSendTime
+ KEEP_ALIVE_TIMEOUT
;
121 sleepUntil(responseRequiredTime
);
123 if (shouldKeepRunning
&& sendKeepAlives()) {
124 if (lastKeepAliveReceived
< keepAliveSendTime
) {
125 logger
.debug("Missed keep alive, last: {} needed by: {}",
126 lastKeepAliveReceived
,
127 responseRequiredTime
);
128 webSocket
.forceNewWebSocket();
131 } catch (Throwable e
) {
132 logger
.warn("Keep alive sender failed", e
);
135 logger
.debug("[KeepAliveSender({})] ended", threadId());
138 void sleepUntil(long timeMillis
) {
139 while (System
.currentTimeMillis() < timeMillis
) {
140 final var waitTime
= timeMillis
- System
.currentTimeMillis();
143 sleepTimer
.sleep(waitTime
);
144 } catch (InterruptedException e
) {
145 logger
.warn("WebSocket health monitor interrupted", e
);
152 shouldKeepRunning
= false;