]> nmode's Git Repositories - signal-cli/blob - lib/src/main/java/org/asamk/signal/manager/helper/ReceiveHelper.java
Clean old prekeys only after server message queue is empty
[signal-cli] / lib / src / main / java / org / asamk / signal / manager / helper / ReceiveHelper.java
1 package org.asamk.signal.manager.helper;
2
3 import org.asamk.signal.manager.Manager;
4 import org.asamk.signal.manager.actions.HandleAction;
5 import org.asamk.signal.manager.api.ReceiveConfig;
6 import org.asamk.signal.manager.api.UntrustedIdentityException;
7 import org.asamk.signal.manager.internal.SignalDependencies;
8 import org.asamk.signal.manager.jobs.CleanOldPreKeysJob;
9 import org.asamk.signal.manager.storage.SignalAccount;
10 import org.asamk.signal.manager.storage.messageCache.CachedMessage;
11 import org.asamk.signal.manager.storage.recipients.RecipientAddress;
12 import org.slf4j.Logger;
13 import org.slf4j.LoggerFactory;
14 import org.whispersystems.signalservice.api.SignalWebSocket;
15 import org.whispersystems.signalservice.api.messages.SignalServiceEnvelope;
16 import org.whispersystems.signalservice.api.push.ServiceId;
17 import org.whispersystems.signalservice.api.push.ServiceId.ACI;
18 import org.whispersystems.signalservice.api.websocket.WebSocketConnectionState;
19 import org.whispersystems.signalservice.api.websocket.WebSocketUnavailableException;
20
21 import java.io.IOException;
22 import java.time.Duration;
23 import java.util.Collection;
24 import java.util.HashMap;
25 import java.util.HashSet;
26 import java.util.List;
27 import java.util.Map;
28 import java.util.Set;
29 import java.util.concurrent.TimeoutException;
30
31 import io.reactivex.rxjava3.core.Observable;
32 import io.reactivex.rxjava3.schedulers.Schedulers;
33
34 public class ReceiveHelper {
35
36 private static final Logger logger = LoggerFactory.getLogger(ReceiveHelper.class);
37 private static final int MAX_BACKOFF_COUNTER = 9;
38
39 private final SignalAccount account;
40 private final SignalDependencies dependencies;
41 private final Context context;
42
43 private ReceiveConfig receiveConfig = new ReceiveConfig(false, false, false);
44 private boolean needsToRetryFailedMessages = false;
45 private boolean hasCaughtUpWithOldMessages = false;
46 private boolean isWaitingForMessage = false;
47 private boolean shouldStop = false;
48 private Callable authenticationFailureListener;
49 private Callable caughtUpWithOldMessagesListener;
50
51 public ReceiveHelper(final Context context) {
52 this.account = context.getAccount();
53 this.dependencies = context.getDependencies();
54 this.context = context;
55 }
56
57 public void setReceiveConfig(final ReceiveConfig receiveConfig) {
58 this.receiveConfig = receiveConfig;
59 dependencies.setAllowStories(!receiveConfig.ignoreStories());
60 }
61
62 public void setNeedsToRetryFailedMessages(final boolean needsToRetryFailedMessages) {
63 this.needsToRetryFailedMessages = needsToRetryFailedMessages;
64 }
65
66 public void setAuthenticationFailureListener(final Callable authenticationFailureListener) {
67 this.authenticationFailureListener = authenticationFailureListener;
68 }
69
70 public void setCaughtUpWithOldMessagesListener(final Callable caughtUpWithOldMessagesListener) {
71 this.caughtUpWithOldMessagesListener = caughtUpWithOldMessagesListener;
72 }
73
74 public boolean requestStopReceiveMessages() {
75 this.shouldStop = true;
76 return isWaitingForMessage;
77 }
78
79 public void receiveMessagesContinuously(Manager.ReceiveMessageHandler handler) {
80 while (!shouldStop) {
81 try {
82 receiveMessages(Duration.ofMinutes(1), false, null, handler);
83 break;
84 } catch (IOException e) {
85 logger.warn("Receiving messages failed, retrying", e);
86 }
87 }
88 }
89
90 public void receiveMessages(
91 Duration timeout, boolean returnOnTimeout, Integer maxMessages, Manager.ReceiveMessageHandler handler
92 ) throws IOException {
93 needsToRetryFailedMessages = true;
94 hasCaughtUpWithOldMessages = false;
95
96 // Use a Map here because java Set doesn't have a get method ...
97 Map<HandleAction, HandleAction> queuedActions = new HashMap<>();
98
99 final var signalWebSocket = dependencies.getSignalWebSocket();
100 final var webSocketStateDisposable = Observable.merge(signalWebSocket.getUnidentifiedWebSocketState(),
101 signalWebSocket.getWebSocketState())
102 .subscribeOn(Schedulers.computation())
103 .observeOn(Schedulers.computation())
104 .distinctUntilChanged()
105 .subscribe(this::onWebSocketStateChange);
106 signalWebSocket.connect();
107
108 try {
109 receiveMessagesInternal(signalWebSocket, timeout, returnOnTimeout, maxMessages, handler, queuedActions);
110 } finally {
111 hasCaughtUpWithOldMessages = false;
112 handleQueuedActions(queuedActions.keySet());
113 queuedActions.clear();
114 signalWebSocket.disconnect();
115 webSocketStateDisposable.dispose();
116 shouldStop = false;
117 }
118 }
119
120 private void receiveMessagesInternal(
121 final SignalWebSocket signalWebSocket,
122 Duration timeout,
123 boolean returnOnTimeout,
124 Integer maxMessages,
125 Manager.ReceiveMessageHandler handler,
126 final Map<HandleAction, HandleAction> queuedActions
127 ) throws IOException {
128 int remainingMessages = maxMessages == null ? -1 : maxMessages;
129 var backOffCounter = 0;
130 isWaitingForMessage = false;
131
132 while (!shouldStop && remainingMessages != 0) {
133 if (needsToRetryFailedMessages) {
134 retryFailedReceivedMessages(handler);
135 needsToRetryFailedMessages = false;
136 }
137 SignalServiceEnvelope envelope;
138 final CachedMessage[] cachedMessage = {null};
139 final var nowMillis = System.currentTimeMillis();
140 if (nowMillis - account.getLastReceiveTimestamp() > 60000) {
141 account.setLastReceiveTimestamp(nowMillis);
142 }
143 logger.debug("Checking for new message from server");
144 try {
145 isWaitingForMessage = true;
146 var queueNotEmpty = signalWebSocket.readMessageBatch(timeout.toMillis(), 1, batch -> {
147 logger.debug("Retrieved {} envelopes!", batch.size());
148 isWaitingForMessage = false;
149 for (final var it : batch) {
150 SignalServiceEnvelope envelope1 = new SignalServiceEnvelope(it.getEnvelope(),
151 it.getServerDeliveredTimestamp());
152 final var recipientId = envelope1.getSourceServiceId()
153 .map(ServiceId::parseOrNull)
154 .map(s -> account.getRecipientResolver().resolveRecipient(s))
155 .orElse(null);
156 logger.trace("Storing new message from {}", recipientId);
157 // store message on disk, before acknowledging receipt to the server
158 cachedMessage[0] = account.getMessageCache().cacheMessage(envelope1, recipientId);
159 try {
160 signalWebSocket.sendAck(it);
161 } catch (IOException e) {
162 logger.warn("Failed to ack envelope to server after storing it: {}", e.getMessage());
163 }
164 }
165 });
166 isWaitingForMessage = false;
167 backOffCounter = 0;
168
169 if (queueNotEmpty) {
170 if (remainingMessages > 0) {
171 remainingMessages -= 1;
172 }
173 envelope = cachedMessage[0].loadEnvelope();
174 logger.debug("New message received from server");
175 } else {
176 logger.debug("Received indicator that server queue is empty");
177 handleQueuedActions(queuedActions.keySet());
178 queuedActions.clear();
179
180 context.getJobExecutor().enqueueJob(new CleanOldPreKeysJob());
181 hasCaughtUpWithOldMessages = true;
182 caughtUpWithOldMessagesListener.call();
183
184 // Continue to wait another timeout for new messages
185 continue;
186 }
187 } catch (AssertionError e) {
188 if (e.getCause() instanceof InterruptedException) {
189 break;
190 } else {
191 throw e;
192 }
193 } catch (IOException e) {
194 logger.debug("Pipe unexpectedly unavailable: {}", e.getMessage());
195 if (e instanceof WebSocketUnavailableException || "Connection closed!".equals(e.getMessage())) {
196 final var sleepMilliseconds = 100 * (long) Math.pow(2, backOffCounter);
197 backOffCounter = Math.min(backOffCounter + 1, MAX_BACKOFF_COUNTER);
198 logger.warn("Connection closed unexpectedly, reconnecting in {} ms", sleepMilliseconds);
199 try {
200 Thread.sleep(sleepMilliseconds);
201 } catch (InterruptedException interruptedException) {
202 return;
203 }
204 hasCaughtUpWithOldMessages = false;
205 signalWebSocket.connect();
206 continue;
207 }
208 throw e;
209 } catch (TimeoutException e) {
210 backOffCounter = 0;
211 if (returnOnTimeout) return;
212 continue;
213 } catch (Exception e) {
214 logger.error("Unknown error when receiving messages", e);
215 continue;
216 }
217
218 try {
219 final var result = context.getIncomingMessageHandler().handleEnvelope(envelope, receiveConfig, handler);
220 for (final var h : result.first()) {
221 final var existingAction = queuedActions.get(h);
222 if (existingAction == null) {
223 queuedActions.put(h, h);
224 } else {
225 existingAction.mergeOther(h);
226 }
227 }
228 final var exception = result.second();
229
230 if (hasCaughtUpWithOldMessages) {
231 handleQueuedActions(queuedActions.keySet());
232 queuedActions.clear();
233 }
234 if (cachedMessage[0] != null) {
235 if (exception instanceof UntrustedIdentityException) {
236 logger.debug("Keeping message with untrusted identity in message cache");
237 final var address = ((UntrustedIdentityException) exception).getSender();
238 if (envelope.getSourceServiceId().isEmpty() && address.uuid().isPresent()) {
239 final var recipientId = account.getRecipientResolver()
240 .resolveRecipient(ACI.from(address.uuid().get()));
241 try {
242 cachedMessage[0] = account.getMessageCache()
243 .replaceSender(cachedMessage[0], recipientId);
244 } catch (IOException ioException) {
245 logger.warn("Failed to move cached message to recipient folder: {}",
246 ioException.getMessage(),
247 ioException);
248 }
249 }
250 } else {
251 cachedMessage[0].delete();
252 }
253 }
254 } catch (Exception e) {
255 logger.error("Unknown error when handling messages", e);
256 }
257 }
258 }
259
260 private void retryFailedReceivedMessages(Manager.ReceiveMessageHandler handler) {
261 Set<HandleAction> queuedActions = new HashSet<>();
262 for (var cachedMessage : account.getMessageCache().getCachedMessages()) {
263 var actions = retryFailedReceivedMessage(handler, cachedMessage);
264 if (actions != null) {
265 queuedActions.addAll(actions);
266 }
267 }
268 handleQueuedActions(queuedActions);
269 }
270
271 private List<HandleAction> retryFailedReceivedMessage(
272 final Manager.ReceiveMessageHandler handler, final CachedMessage cachedMessage
273 ) {
274 var envelope = cachedMessage.loadEnvelope();
275 if (envelope == null) {
276 cachedMessage.delete();
277 return null;
278 }
279
280 final var result = context.getIncomingMessageHandler().handleRetryEnvelope(envelope, receiveConfig, handler);
281 final var actions = result.first();
282 final var exception = result.second();
283
284 if (exception instanceof UntrustedIdentityException) {
285 if (System.currentTimeMillis() - envelope.getServerDeliveredTimestamp() > 1000L * 60 * 60 * 24 * 30) {
286 // Envelope is more than a month old, cleaning up.
287 cachedMessage.delete();
288 return null;
289 }
290 if (envelope.getSourceServiceId().isEmpty()) {
291 final var identifier = ((UntrustedIdentityException) exception).getSender();
292 final var recipientId = account.getRecipientResolver()
293 .resolveRecipient(new RecipientAddress(identifier));
294 try {
295 account.getMessageCache().replaceSender(cachedMessage, recipientId);
296 } catch (IOException ioException) {
297 logger.warn("Failed to move cached message to recipient folder: {}",
298 ioException.getMessage(),
299 ioException);
300 }
301 }
302 return null;
303 }
304
305 // If successful and for all other errors that are not recoverable, delete the cached message
306 cachedMessage.delete();
307 return actions;
308 }
309
310 private void handleQueuedActions(final Collection<HandleAction> queuedActions) {
311 logger.debug("Handling message actions");
312 for (var action : queuedActions) {
313 logger.debug("Executing action {}", action.getClass().getSimpleName());
314 try {
315 action.execute(context);
316 } catch (Throwable e) {
317 logger.warn("Message action failed.", e);
318 }
319 }
320 }
321
322 private void onWebSocketStateChange(final WebSocketConnectionState s) {
323 if (s.equals(WebSocketConnectionState.AUTHENTICATION_FAILED)) {
324 account.setRegistered(false);
325 authenticationFailureListener.call();
326 }
327 }
328
329 public interface Callable {
330
331 void call();
332 }
333 }