]> nmode's Git Repositories - signal-cli/blob - lib/src/main/java/org/asamk/signal/manager/helper/ReceiveHelper.java
d3c1ef5629e135a6cb27597f095497368ce00d09
[signal-cli] / lib / src / main / java / org / asamk / signal / manager / helper / ReceiveHelper.java
1 package org.asamk.signal.manager.helper;
2
3 import org.asamk.signal.manager.Manager;
4 import org.asamk.signal.manager.SignalDependencies;
5 import org.asamk.signal.manager.actions.HandleAction;
6 import org.asamk.signal.manager.api.ReceiveConfig;
7 import org.asamk.signal.manager.api.UntrustedIdentityException;
8 import org.asamk.signal.manager.storage.SignalAccount;
9 import org.asamk.signal.manager.storage.messageCache.CachedMessage;
10 import org.slf4j.Logger;
11 import org.slf4j.LoggerFactory;
12 import org.whispersystems.signalservice.api.messages.SignalServiceEnvelope;
13 import org.whispersystems.signalservice.api.websocket.WebSocketConnectionState;
14 import org.whispersystems.signalservice.api.websocket.WebSocketUnavailableException;
15
16 import java.io.IOException;
17 import java.time.Duration;
18 import java.util.Collection;
19 import java.util.HashMap;
20 import java.util.HashSet;
21 import java.util.List;
22 import java.util.Map;
23 import java.util.Set;
24 import java.util.concurrent.TimeoutException;
25
26 import io.reactivex.rxjava3.core.Observable;
27 import io.reactivex.rxjava3.schedulers.Schedulers;
28
29 public class ReceiveHelper {
30
31 private final static Logger logger = LoggerFactory.getLogger(ReceiveHelper.class);
32 private final static int MAX_BACKOFF_COUNTER = 9;
33
34 private final SignalAccount account;
35 private final SignalDependencies dependencies;
36 private final Context context;
37
38 private ReceiveConfig receiveConfig = new ReceiveConfig(false, false);
39 private boolean needsToRetryFailedMessages = false;
40 private boolean hasCaughtUpWithOldMessages = false;
41 private boolean isWaitingForMessage = false;
42 private boolean shouldStop = false;
43 private Callable authenticationFailureListener;
44 private Callable caughtUpWithOldMessagesListener;
45
46 public ReceiveHelper(final Context context) {
47 this.account = context.getAccount();
48 this.dependencies = context.getDependencies();
49 this.context = context;
50 }
51
52 public void setReceiveConfig(final ReceiveConfig receiveConfig) {
53 this.receiveConfig = receiveConfig;
54 }
55
56 public void setNeedsToRetryFailedMessages(final boolean needsToRetryFailedMessages) {
57 this.needsToRetryFailedMessages = needsToRetryFailedMessages;
58 }
59
60 public boolean hasCaughtUpWithOldMessages() {
61 return hasCaughtUpWithOldMessages;
62 }
63
64 public void setAuthenticationFailureListener(final Callable authenticationFailureListener) {
65 this.authenticationFailureListener = authenticationFailureListener;
66 }
67
68 public void setCaughtUpWithOldMessagesListener(final Callable caughtUpWithOldMessagesListener) {
69 this.caughtUpWithOldMessagesListener = caughtUpWithOldMessagesListener;
70 }
71
72 public boolean requestStopReceiveMessages() {
73 this.shouldStop = true;
74 return isWaitingForMessage;
75 }
76
77 public void receiveMessagesContinuously(Manager.ReceiveMessageHandler handler) {
78 while (!shouldStop) {
79 try {
80 receiveMessages(Duration.ofMinutes(1), false, handler);
81 break;
82 } catch (IOException e) {
83 logger.warn("Receiving messages failed, retrying", e);
84 }
85 }
86 }
87
88 public void receiveMessages(
89 Duration timeout, boolean returnOnTimeout, Manager.ReceiveMessageHandler handler
90 ) throws IOException {
91 needsToRetryFailedMessages = true;
92 hasCaughtUpWithOldMessages = false;
93
94 // Use a Map here because java Set doesn't have a get method ...
95 Map<HandleAction, HandleAction> queuedActions = new HashMap<>();
96
97 final var signalWebSocket = dependencies.getSignalWebSocket();
98 final var webSocketStateDisposable = Observable.merge(signalWebSocket.getUnidentifiedWebSocketState(),
99 signalWebSocket.getWebSocketState())
100 .subscribeOn(Schedulers.computation())
101 .observeOn(Schedulers.computation())
102 .distinctUntilChanged()
103 .subscribe(this::onWebSocketStateChange);
104 signalWebSocket.connect();
105
106 try {
107 receiveMessagesInternal(timeout, returnOnTimeout, handler, queuedActions);
108 } finally {
109 hasCaughtUpWithOldMessages = false;
110 handleQueuedActions(queuedActions.keySet());
111 queuedActions.clear();
112 dependencies.getSignalWebSocket().disconnect();
113 webSocketStateDisposable.dispose();
114 shouldStop = false;
115 }
116 }
117
118 private void receiveMessagesInternal(
119 Duration timeout,
120 boolean returnOnTimeout,
121 Manager.ReceiveMessageHandler handler,
122 final Map<HandleAction, HandleAction> queuedActions
123 ) throws IOException {
124 final var signalWebSocket = dependencies.getSignalWebSocket();
125
126 var backOffCounter = 0;
127 isWaitingForMessage = false;
128
129 while (!shouldStop) {
130 if (needsToRetryFailedMessages) {
131 retryFailedReceivedMessages(handler);
132 needsToRetryFailedMessages = false;
133 }
134 SignalServiceEnvelope envelope;
135 final CachedMessage[] cachedMessage = {null};
136 final var nowMillis = System.currentTimeMillis();
137 if (nowMillis - account.getLastReceiveTimestamp() > 60000) {
138 account.setLastReceiveTimestamp(nowMillis);
139 }
140 logger.debug("Checking for new message from server");
141 try {
142 isWaitingForMessage = true;
143 var result = signalWebSocket.readOrEmpty(timeout.toMillis(), envelope1 -> {
144 isWaitingForMessage = false;
145 final var recipientId = envelope1.hasSourceUuid() ? account.getRecipientResolver()
146 .resolveRecipient(envelope1.getSourceAddress()) : null;
147 logger.trace("Storing new message from {}", recipientId);
148 // store message on disk, before acknowledging receipt to the server
149 cachedMessage[0] = account.getMessageCache().cacheMessage(envelope1, recipientId);
150 });
151 isWaitingForMessage = false;
152 backOffCounter = 0;
153
154 if (result.isPresent()) {
155 envelope = result.get();
156 logger.debug("New message received from server");
157 } else {
158 logger.debug("Received indicator that server queue is empty");
159 handleQueuedActions(queuedActions.keySet());
160 queuedActions.clear();
161
162 hasCaughtUpWithOldMessages = true;
163 caughtUpWithOldMessagesListener.call();
164
165 // Continue to wait another timeout for new messages
166 continue;
167 }
168 } catch (AssertionError e) {
169 if (e.getCause() instanceof InterruptedException) {
170 break;
171 } else {
172 throw e;
173 }
174 } catch (IOException e) {
175 logger.debug("Pipe unexpectedly unavailable: {}", e.getMessage());
176 if (e instanceof WebSocketUnavailableException || "Connection closed!".equals(e.getMessage())) {
177 final var sleepMilliseconds = 100 * (long) Math.pow(2, backOffCounter);
178 backOffCounter = Math.min(backOffCounter + 1, MAX_BACKOFF_COUNTER);
179 logger.warn("Connection closed unexpectedly, reconnecting in {} ms", sleepMilliseconds);
180 try {
181 Thread.sleep(sleepMilliseconds);
182 } catch (InterruptedException interruptedException) {
183 return;
184 }
185 hasCaughtUpWithOldMessages = false;
186 signalWebSocket.connect();
187 continue;
188 }
189 throw e;
190 } catch (TimeoutException e) {
191 backOffCounter = 0;
192 if (returnOnTimeout) return;
193 continue;
194 }
195
196 final var result = context.getIncomingMessageHandler().handleEnvelope(envelope, receiveConfig, handler);
197 for (final var h : result.first()) {
198 final var existingAction = queuedActions.get(h);
199 if (existingAction == null) {
200 queuedActions.put(h, h);
201 } else {
202 existingAction.mergeOther(h);
203 }
204 }
205 final var exception = result.second();
206
207 if (hasCaughtUpWithOldMessages) {
208 handleQueuedActions(queuedActions.keySet());
209 queuedActions.clear();
210 }
211 if (cachedMessage[0] != null) {
212 if (exception instanceof UntrustedIdentityException) {
213 logger.debug("Keeping message with untrusted identity in message cache");
214 final var address = ((UntrustedIdentityException) exception).getSender();
215 final var recipientId = account.getRecipientResolver().resolveRecipient(address);
216 if (!envelope.hasSourceUuid()) {
217 try {
218 cachedMessage[0] = account.getMessageCache().replaceSender(cachedMessage[0], recipientId);
219 } catch (IOException ioException) {
220 logger.warn("Failed to move cached message to recipient folder: {}",
221 ioException.getMessage());
222 }
223 }
224 } else {
225 cachedMessage[0].delete();
226 }
227 }
228 }
229 }
230
231 private void retryFailedReceivedMessages(Manager.ReceiveMessageHandler handler) {
232 Set<HandleAction> queuedActions = new HashSet<>();
233 for (var cachedMessage : account.getMessageCache().getCachedMessages()) {
234 var actions = retryFailedReceivedMessage(handler, cachedMessage);
235 if (actions != null) {
236 queuedActions.addAll(actions);
237 }
238 }
239 handleQueuedActions(queuedActions);
240 }
241
242 private List<HandleAction> retryFailedReceivedMessage(
243 final Manager.ReceiveMessageHandler handler, final CachedMessage cachedMessage
244 ) {
245 var envelope = cachedMessage.loadEnvelope();
246 if (envelope == null) {
247 cachedMessage.delete();
248 return null;
249 }
250
251 final var result = context.getIncomingMessageHandler().handleRetryEnvelope(envelope, receiveConfig, handler);
252 final var actions = result.first();
253 final var exception = result.second();
254
255 if (exception instanceof UntrustedIdentityException) {
256 if (System.currentTimeMillis() - envelope.getServerDeliveredTimestamp() > 1000L * 60 * 60 * 24 * 30) {
257 // Envelope is more than a month old, cleaning up.
258 cachedMessage.delete();
259 return null;
260 }
261 if (!envelope.hasSourceUuid()) {
262 final var identifier = ((UntrustedIdentityException) exception).getSender();
263 final var recipientId = account.getRecipientResolver().resolveRecipient(identifier);
264 try {
265 account.getMessageCache().replaceSender(cachedMessage, recipientId);
266 } catch (IOException ioException) {
267 logger.warn("Failed to move cached message to recipient folder: {}", ioException.getMessage());
268 }
269 }
270 return null;
271 }
272
273 // If successful and for all other errors that are not recoverable, delete the cached message
274 cachedMessage.delete();
275 return actions;
276 }
277
278 private void handleQueuedActions(final Collection<HandleAction> queuedActions) {
279 logger.debug("Handling message actions");
280 for (var action : queuedActions) {
281 logger.debug("Executing action {}", action.getClass().getSimpleName());
282 try {
283 action.execute(context);
284 } catch (Throwable e) {
285 logger.warn("Message action failed.", e);
286 }
287 }
288 }
289
290 private void onWebSocketStateChange(final WebSocketConnectionState s) {
291 if (s.equals(WebSocketConnectionState.AUTHENTICATION_FAILED)) {
292 account.setRegistered(false);
293 authenticationFailureListener.call();
294 }
295 }
296
297 public interface Callable {
298
299 void call();
300 }
301 }