]> nmode's Git Repositories - signal-cli/blob - lib/src/main/java/org/asamk/signal/manager/helper/ReceiveHelper.java
Implement username links
[signal-cli] / lib / src / main / java / org / asamk / signal / manager / helper / ReceiveHelper.java
1 package org.asamk.signal.manager.helper;
2
3 import org.asamk.signal.manager.Manager;
4 import org.asamk.signal.manager.actions.HandleAction;
5 import org.asamk.signal.manager.api.ReceiveConfig;
6 import org.asamk.signal.manager.api.UntrustedIdentityException;
7 import org.asamk.signal.manager.internal.SignalDependencies;
8 import org.asamk.signal.manager.storage.SignalAccount;
9 import org.asamk.signal.manager.storage.messageCache.CachedMessage;
10 import org.asamk.signal.manager.storage.recipients.RecipientAddress;
11 import org.slf4j.Logger;
12 import org.slf4j.LoggerFactory;
13 import org.whispersystems.signalservice.api.SignalWebSocket;
14 import org.whispersystems.signalservice.api.messages.SignalServiceEnvelope;
15 import org.whispersystems.signalservice.api.push.ServiceId.ACI;
16 import org.whispersystems.signalservice.api.websocket.WebSocketConnectionState;
17 import org.whispersystems.signalservice.api.websocket.WebSocketUnavailableException;
18
19 import java.io.IOException;
20 import java.time.Duration;
21 import java.util.Collection;
22 import java.util.HashMap;
23 import java.util.HashSet;
24 import java.util.List;
25 import java.util.Map;
26 import java.util.Set;
27 import java.util.concurrent.TimeoutException;
28
29 import io.reactivex.rxjava3.core.Observable;
30 import io.reactivex.rxjava3.schedulers.Schedulers;
31
32 public class ReceiveHelper {
33
34 private static final Logger logger = LoggerFactory.getLogger(ReceiveHelper.class);
35 private static final int MAX_BACKOFF_COUNTER = 9;
36
37 private final SignalAccount account;
38 private final SignalDependencies dependencies;
39 private final Context context;
40
41 private ReceiveConfig receiveConfig = new ReceiveConfig(false, false, false);
42 private boolean needsToRetryFailedMessages = false;
43 private boolean hasCaughtUpWithOldMessages = false;
44 private boolean isWaitingForMessage = false;
45 private boolean shouldStop = false;
46 private Callable authenticationFailureListener;
47 private Callable caughtUpWithOldMessagesListener;
48
49 public ReceiveHelper(final Context context) {
50 this.account = context.getAccount();
51 this.dependencies = context.getDependencies();
52 this.context = context;
53 }
54
55 public void setReceiveConfig(final ReceiveConfig receiveConfig) {
56 this.receiveConfig = receiveConfig;
57 dependencies.setAllowStories(!receiveConfig.ignoreStories());
58 }
59
60 public void setNeedsToRetryFailedMessages(final boolean needsToRetryFailedMessages) {
61 this.needsToRetryFailedMessages = needsToRetryFailedMessages;
62 }
63
64 public void setAuthenticationFailureListener(final Callable authenticationFailureListener) {
65 this.authenticationFailureListener = authenticationFailureListener;
66 }
67
68 public void setCaughtUpWithOldMessagesListener(final Callable caughtUpWithOldMessagesListener) {
69 this.caughtUpWithOldMessagesListener = caughtUpWithOldMessagesListener;
70 }
71
72 public boolean requestStopReceiveMessages() {
73 this.shouldStop = true;
74 return isWaitingForMessage;
75 }
76
77 public void receiveMessagesContinuously(Manager.ReceiveMessageHandler handler) {
78 while (!shouldStop) {
79 try {
80 receiveMessages(Duration.ofMinutes(1), false, null, handler);
81 break;
82 } catch (IOException e) {
83 logger.warn("Receiving messages failed, retrying", e);
84 }
85 }
86 }
87
88 public void receiveMessages(
89 Duration timeout, boolean returnOnTimeout, Integer maxMessages, Manager.ReceiveMessageHandler handler
90 ) throws IOException {
91 needsToRetryFailedMessages = true;
92 hasCaughtUpWithOldMessages = false;
93
94 // Use a Map here because java Set doesn't have a get method ...
95 Map<HandleAction, HandleAction> queuedActions = new HashMap<>();
96
97 final var signalWebSocket = dependencies.getSignalWebSocket();
98 final var webSocketStateDisposable = Observable.merge(signalWebSocket.getUnidentifiedWebSocketState(),
99 signalWebSocket.getWebSocketState())
100 .subscribeOn(Schedulers.computation())
101 .observeOn(Schedulers.computation())
102 .distinctUntilChanged()
103 .subscribe(this::onWebSocketStateChange);
104 signalWebSocket.connect();
105
106 try {
107 receiveMessagesInternal(signalWebSocket, timeout, returnOnTimeout, maxMessages, handler, queuedActions);
108 } finally {
109 hasCaughtUpWithOldMessages = false;
110 handleQueuedActions(queuedActions.keySet());
111 queuedActions.clear();
112 signalWebSocket.disconnect();
113 webSocketStateDisposable.dispose();
114 shouldStop = false;
115 }
116 }
117
118 private void receiveMessagesInternal(
119 final SignalWebSocket signalWebSocket,
120 Duration timeout,
121 boolean returnOnTimeout,
122 Integer maxMessages,
123 Manager.ReceiveMessageHandler handler,
124 final Map<HandleAction, HandleAction> queuedActions
125 ) throws IOException {
126 int remainingMessages = maxMessages == null ? -1 : maxMessages;
127 var backOffCounter = 0;
128 isWaitingForMessage = false;
129
130 while (!shouldStop && remainingMessages != 0) {
131 if (needsToRetryFailedMessages) {
132 retryFailedReceivedMessages(handler);
133 needsToRetryFailedMessages = false;
134 }
135 SignalServiceEnvelope envelope;
136 final CachedMessage[] cachedMessage = {null};
137 final var nowMillis = System.currentTimeMillis();
138 if (nowMillis - account.getLastReceiveTimestamp() > 60000) {
139 account.setLastReceiveTimestamp(nowMillis);
140 }
141 logger.debug("Checking for new message from server");
142 try {
143 isWaitingForMessage = true;
144 var queueNotEmpty = signalWebSocket.readMessageBatch(timeout.toMillis(), 1, batch -> {
145 logger.debug("Retrieved {} envelopes!", batch.size());
146 isWaitingForMessage = false;
147 for (final var it : batch) {
148 SignalServiceEnvelope envelope1 = new SignalServiceEnvelope(it.getEnvelope(),
149 it.getServerDeliveredTimestamp());
150 final var recipientId = envelope1.hasSourceServiceId() ? account.getRecipientResolver()
151 .resolveRecipient(envelope1.getSourceAddress()) : null;
152 logger.trace("Storing new message from {}", recipientId);
153 // store message on disk, before acknowledging receipt to the server
154 cachedMessage[0] = account.getMessageCache().cacheMessage(envelope1, recipientId);
155 try {
156 signalWebSocket.sendAck(it);
157 } catch (IOException e) {
158 logger.warn("Failed to ack envelope to server after storing it: {}", e.getMessage());
159 }
160 }
161 });
162 isWaitingForMessage = false;
163 backOffCounter = 0;
164
165 if (queueNotEmpty) {
166 if (remainingMessages > 0) {
167 remainingMessages -= 1;
168 }
169 envelope = cachedMessage[0].loadEnvelope();
170 logger.debug("New message received from server");
171 } else {
172 logger.debug("Received indicator that server queue is empty");
173 handleQueuedActions(queuedActions.keySet());
174 queuedActions.clear();
175
176 hasCaughtUpWithOldMessages = true;
177 caughtUpWithOldMessagesListener.call();
178
179 // Continue to wait another timeout for new messages
180 continue;
181 }
182 } catch (AssertionError e) {
183 if (e.getCause() instanceof InterruptedException) {
184 break;
185 } else {
186 throw e;
187 }
188 } catch (IOException e) {
189 logger.debug("Pipe unexpectedly unavailable: {}", e.getMessage());
190 if (e instanceof WebSocketUnavailableException || "Connection closed!".equals(e.getMessage())) {
191 final var sleepMilliseconds = 100 * (long) Math.pow(2, backOffCounter);
192 backOffCounter = Math.min(backOffCounter + 1, MAX_BACKOFF_COUNTER);
193 logger.warn("Connection closed unexpectedly, reconnecting in {} ms", sleepMilliseconds);
194 try {
195 Thread.sleep(sleepMilliseconds);
196 } catch (InterruptedException interruptedException) {
197 return;
198 }
199 hasCaughtUpWithOldMessages = false;
200 signalWebSocket.connect();
201 continue;
202 }
203 throw e;
204 } catch (TimeoutException e) {
205 backOffCounter = 0;
206 if (returnOnTimeout) return;
207 continue;
208 } catch (Exception e) {
209 logger.error("Unknown error when receiving messages", e);
210 continue;
211 }
212
213 try {
214 final var result = context.getIncomingMessageHandler().handleEnvelope(envelope, receiveConfig, handler);
215 for (final var h : result.first()) {
216 final var existingAction = queuedActions.get(h);
217 if (existingAction == null) {
218 queuedActions.put(h, h);
219 } else {
220 existingAction.mergeOther(h);
221 }
222 }
223 final var exception = result.second();
224
225 if (hasCaughtUpWithOldMessages) {
226 handleQueuedActions(queuedActions.keySet());
227 queuedActions.clear();
228 }
229 if (cachedMessage[0] != null) {
230 if (exception instanceof UntrustedIdentityException) {
231 logger.debug("Keeping message with untrusted identity in message cache");
232 final var address = ((UntrustedIdentityException) exception).getSender();
233 if (!envelope.hasSourceServiceId() && address.uuid().isPresent()) {
234 final var recipientId = account.getRecipientResolver()
235 .resolveRecipient(ACI.from(address.uuid().get()));
236 try {
237 cachedMessage[0] = account.getMessageCache()
238 .replaceSender(cachedMessage[0], recipientId);
239 } catch (IOException ioException) {
240 logger.warn("Failed to move cached message to recipient folder: {}",
241 ioException.getMessage(),
242 ioException);
243 }
244 }
245 } else {
246 cachedMessage[0].delete();
247 }
248 }
249 } catch (Exception e) {
250 logger.error("Unknown error when handling messages", e);
251 }
252 }
253 }
254
255 private void retryFailedReceivedMessages(Manager.ReceiveMessageHandler handler) {
256 Set<HandleAction> queuedActions = new HashSet<>();
257 for (var cachedMessage : account.getMessageCache().getCachedMessages()) {
258 var actions = retryFailedReceivedMessage(handler, cachedMessage);
259 if (actions != null) {
260 queuedActions.addAll(actions);
261 }
262 }
263 handleQueuedActions(queuedActions);
264 }
265
266 private List<HandleAction> retryFailedReceivedMessage(
267 final Manager.ReceiveMessageHandler handler, final CachedMessage cachedMessage
268 ) {
269 var envelope = cachedMessage.loadEnvelope();
270 if (envelope == null) {
271 cachedMessage.delete();
272 return null;
273 }
274
275 final var result = context.getIncomingMessageHandler().handleRetryEnvelope(envelope, receiveConfig, handler);
276 final var actions = result.first();
277 final var exception = result.second();
278
279 if (exception instanceof UntrustedIdentityException) {
280 if (System.currentTimeMillis() - envelope.getServerDeliveredTimestamp() > 1000L * 60 * 60 * 24 * 30) {
281 // Envelope is more than a month old, cleaning up.
282 cachedMessage.delete();
283 return null;
284 }
285 if (!envelope.hasSourceServiceId()) {
286 final var identifier = ((UntrustedIdentityException) exception).getSender();
287 final var recipientId = account.getRecipientResolver()
288 .resolveRecipient(new RecipientAddress(identifier));
289 try {
290 account.getMessageCache().replaceSender(cachedMessage, recipientId);
291 } catch (IOException ioException) {
292 logger.warn("Failed to move cached message to recipient folder: {}",
293 ioException.getMessage(),
294 ioException);
295 }
296 }
297 return null;
298 }
299
300 // If successful and for all other errors that are not recoverable, delete the cached message
301 cachedMessage.delete();
302 return actions;
303 }
304
305 private void handleQueuedActions(final Collection<HandleAction> queuedActions) {
306 logger.debug("Handling message actions");
307 for (var action : queuedActions) {
308 logger.debug("Executing action {}", action.getClass().getSimpleName());
309 try {
310 action.execute(context);
311 } catch (Throwable e) {
312 logger.warn("Message action failed.", e);
313 }
314 }
315 }
316
317 private void onWebSocketStateChange(final WebSocketConnectionState s) {
318 if (s.equals(WebSocketConnectionState.AUTHENTICATION_FAILED)) {
319 account.setRegistered(false);
320 authenticationFailureListener.call();
321 }
322 }
323
324 public interface Callable {
325
326 void call();
327 }
328 }