]> nmode's Git Repositories - signal-cli/blob - lib/src/main/java/org/asamk/signal/manager/helper/ReceiveHelper.java
Reduce use of unknown serviceIds
[signal-cli] / lib / src / main / java / org / asamk / signal / manager / helper / ReceiveHelper.java
1 package org.asamk.signal.manager.helper;
2
3 import org.asamk.signal.manager.Manager;
4 import org.asamk.signal.manager.SignalDependencies;
5 import org.asamk.signal.manager.actions.HandleAction;
6 import org.asamk.signal.manager.api.ReceiveConfig;
7 import org.asamk.signal.manager.api.UntrustedIdentityException;
8 import org.asamk.signal.manager.storage.SignalAccount;
9 import org.asamk.signal.manager.storage.messageCache.CachedMessage;
10 import org.asamk.signal.manager.storage.recipients.RecipientAddress;
11 import org.slf4j.Logger;
12 import org.slf4j.LoggerFactory;
13 import org.whispersystems.signalservice.api.SignalWebSocket;
14 import org.whispersystems.signalservice.api.messages.SignalServiceEnvelope;
15 import org.whispersystems.signalservice.api.push.ServiceId;
16 import org.whispersystems.signalservice.api.websocket.WebSocketConnectionState;
17 import org.whispersystems.signalservice.api.websocket.WebSocketUnavailableException;
18
19 import java.io.IOException;
20 import java.time.Duration;
21 import java.util.Collection;
22 import java.util.HashMap;
23 import java.util.HashSet;
24 import java.util.List;
25 import java.util.Map;
26 import java.util.Set;
27 import java.util.concurrent.TimeoutException;
28
29 import io.reactivex.rxjava3.core.Observable;
30 import io.reactivex.rxjava3.schedulers.Schedulers;
31
32 public class ReceiveHelper {
33
34 private final static Logger logger = LoggerFactory.getLogger(ReceiveHelper.class);
35 private final static int MAX_BACKOFF_COUNTER = 9;
36
37 private final SignalAccount account;
38 private final SignalDependencies dependencies;
39 private final Context context;
40
41 private ReceiveConfig receiveConfig = new ReceiveConfig(false, false, false);
42 private boolean needsToRetryFailedMessages = false;
43 private boolean hasCaughtUpWithOldMessages = false;
44 private boolean isWaitingForMessage = false;
45 private boolean shouldStop = false;
46 private Callable authenticationFailureListener;
47 private Callable caughtUpWithOldMessagesListener;
48
49 public ReceiveHelper(final Context context) {
50 this.account = context.getAccount();
51 this.dependencies = context.getDependencies();
52 this.context = context;
53 }
54
55 public void setReceiveConfig(final ReceiveConfig receiveConfig) {
56 this.receiveConfig = receiveConfig;
57 dependencies.setAllowStories(!receiveConfig.ignoreStories());
58 }
59
60 public void setNeedsToRetryFailedMessages(final boolean needsToRetryFailedMessages) {
61 this.needsToRetryFailedMessages = needsToRetryFailedMessages;
62 }
63
64 public boolean hasCaughtUpWithOldMessages() {
65 return hasCaughtUpWithOldMessages;
66 }
67
68 public void setAuthenticationFailureListener(final Callable authenticationFailureListener) {
69 this.authenticationFailureListener = authenticationFailureListener;
70 }
71
72 public void setCaughtUpWithOldMessagesListener(final Callable caughtUpWithOldMessagesListener) {
73 this.caughtUpWithOldMessagesListener = caughtUpWithOldMessagesListener;
74 }
75
76 public boolean requestStopReceiveMessages() {
77 this.shouldStop = true;
78 return isWaitingForMessage;
79 }
80
81 public void receiveMessagesContinuously(Manager.ReceiveMessageHandler handler) {
82 while (!shouldStop) {
83 try {
84 receiveMessages(Duration.ofMinutes(1), false, null, handler);
85 break;
86 } catch (IOException e) {
87 logger.warn("Receiving messages failed, retrying", e);
88 }
89 }
90 }
91
92 public void receiveMessages(
93 Duration timeout, boolean returnOnTimeout, Integer maxMessages, Manager.ReceiveMessageHandler handler
94 ) throws IOException {
95 needsToRetryFailedMessages = true;
96 hasCaughtUpWithOldMessages = false;
97
98 // Use a Map here because java Set doesn't have a get method ...
99 Map<HandleAction, HandleAction> queuedActions = new HashMap<>();
100
101 final var signalWebSocket = dependencies.getSignalWebSocket();
102 final var webSocketStateDisposable = Observable.merge(signalWebSocket.getUnidentifiedWebSocketState(),
103 signalWebSocket.getWebSocketState())
104 .subscribeOn(Schedulers.computation())
105 .observeOn(Schedulers.computation())
106 .distinctUntilChanged()
107 .subscribe(this::onWebSocketStateChange);
108 signalWebSocket.connect();
109
110 try {
111 receiveMessagesInternal(signalWebSocket, timeout, returnOnTimeout, maxMessages, handler, queuedActions);
112 } finally {
113 hasCaughtUpWithOldMessages = false;
114 handleQueuedActions(queuedActions.keySet());
115 queuedActions.clear();
116 signalWebSocket.disconnect();
117 webSocketStateDisposable.dispose();
118 shouldStop = false;
119 }
120 }
121
122 private void receiveMessagesInternal(
123 final SignalWebSocket signalWebSocket,
124 Duration timeout,
125 boolean returnOnTimeout,
126 Integer maxMessages,
127 Manager.ReceiveMessageHandler handler,
128 final Map<HandleAction, HandleAction> queuedActions
129 ) throws IOException {
130 int remainingMessages = maxMessages == null ? -1 : maxMessages;
131 var backOffCounter = 0;
132 isWaitingForMessage = false;
133
134 while (!shouldStop && remainingMessages != 0) {
135 if (needsToRetryFailedMessages) {
136 retryFailedReceivedMessages(handler);
137 needsToRetryFailedMessages = false;
138 }
139 SignalServiceEnvelope envelope;
140 final CachedMessage[] cachedMessage = {null};
141 final var nowMillis = System.currentTimeMillis();
142 if (nowMillis - account.getLastReceiveTimestamp() > 60000) {
143 account.setLastReceiveTimestamp(nowMillis);
144 }
145 logger.debug("Checking for new message from server");
146 try {
147 isWaitingForMessage = true;
148 var queueNotEmpty = signalWebSocket.readMessageBatch(timeout.toMillis(), 1, batch -> {
149 logger.debug("Retrieved {} envelopes!", batch.size());
150 isWaitingForMessage = false;
151 for (final var it : batch) {
152 SignalServiceEnvelope envelope1 = new SignalServiceEnvelope(it.getEnvelope(),
153 it.getServerDeliveredTimestamp());
154 final var recipientId = envelope1.hasSourceUuid() ? account.getRecipientResolver()
155 .resolveRecipient(envelope1.getSourceAddress()) : null;
156 logger.trace("Storing new message from {}", recipientId);
157 // store message on disk, before acknowledging receipt to the server
158 cachedMessage[0] = account.getMessageCache().cacheMessage(envelope1, recipientId);
159 }
160 return true;
161 });
162 isWaitingForMessage = false;
163 backOffCounter = 0;
164
165 if (queueNotEmpty) {
166 if (remainingMessages > 0) {
167 remainingMessages -= 1;
168 }
169 envelope = cachedMessage[0].loadEnvelope();
170 logger.debug("New message received from server");
171 } else {
172 logger.debug("Received indicator that server queue is empty");
173 handleQueuedActions(queuedActions.keySet());
174 queuedActions.clear();
175
176 hasCaughtUpWithOldMessages = true;
177 caughtUpWithOldMessagesListener.call();
178
179 // Continue to wait another timeout for new messages
180 continue;
181 }
182 } catch (AssertionError e) {
183 if (e.getCause() instanceof InterruptedException) {
184 break;
185 } else {
186 throw e;
187 }
188 } catch (IOException e) {
189 logger.debug("Pipe unexpectedly unavailable: {}", e.getMessage());
190 if (e instanceof WebSocketUnavailableException || "Connection closed!".equals(e.getMessage())) {
191 final var sleepMilliseconds = 100 * (long) Math.pow(2, backOffCounter);
192 backOffCounter = Math.min(backOffCounter + 1, MAX_BACKOFF_COUNTER);
193 logger.warn("Connection closed unexpectedly, reconnecting in {} ms", sleepMilliseconds);
194 try {
195 Thread.sleep(sleepMilliseconds);
196 } catch (InterruptedException interruptedException) {
197 return;
198 }
199 hasCaughtUpWithOldMessages = false;
200 signalWebSocket.connect();
201 continue;
202 }
203 throw e;
204 } catch (TimeoutException e) {
205 backOffCounter = 0;
206 if (returnOnTimeout) return;
207 continue;
208 }
209
210 final var result = context.getIncomingMessageHandler().handleEnvelope(envelope, receiveConfig, handler);
211 for (final var h : result.first()) {
212 final var existingAction = queuedActions.get(h);
213 if (existingAction == null) {
214 queuedActions.put(h, h);
215 } else {
216 existingAction.mergeOther(h);
217 }
218 }
219 final var exception = result.second();
220
221 if (hasCaughtUpWithOldMessages) {
222 handleQueuedActions(queuedActions.keySet());
223 queuedActions.clear();
224 }
225 if (cachedMessage[0] != null) {
226 if (exception instanceof UntrustedIdentityException) {
227 logger.debug("Keeping message with untrusted identity in message cache");
228 final var address = ((UntrustedIdentityException) exception).getSender();
229 if (!envelope.hasSourceUuid() && address.uuid().isPresent()) {
230 final var recipientId = account.getRecipientResolver()
231 .resolveRecipient(ServiceId.from(address.uuid().get()));
232 try {
233 cachedMessage[0] = account.getMessageCache().replaceSender(cachedMessage[0], recipientId);
234 } catch (IOException ioException) {
235 logger.warn("Failed to move cached message to recipient folder: {}",
236 ioException.getMessage());
237 }
238 }
239 } else {
240 cachedMessage[0].delete();
241 }
242 }
243 }
244 }
245
246 private void retryFailedReceivedMessages(Manager.ReceiveMessageHandler handler) {
247 Set<HandleAction> queuedActions = new HashSet<>();
248 for (var cachedMessage : account.getMessageCache().getCachedMessages()) {
249 var actions = retryFailedReceivedMessage(handler, cachedMessage);
250 if (actions != null) {
251 queuedActions.addAll(actions);
252 }
253 }
254 handleQueuedActions(queuedActions);
255 }
256
257 private List<HandleAction> retryFailedReceivedMessage(
258 final Manager.ReceiveMessageHandler handler, final CachedMessage cachedMessage
259 ) {
260 var envelope = cachedMessage.loadEnvelope();
261 if (envelope == null) {
262 cachedMessage.delete();
263 return null;
264 }
265
266 final var result = context.getIncomingMessageHandler().handleRetryEnvelope(envelope, receiveConfig, handler);
267 final var actions = result.first();
268 final var exception = result.second();
269
270 if (exception instanceof UntrustedIdentityException) {
271 if (System.currentTimeMillis() - envelope.getServerDeliveredTimestamp() > 1000L * 60 * 60 * 24 * 30) {
272 // Envelope is more than a month old, cleaning up.
273 cachedMessage.delete();
274 return null;
275 }
276 if (!envelope.hasSourceUuid()) {
277 final var identifier = ((UntrustedIdentityException) exception).getSender();
278 final var recipientId = account.getRecipientResolver()
279 .resolveRecipient(new RecipientAddress(identifier));
280 try {
281 account.getMessageCache().replaceSender(cachedMessage, recipientId);
282 } catch (IOException ioException) {
283 logger.warn("Failed to move cached message to recipient folder: {}", ioException.getMessage());
284 }
285 }
286 return null;
287 }
288
289 // If successful and for all other errors that are not recoverable, delete the cached message
290 cachedMessage.delete();
291 return actions;
292 }
293
294 private void handleQueuedActions(final Collection<HandleAction> queuedActions) {
295 logger.debug("Handling message actions");
296 for (var action : queuedActions) {
297 logger.debug("Executing action {}", action.getClass().getSimpleName());
298 try {
299 action.execute(context);
300 } catch (Throwable e) {
301 logger.warn("Message action failed.", e);
302 }
303 }
304 }
305
306 private void onWebSocketStateChange(final WebSocketConnectionState s) {
307 if (s.equals(WebSocketConnectionState.AUTHENTICATION_FAILED)) {
308 account.setRegistered(false);
309 authenticationFailureListener.call();
310 }
311 }
312
313 public interface Callable {
314
315 void call();
316 }
317 }