1 package org
.asamk
.signal
.manager
;
3 import org
.slf4j
.Logger
;
4 import org
.slf4j
.LoggerFactory
;
5 import org
.whispersystems
.libsignal
.util
.guava
.Preconditions
;
6 import org
.whispersystems
.signalservice
.api
.SignalWebSocket
;
7 import org
.whispersystems
.signalservice
.api
.util
.SleepTimer
;
8 import org
.whispersystems
.signalservice
.api
.websocket
.HealthMonitor
;
9 import org
.whispersystems
.signalservice
.api
.websocket
.WebSocketConnectionState
;
10 import org
.whispersystems
.signalservice
.internal
.websocket
.WebSocketConnection
;
12 import java
.util
.Arrays
;
13 import java
.util
.concurrent
.TimeUnit
;
15 import io
.reactivex
.rxjava3
.schedulers
.Schedulers
;
18 * Monitors the health of the identified and unidentified WebSockets. If either one appears to be
19 * unhealthy, will trigger restarting both.
21 * The monitor is also responsible for sending heartbeats/keep-alive messages to prevent
24 public final class SignalWebSocketHealthMonitor
implements HealthMonitor
{
26 private final static Logger logger
= LoggerFactory
.getLogger(SignalWebSocketHealthMonitor
.class);
28 private static final long KEEP_ALIVE_SEND_CADENCE
= TimeUnit
.SECONDS
.toMillis(WebSocketConnection
.KEEPALIVE_TIMEOUT_SECONDS
);
29 private static final long MAX_TIME_SINCE_SUCCESSFUL_KEEP_ALIVE
= KEEP_ALIVE_SEND_CADENCE
* 3;
31 private SignalWebSocket signalWebSocket
;
32 private final SleepTimer sleepTimer
;
34 private volatile KeepAliveSender keepAliveSender
;
36 private final HealthState identified
= new HealthState();
37 private final HealthState unidentified
= new HealthState();
39 public SignalWebSocketHealthMonitor(SleepTimer sleepTimer
) {
40 this.sleepTimer
= sleepTimer
;
43 public void monitor(SignalWebSocket signalWebSocket
) {
44 Preconditions
.checkNotNull(signalWebSocket
);
45 Preconditions
.checkArgument(this.signalWebSocket
== null, "monitor can only be called once");
47 this.signalWebSocket
= signalWebSocket
;
49 //noinspection ResultOfMethodCallIgnored
50 signalWebSocket
.getWebSocketState()
51 .subscribeOn(Schedulers
.computation())
52 .observeOn(Schedulers
.computation())
53 .distinctUntilChanged()
54 .subscribe(s
-> onStateChange(s
, identified
));
56 //noinspection ResultOfMethodCallIgnored
57 signalWebSocket
.getUnidentifiedWebSocketState()
58 .subscribeOn(Schedulers
.computation())
59 .observeOn(Schedulers
.computation())
60 .distinctUntilChanged()
61 .subscribe(s
-> onStateChange(s
, unidentified
));
64 private synchronized void onStateChange(WebSocketConnectionState connectionState
, HealthState healthState
) {
65 switch (connectionState
) {
67 logger
.debug("WebSocket is now connected");
69 case AUTHENTICATION_FAILED
:
70 logger
.debug("WebSocket authentication failed");
73 logger
.debug("WebSocket connection failed");
77 healthState
.needsKeepAlive
= connectionState
== WebSocketConnectionState
.CONNECTED
;
79 if (keepAliveSender
== null && isKeepAliveNecessary()) {
80 keepAliveSender
= new KeepAliveSender();
81 keepAliveSender
.start();
82 } else if (keepAliveSender
!= null && !isKeepAliveNecessary()) {
83 keepAliveSender
.shutdown();
84 keepAliveSender
= null;
89 public void onKeepAliveResponse(long sentTimestamp
, boolean isIdentifiedWebSocket
) {
90 if (isIdentifiedWebSocket
) {
91 identified
.lastKeepAliveReceived
= System
.currentTimeMillis();
93 unidentified
.lastKeepAliveReceived
= System
.currentTimeMillis();
98 public void onMessageError(int status
, boolean isIdentifiedWebSocket
) {
100 HealthState healthState
= (isIdentifiedWebSocket ? identified
: unidentified
);
101 if (healthState
.mismatchErrorTracker
.addSample(System
.currentTimeMillis())) {
102 logger
.warn("Received too many mismatch device errors, forcing new websockets.");
103 signalWebSocket
.forceNewWebSockets();
108 private boolean isKeepAliveNecessary() {
109 return identified
.needsKeepAlive
|| unidentified
.needsKeepAlive
;
112 private static class HealthState
{
114 private final HttpErrorTracker mismatchErrorTracker
= new HttpErrorTracker(5, TimeUnit
.MINUTES
.toMillis(1));
116 private volatile boolean needsKeepAlive
;
117 private volatile long lastKeepAliveReceived
;
121 * Sends periodic heartbeats/keep-alives over both WebSockets to prevent connection timeouts. If
122 * either WebSocket fails 3 times to get a return heartbeat both are forced to be recreated.
124 private class KeepAliveSender
extends Thread
{
126 private volatile boolean shouldKeepRunning
= true;
129 identified
.lastKeepAliveReceived
= System
.currentTimeMillis();
130 unidentified
.lastKeepAliveReceived
= System
.currentTimeMillis();
132 while (shouldKeepRunning
&& isKeepAliveNecessary()) {
134 sleepTimer
.sleep(KEEP_ALIVE_SEND_CADENCE
);
136 if (shouldKeepRunning
&& isKeepAliveNecessary()) {
137 long keepAliveRequiredSinceTime
= System
.currentTimeMillis()
138 - MAX_TIME_SINCE_SUCCESSFUL_KEEP_ALIVE
;
140 if (identified
.lastKeepAliveReceived
< keepAliveRequiredSinceTime
141 || unidentified
.lastKeepAliveReceived
< keepAliveRequiredSinceTime
) {
142 logger
.warn("Missed keep alives, identified last: "
143 + identified
.lastKeepAliveReceived
144 + " unidentified last: "
145 + unidentified
.lastKeepAliveReceived
147 + keepAliveRequiredSinceTime
);
148 signalWebSocket
.forceNewWebSockets();
150 signalWebSocket
.sendKeepAlive();
153 } catch (Throwable e
) {
154 logger
.warn("Error occured in KeepAliveSender, ignoring ...", e
);
159 public void shutdown() {
160 shouldKeepRunning
= false;
164 private final static class HttpErrorTracker
{
166 private final long[] timestamps
;
167 private final long errorTimeRange
;
169 public HttpErrorTracker(int samples
, long errorTimeRange
) {
170 this.timestamps
= new long[samples
];
171 this.errorTimeRange
= errorTimeRange
;
174 public synchronized boolean addSample(long now
) {
175 long errorsMustBeAfter
= now
- errorTimeRange
;
179 for (int i
= 0; i
< timestamps
.length
; i
++) {
180 if (timestamps
[i
] < errorsMustBeAfter
) {
182 } else if (timestamps
[i
] != 0) {
186 if (timestamps
[i
] < timestamps
[minIndex
]) {
191 timestamps
[minIndex
] = now
;
193 if (count
>= timestamps
.length
) {
194 Arrays
.fill(timestamps
, 0);