]> nmode's Git Repositories - signal-cli/blob - lib/src/main/java/org/asamk/signal/manager/helper/ReceiveHelper.java
2f7834673a01ab732decc3cffff99978c9eee438
[signal-cli] / lib / src / main / java / org / asamk / signal / manager / helper / ReceiveHelper.java
1 package org.asamk.signal.manager.helper;
2
3 import org.asamk.signal.manager.Manager;
4 import org.asamk.signal.manager.actions.HandleAction;
5 import org.asamk.signal.manager.api.ReceiveConfig;
6 import org.asamk.signal.manager.api.UntrustedIdentityException;
7 import org.asamk.signal.manager.internal.SignalDependencies;
8 import org.asamk.signal.manager.storage.SignalAccount;
9 import org.asamk.signal.manager.storage.messageCache.CachedMessage;
10 import org.asamk.signal.manager.storage.recipients.RecipientAddress;
11 import org.slf4j.Logger;
12 import org.slf4j.LoggerFactory;
13 import org.whispersystems.signalservice.api.SignalWebSocket;
14 import org.whispersystems.signalservice.api.messages.SignalServiceEnvelope;
15 import org.whispersystems.signalservice.api.push.ServiceId.ACI;
16 import org.whispersystems.signalservice.api.websocket.WebSocketConnectionState;
17 import org.whispersystems.signalservice.api.websocket.WebSocketUnavailableException;
18
19 import java.io.IOException;
20 import java.time.Duration;
21 import java.util.Collection;
22 import java.util.HashMap;
23 import java.util.HashSet;
24 import java.util.List;
25 import java.util.Map;
26 import java.util.Set;
27 import java.util.concurrent.TimeoutException;
28
29 import io.reactivex.rxjava3.core.Observable;
30 import io.reactivex.rxjava3.schedulers.Schedulers;
31
32 public class ReceiveHelper {
33
34 private final static Logger logger = LoggerFactory.getLogger(ReceiveHelper.class);
35 private final static int MAX_BACKOFF_COUNTER = 9;
36
37 private final SignalAccount account;
38 private final SignalDependencies dependencies;
39 private final Context context;
40
41 private ReceiveConfig receiveConfig = new ReceiveConfig(false, false, false);
42 private boolean needsToRetryFailedMessages = false;
43 private boolean hasCaughtUpWithOldMessages = false;
44 private boolean isWaitingForMessage = false;
45 private boolean shouldStop = false;
46 private Callable authenticationFailureListener;
47 private Callable caughtUpWithOldMessagesListener;
48
49 public ReceiveHelper(final Context context) {
50 this.account = context.getAccount();
51 this.dependencies = context.getDependencies();
52 this.context = context;
53 }
54
55 public void setReceiveConfig(final ReceiveConfig receiveConfig) {
56 this.receiveConfig = receiveConfig;
57 dependencies.setAllowStories(!receiveConfig.ignoreStories());
58 }
59
60 public void setNeedsToRetryFailedMessages(final boolean needsToRetryFailedMessages) {
61 this.needsToRetryFailedMessages = needsToRetryFailedMessages;
62 }
63
64 public void setAuthenticationFailureListener(final Callable authenticationFailureListener) {
65 this.authenticationFailureListener = authenticationFailureListener;
66 }
67
68 public void setCaughtUpWithOldMessagesListener(final Callable caughtUpWithOldMessagesListener) {
69 this.caughtUpWithOldMessagesListener = caughtUpWithOldMessagesListener;
70 }
71
72 public boolean requestStopReceiveMessages() {
73 this.shouldStop = true;
74 return isWaitingForMessage;
75 }
76
77 public void receiveMessagesContinuously(Manager.ReceiveMessageHandler handler) {
78 while (!shouldStop) {
79 try {
80 receiveMessages(Duration.ofMinutes(1), false, null, handler);
81 break;
82 } catch (IOException e) {
83 logger.warn("Receiving messages failed, retrying", e);
84 }
85 }
86 }
87
88 public void receiveMessages(
89 Duration timeout, boolean returnOnTimeout, Integer maxMessages, Manager.ReceiveMessageHandler handler
90 ) throws IOException {
91 needsToRetryFailedMessages = true;
92 hasCaughtUpWithOldMessages = false;
93
94 // Use a Map here because java Set doesn't have a get method ...
95 Map<HandleAction, HandleAction> queuedActions = new HashMap<>();
96
97 final var signalWebSocket = dependencies.getSignalWebSocket();
98 final var webSocketStateDisposable = Observable.merge(signalWebSocket.getUnidentifiedWebSocketState(),
99 signalWebSocket.getWebSocketState())
100 .subscribeOn(Schedulers.computation())
101 .observeOn(Schedulers.computation())
102 .distinctUntilChanged()
103 .subscribe(this::onWebSocketStateChange);
104 signalWebSocket.connect();
105
106 try {
107 receiveMessagesInternal(signalWebSocket, timeout, returnOnTimeout, maxMessages, handler, queuedActions);
108 } finally {
109 hasCaughtUpWithOldMessages = false;
110 handleQueuedActions(queuedActions.keySet());
111 queuedActions.clear();
112 signalWebSocket.disconnect();
113 webSocketStateDisposable.dispose();
114 shouldStop = false;
115 }
116 }
117
118 private void receiveMessagesInternal(
119 final SignalWebSocket signalWebSocket,
120 Duration timeout,
121 boolean returnOnTimeout,
122 Integer maxMessages,
123 Manager.ReceiveMessageHandler handler,
124 final Map<HandleAction, HandleAction> queuedActions
125 ) throws IOException {
126 int remainingMessages = maxMessages == null ? -1 : maxMessages;
127 var backOffCounter = 0;
128 isWaitingForMessage = false;
129
130 while (!shouldStop && remainingMessages != 0) {
131 if (needsToRetryFailedMessages) {
132 retryFailedReceivedMessages(handler);
133 needsToRetryFailedMessages = false;
134 }
135 SignalServiceEnvelope envelope;
136 final CachedMessage[] cachedMessage = {null};
137 final var nowMillis = System.currentTimeMillis();
138 if (nowMillis - account.getLastReceiveTimestamp() > 60000) {
139 account.setLastReceiveTimestamp(nowMillis);
140 }
141 logger.debug("Checking for new message from server");
142 try {
143 isWaitingForMessage = true;
144 var queueNotEmpty = signalWebSocket.readMessageBatch(timeout.toMillis(), 1, batch -> {
145 logger.debug("Retrieved {} envelopes!", batch.size());
146 isWaitingForMessage = false;
147 for (final var it : batch) {
148 SignalServiceEnvelope envelope1 = new SignalServiceEnvelope(it.getEnvelope(),
149 it.getServerDeliveredTimestamp());
150 final var recipientId = envelope1.hasSourceServiceId() ? account.getRecipientResolver()
151 .resolveRecipient(envelope1.getSourceAddress()) : null;
152 logger.trace("Storing new message from {}", recipientId);
153 // store message on disk, before acknowledging receipt to the server
154 cachedMessage[0] = account.getMessageCache().cacheMessage(envelope1, recipientId);
155 try {
156 signalWebSocket.sendAck(it);
157 } catch (IOException e) {
158 logger.warn("Failed to ack envelope to server after storing it: {}", e.getMessage());
159 }
160 }
161 });
162 isWaitingForMessage = false;
163 backOffCounter = 0;
164
165 if (queueNotEmpty) {
166 if (remainingMessages > 0) {
167 remainingMessages -= 1;
168 }
169 envelope = cachedMessage[0].loadEnvelope();
170 logger.debug("New message received from server");
171 } else {
172 logger.debug("Received indicator that server queue is empty");
173 handleQueuedActions(queuedActions.keySet());
174 queuedActions.clear();
175
176 hasCaughtUpWithOldMessages = true;
177 caughtUpWithOldMessagesListener.call();
178
179 // Continue to wait another timeout for new messages
180 continue;
181 }
182 } catch (AssertionError e) {
183 if (e.getCause() instanceof InterruptedException) {
184 break;
185 } else {
186 throw e;
187 }
188 } catch (IOException e) {
189 logger.debug("Pipe unexpectedly unavailable: {}", e.getMessage());
190 if (e instanceof WebSocketUnavailableException || "Connection closed!".equals(e.getMessage())) {
191 final var sleepMilliseconds = 100 * (long) Math.pow(2, backOffCounter);
192 backOffCounter = Math.min(backOffCounter + 1, MAX_BACKOFF_COUNTER);
193 logger.warn("Connection closed unexpectedly, reconnecting in {} ms", sleepMilliseconds);
194 try {
195 Thread.sleep(sleepMilliseconds);
196 } catch (InterruptedException interruptedException) {
197 return;
198 }
199 hasCaughtUpWithOldMessages = false;
200 signalWebSocket.connect();
201 continue;
202 }
203 throw e;
204 } catch (TimeoutException e) {
205 backOffCounter = 0;
206 if (returnOnTimeout) return;
207 continue;
208 }
209
210 final var result = context.getIncomingMessageHandler().handleEnvelope(envelope, receiveConfig, handler);
211 for (final var h : result.first()) {
212 final var existingAction = queuedActions.get(h);
213 if (existingAction == null) {
214 queuedActions.put(h, h);
215 } else {
216 existingAction.mergeOther(h);
217 }
218 }
219 final var exception = result.second();
220
221 if (hasCaughtUpWithOldMessages) {
222 handleQueuedActions(queuedActions.keySet());
223 queuedActions.clear();
224 }
225 if (cachedMessage[0] != null) {
226 if (exception instanceof UntrustedIdentityException) {
227 logger.debug("Keeping message with untrusted identity in message cache");
228 final var address = ((UntrustedIdentityException) exception).getSender();
229 if (!envelope.hasSourceServiceId() && address.uuid().isPresent()) {
230 final var recipientId = account.getRecipientResolver()
231 .resolveRecipient(ACI.from(address.uuid().get()));
232 try {
233 cachedMessage[0] = account.getMessageCache().replaceSender(cachedMessage[0], recipientId);
234 } catch (IOException ioException) {
235 logger.warn("Failed to move cached message to recipient folder: {}",
236 ioException.getMessage(),
237 ioException);
238 }
239 }
240 } else {
241 cachedMessage[0].delete();
242 }
243 }
244 }
245 }
246
247 private void retryFailedReceivedMessages(Manager.ReceiveMessageHandler handler) {
248 Set<HandleAction> queuedActions = new HashSet<>();
249 for (var cachedMessage : account.getMessageCache().getCachedMessages()) {
250 var actions = retryFailedReceivedMessage(handler, cachedMessage);
251 if (actions != null) {
252 queuedActions.addAll(actions);
253 }
254 }
255 handleQueuedActions(queuedActions);
256 }
257
258 private List<HandleAction> retryFailedReceivedMessage(
259 final Manager.ReceiveMessageHandler handler, final CachedMessage cachedMessage
260 ) {
261 var envelope = cachedMessage.loadEnvelope();
262 if (envelope == null) {
263 cachedMessage.delete();
264 return null;
265 }
266
267 final var result = context.getIncomingMessageHandler().handleRetryEnvelope(envelope, receiveConfig, handler);
268 final var actions = result.first();
269 final var exception = result.second();
270
271 if (exception instanceof UntrustedIdentityException) {
272 if (System.currentTimeMillis() - envelope.getServerDeliveredTimestamp() > 1000L * 60 * 60 * 24 * 30) {
273 // Envelope is more than a month old, cleaning up.
274 cachedMessage.delete();
275 return null;
276 }
277 if (!envelope.hasSourceServiceId()) {
278 final var identifier = ((UntrustedIdentityException) exception).getSender();
279 final var recipientId = account.getRecipientResolver()
280 .resolveRecipient(new RecipientAddress(identifier));
281 try {
282 account.getMessageCache().replaceSender(cachedMessage, recipientId);
283 } catch (IOException ioException) {
284 logger.warn("Failed to move cached message to recipient folder: {}",
285 ioException.getMessage(),
286 ioException);
287 }
288 }
289 return null;
290 }
291
292 // If successful and for all other errors that are not recoverable, delete the cached message
293 cachedMessage.delete();
294 return actions;
295 }
296
297 private void handleQueuedActions(final Collection<HandleAction> queuedActions) {
298 logger.debug("Handling message actions");
299 for (var action : queuedActions) {
300 logger.debug("Executing action {}", action.getClass().getSimpleName());
301 try {
302 action.execute(context);
303 } catch (Throwable e) {
304 logger.warn("Message action failed.", e);
305 }
306 }
307 }
308
309 private void onWebSocketStateChange(final WebSocketConnectionState s) {
310 if (s.equals(WebSocketConnectionState.AUTHENTICATION_FAILED)) {
311 account.setRegistered(false);
312 authenticationFailureListener.call();
313 }
314 }
315
316 public interface Callable {
317
318 void call();
319 }
320 }