]> nmode's Git Repositories - signal-cli/blob - lib/src/main/java/org/asamk/signal/manager/internal/SignalWebSocketHealthMonitor.java
b5b855acd83099419711aa27fa361f6730be572a
[signal-cli] / lib / src / main / java / org / asamk / signal / manager / internal / SignalWebSocketHealthMonitor.java
1 package org.asamk.signal.manager.internal;
2
3 import org.slf4j.Logger;
4 import org.slf4j.LoggerFactory;
5 import org.whispersystems.signalservice.api.util.Preconditions;
6 import org.whispersystems.signalservice.api.util.SleepTimer;
7 import org.whispersystems.signalservice.api.websocket.HealthMonitor;
8 import org.whispersystems.signalservice.api.websocket.SignalWebSocket;
9 import org.whispersystems.signalservice.api.websocket.WebSocketConnectionState;
10 import org.whispersystems.signalservice.internal.websocket.OkHttpWebSocketConnection;
11
12 import java.util.concurrent.Executor;
13 import java.util.concurrent.Executors;
14 import java.util.concurrent.TimeUnit;
15
16 import io.reactivex.rxjava3.schedulers.Schedulers;
17 import kotlin.Unit;
18
19 final class SignalWebSocketHealthMonitor implements HealthMonitor {
20
21 private static final Logger logger = LoggerFactory.getLogger(SignalWebSocketHealthMonitor.class);
22
23 /**
24 * This is the amount of time in between sent keep alives. Must be greater than [KEEP_ALIVE_TIMEOUT]
25 */
26 private static final long KEEP_ALIVE_SEND_CADENCE = TimeUnit.SECONDS.toMillis(OkHttpWebSocketConnection.KEEPALIVE_FREQUENCY_SECONDS);
27
28 /**
29 * This is the amount of time we will wait for a response to the keep alive before we consider the websockets dead.
30 * It is required that this value be less than [KEEP_ALIVE_SEND_CADENCE]
31 */
32 private static final long KEEP_ALIVE_TIMEOUT = TimeUnit.SECONDS.toMillis(20);
33
34 private final Executor executor = Executors.newSingleThreadExecutor();
35 private final SleepTimer sleepTimer;
36 private SignalWebSocket webSocket = null;
37 private volatile KeepAliveSender keepAliveSender = null;
38 private boolean needsKeepAlive = false;
39 private long lastKeepAliveReceived = 0;
40
41 public SignalWebSocketHealthMonitor(SleepTimer sleepTimer) {
42 this.sleepTimer = sleepTimer;
43 }
44
45 void monitor(SignalWebSocket webSocket) {
46 Preconditions.checkNotNull(webSocket);
47 Preconditions.checkArgument(this.webSocket == null, "monitor can only be called once");
48
49 executor.execute(() -> {
50
51 this.webSocket = webSocket;
52
53 webSocket.getState()
54 .subscribeOn(Schedulers.computation())
55 .observeOn(Schedulers.computation())
56 .distinctUntilChanged()
57 .subscribe(this::onStateChanged);
58
59 webSocket.setKeepAliveChangedListener(this::updateKeepAliveSenderStatus);
60 });
61 }
62
63 private void onStateChanged(WebSocketConnectionState connectionState) {
64 executor.execute(() -> {
65 needsKeepAlive = connectionState == WebSocketConnectionState.CONNECTED;
66
67 updateKeepAliveSenderStatus();
68 });
69 }
70
71 @Override
72 public void onKeepAliveResponse(long sentTimestamp, boolean isIdentifiedWebSocket) {
73 final var keepAliveTime = System.currentTimeMillis();
74 executor.execute(() -> lastKeepAliveReceived = keepAliveTime);
75 }
76
77 @Override
78 public void onMessageError(int status, boolean isIdentifiedWebSocket) {
79 }
80
81 private Unit updateKeepAliveSenderStatus() {
82 if (keepAliveSender == null && sendKeepAlives()) {
83 keepAliveSender = new KeepAliveSender();
84 keepAliveSender.start();
85 } else if (keepAliveSender != null && !sendKeepAlives()) {
86 keepAliveSender.shutdown();
87 keepAliveSender = null;
88 }
89 return Unit.INSTANCE;
90 }
91
92 private boolean sendKeepAlives() {
93 return needsKeepAlive && webSocket != null && webSocket.getShouldSendKeepAlives();
94 }
95
96 /**
97 * Sends periodic heartbeats/keep-alives over the WebSocket to prevent connection timeouts. If
98 * the WebSocket fails to get a return heartbeat after [KEEP_ALIVE_TIMEOUT] seconds, it is forced to be recreated.
99 */
100 private final class KeepAliveSender extends Thread {
101
102 private volatile boolean shouldKeepRunning = true;
103
104 @Override
105 public void run() {
106 logger.debug("[KeepAliveSender({})] started", this.threadId());
107 lastKeepAliveReceived = System.currentTimeMillis();
108
109 var keepAliveSendTime = System.currentTimeMillis();
110 while (shouldKeepRunning && sendKeepAlives()) {
111 try {
112 final var nextKeepAliveSendTime = keepAliveSendTime + KEEP_ALIVE_SEND_CADENCE;
113 sleepUntil(nextKeepAliveSendTime);
114
115 if (shouldKeepRunning && sendKeepAlives()) {
116 keepAliveSendTime = System.currentTimeMillis();
117 webSocket.sendKeepAlive();
118 }
119
120 final var responseRequiredTime = keepAliveSendTime + KEEP_ALIVE_TIMEOUT;
121 sleepUntil(responseRequiredTime);
122
123 if (shouldKeepRunning && sendKeepAlives()) {
124 if (lastKeepAliveReceived < keepAliveSendTime) {
125 logger.debug("Missed keep alive, last: {} needed by: {}",
126 lastKeepAliveReceived,
127 responseRequiredTime);
128 webSocket.forceNewWebSocket();
129 }
130 }
131 } catch (Throwable e) {
132 logger.warn("Keep alive sender failed", e);
133 }
134 }
135 logger.debug("[KeepAliveSender({})] ended", threadId());
136 }
137
138 void sleepUntil(long timeMillis) {
139 while (System.currentTimeMillis() < timeMillis) {
140 final var waitTime = timeMillis - System.currentTimeMillis();
141 if (waitTime > 0) {
142 try {
143 sleepTimer.sleep(waitTime);
144 } catch (InterruptedException e) {
145 logger.warn("WebSocket health monitor interrupted", e);
146 }
147 }
148 }
149 }
150
151 void shutdown() {
152 shouldKeepRunning = false;
153 }
154 }
155 }
156