]> nmode's Git Repositories - signal-cli/blob - lib/src/main/java/org/asamk/signal/manager/helper/ReceiveHelper.java
713ca5d744aed1af492316610fa75f0abfe99429
[signal-cli] / lib / src / main / java / org / asamk / signal / manager / helper / ReceiveHelper.java
1 package org.asamk.signal.manager.helper;
2
3 import org.asamk.signal.manager.Manager;
4 import org.asamk.signal.manager.actions.HandleAction;
5 import org.asamk.signal.manager.api.ReceiveConfig;
6 import org.asamk.signal.manager.api.UntrustedIdentityException;
7 import org.asamk.signal.manager.internal.SignalDependencies;
8 import org.asamk.signal.manager.storage.SignalAccount;
9 import org.asamk.signal.manager.storage.messageCache.CachedMessage;
10 import org.asamk.signal.manager.storage.recipients.RecipientAddress;
11 import org.slf4j.Logger;
12 import org.slf4j.LoggerFactory;
13 import org.whispersystems.signalservice.api.SignalWebSocket;
14 import org.whispersystems.signalservice.api.messages.SignalServiceEnvelope;
15 import org.whispersystems.signalservice.api.push.ServiceId;
16 import org.whispersystems.signalservice.api.push.ServiceId.ACI;
17 import org.whispersystems.signalservice.api.websocket.WebSocketConnectionState;
18 import org.whispersystems.signalservice.api.websocket.WebSocketUnavailableException;
19
20 import java.io.IOException;
21 import java.time.Duration;
22 import java.util.Collection;
23 import java.util.HashMap;
24 import java.util.HashSet;
25 import java.util.List;
26 import java.util.Map;
27 import java.util.Set;
28 import java.util.concurrent.TimeoutException;
29
30 import io.reactivex.rxjava3.core.Observable;
31 import io.reactivex.rxjava3.schedulers.Schedulers;
32
33 public class ReceiveHelper {
34
35 private static final Logger logger = LoggerFactory.getLogger(ReceiveHelper.class);
36 private static final int MAX_BACKOFF_COUNTER = 9;
37
38 private final SignalAccount account;
39 private final SignalDependencies dependencies;
40 private final Context context;
41
42 private ReceiveConfig receiveConfig = new ReceiveConfig(false, false, false);
43 private boolean needsToRetryFailedMessages = false;
44 private boolean hasCaughtUpWithOldMessages = false;
45 private boolean isWaitingForMessage = false;
46 private boolean shouldStop = false;
47 private Callable authenticationFailureListener;
48 private Callable caughtUpWithOldMessagesListener;
49
50 public ReceiveHelper(final Context context) {
51 this.account = context.getAccount();
52 this.dependencies = context.getDependencies();
53 this.context = context;
54 }
55
56 public void setReceiveConfig(final ReceiveConfig receiveConfig) {
57 this.receiveConfig = receiveConfig;
58 dependencies.setAllowStories(!receiveConfig.ignoreStories());
59 }
60
61 public void setNeedsToRetryFailedMessages(final boolean needsToRetryFailedMessages) {
62 this.needsToRetryFailedMessages = needsToRetryFailedMessages;
63 }
64
65 public void setAuthenticationFailureListener(final Callable authenticationFailureListener) {
66 this.authenticationFailureListener = authenticationFailureListener;
67 }
68
69 public void setCaughtUpWithOldMessagesListener(final Callable caughtUpWithOldMessagesListener) {
70 this.caughtUpWithOldMessagesListener = caughtUpWithOldMessagesListener;
71 }
72
73 public boolean requestStopReceiveMessages() {
74 this.shouldStop = true;
75 return isWaitingForMessage;
76 }
77
78 public void receiveMessagesContinuously(Manager.ReceiveMessageHandler handler) {
79 while (!shouldStop) {
80 try {
81 receiveMessages(Duration.ofMinutes(1), false, null, handler);
82 break;
83 } catch (IOException e) {
84 logger.warn("Receiving messages failed, retrying", e);
85 }
86 }
87 }
88
89 public void receiveMessages(
90 Duration timeout, boolean returnOnTimeout, Integer maxMessages, Manager.ReceiveMessageHandler handler
91 ) throws IOException {
92 needsToRetryFailedMessages = true;
93 hasCaughtUpWithOldMessages = false;
94
95 // Use a Map here because java Set doesn't have a get method ...
96 Map<HandleAction, HandleAction> queuedActions = new HashMap<>();
97
98 final var signalWebSocket = dependencies.getSignalWebSocket();
99 final var webSocketStateDisposable = Observable.merge(signalWebSocket.getUnidentifiedWebSocketState(),
100 signalWebSocket.getWebSocketState())
101 .subscribeOn(Schedulers.computation())
102 .observeOn(Schedulers.computation())
103 .distinctUntilChanged()
104 .subscribe(this::onWebSocketStateChange);
105 signalWebSocket.connect();
106
107 try {
108 receiveMessagesInternal(signalWebSocket, timeout, returnOnTimeout, maxMessages, handler, queuedActions);
109 } finally {
110 hasCaughtUpWithOldMessages = false;
111 handleQueuedActions(queuedActions.keySet());
112 queuedActions.clear();
113 signalWebSocket.disconnect();
114 webSocketStateDisposable.dispose();
115 shouldStop = false;
116 }
117 }
118
119 private void receiveMessagesInternal(
120 final SignalWebSocket signalWebSocket,
121 Duration timeout,
122 boolean returnOnTimeout,
123 Integer maxMessages,
124 Manager.ReceiveMessageHandler handler,
125 final Map<HandleAction, HandleAction> queuedActions
126 ) throws IOException {
127 int remainingMessages = maxMessages == null ? -1 : maxMessages;
128 var backOffCounter = 0;
129 isWaitingForMessage = false;
130
131 while (!shouldStop && remainingMessages != 0) {
132 if (needsToRetryFailedMessages) {
133 retryFailedReceivedMessages(handler);
134 needsToRetryFailedMessages = false;
135 }
136 SignalServiceEnvelope envelope;
137 final CachedMessage[] cachedMessage = {null};
138 final var nowMillis = System.currentTimeMillis();
139 if (nowMillis - account.getLastReceiveTimestamp() > 60000) {
140 account.setLastReceiveTimestamp(nowMillis);
141 }
142 logger.debug("Checking for new message from server");
143 try {
144 isWaitingForMessage = true;
145 var queueNotEmpty = signalWebSocket.readMessageBatch(timeout.toMillis(), 1, batch -> {
146 logger.debug("Retrieved {} envelopes!", batch.size());
147 isWaitingForMessage = false;
148 for (final var it : batch) {
149 SignalServiceEnvelope envelope1 = new SignalServiceEnvelope(it.getEnvelope(),
150 it.getServerDeliveredTimestamp());
151 final var recipientId = envelope1.getSourceServiceId()
152 .map(ServiceId::parseOrNull)
153 .map(s -> account.getRecipientResolver().resolveRecipient(s))
154 .orElse(null);
155 logger.trace("Storing new message from {}", recipientId);
156 // store message on disk, before acknowledging receipt to the server
157 cachedMessage[0] = account.getMessageCache().cacheMessage(envelope1, recipientId);
158 try {
159 signalWebSocket.sendAck(it);
160 } catch (IOException e) {
161 logger.warn("Failed to ack envelope to server after storing it: {}", e.getMessage());
162 }
163 }
164 });
165 isWaitingForMessage = false;
166 backOffCounter = 0;
167
168 if (queueNotEmpty) {
169 if (remainingMessages > 0) {
170 remainingMessages -= 1;
171 }
172 envelope = cachedMessage[0].loadEnvelope();
173 logger.debug("New message received from server");
174 } else {
175 logger.debug("Received indicator that server queue is empty");
176 handleQueuedActions(queuedActions.keySet());
177 queuedActions.clear();
178
179 hasCaughtUpWithOldMessages = true;
180 caughtUpWithOldMessagesListener.call();
181
182 // Continue to wait another timeout for new messages
183 continue;
184 }
185 } catch (AssertionError e) {
186 if (e.getCause() instanceof InterruptedException) {
187 break;
188 } else {
189 throw e;
190 }
191 } catch (IOException e) {
192 logger.debug("Pipe unexpectedly unavailable: {}", e.getMessage());
193 if (e instanceof WebSocketUnavailableException || "Connection closed!".equals(e.getMessage())) {
194 final var sleepMilliseconds = 100 * (long) Math.pow(2, backOffCounter);
195 backOffCounter = Math.min(backOffCounter + 1, MAX_BACKOFF_COUNTER);
196 logger.warn("Connection closed unexpectedly, reconnecting in {} ms", sleepMilliseconds);
197 try {
198 Thread.sleep(sleepMilliseconds);
199 } catch (InterruptedException interruptedException) {
200 return;
201 }
202 hasCaughtUpWithOldMessages = false;
203 signalWebSocket.connect();
204 continue;
205 }
206 throw e;
207 } catch (TimeoutException e) {
208 backOffCounter = 0;
209 if (returnOnTimeout) return;
210 continue;
211 } catch (Exception e) {
212 logger.error("Unknown error when receiving messages", e);
213 continue;
214 }
215
216 try {
217 final var result = context.getIncomingMessageHandler().handleEnvelope(envelope, receiveConfig, handler);
218 for (final var h : result.first()) {
219 final var existingAction = queuedActions.get(h);
220 if (existingAction == null) {
221 queuedActions.put(h, h);
222 } else {
223 existingAction.mergeOther(h);
224 }
225 }
226 final var exception = result.second();
227
228 if (hasCaughtUpWithOldMessages) {
229 handleQueuedActions(queuedActions.keySet());
230 queuedActions.clear();
231 }
232 if (cachedMessage[0] != null) {
233 if (exception instanceof UntrustedIdentityException) {
234 logger.debug("Keeping message with untrusted identity in message cache");
235 final var address = ((UntrustedIdentityException) exception).getSender();
236 if (envelope.getSourceServiceId().isEmpty() && address.uuid().isPresent()) {
237 final var recipientId = account.getRecipientResolver()
238 .resolveRecipient(ACI.from(address.uuid().get()));
239 try {
240 cachedMessage[0] = account.getMessageCache()
241 .replaceSender(cachedMessage[0], recipientId);
242 } catch (IOException ioException) {
243 logger.warn("Failed to move cached message to recipient folder: {}",
244 ioException.getMessage(),
245 ioException);
246 }
247 }
248 } else {
249 cachedMessage[0].delete();
250 }
251 }
252 } catch (Exception e) {
253 logger.error("Unknown error when handling messages", e);
254 }
255 }
256 }
257
258 private void retryFailedReceivedMessages(Manager.ReceiveMessageHandler handler) {
259 Set<HandleAction> queuedActions = new HashSet<>();
260 for (var cachedMessage : account.getMessageCache().getCachedMessages()) {
261 var actions = retryFailedReceivedMessage(handler, cachedMessage);
262 if (actions != null) {
263 queuedActions.addAll(actions);
264 }
265 }
266 handleQueuedActions(queuedActions);
267 }
268
269 private List<HandleAction> retryFailedReceivedMessage(
270 final Manager.ReceiveMessageHandler handler, final CachedMessage cachedMessage
271 ) {
272 var envelope = cachedMessage.loadEnvelope();
273 if (envelope == null) {
274 cachedMessage.delete();
275 return null;
276 }
277
278 final var result = context.getIncomingMessageHandler().handleRetryEnvelope(envelope, receiveConfig, handler);
279 final var actions = result.first();
280 final var exception = result.second();
281
282 if (exception instanceof UntrustedIdentityException) {
283 if (System.currentTimeMillis() - envelope.getServerDeliveredTimestamp() > 1000L * 60 * 60 * 24 * 30) {
284 // Envelope is more than a month old, cleaning up.
285 cachedMessage.delete();
286 return null;
287 }
288 if (envelope.getSourceServiceId().isEmpty()) {
289 final var identifier = ((UntrustedIdentityException) exception).getSender();
290 final var recipientId = account.getRecipientResolver()
291 .resolveRecipient(new RecipientAddress(identifier));
292 try {
293 account.getMessageCache().replaceSender(cachedMessage, recipientId);
294 } catch (IOException ioException) {
295 logger.warn("Failed to move cached message to recipient folder: {}",
296 ioException.getMessage(),
297 ioException);
298 }
299 }
300 return null;
301 }
302
303 // If successful and for all other errors that are not recoverable, delete the cached message
304 cachedMessage.delete();
305 return actions;
306 }
307
308 private void handleQueuedActions(final Collection<HandleAction> queuedActions) {
309 logger.debug("Handling message actions");
310 for (var action : queuedActions) {
311 logger.debug("Executing action {}", action.getClass().getSimpleName());
312 try {
313 action.execute(context);
314 } catch (Throwable e) {
315 logger.warn("Message action failed.", e);
316 }
317 }
318 }
319
320 private void onWebSocketStateChange(final WebSocketConnectionState s) {
321 if (s.equals(WebSocketConnectionState.AUTHENTICATION_FAILED)) {
322 account.setRegistered(false);
323 authenticationFailureListener.call();
324 }
325 }
326
327 public interface Callable {
328
329 void call();
330 }
331 }