1 package org
.asamk
.signal
.manager
;
3 import org
.slf4j
.Logger
;
4 import org
.slf4j
.LoggerFactory
;
5 import org
.whispersystems
.libsignal
.util
.guava
.Preconditions
;
6 import org
.whispersystems
.signalservice
.api
.SignalWebSocket
;
7 import org
.whispersystems
.signalservice
.api
.util
.SleepTimer
;
8 import org
.whispersystems
.signalservice
.api
.websocket
.HealthMonitor
;
9 import org
.whispersystems
.signalservice
.api
.websocket
.WebSocketConnectionState
;
10 import org
.whispersystems
.signalservice
.internal
.websocket
.WebSocketConnection
;
12 import java
.util
.Arrays
;
13 import java
.util
.concurrent
.TimeUnit
;
15 import io
.reactivex
.rxjava3
.schedulers
.Schedulers
;
18 * Monitors the health of the identified and unidentified WebSockets. If either one appears to be
19 * unhealthy, will trigger restarting both.
21 * The monitor is also responsible for sending heartbeats/keep-alive messages to prevent
24 final class SignalWebSocketHealthMonitor
implements HealthMonitor
{
26 private final static Logger logger
= LoggerFactory
.getLogger(SignalWebSocketHealthMonitor
.class);
28 private static final long KEEP_ALIVE_SEND_CADENCE
= TimeUnit
.SECONDS
.toMillis(WebSocketConnection
.KEEPALIVE_TIMEOUT_SECONDS
);
29 private static final long MAX_TIME_SINCE_SUCCESSFUL_KEEP_ALIVE
= KEEP_ALIVE_SEND_CADENCE
* 3;
31 private SignalWebSocket signalWebSocket
;
32 private final SleepTimer sleepTimer
;
34 private volatile KeepAliveSender keepAliveSender
;
36 private final HealthState identified
= new HealthState();
37 private final HealthState unidentified
= new HealthState();
39 public SignalWebSocketHealthMonitor(SleepTimer sleepTimer
) {
40 this.sleepTimer
= sleepTimer
;
43 public void monitor(SignalWebSocket signalWebSocket
) {
44 Preconditions
.checkNotNull(signalWebSocket
);
45 Preconditions
.checkArgument(this.signalWebSocket
== null, "monitor can only be called once");
47 this.signalWebSocket
= signalWebSocket
;
49 //noinspection ResultOfMethodCallIgnored
50 signalWebSocket
.getWebSocketState()
51 .subscribeOn(Schedulers
.computation())
52 .observeOn(Schedulers
.computation())
53 .distinctUntilChanged()
54 .subscribe(s
-> onStateChange(s
, identified
));
56 //noinspection ResultOfMethodCallIgnored
57 signalWebSocket
.getUnidentifiedWebSocketState()
58 .subscribeOn(Schedulers
.computation())
59 .observeOn(Schedulers
.computation())
60 .distinctUntilChanged()
61 .subscribe(s
-> onStateChange(s
, unidentified
));
64 private synchronized void onStateChange(WebSocketConnectionState connectionState
, HealthState healthState
) {
65 switch (connectionState
) {
66 case CONNECTED
-> logger
.debug("WebSocket is now connected");
67 case AUTHENTICATION_FAILED
-> logger
.debug("WebSocket authentication failed");
68 case FAILED
-> logger
.debug("WebSocket connection failed");
71 healthState
.needsKeepAlive
= connectionState
== WebSocketConnectionState
.CONNECTED
;
73 if (keepAliveSender
== null && isKeepAliveNecessary()) {
74 keepAliveSender
= new KeepAliveSender();
75 keepAliveSender
.start();
76 } else if (keepAliveSender
!= null && !isKeepAliveNecessary()) {
77 keepAliveSender
.shutdown();
78 keepAliveSender
= null;
83 public void onKeepAliveResponse(long sentTimestamp
, boolean isIdentifiedWebSocket
) {
84 if (isIdentifiedWebSocket
) {
85 identified
.lastKeepAliveReceived
= System
.currentTimeMillis();
87 unidentified
.lastKeepAliveReceived
= System
.currentTimeMillis();
92 public void onMessageError(int status
, boolean isIdentifiedWebSocket
) {
94 HealthState healthState
= (isIdentifiedWebSocket ? identified
: unidentified
);
95 if (healthState
.mismatchErrorTracker
.addSample(System
.currentTimeMillis())) {
96 logger
.warn("Received too many mismatch device errors, forcing new websockets.");
97 signalWebSocket
.forceNewWebSockets();
98 signalWebSocket
.connect();
103 private boolean isKeepAliveNecessary() {
104 return identified
.needsKeepAlive
|| unidentified
.needsKeepAlive
;
107 private static class HealthState
{
109 private final HttpErrorTracker mismatchErrorTracker
= new HttpErrorTracker(5, TimeUnit
.MINUTES
.toMillis(1));
111 private volatile boolean needsKeepAlive
;
112 private volatile long lastKeepAliveReceived
;
116 * Sends periodic heartbeats/keep-alives over both WebSockets to prevent connection timeouts. If
117 * either WebSocket fails 3 times to get a return heartbeat both are forced to be recreated.
119 private class KeepAliveSender
extends Thread
{
121 private volatile boolean shouldKeepRunning
= true;
124 identified
.lastKeepAliveReceived
= System
.currentTimeMillis();
125 unidentified
.lastKeepAliveReceived
= System
.currentTimeMillis();
127 while (shouldKeepRunning
&& isKeepAliveNecessary()) {
129 sleepTimer
.sleep(KEEP_ALIVE_SEND_CADENCE
);
131 if (shouldKeepRunning
&& isKeepAliveNecessary()) {
132 long keepAliveRequiredSinceTime
= System
.currentTimeMillis()
133 - MAX_TIME_SINCE_SUCCESSFUL_KEEP_ALIVE
;
135 if (identified
.lastKeepAliveReceived
< keepAliveRequiredSinceTime
136 || unidentified
.lastKeepAliveReceived
< keepAliveRequiredSinceTime
) {
137 logger
.warn("Missed keep alives, identified last: "
138 + identified
.lastKeepAliveReceived
139 + " unidentified last: "
140 + unidentified
.lastKeepAliveReceived
142 + keepAliveRequiredSinceTime
);
143 signalWebSocket
.forceNewWebSockets();
144 signalWebSocket
.connect();
146 signalWebSocket
.sendKeepAlive();
149 } catch (Throwable e
) {
150 logger
.warn("Error occured in KeepAliveSender, ignoring ...", e
);
155 public void shutdown() {
156 shouldKeepRunning
= false;
160 private final static class HttpErrorTracker
{
162 private final long[] timestamps
;
163 private final long errorTimeRange
;
165 public HttpErrorTracker(int samples
, long errorTimeRange
) {
166 this.timestamps
= new long[samples
];
167 this.errorTimeRange
= errorTimeRange
;
170 public synchronized boolean addSample(long now
) {
171 long errorsMustBeAfter
= now
- errorTimeRange
;
175 for (int i
= 0; i
< timestamps
.length
; i
++) {
176 if (timestamps
[i
] < errorsMustBeAfter
) {
178 } else if (timestamps
[i
] != 0) {
182 if (timestamps
[i
] < timestamps
[minIndex
]) {
187 timestamps
[minIndex
] = now
;
189 if (count
>= timestamps
.length
) {
190 Arrays
.fill(timestamps
, 0);