]> nmode's Git Repositories - signal-cli/blob - lib/src/main/java/org/asamk/signal/manager/helper/ReceiveHelper.java
7083caaeb73b4dbdbc8548eacce41376309a4866
[signal-cli] / lib / src / main / java / org / asamk / signal / manager / helper / ReceiveHelper.java
1 package org.asamk.signal.manager.helper;
2
3 import org.asamk.signal.manager.Manager;
4 import org.asamk.signal.manager.SignalDependencies;
5 import org.asamk.signal.manager.actions.HandleAction;
6 import org.asamk.signal.manager.api.UntrustedIdentityException;
7 import org.asamk.signal.manager.storage.SignalAccount;
8 import org.asamk.signal.manager.storage.messageCache.CachedMessage;
9 import org.slf4j.Logger;
10 import org.slf4j.LoggerFactory;
11 import org.whispersystems.signalservice.api.messages.SignalServiceEnvelope;
12 import org.whispersystems.signalservice.api.websocket.WebSocketConnectionState;
13 import org.whispersystems.signalservice.api.websocket.WebSocketUnavailableException;
14
15 import java.io.IOException;
16 import java.time.Duration;
17 import java.util.Collection;
18 import java.util.HashMap;
19 import java.util.HashSet;
20 import java.util.List;
21 import java.util.Map;
22 import java.util.Set;
23 import java.util.concurrent.TimeoutException;
24
25 import io.reactivex.rxjava3.core.Observable;
26 import io.reactivex.rxjava3.schedulers.Schedulers;
27
28 public class ReceiveHelper {
29
30 private final static Logger logger = LoggerFactory.getLogger(ReceiveHelper.class);
31 private final static int MAX_BACKOFF_COUNTER = 9;
32
33 private final SignalAccount account;
34 private final SignalDependencies dependencies;
35 private final Context context;
36
37 private boolean ignoreAttachments = false;
38 private boolean needsToRetryFailedMessages = false;
39 private boolean hasCaughtUpWithOldMessages = false;
40 private boolean isWaitingForMessage = false;
41 private boolean shouldStop = false;
42 private Callable authenticationFailureListener;
43 private Callable caughtUpWithOldMessagesListener;
44
45 public ReceiveHelper(final Context context) {
46 this.account = context.getAccount();
47 this.dependencies = context.getDependencies();
48 this.context = context;
49 }
50
51 public void setIgnoreAttachments(final boolean ignoreAttachments) {
52 this.ignoreAttachments = ignoreAttachments;
53 }
54
55 public void setNeedsToRetryFailedMessages(final boolean needsToRetryFailedMessages) {
56 this.needsToRetryFailedMessages = needsToRetryFailedMessages;
57 }
58
59 public boolean hasCaughtUpWithOldMessages() {
60 return hasCaughtUpWithOldMessages;
61 }
62
63 public void setAuthenticationFailureListener(final Callable authenticationFailureListener) {
64 this.authenticationFailureListener = authenticationFailureListener;
65 }
66
67 public void setCaughtUpWithOldMessagesListener(final Callable caughtUpWithOldMessagesListener) {
68 this.caughtUpWithOldMessagesListener = caughtUpWithOldMessagesListener;
69 }
70
71 public boolean requestStopReceiveMessages() {
72 this.shouldStop = true;
73 return isWaitingForMessage;
74 }
75
76 public void receiveMessagesContinuously(Manager.ReceiveMessageHandler handler) {
77 while (!shouldStop) {
78 try {
79 receiveMessages(Duration.ofMinutes(1), false, handler);
80 break;
81 } catch (IOException e) {
82 logger.warn("Receiving messages failed, retrying", e);
83 }
84 }
85 }
86
87 public void receiveMessages(
88 Duration timeout, boolean returnOnTimeout, Manager.ReceiveMessageHandler handler
89 ) throws IOException {
90 needsToRetryFailedMessages = true;
91 hasCaughtUpWithOldMessages = false;
92
93 // Use a Map here because java Set doesn't have a get method ...
94 Map<HandleAction, HandleAction> queuedActions = new HashMap<>();
95
96 final var signalWebSocket = dependencies.getSignalWebSocket();
97 final var webSocketStateDisposable = Observable.merge(signalWebSocket.getUnidentifiedWebSocketState(),
98 signalWebSocket.getWebSocketState())
99 .subscribeOn(Schedulers.computation())
100 .observeOn(Schedulers.computation())
101 .distinctUntilChanged()
102 .subscribe(this::onWebSocketStateChange);
103 signalWebSocket.connect();
104
105 try {
106 receiveMessagesInternal(timeout, returnOnTimeout, handler, queuedActions);
107 } finally {
108 hasCaughtUpWithOldMessages = false;
109 handleQueuedActions(queuedActions.keySet());
110 queuedActions.clear();
111 dependencies.getSignalWebSocket().disconnect();
112 webSocketStateDisposable.dispose();
113 shouldStop = false;
114 }
115 }
116
117 private void receiveMessagesInternal(
118 Duration timeout,
119 boolean returnOnTimeout,
120 Manager.ReceiveMessageHandler handler,
121 final Map<HandleAction, HandleAction> queuedActions
122 ) throws IOException {
123 final var signalWebSocket = dependencies.getSignalWebSocket();
124
125 var backOffCounter = 0;
126 isWaitingForMessage = false;
127
128 while (!shouldStop) {
129 if (needsToRetryFailedMessages) {
130 retryFailedReceivedMessages(handler);
131 needsToRetryFailedMessages = false;
132 }
133 SignalServiceEnvelope envelope;
134 final CachedMessage[] cachedMessage = {null};
135 final var nowMillis = System.currentTimeMillis();
136 if (nowMillis - account.getLastReceiveTimestamp() > 60000) {
137 account.setLastReceiveTimestamp(nowMillis);
138 }
139 logger.debug("Checking for new message from server");
140 try {
141 isWaitingForMessage = true;
142 var result = signalWebSocket.readOrEmpty(timeout.toMillis(), envelope1 -> {
143 isWaitingForMessage = false;
144 final var recipientId = envelope1.hasSourceUuid() ? account.getRecipientResolver()
145 .resolveRecipient(envelope1.getSourceAddress()) : null;
146 logger.trace("Storing new message from {}", recipientId);
147 // store message on disk, before acknowledging receipt to the server
148 cachedMessage[0] = account.getMessageCache().cacheMessage(envelope1, recipientId);
149 });
150 isWaitingForMessage = false;
151 backOffCounter = 0;
152
153 if (result.isPresent()) {
154 envelope = result.get();
155 logger.debug("New message received from server");
156 } else {
157 logger.debug("Received indicator that server queue is empty");
158 handleQueuedActions(queuedActions.keySet());
159 queuedActions.clear();
160
161 hasCaughtUpWithOldMessages = true;
162 caughtUpWithOldMessagesListener.call();
163
164 // Continue to wait another timeout for new messages
165 continue;
166 }
167 } catch (AssertionError e) {
168 if (e.getCause() instanceof InterruptedException) {
169 break;
170 } else {
171 throw e;
172 }
173 } catch (IOException e) {
174 logger.debug("Pipe unexpectedly unavailable: {}", e.getMessage());
175 if (e instanceof WebSocketUnavailableException || "Connection closed!".equals(e.getMessage())) {
176 final var sleepMilliseconds = 100 * (long) Math.pow(2, backOffCounter);
177 backOffCounter = Math.min(backOffCounter + 1, MAX_BACKOFF_COUNTER);
178 logger.warn("Connection closed unexpectedly, reconnecting in {} ms", sleepMilliseconds);
179 try {
180 Thread.sleep(sleepMilliseconds);
181 } catch (InterruptedException interruptedException) {
182 return;
183 }
184 hasCaughtUpWithOldMessages = false;
185 signalWebSocket.connect();
186 continue;
187 }
188 throw e;
189 } catch (TimeoutException e) {
190 backOffCounter = 0;
191 if (returnOnTimeout) return;
192 continue;
193 }
194
195 final var result = context.getIncomingMessageHandler().handleEnvelope(envelope, ignoreAttachments, handler);
196 for (final var h : result.first()) {
197 final var existingAction = queuedActions.get(h);
198 if (existingAction == null) {
199 queuedActions.put(h, h);
200 } else {
201 existingAction.mergeOther(h);
202 }
203 }
204 final var exception = result.second();
205
206 if (hasCaughtUpWithOldMessages) {
207 handleQueuedActions(queuedActions.keySet());
208 queuedActions.clear();
209 }
210 if (cachedMessage[0] != null) {
211 if (exception instanceof UntrustedIdentityException) {
212 logger.debug("Keeping message with untrusted identity in message cache");
213 final var address = ((UntrustedIdentityException) exception).getSender();
214 final var recipientId = account.getRecipientResolver().resolveRecipient(address);
215 if (!envelope.hasSourceUuid()) {
216 try {
217 cachedMessage[0] = account.getMessageCache().replaceSender(cachedMessage[0], recipientId);
218 } catch (IOException ioException) {
219 logger.warn("Failed to move cached message to recipient folder: {}",
220 ioException.getMessage());
221 }
222 }
223 } else {
224 cachedMessage[0].delete();
225 }
226 }
227 }
228 }
229
230 private void retryFailedReceivedMessages(Manager.ReceiveMessageHandler handler) {
231 Set<HandleAction> queuedActions = new HashSet<>();
232 for (var cachedMessage : account.getMessageCache().getCachedMessages()) {
233 var actions = retryFailedReceivedMessage(handler, cachedMessage);
234 if (actions != null) {
235 queuedActions.addAll(actions);
236 }
237 }
238 handleQueuedActions(queuedActions);
239 }
240
241 private List<HandleAction> retryFailedReceivedMessage(
242 final Manager.ReceiveMessageHandler handler, final CachedMessage cachedMessage
243 ) {
244 var envelope = cachedMessage.loadEnvelope();
245 if (envelope == null) {
246 cachedMessage.delete();
247 return null;
248 }
249
250 final var result = context.getIncomingMessageHandler()
251 .handleRetryEnvelope(envelope, ignoreAttachments, handler);
252 final var actions = result.first();
253 final var exception = result.second();
254
255 if (exception instanceof UntrustedIdentityException) {
256 if (System.currentTimeMillis() - envelope.getServerDeliveredTimestamp() > 1000L * 60 * 60 * 24 * 30) {
257 // Envelope is more than a month old, cleaning up.
258 cachedMessage.delete();
259 return null;
260 }
261 if (!envelope.hasSourceUuid()) {
262 final var identifier = ((UntrustedIdentityException) exception).getSender();
263 final var recipientId = account.getRecipientResolver().resolveRecipient(identifier);
264 try {
265 account.getMessageCache().replaceSender(cachedMessage, recipientId);
266 } catch (IOException ioException) {
267 logger.warn("Failed to move cached message to recipient folder: {}", ioException.getMessage());
268 }
269 }
270 return null;
271 }
272
273 // If successful and for all other errors that are not recoverable, delete the cached message
274 cachedMessage.delete();
275 return actions;
276 }
277
278 private void handleQueuedActions(final Collection<HandleAction> queuedActions) {
279 logger.debug("Handling message actions");
280 for (var action : queuedActions) {
281 logger.debug("Executing action {}", action.getClass().getSimpleName());
282 try {
283 action.execute(context);
284 } catch (Throwable e) {
285 logger.warn("Message action failed.", e);
286 }
287 }
288 }
289
290 private void onWebSocketStateChange(final WebSocketConnectionState s) {
291 if (s.equals(WebSocketConnectionState.AUTHENTICATION_FAILED)) {
292 account.setRegistered(false);
293 authenticationFailureListener.call();
294 }
295 }
296
297 public interface Callable {
298
299 void call();
300 }
301 }