]> nmode's Git Repositories - signal-cli/blob - lib/src/main/java/org/asamk/signal/manager/helper/ReceiveHelper.java
Update libsignal-service
[signal-cli] / lib / src / main / java / org / asamk / signal / manager / helper / ReceiveHelper.java
1 package org.asamk.signal.manager.helper;
2
3 import org.asamk.signal.manager.Manager;
4 import org.asamk.signal.manager.actions.HandleAction;
5 import org.asamk.signal.manager.api.ReceiveConfig;
6 import org.asamk.signal.manager.api.UntrustedIdentityException;
7 import org.asamk.signal.manager.internal.SignalDependencies;
8 import org.asamk.signal.manager.jobs.CleanOldPreKeysJob;
9 import org.asamk.signal.manager.storage.SignalAccount;
10 import org.asamk.signal.manager.storage.messageCache.CachedMessage;
11 import org.asamk.signal.manager.storage.recipients.RecipientAddress;
12 import org.slf4j.Logger;
13 import org.slf4j.LoggerFactory;
14 import org.whispersystems.signalservice.api.SignalWebSocket;
15 import org.whispersystems.signalservice.api.messages.SignalServiceEnvelope;
16 import org.whispersystems.signalservice.api.push.ServiceId;
17 import org.whispersystems.signalservice.api.push.ServiceId.ACI;
18 import org.whispersystems.signalservice.api.websocket.WebSocketConnectionState;
19 import org.whispersystems.signalservice.api.websocket.WebSocketUnavailableException;
20
21 import java.io.IOException;
22 import java.time.Duration;
23 import java.util.Collection;
24 import java.util.HashMap;
25 import java.util.HashSet;
26 import java.util.List;
27 import java.util.Map;
28 import java.util.Set;
29 import java.util.concurrent.TimeoutException;
30
31 import io.reactivex.rxjava3.core.Observable;
32 import io.reactivex.rxjava3.schedulers.Schedulers;
33
34 public class ReceiveHelper {
35
36 private static final Logger logger = LoggerFactory.getLogger(ReceiveHelper.class);
37 private static final int MAX_BACKOFF_COUNTER = 9;
38
39 private final SignalAccount account;
40 private final SignalDependencies dependencies;
41 private final Context context;
42
43 private ReceiveConfig receiveConfig = new ReceiveConfig(false, false, false);
44 private boolean hasCaughtUpWithOldMessages = false;
45 private boolean isWaitingForMessage = false;
46 private boolean shouldStop = false;
47 private Callable authenticationFailureListener;
48 private Callable caughtUpWithOldMessagesListener;
49
50 public ReceiveHelper(final Context context) {
51 this.account = context.getAccount();
52 this.dependencies = context.getDependencies();
53 this.context = context;
54 }
55
56 public void setReceiveConfig(final ReceiveConfig receiveConfig) {
57 this.receiveConfig = receiveConfig;
58 dependencies.setAllowStories(!receiveConfig.ignoreStories());
59 }
60
61 public void setAuthenticationFailureListener(final Callable authenticationFailureListener) {
62 this.authenticationFailureListener = authenticationFailureListener;
63 }
64
65 public void setCaughtUpWithOldMessagesListener(final Callable caughtUpWithOldMessagesListener) {
66 this.caughtUpWithOldMessagesListener = caughtUpWithOldMessagesListener;
67 }
68
69 public boolean requestStopReceiveMessages() {
70 this.shouldStop = true;
71 return isWaitingForMessage;
72 }
73
74 public void receiveMessagesContinuously(Manager.ReceiveMessageHandler handler) {
75 while (!shouldStop) {
76 try {
77 receiveMessages(Duration.ofMinutes(1), false, null, handler);
78 break;
79 } catch (IOException e) {
80 logger.warn("Receiving messages failed, retrying", e);
81 }
82 }
83 }
84
85 public void receiveMessages(
86 Duration timeout, boolean returnOnTimeout, Integer maxMessages, Manager.ReceiveMessageHandler handler
87 ) throws IOException {
88 account.setNeedsToRetryFailedMessages(true);
89 hasCaughtUpWithOldMessages = false;
90
91 // Use a Map here because java Set doesn't have a get method ...
92 Map<HandleAction, HandleAction> queuedActions = new HashMap<>();
93
94 final var signalWebSocket = dependencies.getSignalWebSocket();
95 final var webSocketStateDisposable = Observable.merge(signalWebSocket.getUnidentifiedWebSocketState(),
96 signalWebSocket.getWebSocketState())
97 .subscribeOn(Schedulers.computation())
98 .observeOn(Schedulers.computation())
99 .distinctUntilChanged()
100 .subscribe(this::onWebSocketStateChange);
101 signalWebSocket.connect();
102
103 try {
104 receiveMessagesInternal(signalWebSocket, timeout, returnOnTimeout, maxMessages, handler, queuedActions);
105 } finally {
106 hasCaughtUpWithOldMessages = false;
107 handleQueuedActions(queuedActions.keySet());
108 queuedActions.clear();
109 signalWebSocket.disconnect();
110 webSocketStateDisposable.dispose();
111 shouldStop = false;
112 }
113 }
114
115 private void receiveMessagesInternal(
116 final SignalWebSocket signalWebSocket,
117 Duration timeout,
118 boolean returnOnTimeout,
119 Integer maxMessages,
120 Manager.ReceiveMessageHandler handler,
121 final Map<HandleAction, HandleAction> queuedActions
122 ) throws IOException {
123 int remainingMessages = maxMessages == null ? -1 : maxMessages;
124 var backOffCounter = 0;
125 isWaitingForMessage = false;
126
127 while (!shouldStop && remainingMessages != 0) {
128 if (account.getNeedsToRetryFailedMessages()) {
129 retryFailedReceivedMessages(handler);
130 }
131 SignalServiceEnvelope envelope;
132 final CachedMessage[] cachedMessage = {null};
133 final var nowMillis = System.currentTimeMillis();
134 if (nowMillis - account.getLastReceiveTimestamp() > 60000) {
135 account.setLastReceiveTimestamp(nowMillis);
136 }
137 logger.debug("Checking for new message from server");
138 try {
139 isWaitingForMessage = true;
140 var queueNotEmpty = signalWebSocket.readMessageBatch(timeout.toMillis(), 1, batch -> {
141 logger.debug("Retrieved {} envelopes!", batch.size());
142 isWaitingForMessage = false;
143 for (final var it : batch) {
144 SignalServiceEnvelope envelope1 = new SignalServiceEnvelope(it.getEnvelope(),
145 it.getServerDeliveredTimestamp());
146 final var recipientId = envelope1.getSourceServiceId()
147 .map(ServiceId::parseOrNull)
148 .map(s -> account.getRecipientResolver().resolveRecipient(s))
149 .orElse(null);
150 logger.trace("Storing new message from {}", recipientId);
151 // store message on disk, before acknowledging receipt to the server
152 cachedMessage[0] = account.getMessageCache().cacheMessage(envelope1, recipientId);
153 try {
154 signalWebSocket.sendAck(it);
155 } catch (IOException e) {
156 logger.warn("Failed to ack envelope to server after storing it: {}", e.getMessage());
157 }
158 }
159 });
160 isWaitingForMessage = false;
161 backOffCounter = 0;
162
163 if (queueNotEmpty) {
164 if (remainingMessages > 0) {
165 remainingMessages -= 1;
166 }
167 envelope = cachedMessage[0].loadEnvelope();
168 logger.debug("New message received from server");
169 } else {
170 logger.debug("Received indicator that server queue is empty");
171 handleQueuedActions(queuedActions.keySet());
172 queuedActions.clear();
173
174 context.getJobExecutor().enqueueJob(new CleanOldPreKeysJob());
175 hasCaughtUpWithOldMessages = true;
176 caughtUpWithOldMessagesListener.call();
177
178 // Continue to wait another timeout for new messages
179 continue;
180 }
181 } catch (AssertionError e) {
182 if (e.getCause() instanceof InterruptedException) {
183 break;
184 } else {
185 throw e;
186 }
187 } catch (IOException e) {
188 logger.debug("Pipe unexpectedly unavailable: {}", e.getMessage());
189 if (e instanceof WebSocketUnavailableException || "Connection closed!".equals(e.getMessage())) {
190 final var sleepMilliseconds = 100 * (long) Math.pow(2, backOffCounter);
191 backOffCounter = Math.min(backOffCounter + 1, MAX_BACKOFF_COUNTER);
192 logger.warn("Connection closed unexpectedly, reconnecting in {} ms", sleepMilliseconds);
193 try {
194 Thread.sleep(sleepMilliseconds);
195 } catch (InterruptedException interruptedException) {
196 return;
197 }
198 hasCaughtUpWithOldMessages = false;
199 signalWebSocket.connect();
200 continue;
201 }
202 throw e;
203 } catch (TimeoutException e) {
204 backOffCounter = 0;
205 if (returnOnTimeout) return;
206 continue;
207 } catch (Exception e) {
208 logger.error("Unknown error when receiving messages", e);
209 continue;
210 }
211
212 try {
213 final var result = context.getIncomingMessageHandler().handleEnvelope(envelope, receiveConfig, handler);
214 for (final var h : result.first()) {
215 final var existingAction = queuedActions.get(h);
216 if (existingAction == null) {
217 queuedActions.put(h, h);
218 } else {
219 existingAction.mergeOther(h);
220 }
221 }
222 final var exception = result.second();
223
224 if (hasCaughtUpWithOldMessages) {
225 handleQueuedActions(queuedActions.keySet());
226 queuedActions.clear();
227 }
228 if (cachedMessage[0] != null) {
229 if (exception instanceof UntrustedIdentityException) {
230 logger.debug("Keeping message with untrusted identity in message cache");
231 final var address = ((UntrustedIdentityException) exception).getSender();
232 if (envelope.getSourceServiceId().isEmpty() && address.uuid().isPresent()) {
233 final var recipientId = account.getRecipientResolver()
234 .resolveRecipient(ACI.from(address.uuid().get()));
235 try {
236 cachedMessage[0] = account.getMessageCache()
237 .replaceSender(cachedMessage[0], recipientId);
238 } catch (IOException ioException) {
239 logger.warn("Failed to move cached message to recipient folder: {}",
240 ioException.getMessage(),
241 ioException);
242 }
243 }
244 } else {
245 cachedMessage[0].delete();
246 }
247 }
248 } catch (Exception e) {
249 logger.error("Unknown error when handling messages", e);
250 }
251 }
252 }
253
254 private void retryFailedReceivedMessages(Manager.ReceiveMessageHandler handler) {
255 Set<HandleAction> queuedActions = new HashSet<>();
256 for (var cachedMessage : account.getMessageCache().getCachedMessages()) {
257 var actions = retryFailedReceivedMessage(handler, cachedMessage);
258 if (actions != null) {
259 queuedActions.addAll(actions);
260 }
261 }
262 handleQueuedActions(queuedActions);
263 account.setNeedsToRetryFailedMessages(false);
264 }
265
266 private List<HandleAction> retryFailedReceivedMessage(
267 final Manager.ReceiveMessageHandler handler, final CachedMessage cachedMessage
268 ) {
269 var envelope = cachedMessage.loadEnvelope();
270 if (envelope == null) {
271 cachedMessage.delete();
272 return null;
273 }
274
275 final var result = context.getIncomingMessageHandler().handleRetryEnvelope(envelope, receiveConfig, handler);
276 final var actions = result.first();
277 final var exception = result.second();
278
279 if (exception instanceof UntrustedIdentityException) {
280 if (System.currentTimeMillis() - envelope.getServerDeliveredTimestamp() > 1000L * 60 * 60 * 24 * 14) {
281 // Envelope is more than two weeks old, cleaning up.
282 cachedMessage.delete();
283 return null;
284 }
285 if (envelope.getSourceServiceId().isEmpty()) {
286 final var identifier = ((UntrustedIdentityException) exception).getSender();
287 final var recipientId = account.getRecipientResolver()
288 .resolveRecipient(new RecipientAddress(identifier));
289 try {
290 account.getMessageCache().replaceSender(cachedMessage, recipientId);
291 } catch (IOException ioException) {
292 logger.warn("Failed to move cached message to recipient folder: {}",
293 ioException.getMessage(),
294 ioException);
295 }
296 }
297 return null;
298 }
299
300 // If successful and for all other errors that are not recoverable, delete the cached message
301 cachedMessage.delete();
302 return actions;
303 }
304
305 private void handleQueuedActions(final Collection<HandleAction> queuedActions) {
306 logger.debug("Handling message actions");
307 for (var action : queuedActions) {
308 logger.debug("Executing action {}", action.getClass().getSimpleName());
309 try {
310 action.execute(context);
311 } catch (Throwable e) {
312 logger.warn("Message action failed.", e);
313 }
314 }
315 }
316
317 private void onWebSocketStateChange(final WebSocketConnectionState s) {
318 if (s.equals(WebSocketConnectionState.AUTHENTICATION_FAILED)) {
319 account.setRegistered(false);
320 authenticationFailureListener.call();
321 }
322 }
323
324 public interface Callable {
325
326 void call();
327 }
328 }