]> nmode's Git Repositories - signal-cli/blob - lib/src/main/java/org/asamk/signal/manager/helper/ReceiveHelper.java
631e271560ddee01f78a6b20351d182a427e142c
[signal-cli] / lib / src / main / java / org / asamk / signal / manager / helper / ReceiveHelper.java
1 package org.asamk.signal.manager.helper;
2
3 import org.asamk.signal.manager.Manager;
4 import org.asamk.signal.manager.SignalDependencies;
5 import org.asamk.signal.manager.UntrustedIdentityException;
6 import org.asamk.signal.manager.actions.HandleAction;
7 import org.asamk.signal.manager.storage.SignalAccount;
8 import org.asamk.signal.manager.storage.messageCache.CachedMessage;
9 import org.slf4j.Logger;
10 import org.slf4j.LoggerFactory;
11 import org.whispersystems.signalservice.api.messages.SignalServiceEnvelope;
12 import org.whispersystems.signalservice.api.websocket.WebSocketConnectionState;
13 import org.whispersystems.signalservice.api.websocket.WebSocketUnavailableException;
14
15 import java.io.IOException;
16 import java.time.Duration;
17 import java.util.Collection;
18 import java.util.HashMap;
19 import java.util.HashSet;
20 import java.util.List;
21 import java.util.Map;
22 import java.util.Set;
23 import java.util.concurrent.TimeoutException;
24
25 import io.reactivex.rxjava3.core.Observable;
26 import io.reactivex.rxjava3.schedulers.Schedulers;
27
28 public class ReceiveHelper {
29
30 private final static Logger logger = LoggerFactory.getLogger(ReceiveHelper.class);
31 private final static int MAX_BACKOFF_COUNTER = 9;
32
33 private final SignalAccount account;
34 private final SignalDependencies dependencies;
35 private final Context context;
36
37 private boolean ignoreAttachments = false;
38 private boolean needsToRetryFailedMessages = false;
39 private boolean hasCaughtUpWithOldMessages = false;
40 private Callable authenticationFailureListener;
41 private Callable caughtUpWithOldMessagesListener;
42
43 public ReceiveHelper(final Context context) {
44 this.account = context.getAccount();
45 this.dependencies = context.getDependencies();
46 this.context = context;
47 }
48
49 public void setIgnoreAttachments(final boolean ignoreAttachments) {
50 this.ignoreAttachments = ignoreAttachments;
51 }
52
53 public void setNeedsToRetryFailedMessages(final boolean needsToRetryFailedMessages) {
54 this.needsToRetryFailedMessages = needsToRetryFailedMessages;
55 }
56
57 public boolean hasCaughtUpWithOldMessages() {
58 return hasCaughtUpWithOldMessages;
59 }
60
61 public void setAuthenticationFailureListener(final Callable authenticationFailureListener) {
62 this.authenticationFailureListener = authenticationFailureListener;
63 }
64
65 public void setCaughtUpWithOldMessagesListener(final Callable caughtUpWithOldMessagesListener) {
66 this.caughtUpWithOldMessagesListener = caughtUpWithOldMessagesListener;
67 }
68
69 public void receiveMessages(
70 Duration timeout, boolean returnOnTimeout, Manager.ReceiveMessageHandler handler
71 ) throws IOException {
72 needsToRetryFailedMessages = true;
73 hasCaughtUpWithOldMessages = false;
74
75 // Use a Map here because java Set doesn't have a get method ...
76 Map<HandleAction, HandleAction> queuedActions = new HashMap<>();
77
78 final var signalWebSocket = dependencies.getSignalWebSocket();
79 final var webSocketStateDisposable = Observable.merge(signalWebSocket.getUnidentifiedWebSocketState(),
80 signalWebSocket.getWebSocketState())
81 .subscribeOn(Schedulers.computation())
82 .observeOn(Schedulers.computation())
83 .distinctUntilChanged()
84 .subscribe(this::onWebSocketStateChange);
85 signalWebSocket.connect();
86
87 try {
88 receiveMessagesInternal(timeout, returnOnTimeout, handler, queuedActions);
89 } finally {
90 hasCaughtUpWithOldMessages = false;
91 handleQueuedActions(queuedActions.keySet());
92 queuedActions.clear();
93 dependencies.getSignalWebSocket().disconnect();
94 webSocketStateDisposable.dispose();
95 }
96 }
97
98 private void receiveMessagesInternal(
99 Duration timeout,
100 boolean returnOnTimeout,
101 Manager.ReceiveMessageHandler handler,
102 final Map<HandleAction, HandleAction> queuedActions
103 ) throws IOException {
104 final var signalWebSocket = dependencies.getSignalWebSocket();
105
106 var backOffCounter = 0;
107
108 while (!Thread.interrupted()) {
109 if (needsToRetryFailedMessages) {
110 retryFailedReceivedMessages(handler);
111 needsToRetryFailedMessages = false;
112 }
113 SignalServiceEnvelope envelope;
114 final CachedMessage[] cachedMessage = {null};
115 final var nowMillis = System.currentTimeMillis();
116 if (nowMillis - account.getLastReceiveTimestamp() > 60000) {
117 account.setLastReceiveTimestamp(nowMillis);
118 }
119 logger.debug("Checking for new message from server");
120 try {
121 var result = signalWebSocket.readOrEmpty(timeout.toMillis(), envelope1 -> {
122 final var recipientId = envelope1.hasSourceUuid() ? account.getRecipientStore()
123 .resolveRecipient(envelope1.getSourceAddress()) : null;
124 logger.trace("Storing new message from {}", recipientId);
125 // store message on disk, before acknowledging receipt to the server
126 cachedMessage[0] = account.getMessageCache().cacheMessage(envelope1, recipientId);
127 });
128 backOffCounter = 0;
129
130 if (result.isPresent()) {
131 envelope = result.get();
132 logger.debug("New message received from server");
133 } else {
134 logger.debug("Received indicator that server queue is empty");
135 handleQueuedActions(queuedActions.keySet());
136 queuedActions.clear();
137
138 hasCaughtUpWithOldMessages = true;
139 caughtUpWithOldMessagesListener.call();
140
141 // Continue to wait another timeout for new messages
142 continue;
143 }
144 } catch (AssertionError e) {
145 if (e.getCause() instanceof InterruptedException) {
146 Thread.currentThread().interrupt();
147 break;
148 } else {
149 throw e;
150 }
151 } catch (IOException e) {
152 logger.debug("Pipe unexpectedly unavailable: {}", e.getMessage());
153 if (e instanceof WebSocketUnavailableException || "Connection closed!".equals(e.getMessage())) {
154 final var sleepMilliseconds = 100 * (long) Math.pow(2, backOffCounter);
155 backOffCounter = Math.min(backOffCounter + 1, MAX_BACKOFF_COUNTER);
156 logger.warn("Connection closed unexpectedly, reconnecting in {} ms", sleepMilliseconds);
157 try {
158 Thread.sleep(sleepMilliseconds);
159 } catch (InterruptedException interruptedException) {
160 return;
161 }
162 hasCaughtUpWithOldMessages = false;
163 signalWebSocket.connect();
164 continue;
165 }
166 throw e;
167 } catch (TimeoutException e) {
168 backOffCounter = 0;
169 if (returnOnTimeout) return;
170 continue;
171 }
172
173 final var result = context.getIncomingMessageHandler().handleEnvelope(envelope, ignoreAttachments, handler);
174 for (final var h : result.first()) {
175 final var existingAction = queuedActions.get(h);
176 if (existingAction == null) {
177 queuedActions.put(h, h);
178 } else {
179 existingAction.mergeOther(h);
180 }
181 }
182 final var exception = result.second();
183
184 if (hasCaughtUpWithOldMessages) {
185 handleQueuedActions(queuedActions.keySet());
186 queuedActions.clear();
187 }
188 if (cachedMessage[0] != null) {
189 if (exception instanceof UntrustedIdentityException) {
190 logger.debug("Keeping message with untrusted identity in message cache");
191 final var address = ((UntrustedIdentityException) exception).getSender();
192 final var recipientId = account.getRecipientStore().resolveRecipient(address);
193 if (!envelope.hasSourceUuid()) {
194 try {
195 cachedMessage[0] = account.getMessageCache().replaceSender(cachedMessage[0], recipientId);
196 } catch (IOException ioException) {
197 logger.warn("Failed to move cached message to recipient folder: {}",
198 ioException.getMessage());
199 }
200 }
201 } else {
202 cachedMessage[0].delete();
203 }
204 }
205 }
206 }
207
208 private void retryFailedReceivedMessages(Manager.ReceiveMessageHandler handler) {
209 Set<HandleAction> queuedActions = new HashSet<>();
210 for (var cachedMessage : account.getMessageCache().getCachedMessages()) {
211 var actions = retryFailedReceivedMessage(handler, cachedMessage);
212 if (actions != null) {
213 queuedActions.addAll(actions);
214 }
215 }
216 handleQueuedActions(queuedActions);
217 }
218
219 private List<HandleAction> retryFailedReceivedMessage(
220 final Manager.ReceiveMessageHandler handler, final CachedMessage cachedMessage
221 ) {
222 var envelope = cachedMessage.loadEnvelope();
223 if (envelope == null) {
224 cachedMessage.delete();
225 return null;
226 }
227
228 final var result = context.getIncomingMessageHandler()
229 .handleRetryEnvelope(envelope, ignoreAttachments, handler);
230 final var actions = result.first();
231 final var exception = result.second();
232
233 if (exception instanceof UntrustedIdentityException) {
234 if (System.currentTimeMillis() - envelope.getServerDeliveredTimestamp() > 1000L * 60 * 60 * 24 * 30) {
235 // Envelope is more than a month old, cleaning up.
236 cachedMessage.delete();
237 return null;
238 }
239 if (!envelope.hasSourceUuid()) {
240 final var identifier = ((UntrustedIdentityException) exception).getSender();
241 final var recipientId = account.getRecipientStore().resolveRecipient(identifier);
242 try {
243 account.getMessageCache().replaceSender(cachedMessage, recipientId);
244 } catch (IOException ioException) {
245 logger.warn("Failed to move cached message to recipient folder: {}", ioException.getMessage());
246 }
247 }
248 return null;
249 }
250
251 // If successful and for all other errors that are not recoverable, delete the cached message
252 cachedMessage.delete();
253 return actions;
254 }
255
256 private void handleQueuedActions(final Collection<HandleAction> queuedActions) {
257 logger.debug("Handling message actions");
258 var interrupted = false;
259 for (var action : queuedActions) {
260 logger.debug("Executing action {}", action.getClass().getSimpleName());
261 try {
262 action.execute(context);
263 } catch (Throwable e) {
264 if ((e instanceof AssertionError || e instanceof RuntimeException)
265 && e.getCause() instanceof InterruptedException) {
266 interrupted = true;
267 continue;
268 }
269 logger.warn("Message action failed.", e);
270 }
271 }
272 if (interrupted) {
273 Thread.currentThread().interrupt();
274 }
275 }
276
277 private void onWebSocketStateChange(final WebSocketConnectionState s) {
278 if (s.equals(WebSocketConnectionState.AUTHENTICATION_FAILED)) {
279 account.setRegistered(false);
280 authenticationFailureListener.call();
281 }
282 }
283
284 public interface Callable {
285
286 void call();
287 }
288 }