]> nmode's Git Repositories - signal-cli/blob - lib/src/main/java/org/asamk/signal/manager/helper/ReceiveHelper.java
3d4ca38d24361b18c60fbd3de3bcfc527b194e26
[signal-cli] / lib / src / main / java / org / asamk / signal / manager / helper / ReceiveHelper.java
1 package org.asamk.signal.manager.helper;
2
3 import org.asamk.signal.manager.Manager;
4 import org.asamk.signal.manager.actions.HandleAction;
5 import org.asamk.signal.manager.api.ReceiveConfig;
6 import org.asamk.signal.manager.api.UntrustedIdentityException;
7 import org.asamk.signal.manager.internal.SignalDependencies;
8 import org.asamk.signal.manager.storage.SignalAccount;
9 import org.asamk.signal.manager.storage.messageCache.CachedMessage;
10 import org.asamk.signal.manager.storage.recipients.RecipientAddress;
11 import org.slf4j.Logger;
12 import org.slf4j.LoggerFactory;
13 import org.whispersystems.signalservice.api.SignalWebSocket;
14 import org.whispersystems.signalservice.api.messages.SignalServiceEnvelope;
15 import org.whispersystems.signalservice.api.push.ServiceId;
16 import org.whispersystems.signalservice.api.websocket.WebSocketConnectionState;
17 import org.whispersystems.signalservice.api.websocket.WebSocketUnavailableException;
18
19 import java.io.IOException;
20 import java.time.Duration;
21 import java.util.Collection;
22 import java.util.HashMap;
23 import java.util.HashSet;
24 import java.util.List;
25 import java.util.Map;
26 import java.util.Set;
27 import java.util.concurrent.TimeoutException;
28
29 import io.reactivex.rxjava3.core.Observable;
30 import io.reactivex.rxjava3.schedulers.Schedulers;
31
32 public class ReceiveHelper {
33
34 private final static Logger logger = LoggerFactory.getLogger(ReceiveHelper.class);
35 private final static int MAX_BACKOFF_COUNTER = 9;
36
37 private final SignalAccount account;
38 private final SignalDependencies dependencies;
39 private final Context context;
40
41 private ReceiveConfig receiveConfig = new ReceiveConfig(false, false, false);
42 private boolean needsToRetryFailedMessages = false;
43 private boolean hasCaughtUpWithOldMessages = false;
44 private boolean isWaitingForMessage = false;
45 private boolean shouldStop = false;
46 private Callable authenticationFailureListener;
47 private Callable caughtUpWithOldMessagesListener;
48
49 public ReceiveHelper(final Context context) {
50 this.account = context.getAccount();
51 this.dependencies = context.getDependencies();
52 this.context = context;
53 }
54
55 public void setReceiveConfig(final ReceiveConfig receiveConfig) {
56 this.receiveConfig = receiveConfig;
57 dependencies.setAllowStories(!receiveConfig.ignoreStories());
58 }
59
60 public void setNeedsToRetryFailedMessages(final boolean needsToRetryFailedMessages) {
61 this.needsToRetryFailedMessages = needsToRetryFailedMessages;
62 }
63
64 public void setAuthenticationFailureListener(final Callable authenticationFailureListener) {
65 this.authenticationFailureListener = authenticationFailureListener;
66 }
67
68 public void setCaughtUpWithOldMessagesListener(final Callable caughtUpWithOldMessagesListener) {
69 this.caughtUpWithOldMessagesListener = caughtUpWithOldMessagesListener;
70 }
71
72 public boolean requestStopReceiveMessages() {
73 this.shouldStop = true;
74 return isWaitingForMessage;
75 }
76
77 public void receiveMessagesContinuously(Manager.ReceiveMessageHandler handler) {
78 while (!shouldStop) {
79 try {
80 receiveMessages(Duration.ofMinutes(1), false, null, handler);
81 break;
82 } catch (IOException e) {
83 logger.warn("Receiving messages failed, retrying", e);
84 }
85 }
86 }
87
88 public void receiveMessages(
89 Duration timeout, boolean returnOnTimeout, Integer maxMessages, Manager.ReceiveMessageHandler handler
90 ) throws IOException {
91 needsToRetryFailedMessages = true;
92 hasCaughtUpWithOldMessages = false;
93
94 // Use a Map here because java Set doesn't have a get method ...
95 Map<HandleAction, HandleAction> queuedActions = new HashMap<>();
96
97 final var signalWebSocket = dependencies.getSignalWebSocket();
98 final var webSocketStateDisposable = Observable.merge(signalWebSocket.getUnidentifiedWebSocketState(),
99 signalWebSocket.getWebSocketState())
100 .subscribeOn(Schedulers.computation())
101 .observeOn(Schedulers.computation())
102 .distinctUntilChanged()
103 .subscribe(this::onWebSocketStateChange);
104 signalWebSocket.connect();
105
106 try {
107 receiveMessagesInternal(signalWebSocket, timeout, returnOnTimeout, maxMessages, handler, queuedActions);
108 } finally {
109 hasCaughtUpWithOldMessages = false;
110 handleQueuedActions(queuedActions.keySet());
111 queuedActions.clear();
112 signalWebSocket.disconnect();
113 webSocketStateDisposable.dispose();
114 shouldStop = false;
115 }
116 }
117
118 private void receiveMessagesInternal(
119 final SignalWebSocket signalWebSocket,
120 Duration timeout,
121 boolean returnOnTimeout,
122 Integer maxMessages,
123 Manager.ReceiveMessageHandler handler,
124 final Map<HandleAction, HandleAction> queuedActions
125 ) throws IOException {
126 int remainingMessages = maxMessages == null ? -1 : maxMessages;
127 var backOffCounter = 0;
128 isWaitingForMessage = false;
129
130 while (!shouldStop && remainingMessages != 0) {
131 if (needsToRetryFailedMessages) {
132 retryFailedReceivedMessages(handler);
133 needsToRetryFailedMessages = false;
134 }
135 SignalServiceEnvelope envelope;
136 final CachedMessage[] cachedMessage = {null};
137 final var nowMillis = System.currentTimeMillis();
138 if (nowMillis - account.getLastReceiveTimestamp() > 60000) {
139 account.setLastReceiveTimestamp(nowMillis);
140 }
141 logger.debug("Checking for new message from server");
142 try {
143 isWaitingForMessage = true;
144 var queueNotEmpty = signalWebSocket.readMessageBatch(timeout.toMillis(), 1, batch -> {
145 logger.debug("Retrieved {} envelopes!", batch.size());
146 isWaitingForMessage = false;
147 for (final var it : batch) {
148 SignalServiceEnvelope envelope1 = new SignalServiceEnvelope(it.getEnvelope(),
149 it.getServerDeliveredTimestamp());
150 final var recipientId = envelope1.hasSourceUuid() ? account.getRecipientResolver()
151 .resolveRecipient(envelope1.getSourceAddress()) : null;
152 logger.trace("Storing new message from {}", recipientId);
153 // store message on disk, before acknowledging receipt to the server
154 cachedMessage[0] = account.getMessageCache().cacheMessage(envelope1, recipientId);
155 try {
156 signalWebSocket.sendAck(it);
157 } catch (IOException e) {
158 logger.warn("Failed to ack envelope to server after storing it: {}", e.getMessage());
159 }
160 }
161 });
162 isWaitingForMessage = false;
163 backOffCounter = 0;
164
165 if (queueNotEmpty) {
166 if (remainingMessages > 0) {
167 remainingMessages -= 1;
168 }
169 envelope = cachedMessage[0].loadEnvelope();
170 logger.debug("New message received from server");
171 } else {
172 logger.debug("Received indicator that server queue is empty");
173 handleQueuedActions(queuedActions.keySet());
174 queuedActions.clear();
175
176 hasCaughtUpWithOldMessages = true;
177 caughtUpWithOldMessagesListener.call();
178
179 // Continue to wait another timeout for new messages
180 continue;
181 }
182 } catch (AssertionError e) {
183 if (e.getCause() instanceof InterruptedException) {
184 break;
185 } else {
186 throw e;
187 }
188 } catch (IOException e) {
189 logger.debug("Pipe unexpectedly unavailable: {}", e.getMessage());
190 if (e instanceof WebSocketUnavailableException || "Connection closed!".equals(e.getMessage())) {
191 final var sleepMilliseconds = 100 * (long) Math.pow(2, backOffCounter);
192 backOffCounter = Math.min(backOffCounter + 1, MAX_BACKOFF_COUNTER);
193 logger.warn("Connection closed unexpectedly, reconnecting in {} ms", sleepMilliseconds);
194 try {
195 Thread.sleep(sleepMilliseconds);
196 } catch (InterruptedException interruptedException) {
197 return;
198 }
199 hasCaughtUpWithOldMessages = false;
200 signalWebSocket.connect();
201 continue;
202 }
203 throw e;
204 } catch (TimeoutException e) {
205 backOffCounter = 0;
206 if (returnOnTimeout) return;
207 continue;
208 }
209
210 final var result = context.getIncomingMessageHandler().handleEnvelope(envelope, receiveConfig, handler);
211 for (final var h : result.first()) {
212 final var existingAction = queuedActions.get(h);
213 if (existingAction == null) {
214 queuedActions.put(h, h);
215 } else {
216 existingAction.mergeOther(h);
217 }
218 }
219 final var exception = result.second();
220
221 if (hasCaughtUpWithOldMessages) {
222 handleQueuedActions(queuedActions.keySet());
223 queuedActions.clear();
224 }
225 if (cachedMessage[0] != null) {
226 if (exception instanceof UntrustedIdentityException) {
227 logger.debug("Keeping message with untrusted identity in message cache");
228 final var address = ((UntrustedIdentityException) exception).getSender();
229 if (!envelope.hasSourceUuid() && address.uuid().isPresent()) {
230 final var recipientId = account.getRecipientResolver()
231 .resolveRecipient(ServiceId.from(address.uuid().get()));
232 try {
233 cachedMessage[0] = account.getMessageCache().replaceSender(cachedMessage[0], recipientId);
234 } catch (IOException ioException) {
235 logger.warn("Failed to move cached message to recipient folder: {}",
236 ioException.getMessage());
237 }
238 }
239 } else {
240 cachedMessage[0].delete();
241 }
242 }
243 }
244 }
245
246 private void retryFailedReceivedMessages(Manager.ReceiveMessageHandler handler) {
247 Set<HandleAction> queuedActions = new HashSet<>();
248 for (var cachedMessage : account.getMessageCache().getCachedMessages()) {
249 var actions = retryFailedReceivedMessage(handler, cachedMessage);
250 if (actions != null) {
251 queuedActions.addAll(actions);
252 }
253 }
254 handleQueuedActions(queuedActions);
255 }
256
257 private List<HandleAction> retryFailedReceivedMessage(
258 final Manager.ReceiveMessageHandler handler, final CachedMessage cachedMessage
259 ) {
260 var envelope = cachedMessage.loadEnvelope();
261 if (envelope == null) {
262 cachedMessage.delete();
263 return null;
264 }
265
266 final var result = context.getIncomingMessageHandler().handleRetryEnvelope(envelope, receiveConfig, handler);
267 final var actions = result.first();
268 final var exception = result.second();
269
270 if (exception instanceof UntrustedIdentityException) {
271 if (System.currentTimeMillis() - envelope.getServerDeliveredTimestamp() > 1000L * 60 * 60 * 24 * 30) {
272 // Envelope is more than a month old, cleaning up.
273 cachedMessage.delete();
274 return null;
275 }
276 if (!envelope.hasSourceUuid()) {
277 final var identifier = ((UntrustedIdentityException) exception).getSender();
278 final var recipientId = account.getRecipientResolver()
279 .resolveRecipient(new RecipientAddress(identifier));
280 try {
281 account.getMessageCache().replaceSender(cachedMessage, recipientId);
282 } catch (IOException ioException) {
283 logger.warn("Failed to move cached message to recipient folder: {}", ioException.getMessage());
284 }
285 }
286 return null;
287 }
288
289 // If successful and for all other errors that are not recoverable, delete the cached message
290 cachedMessage.delete();
291 return actions;
292 }
293
294 private void handleQueuedActions(final Collection<HandleAction> queuedActions) {
295 logger.debug("Handling message actions");
296 for (var action : queuedActions) {
297 logger.debug("Executing action {}", action.getClass().getSimpleName());
298 try {
299 action.execute(context);
300 } catch (Throwable e) {
301 logger.warn("Message action failed.", e);
302 }
303 }
304 }
305
306 private void onWebSocketStateChange(final WebSocketConnectionState s) {
307 if (s.equals(WebSocketConnectionState.AUTHENTICATION_FAILED)) {
308 account.setRegistered(false);
309 authenticationFailureListener.call();
310 }
311 }
312
313 public interface Callable {
314
315 void call();
316 }
317 }