]> nmode's Git Repositories - signal-cli/blob - lib/src/main/java/org/asamk/signal/manager/SignalWebSocketHealthMonitor.java
Don't handle blocked or forbidden messages
[signal-cli] / lib / src / main / java / org / asamk / signal / manager / SignalWebSocketHealthMonitor.java
1 package org.asamk.signal.manager;
2
3 import org.slf4j.Logger;
4 import org.slf4j.LoggerFactory;
5 import org.whispersystems.libsignal.util.guava.Preconditions;
6 import org.whispersystems.signalservice.api.SignalWebSocket;
7 import org.whispersystems.signalservice.api.util.SleepTimer;
8 import org.whispersystems.signalservice.api.websocket.HealthMonitor;
9 import org.whispersystems.signalservice.api.websocket.WebSocketConnectionState;
10 import org.whispersystems.signalservice.internal.websocket.WebSocketConnection;
11
12 import java.util.Arrays;
13 import java.util.concurrent.TimeUnit;
14
15 import io.reactivex.rxjava3.schedulers.Schedulers;
16
17 /**
18 * Monitors the health of the identified and unidentified WebSockets. If either one appears to be
19 * unhealthy, will trigger restarting both.
20 * <p>
21 * The monitor is also responsible for sending heartbeats/keep-alive messages to prevent
22 * timeouts.
23 */
24 public final class SignalWebSocketHealthMonitor implements HealthMonitor {
25
26 private final static Logger logger = LoggerFactory.getLogger(SignalWebSocketHealthMonitor.class);
27
28 private static final long KEEP_ALIVE_SEND_CADENCE = TimeUnit.SECONDS.toMillis(WebSocketConnection.KEEPALIVE_TIMEOUT_SECONDS);
29 private static final long MAX_TIME_SINCE_SUCCESSFUL_KEEP_ALIVE = KEEP_ALIVE_SEND_CADENCE * 3;
30
31 private SignalWebSocket signalWebSocket;
32 private final SleepTimer sleepTimer;
33
34 private volatile KeepAliveSender keepAliveSender;
35
36 private final HealthState identified = new HealthState();
37 private final HealthState unidentified = new HealthState();
38
39 public SignalWebSocketHealthMonitor(SleepTimer sleepTimer) {
40 this.sleepTimer = sleepTimer;
41 }
42
43 public void monitor(SignalWebSocket signalWebSocket) {
44 Preconditions.checkNotNull(signalWebSocket);
45 Preconditions.checkArgument(this.signalWebSocket == null, "monitor can only be called once");
46
47 this.signalWebSocket = signalWebSocket;
48
49 //noinspection ResultOfMethodCallIgnored
50 signalWebSocket.getWebSocketState()
51 .subscribeOn(Schedulers.computation())
52 .observeOn(Schedulers.computation())
53 .distinctUntilChanged()
54 .subscribe(s -> onStateChange(s, identified));
55
56 //noinspection ResultOfMethodCallIgnored
57 signalWebSocket.getUnidentifiedWebSocketState()
58 .subscribeOn(Schedulers.computation())
59 .observeOn(Schedulers.computation())
60 .distinctUntilChanged()
61 .subscribe(s -> onStateChange(s, unidentified));
62 }
63
64 private synchronized void onStateChange(WebSocketConnectionState connectionState, HealthState healthState) {
65 switch (connectionState) {
66 case CONNECTED:
67 logger.debug("WebSocket is now connected");
68 break;
69 case AUTHENTICATION_FAILED:
70 logger.debug("WebSocket authentication failed");
71 break;
72 case FAILED:
73 logger.debug("WebSocket connection failed");
74 break;
75 }
76
77 healthState.needsKeepAlive = connectionState == WebSocketConnectionState.CONNECTED;
78
79 if (keepAliveSender == null && isKeepAliveNecessary()) {
80 keepAliveSender = new KeepAliveSender();
81 keepAliveSender.start();
82 } else if (keepAliveSender != null && !isKeepAliveNecessary()) {
83 keepAliveSender.shutdown();
84 keepAliveSender = null;
85 }
86 }
87
88 @Override
89 public void onKeepAliveResponse(long sentTimestamp, boolean isIdentifiedWebSocket) {
90 if (isIdentifiedWebSocket) {
91 identified.lastKeepAliveReceived = System.currentTimeMillis();
92 } else {
93 unidentified.lastKeepAliveReceived = System.currentTimeMillis();
94 }
95 }
96
97 @Override
98 public void onMessageError(int status, boolean isIdentifiedWebSocket) {
99 if (status == 409) {
100 HealthState healthState = (isIdentifiedWebSocket ? identified : unidentified);
101 if (healthState.mismatchErrorTracker.addSample(System.currentTimeMillis())) {
102 logger.warn("Received too many mismatch device errors, forcing new websockets.");
103 signalWebSocket.forceNewWebSockets();
104 }
105 }
106 }
107
108 private boolean isKeepAliveNecessary() {
109 return identified.needsKeepAlive || unidentified.needsKeepAlive;
110 }
111
112 private static class HealthState {
113
114 private final HttpErrorTracker mismatchErrorTracker = new HttpErrorTracker(5, TimeUnit.MINUTES.toMillis(1));
115
116 private volatile boolean needsKeepAlive;
117 private volatile long lastKeepAliveReceived;
118 }
119
120 /**
121 * Sends periodic heartbeats/keep-alives over both WebSockets to prevent connection timeouts. If
122 * either WebSocket fails 3 times to get a return heartbeat both are forced to be recreated.
123 */
124 private class KeepAliveSender extends Thread {
125
126 private volatile boolean shouldKeepRunning = true;
127
128 public void run() {
129 identified.lastKeepAliveReceived = System.currentTimeMillis();
130 unidentified.lastKeepAliveReceived = System.currentTimeMillis();
131
132 while (shouldKeepRunning && isKeepAliveNecessary()) {
133 try {
134 sleepTimer.sleep(KEEP_ALIVE_SEND_CADENCE);
135
136 if (shouldKeepRunning && isKeepAliveNecessary()) {
137 long keepAliveRequiredSinceTime = System.currentTimeMillis()
138 - MAX_TIME_SINCE_SUCCESSFUL_KEEP_ALIVE;
139
140 if (identified.lastKeepAliveReceived < keepAliveRequiredSinceTime
141 || unidentified.lastKeepAliveReceived < keepAliveRequiredSinceTime) {
142 logger.warn("Missed keep alives, identified last: "
143 + identified.lastKeepAliveReceived
144 + " unidentified last: "
145 + unidentified.lastKeepAliveReceived
146 + " needed by: "
147 + keepAliveRequiredSinceTime);
148 signalWebSocket.forceNewWebSockets();
149 } else {
150 signalWebSocket.sendKeepAlive();
151 }
152 }
153 } catch (Throwable e) {
154 logger.warn("Error occured in KeepAliveSender, ignoring ...", e);
155 }
156 }
157 }
158
159 public void shutdown() {
160 shouldKeepRunning = false;
161 }
162 }
163
164 private final static class HttpErrorTracker {
165
166 private final long[] timestamps;
167 private final long errorTimeRange;
168
169 public HttpErrorTracker(int samples, long errorTimeRange) {
170 this.timestamps = new long[samples];
171 this.errorTimeRange = errorTimeRange;
172 }
173
174 public synchronized boolean addSample(long now) {
175 long errorsMustBeAfter = now - errorTimeRange;
176 int count = 1;
177 int minIndex = 0;
178
179 for (int i = 0; i < timestamps.length; i++) {
180 if (timestamps[i] < errorsMustBeAfter) {
181 timestamps[i] = 0;
182 } else if (timestamps[i] != 0) {
183 count++;
184 }
185
186 if (timestamps[i] < timestamps[minIndex]) {
187 minIndex = i;
188 }
189 }
190
191 timestamps[minIndex] = now;
192
193 if (count >= timestamps.length) {
194 Arrays.fill(timestamps, 0);
195 return true;
196 }
197 return false;
198 }
199 }
200 }