]> nmode's Git Repositories - signal-cli/blob - lib/src/main/java/org/asamk/signal/manager/helper/ReceiveHelper.java
2c5dbe69b1ebac51a10b2438c5e36075bfc1f192
[signal-cli] / lib / src / main / java / org / asamk / signal / manager / helper / ReceiveHelper.java
1 package org.asamk.signal.manager.helper;
2
3 import org.asamk.signal.manager.Manager;
4 import org.asamk.signal.manager.SignalDependencies;
5 import org.asamk.signal.manager.actions.HandleAction;
6 import org.asamk.signal.manager.api.ReceiveConfig;
7 import org.asamk.signal.manager.api.UntrustedIdentityException;
8 import org.asamk.signal.manager.storage.SignalAccount;
9 import org.asamk.signal.manager.storage.messageCache.CachedMessage;
10 import org.asamk.signal.manager.storage.recipients.RecipientAddress;
11 import org.slf4j.Logger;
12 import org.slf4j.LoggerFactory;
13 import org.whispersystems.signalservice.api.SignalWebSocket;
14 import org.whispersystems.signalservice.api.messages.SignalServiceEnvelope;
15 import org.whispersystems.signalservice.api.websocket.WebSocketConnectionState;
16 import org.whispersystems.signalservice.api.websocket.WebSocketUnavailableException;
17
18 import java.io.IOException;
19 import java.time.Duration;
20 import java.util.Collection;
21 import java.util.HashMap;
22 import java.util.HashSet;
23 import java.util.List;
24 import java.util.Map;
25 import java.util.Set;
26 import java.util.concurrent.TimeoutException;
27
28 import io.reactivex.rxjava3.core.Observable;
29 import io.reactivex.rxjava3.schedulers.Schedulers;
30
31 public class ReceiveHelper {
32
33 private final static Logger logger = LoggerFactory.getLogger(ReceiveHelper.class);
34 private final static int MAX_BACKOFF_COUNTER = 9;
35
36 private final SignalAccount account;
37 private final SignalDependencies dependencies;
38 private final Context context;
39
40 private ReceiveConfig receiveConfig = new ReceiveConfig(false, false, false);
41 private boolean needsToRetryFailedMessages = false;
42 private boolean hasCaughtUpWithOldMessages = false;
43 private boolean isWaitingForMessage = false;
44 private boolean shouldStop = false;
45 private Callable authenticationFailureListener;
46 private Callable caughtUpWithOldMessagesListener;
47
48 public ReceiveHelper(final Context context) {
49 this.account = context.getAccount();
50 this.dependencies = context.getDependencies();
51 this.context = context;
52 }
53
54 public void setReceiveConfig(final ReceiveConfig receiveConfig) {
55 this.receiveConfig = receiveConfig;
56 dependencies.setAllowStories(!receiveConfig.ignoreStories());
57 }
58
59 public void setNeedsToRetryFailedMessages(final boolean needsToRetryFailedMessages) {
60 this.needsToRetryFailedMessages = needsToRetryFailedMessages;
61 }
62
63 public boolean hasCaughtUpWithOldMessages() {
64 return hasCaughtUpWithOldMessages;
65 }
66
67 public void setAuthenticationFailureListener(final Callable authenticationFailureListener) {
68 this.authenticationFailureListener = authenticationFailureListener;
69 }
70
71 public void setCaughtUpWithOldMessagesListener(final Callable caughtUpWithOldMessagesListener) {
72 this.caughtUpWithOldMessagesListener = caughtUpWithOldMessagesListener;
73 }
74
75 public boolean requestStopReceiveMessages() {
76 this.shouldStop = true;
77 return isWaitingForMessage;
78 }
79
80 public void receiveMessagesContinuously(Manager.ReceiveMessageHandler handler) {
81 while (!shouldStop) {
82 try {
83 receiveMessages(Duration.ofMinutes(1), false, null, handler);
84 break;
85 } catch (IOException e) {
86 logger.warn("Receiving messages failed, retrying", e);
87 }
88 }
89 }
90
91 public void receiveMessages(
92 Duration timeout, boolean returnOnTimeout, Integer maxMessages, Manager.ReceiveMessageHandler handler
93 ) throws IOException {
94 needsToRetryFailedMessages = true;
95 hasCaughtUpWithOldMessages = false;
96
97 // Use a Map here because java Set doesn't have a get method ...
98 Map<HandleAction, HandleAction> queuedActions = new HashMap<>();
99
100 final var signalWebSocket = dependencies.getSignalWebSocket();
101 final var webSocketStateDisposable = Observable.merge(signalWebSocket.getUnidentifiedWebSocketState(),
102 signalWebSocket.getWebSocketState())
103 .subscribeOn(Schedulers.computation())
104 .observeOn(Schedulers.computation())
105 .distinctUntilChanged()
106 .subscribe(this::onWebSocketStateChange);
107 signalWebSocket.connect();
108
109 try {
110 receiveMessagesInternal(signalWebSocket, timeout, returnOnTimeout, maxMessages, handler, queuedActions);
111 } finally {
112 hasCaughtUpWithOldMessages = false;
113 handleQueuedActions(queuedActions.keySet());
114 queuedActions.clear();
115 signalWebSocket.disconnect();
116 webSocketStateDisposable.dispose();
117 shouldStop = false;
118 }
119 }
120
121 private void receiveMessagesInternal(
122 final SignalWebSocket signalWebSocket,
123 Duration timeout,
124 boolean returnOnTimeout,
125 Integer maxMessages,
126 Manager.ReceiveMessageHandler handler,
127 final Map<HandleAction, HandleAction> queuedActions
128 ) throws IOException {
129 int remainingMessages = maxMessages == null ? -1 : maxMessages;
130 var backOffCounter = 0;
131 isWaitingForMessage = false;
132
133 while (!shouldStop && remainingMessages != 0) {
134 if (needsToRetryFailedMessages) {
135 retryFailedReceivedMessages(handler);
136 needsToRetryFailedMessages = false;
137 }
138 SignalServiceEnvelope envelope;
139 final CachedMessage[] cachedMessage = {null};
140 final var nowMillis = System.currentTimeMillis();
141 if (nowMillis - account.getLastReceiveTimestamp() > 60000) {
142 account.setLastReceiveTimestamp(nowMillis);
143 }
144 logger.debug("Checking for new message from server");
145 try {
146 isWaitingForMessage = true;
147 var queueNotEmpty = signalWebSocket.readMessageBatch(timeout.toMillis(), 1, batch -> {
148 logger.debug("Retrieved {} envelopes!", batch.size());
149 isWaitingForMessage = false;
150 for (final var it : batch) {
151 SignalServiceEnvelope envelope1 = new SignalServiceEnvelope(it.getEnvelope(),
152 it.getServerDeliveredTimestamp());
153 final var recipientId = envelope1.hasSourceUuid() ? account.getRecipientResolver()
154 .resolveRecipient(envelope1.getSourceAddress()) : null;
155 logger.trace("Storing new message from {}", recipientId);
156 // store message on disk, before acknowledging receipt to the server
157 cachedMessage[0] = account.getMessageCache().cacheMessage(envelope1, recipientId);
158 }
159 return true;
160 });
161 isWaitingForMessage = false;
162 backOffCounter = 0;
163
164 if (queueNotEmpty) {
165 if (remainingMessages > 0) {
166 remainingMessages -= 1;
167 }
168 envelope = cachedMessage[0].loadEnvelope();
169 logger.debug("New message received from server");
170 } else {
171 logger.debug("Received indicator that server queue is empty");
172 handleQueuedActions(queuedActions.keySet());
173 queuedActions.clear();
174
175 hasCaughtUpWithOldMessages = true;
176 caughtUpWithOldMessagesListener.call();
177
178 // Continue to wait another timeout for new messages
179 continue;
180 }
181 } catch (AssertionError e) {
182 if (e.getCause() instanceof InterruptedException) {
183 break;
184 } else {
185 throw e;
186 }
187 } catch (IOException e) {
188 logger.debug("Pipe unexpectedly unavailable: {}", e.getMessage());
189 if (e instanceof WebSocketUnavailableException || "Connection closed!".equals(e.getMessage())) {
190 final var sleepMilliseconds = 100 * (long) Math.pow(2, backOffCounter);
191 backOffCounter = Math.min(backOffCounter + 1, MAX_BACKOFF_COUNTER);
192 logger.warn("Connection closed unexpectedly, reconnecting in {} ms", sleepMilliseconds);
193 try {
194 Thread.sleep(sleepMilliseconds);
195 } catch (InterruptedException interruptedException) {
196 return;
197 }
198 hasCaughtUpWithOldMessages = false;
199 signalWebSocket.connect();
200 continue;
201 }
202 throw e;
203 } catch (TimeoutException e) {
204 backOffCounter = 0;
205 if (returnOnTimeout) return;
206 continue;
207 }
208
209 final var result = context.getIncomingMessageHandler().handleEnvelope(envelope, receiveConfig, handler);
210 for (final var h : result.first()) {
211 final var existingAction = queuedActions.get(h);
212 if (existingAction == null) {
213 queuedActions.put(h, h);
214 } else {
215 existingAction.mergeOther(h);
216 }
217 }
218 final var exception = result.second();
219
220 if (hasCaughtUpWithOldMessages) {
221 handleQueuedActions(queuedActions.keySet());
222 queuedActions.clear();
223 }
224 if (cachedMessage[0] != null) {
225 if (exception instanceof UntrustedIdentityException) {
226 logger.debug("Keeping message with untrusted identity in message cache");
227 final var address = ((UntrustedIdentityException) exception).getSender();
228 final var recipientId = account.getRecipientResolver().resolveRecipient(address.getServiceId());
229 if (!envelope.hasSourceUuid()) {
230 try {
231 cachedMessage[0] = account.getMessageCache().replaceSender(cachedMessage[0], recipientId);
232 } catch (IOException ioException) {
233 logger.warn("Failed to move cached message to recipient folder: {}",
234 ioException.getMessage());
235 }
236 }
237 } else {
238 cachedMessage[0].delete();
239 }
240 }
241 }
242 }
243
244 private void retryFailedReceivedMessages(Manager.ReceiveMessageHandler handler) {
245 Set<HandleAction> queuedActions = new HashSet<>();
246 for (var cachedMessage : account.getMessageCache().getCachedMessages()) {
247 var actions = retryFailedReceivedMessage(handler, cachedMessage);
248 if (actions != null) {
249 queuedActions.addAll(actions);
250 }
251 }
252 handleQueuedActions(queuedActions);
253 }
254
255 private List<HandleAction> retryFailedReceivedMessage(
256 final Manager.ReceiveMessageHandler handler, final CachedMessage cachedMessage
257 ) {
258 var envelope = cachedMessage.loadEnvelope();
259 if (envelope == null) {
260 cachedMessage.delete();
261 return null;
262 }
263
264 final var result = context.getIncomingMessageHandler().handleRetryEnvelope(envelope, receiveConfig, handler);
265 final var actions = result.first();
266 final var exception = result.second();
267
268 if (exception instanceof UntrustedIdentityException) {
269 if (System.currentTimeMillis() - envelope.getServerDeliveredTimestamp() > 1000L * 60 * 60 * 24 * 30) {
270 // Envelope is more than a month old, cleaning up.
271 cachedMessage.delete();
272 return null;
273 }
274 if (!envelope.hasSourceUuid()) {
275 final var identifier = ((UntrustedIdentityException) exception).getSender();
276 final var recipientId = account.getRecipientResolver()
277 .resolveRecipient(new RecipientAddress(identifier));
278 try {
279 account.getMessageCache().replaceSender(cachedMessage, recipientId);
280 } catch (IOException ioException) {
281 logger.warn("Failed to move cached message to recipient folder: {}", ioException.getMessage());
282 }
283 }
284 return null;
285 }
286
287 // If successful and for all other errors that are not recoverable, delete the cached message
288 cachedMessage.delete();
289 return actions;
290 }
291
292 private void handleQueuedActions(final Collection<HandleAction> queuedActions) {
293 logger.debug("Handling message actions");
294 for (var action : queuedActions) {
295 logger.debug("Executing action {}", action.getClass().getSimpleName());
296 try {
297 action.execute(context);
298 } catch (Throwable e) {
299 logger.warn("Message action failed.", e);
300 }
301 }
302 }
303
304 private void onWebSocketStateChange(final WebSocketConnectionState s) {
305 if (s.equals(WebSocketConnectionState.AUTHENTICATION_FAILED)) {
306 account.setRegistered(false);
307 authenticationFailureListener.call();
308 }
309 }
310
311 public interface Callable {
312
313 void call();
314 }
315 }