]> nmode's Git Repositories - signal-cli/blob - lib/src/main/java/org/asamk/signal/manager/SignalWebSocketHealthMonitor.java
Rotate profile key after blocking a contact/group
[signal-cli] / lib / src / main / java / org / asamk / signal / manager / SignalWebSocketHealthMonitor.java
1 package org.asamk.signal.manager;
2
3 import org.slf4j.Logger;
4 import org.slf4j.LoggerFactory;
5 import org.whispersystems.signalservice.api.SignalWebSocket;
6 import org.whispersystems.signalservice.api.util.Preconditions;
7 import org.whispersystems.signalservice.api.util.SleepTimer;
8 import org.whispersystems.signalservice.api.websocket.HealthMonitor;
9 import org.whispersystems.signalservice.api.websocket.WebSocketConnectionState;
10 import org.whispersystems.signalservice.internal.websocket.WebSocketConnection;
11
12 import java.util.Arrays;
13 import java.util.concurrent.TimeUnit;
14
15 import io.reactivex.rxjava3.schedulers.Schedulers;
16
17 /**
18 * Monitors the health of the identified and unidentified WebSockets. If either one appears to be
19 * unhealthy, will trigger restarting both.
20 * <p>
21 * The monitor is also responsible for sending heartbeats/keep-alive messages to prevent
22 * timeouts.
23 */
24 final class SignalWebSocketHealthMonitor implements HealthMonitor {
25
26 private final static Logger logger = LoggerFactory.getLogger(SignalWebSocketHealthMonitor.class);
27
28 private static final long KEEP_ALIVE_SEND_CADENCE = TimeUnit.SECONDS.toMillis(WebSocketConnection.KEEPALIVE_TIMEOUT_SECONDS);
29 private static final long MAX_TIME_SINCE_SUCCESSFUL_KEEP_ALIVE = KEEP_ALIVE_SEND_CADENCE * 3;
30
31 private SignalWebSocket signalWebSocket;
32 private final SleepTimer sleepTimer;
33
34 private volatile KeepAliveSender keepAliveSender;
35
36 private final HealthState identified = new HealthState();
37 private final HealthState unidentified = new HealthState();
38
39 public SignalWebSocketHealthMonitor(SleepTimer sleepTimer) {
40 this.sleepTimer = sleepTimer;
41 }
42
43 public void monitor(SignalWebSocket signalWebSocket) {
44 Preconditions.checkNotNull(signalWebSocket);
45 Preconditions.checkArgument(this.signalWebSocket == null, "monitor can only be called once");
46
47 this.signalWebSocket = signalWebSocket;
48
49 //noinspection ResultOfMethodCallIgnored
50 signalWebSocket.getWebSocketState()
51 .subscribeOn(Schedulers.computation())
52 .observeOn(Schedulers.computation())
53 .distinctUntilChanged()
54 .subscribe(s -> onStateChange(s, identified));
55
56 //noinspection ResultOfMethodCallIgnored
57 signalWebSocket.getUnidentifiedWebSocketState()
58 .subscribeOn(Schedulers.computation())
59 .observeOn(Schedulers.computation())
60 .distinctUntilChanged()
61 .subscribe(s -> onStateChange(s, unidentified));
62 }
63
64 private synchronized void onStateChange(WebSocketConnectionState connectionState, HealthState healthState) {
65 switch (connectionState) {
66 case CONNECTED -> logger.debug("WebSocket is now connected");
67 case AUTHENTICATION_FAILED -> logger.debug("WebSocket authentication failed");
68 case FAILED -> logger.debug("WebSocket connection failed");
69 }
70
71 healthState.needsKeepAlive = connectionState == WebSocketConnectionState.CONNECTED;
72
73 if (keepAliveSender == null && isKeepAliveNecessary()) {
74 keepAliveSender = new KeepAliveSender();
75 keepAliveSender.start();
76 } else if (keepAliveSender != null && !isKeepAliveNecessary()) {
77 keepAliveSender.shutdown();
78 keepAliveSender = null;
79 }
80 }
81
82 @Override
83 public void onKeepAliveResponse(long sentTimestamp, boolean isIdentifiedWebSocket) {
84 if (isIdentifiedWebSocket) {
85 identified.lastKeepAliveReceived = System.currentTimeMillis();
86 } else {
87 unidentified.lastKeepAliveReceived = System.currentTimeMillis();
88 }
89 }
90
91 @Override
92 public void onMessageError(int status, boolean isIdentifiedWebSocket) {
93 if (status == 409) {
94 HealthState healthState = (isIdentifiedWebSocket ? identified : unidentified);
95 if (healthState.mismatchErrorTracker.addSample(System.currentTimeMillis())) {
96 logger.warn("Received too many mismatch device errors, forcing new websockets.");
97 signalWebSocket.forceNewWebSockets();
98 signalWebSocket.connect();
99 }
100 }
101 }
102
103 private boolean isKeepAliveNecessary() {
104 return identified.needsKeepAlive || unidentified.needsKeepAlive;
105 }
106
107 private static class HealthState {
108
109 private final HttpErrorTracker mismatchErrorTracker = new HttpErrorTracker(5, TimeUnit.MINUTES.toMillis(1));
110
111 private volatile boolean needsKeepAlive;
112 private volatile long lastKeepAliveReceived;
113 }
114
115 /**
116 * Sends periodic heartbeats/keep-alives over both WebSockets to prevent connection timeouts. If
117 * either WebSocket fails 3 times to get a return heartbeat both are forced to be recreated.
118 */
119 private class KeepAliveSender extends Thread {
120
121 private volatile boolean shouldKeepRunning = true;
122
123 public void run() {
124 identified.lastKeepAliveReceived = System.currentTimeMillis();
125 unidentified.lastKeepAliveReceived = System.currentTimeMillis();
126
127 while (shouldKeepRunning && isKeepAliveNecessary()) {
128 try {
129 sleepTimer.sleep(KEEP_ALIVE_SEND_CADENCE);
130
131 if (shouldKeepRunning && isKeepAliveNecessary()) {
132 long keepAliveRequiredSinceTime = System.currentTimeMillis()
133 - MAX_TIME_SINCE_SUCCESSFUL_KEEP_ALIVE;
134
135 if (identified.lastKeepAliveReceived < keepAliveRequiredSinceTime
136 || unidentified.lastKeepAliveReceived < keepAliveRequiredSinceTime) {
137 logger.warn("Missed keep alives, identified last: "
138 + identified.lastKeepAliveReceived
139 + " unidentified last: "
140 + unidentified.lastKeepAliveReceived
141 + " needed by: "
142 + keepAliveRequiredSinceTime);
143 signalWebSocket.forceNewWebSockets();
144 signalWebSocket.connect();
145 } else {
146 signalWebSocket.sendKeepAlive();
147 }
148 }
149 } catch (Throwable e) {
150 logger.warn("Error occured in KeepAliveSender, ignoring ...", e);
151 }
152 }
153 }
154
155 public void shutdown() {
156 shouldKeepRunning = false;
157 }
158 }
159
160 private final static class HttpErrorTracker {
161
162 private final long[] timestamps;
163 private final long errorTimeRange;
164
165 public HttpErrorTracker(int samples, long errorTimeRange) {
166 this.timestamps = new long[samples];
167 this.errorTimeRange = errorTimeRange;
168 }
169
170 public synchronized boolean addSample(long now) {
171 long errorsMustBeAfter = now - errorTimeRange;
172 int count = 1;
173 int minIndex = 0;
174
175 for (int i = 0; i < timestamps.length; i++) {
176 if (timestamps[i] < errorsMustBeAfter) {
177 timestamps[i] = 0;
178 } else if (timestamps[i] != 0) {
179 count++;
180 }
181
182 if (timestamps[i] < timestamps[minIndex]) {
183 minIndex = i;
184 }
185 }
186
187 timestamps[minIndex] = now;
188
189 if (count >= timestamps.length) {
190 Arrays.fill(timestamps, 0);
191 return true;
192 }
193 return false;
194 }
195 }
196 }