]> nmode's Git Repositories - signal-cli/blob - lib/src/main/java/org/asamk/signal/manager/helper/ReceiveHelper.java
Handle rate limit exception correctly when querying usernames
[signal-cli] / lib / src / main / java / org / asamk / signal / manager / helper / ReceiveHelper.java
1 package org.asamk.signal.manager.helper;
2
3 import org.asamk.signal.manager.Manager;
4 import org.asamk.signal.manager.actions.HandleAction;
5 import org.asamk.signal.manager.api.ReceiveConfig;
6 import org.asamk.signal.manager.api.UntrustedIdentityException;
7 import org.asamk.signal.manager.internal.SignalDependencies;
8 import org.asamk.signal.manager.jobs.CleanOldPreKeysJob;
9 import org.asamk.signal.manager.storage.SignalAccount;
10 import org.asamk.signal.manager.storage.messageCache.CachedMessage;
11 import org.asamk.signal.manager.storage.recipients.RecipientAddress;
12 import org.slf4j.Logger;
13 import org.slf4j.LoggerFactory;
14 import org.whispersystems.signalservice.api.messages.SignalServiceEnvelope;
15 import org.whispersystems.signalservice.api.push.ServiceId;
16 import org.whispersystems.signalservice.api.push.ServiceId.ACI;
17 import org.whispersystems.signalservice.api.websocket.SignalWebSocket;
18 import org.whispersystems.signalservice.api.websocket.WebSocketConnectionState;
19 import org.whispersystems.signalservice.api.websocket.WebSocketUnavailableException;
20
21 import java.io.IOException;
22 import java.time.Duration;
23 import java.util.Collection;
24 import java.util.HashMap;
25 import java.util.HashSet;
26 import java.util.List;
27 import java.util.Map;
28 import java.util.Set;
29 import java.util.concurrent.TimeoutException;
30
31 import io.reactivex.rxjava3.schedulers.Schedulers;
32
33 public class ReceiveHelper {
34
35 private static final Logger logger = LoggerFactory.getLogger(ReceiveHelper.class);
36 private static final int MAX_BACKOFF_COUNTER = 9;
37
38 private final SignalAccount account;
39 private final SignalDependencies dependencies;
40 private final Context context;
41
42 private ReceiveConfig receiveConfig = new ReceiveConfig(false, false, false);
43 private boolean hasCaughtUpWithOldMessages = false;
44 private boolean isWaitingForMessage = false;
45 private boolean shouldStop = false;
46 private Callable authenticationFailureListener;
47 private Callable caughtUpWithOldMessagesListener;
48
49 public ReceiveHelper(final Context context) {
50 this.account = context.getAccount();
51 this.dependencies = context.getDependencies();
52 this.context = context;
53 }
54
55 public void setReceiveConfig(final ReceiveConfig receiveConfig) {
56 this.receiveConfig = receiveConfig;
57 dependencies.setAllowStories(!receiveConfig.ignoreStories());
58 }
59
60 public void setAuthenticationFailureListener(final Callable authenticationFailureListener) {
61 this.authenticationFailureListener = authenticationFailureListener;
62 }
63
64 public void setCaughtUpWithOldMessagesListener(final Callable caughtUpWithOldMessagesListener) {
65 this.caughtUpWithOldMessagesListener = caughtUpWithOldMessagesListener;
66 }
67
68 public boolean requestStopReceiveMessages() {
69 this.shouldStop = true;
70 return isWaitingForMessage;
71 }
72
73 public void receiveMessagesContinuously(Manager.ReceiveMessageHandler handler) {
74 while (!shouldStop) {
75 try {
76 receiveMessages(Duration.ofMinutes(1), false, null, handler);
77 break;
78 } catch (IOException e) {
79 logger.warn("Receiving messages failed, retrying", e);
80 }
81 }
82 }
83
84 public void receiveMessages(
85 Duration timeout,
86 boolean returnOnTimeout,
87 Integer maxMessages,
88 Manager.ReceiveMessageHandler handler
89 ) throws IOException {
90 account.setNeedsToRetryFailedMessages(true);
91 hasCaughtUpWithOldMessages = false;
92
93 // Use a Map here because java Set doesn't have a get method ...
94 Map<HandleAction, HandleAction> queuedActions = new HashMap<>();
95
96 final var signalWebSocket = dependencies.getAuthenticatedSignalWebSocket();
97 final var webSocketStateDisposable = signalWebSocket.getState()
98 .subscribeOn(Schedulers.computation())
99 .observeOn(Schedulers.computation())
100 .distinctUntilChanged()
101 .subscribe(this::onWebSocketStateChange);
102 signalWebSocket.connect();
103 signalWebSocket.registerKeepAliveToken("receive");
104
105 try {
106 receiveMessagesInternal(signalWebSocket, timeout, returnOnTimeout, maxMessages, handler, queuedActions);
107 } finally {
108 hasCaughtUpWithOldMessages = false;
109 handleQueuedActions(queuedActions.keySet());
110 queuedActions.clear();
111 signalWebSocket.removeKeepAliveToken("receive");
112 signalWebSocket.disconnect();
113 webSocketStateDisposable.dispose();
114 shouldStop = false;
115 }
116 }
117
118 private void receiveMessagesInternal(
119 final SignalWebSocket.AuthenticatedWebSocket signalWebSocket,
120 Duration timeout,
121 boolean returnOnTimeout,
122 Integer maxMessages,
123 Manager.ReceiveMessageHandler handler,
124 final Map<HandleAction, HandleAction> queuedActions
125 ) throws IOException {
126 int remainingMessages = maxMessages == null ? -1 : maxMessages;
127 var backOffCounter = 0;
128 isWaitingForMessage = false;
129
130 while (!shouldStop && remainingMessages != 0) {
131 if (account.getNeedsToRetryFailedMessages()) {
132 retryFailedReceivedMessages(handler);
133 }
134 SignalServiceEnvelope envelope;
135 final CachedMessage[] cachedMessage = {null};
136 final var nowMillis = System.currentTimeMillis();
137 if (nowMillis - account.getLastReceiveTimestamp() > 60000) {
138 account.setLastReceiveTimestamp(nowMillis);
139 }
140 logger.debug("Checking for new message from server");
141 try {
142 isWaitingForMessage = true;
143 var queueNotEmpty = signalWebSocket.readMessageBatch(timeout.toMillis(), 1, batch -> {
144 logger.debug("Retrieved {} envelopes!", batch.size());
145 isWaitingForMessage = false;
146 for (final var it : batch) {
147 SignalServiceEnvelope envelope1 = new SignalServiceEnvelope(it.getEnvelope(),
148 it.getServerDeliveredTimestamp());
149 final var recipientId = envelope1.getSourceServiceId()
150 .map(ServiceId::parseOrNull)
151 .map(s -> account.getRecipientResolver().resolveRecipient(s))
152 .orElse(null);
153 logger.trace("Storing new message from {}", recipientId);
154 // store message on disk, before acknowledging receipt to the server
155 cachedMessage[0] = account.getMessageCache().cacheMessage(envelope1, recipientId);
156 try {
157 signalWebSocket.sendAck(it);
158 } catch (IOException e) {
159 logger.warn("Failed to ack envelope to server after storing it: {}", e.getMessage());
160 }
161 }
162 });
163 isWaitingForMessage = false;
164 backOffCounter = 0;
165
166 if (queueNotEmpty) {
167 if (remainingMessages > 0) {
168 remainingMessages -= 1;
169 }
170 envelope = cachedMessage[0].loadEnvelope();
171 logger.debug("New message received from server");
172 } else {
173 logger.debug("Received indicator that server queue is empty");
174 handleQueuedActions(queuedActions.keySet());
175 queuedActions.clear();
176
177 context.getJobExecutor().enqueueJob(new CleanOldPreKeysJob());
178 hasCaughtUpWithOldMessages = true;
179 caughtUpWithOldMessagesListener.call();
180
181 // Continue to wait another timeout for new messages
182 continue;
183 }
184 } catch (AssertionError e) {
185 if (e.getCause() instanceof InterruptedException) {
186 break;
187 } else {
188 throw e;
189 }
190 } catch (IOException e) {
191 logger.debug("Pipe unexpectedly unavailable: {}", e.getMessage());
192 if (e instanceof WebSocketUnavailableException || "Connection closed!".equals(e.getMessage())) {
193 final var sleepMilliseconds = 100 * (long) Math.pow(2, backOffCounter);
194 backOffCounter = Math.min(backOffCounter + 1, MAX_BACKOFF_COUNTER);
195 logger.warn("Connection closed unexpectedly, reconnecting in {} ms", sleepMilliseconds);
196 try {
197 Thread.sleep(sleepMilliseconds);
198 } catch (InterruptedException interruptedException) {
199 return;
200 }
201 hasCaughtUpWithOldMessages = false;
202 signalWebSocket.connect();
203 continue;
204 }
205 throw e;
206 } catch (TimeoutException e) {
207 backOffCounter = 0;
208 if (returnOnTimeout) return;
209 continue;
210 } catch (Exception e) {
211 logger.error("Unknown error when receiving messages", e);
212 continue;
213 }
214
215 try {
216 final var result = context.getIncomingMessageHandler().handleEnvelope(envelope, receiveConfig, handler);
217 for (final var h : result.first()) {
218 final var existingAction = queuedActions.get(h);
219 if (existingAction == null) {
220 queuedActions.put(h, h);
221 } else {
222 existingAction.mergeOther(h);
223 }
224 }
225 final var exception = result.second();
226
227 if (hasCaughtUpWithOldMessages) {
228 handleQueuedActions(queuedActions.keySet());
229 queuedActions.clear();
230 }
231 if (cachedMessage[0] != null) {
232 if (exception instanceof UntrustedIdentityException) {
233 logger.debug("Keeping message with untrusted identity in message cache");
234 final var address = ((UntrustedIdentityException) exception).getSender();
235 if (envelope.getSourceServiceId().isEmpty() && address.aci().isPresent()) {
236 final var recipientId = account.getRecipientResolver()
237 .resolveRecipient(ACI.parseOrThrow(address.aci().get()));
238 try {
239 cachedMessage[0] = account.getMessageCache()
240 .replaceSender(cachedMessage[0], recipientId);
241 } catch (IOException ioException) {
242 logger.warn("Failed to move cached message to recipient folder: {}",
243 ioException.getMessage(),
244 ioException);
245 }
246 }
247 } else {
248 cachedMessage[0].delete();
249 }
250 }
251 } catch (Exception e) {
252 logger.error("Unknown error when handling messages", e);
253 }
254 }
255 }
256
257 private void retryFailedReceivedMessages(Manager.ReceiveMessageHandler handler) {
258 Set<HandleAction> queuedActions = new HashSet<>();
259 for (var cachedMessage : account.getMessageCache().getCachedMessages()) {
260 var actions = retryFailedReceivedMessage(handler, cachedMessage);
261 if (actions != null) {
262 queuedActions.addAll(actions);
263 }
264 }
265 handleQueuedActions(queuedActions);
266 account.setNeedsToRetryFailedMessages(false);
267 }
268
269 private List<HandleAction> retryFailedReceivedMessage(
270 final Manager.ReceiveMessageHandler handler,
271 final CachedMessage cachedMessage
272 ) {
273 var envelope = cachedMessage.loadEnvelope();
274 if (envelope == null) {
275 cachedMessage.delete();
276 return null;
277 }
278
279 final var result = context.getIncomingMessageHandler().handleRetryEnvelope(envelope, receiveConfig, handler);
280 final var actions = result.first();
281 final var exception = result.second();
282
283 if (exception instanceof UntrustedIdentityException) {
284 if (System.currentTimeMillis() - envelope.getServerDeliveredTimestamp() > 1000L * 60 * 60 * 24 * 14) {
285 // Envelope is more than two weeks old, cleaning up.
286 cachedMessage.delete();
287 return null;
288 }
289 if (envelope.getSourceServiceId().isEmpty()) {
290 final var identifier = ((UntrustedIdentityException) exception).getSender();
291 final var recipientId = account.getRecipientResolver()
292 .resolveRecipient(new RecipientAddress(identifier));
293 try {
294 account.getMessageCache().replaceSender(cachedMessage, recipientId);
295 } catch (IOException ioException) {
296 logger.warn("Failed to move cached message to recipient folder: {}",
297 ioException.getMessage(),
298 ioException);
299 }
300 }
301 return null;
302 }
303
304 // If successful and for all other errors that are not recoverable, delete the cached message
305 cachedMessage.delete();
306 return actions;
307 }
308
309 private void handleQueuedActions(final Collection<HandleAction> queuedActions) {
310 logger.debug("Handling message actions");
311 for (var action : queuedActions) {
312 logger.debug("Executing action {}", action.getClass().getSimpleName());
313 try {
314 action.execute(context);
315 } catch (Throwable e) {
316 logger.warn("Message action failed.", e);
317 }
318 }
319 }
320
321 private void onWebSocketStateChange(final WebSocketConnectionState s) {
322 if (s.equals(WebSocketConnectionState.AUTHENTICATION_FAILED)) {
323 account.setRegistered(false);
324 authenticationFailureListener.call();
325 }
326 }
327
328 public interface Callable {
329
330 void call();
331 }
332 }