1 package org
.asamk
.signal
.manager
.helper
;
3 import org
.asamk
.signal
.manager
.Manager
;
4 import org
.asamk
.signal
.manager
.actions
.HandleAction
;
5 import org
.asamk
.signal
.manager
.api
.ReceiveConfig
;
6 import org
.asamk
.signal
.manager
.api
.UntrustedIdentityException
;
7 import org
.asamk
.signal
.manager
.internal
.SignalDependencies
;
8 import org
.asamk
.signal
.manager
.jobs
.CleanOldPreKeysJob
;
9 import org
.asamk
.signal
.manager
.storage
.SignalAccount
;
10 import org
.asamk
.signal
.manager
.storage
.messageCache
.CachedMessage
;
11 import org
.asamk
.signal
.manager
.storage
.recipients
.RecipientAddress
;
12 import org
.slf4j
.Logger
;
13 import org
.slf4j
.LoggerFactory
;
14 import org
.whispersystems
.signalservice
.api
.messages
.SignalServiceEnvelope
;
15 import org
.whispersystems
.signalservice
.api
.push
.ServiceId
;
16 import org
.whispersystems
.signalservice
.api
.push
.ServiceId
.ACI
;
17 import org
.whispersystems
.signalservice
.api
.websocket
.SignalWebSocket
;
18 import org
.whispersystems
.signalservice
.api
.websocket
.WebSocketConnectionState
;
19 import org
.whispersystems
.signalservice
.api
.websocket
.WebSocketUnavailableException
;
21 import java
.io
.IOException
;
22 import java
.time
.Duration
;
23 import java
.util
.Collection
;
24 import java
.util
.HashMap
;
25 import java
.util
.HashSet
;
26 import java
.util
.List
;
29 import java
.util
.concurrent
.TimeoutException
;
31 import io
.reactivex
.rxjava3
.schedulers
.Schedulers
;
33 public class ReceiveHelper
{
35 private static final Logger logger
= LoggerFactory
.getLogger(ReceiveHelper
.class);
36 private static final int MAX_BACKOFF_COUNTER
= 9;
38 private final SignalAccount account
;
39 private final SignalDependencies dependencies
;
40 private final Context context
;
42 private ReceiveConfig receiveConfig
= new ReceiveConfig(false, false, false);
43 private boolean hasCaughtUpWithOldMessages
= false;
44 private boolean isWaitingForMessage
= false;
45 private boolean shouldStop
= false;
46 private Callable authenticationFailureListener
;
47 private Callable caughtUpWithOldMessagesListener
;
49 public ReceiveHelper(final Context context
) {
50 this.account
= context
.getAccount();
51 this.dependencies
= context
.getDependencies();
52 this.context
= context
;
55 public void setReceiveConfig(final ReceiveConfig receiveConfig
) {
56 this.receiveConfig
= receiveConfig
;
57 dependencies
.setAllowStories(!receiveConfig
.ignoreStories());
60 public void setAuthenticationFailureListener(final Callable authenticationFailureListener
) {
61 this.authenticationFailureListener
= authenticationFailureListener
;
64 public void setCaughtUpWithOldMessagesListener(final Callable caughtUpWithOldMessagesListener
) {
65 this.caughtUpWithOldMessagesListener
= caughtUpWithOldMessagesListener
;
68 public boolean requestStopReceiveMessages() {
69 this.shouldStop
= true;
70 return isWaitingForMessage
;
73 public void receiveMessagesContinuously(Manager
.ReceiveMessageHandler handler
) {
76 receiveMessages(Duration
.ofMinutes(1), false, null, handler
);
78 } catch (IOException e
) {
79 logger
.warn("Receiving messages failed, retrying", e
);
84 public void receiveMessages(
86 boolean returnOnTimeout
,
88 Manager
.ReceiveMessageHandler handler
89 ) throws IOException
{
90 account
.setNeedsToRetryFailedMessages(true);
91 hasCaughtUpWithOldMessages
= false;
93 // Use a Map here because java Set doesn't have a get method ...
94 Map
<HandleAction
, HandleAction
> queuedActions
= new HashMap
<>();
96 final var signalWebSocket
= dependencies
.getAuthenticatedSignalWebSocket();
97 final var webSocketStateDisposable
= signalWebSocket
.getState()
98 .subscribeOn(Schedulers
.computation())
99 .observeOn(Schedulers
.computation())
100 .distinctUntilChanged()
101 .subscribe(this::onWebSocketStateChange
);
102 signalWebSocket
.connect();
103 signalWebSocket
.registerKeepAliveToken("receive");
106 receiveMessagesInternal(signalWebSocket
, timeout
, returnOnTimeout
, maxMessages
, handler
, queuedActions
);
108 hasCaughtUpWithOldMessages
= false;
109 handleQueuedActions(queuedActions
.keySet());
110 queuedActions
.clear();
111 signalWebSocket
.removeKeepAliveToken("receive");
112 signalWebSocket
.disconnect();
113 webSocketStateDisposable
.dispose();
118 private void receiveMessagesInternal(
119 final SignalWebSocket
.AuthenticatedWebSocket signalWebSocket
,
121 boolean returnOnTimeout
,
123 Manager
.ReceiveMessageHandler handler
,
124 final Map
<HandleAction
, HandleAction
> queuedActions
125 ) throws IOException
{
126 int remainingMessages
= maxMessages
== null ?
-1 : maxMessages
;
127 var backOffCounter
= 0;
128 isWaitingForMessage
= false;
130 while (!shouldStop
&& remainingMessages
!= 0) {
131 if (account
.getNeedsToRetryFailedMessages()) {
132 retryFailedReceivedMessages(handler
);
134 SignalServiceEnvelope envelope
;
135 final CachedMessage
[] cachedMessage
= {null};
136 final var nowMillis
= System
.currentTimeMillis();
137 if (nowMillis
- account
.getLastReceiveTimestamp() > 60000) {
138 account
.setLastReceiveTimestamp(nowMillis
);
140 logger
.debug("Checking for new message from server");
142 isWaitingForMessage
= true;
143 var queueNotEmpty
= signalWebSocket
.readMessageBatch(timeout
.toMillis(), 1, batch
-> {
144 logger
.debug("Retrieved {} envelopes!", batch
.size());
145 isWaitingForMessage
= false;
146 for (final var it
: batch
) {
147 SignalServiceEnvelope envelope1
= new SignalServiceEnvelope(it
.getEnvelope(),
148 it
.getServerDeliveredTimestamp());
149 final var recipientId
= envelope1
.getSourceServiceId()
150 .map(ServiceId
::parseOrNull
)
151 .map(s
-> account
.getRecipientResolver().resolveRecipient(s
))
153 logger
.trace("Storing new message from {}", recipientId
);
154 // store message on disk, before acknowledging receipt to the server
155 cachedMessage
[0] = account
.getMessageCache().cacheMessage(envelope1
, recipientId
);
157 signalWebSocket
.sendAck(it
);
158 } catch (IOException e
) {
159 logger
.warn("Failed to ack envelope to server after storing it: {}", e
.getMessage());
163 isWaitingForMessage
= false;
167 if (remainingMessages
> 0) {
168 remainingMessages
-= 1;
170 envelope
= cachedMessage
[0].loadEnvelope();
171 logger
.debug("New message received from server");
173 logger
.debug("Received indicator that server queue is empty");
174 handleQueuedActions(queuedActions
.keySet());
175 queuedActions
.clear();
177 context
.getJobExecutor().enqueueJob(new CleanOldPreKeysJob());
178 hasCaughtUpWithOldMessages
= true;
179 caughtUpWithOldMessagesListener
.call();
181 // Continue to wait another timeout for new messages
184 } catch (AssertionError e
) {
185 if (e
.getCause() instanceof InterruptedException
) {
190 } catch (IOException e
) {
191 logger
.debug("Pipe unexpectedly unavailable: {}", e
.getMessage());
192 if (e
instanceof WebSocketUnavailableException
|| "Connection closed!".equals(e
.getMessage())) {
193 final var sleepMilliseconds
= 100 * (long) Math
.pow(2, backOffCounter
);
194 backOffCounter
= Math
.min(backOffCounter
+ 1, MAX_BACKOFF_COUNTER
);
195 logger
.warn("Connection closed unexpectedly, reconnecting in {} ms", sleepMilliseconds
);
197 Thread
.sleep(sleepMilliseconds
);
198 } catch (InterruptedException interruptedException
) {
201 hasCaughtUpWithOldMessages
= false;
202 signalWebSocket
.connect();
206 } catch (TimeoutException e
) {
208 if (returnOnTimeout
) return;
210 } catch (Exception e
) {
211 logger
.error("Unknown error when receiving messages", e
);
216 final var result
= context
.getIncomingMessageHandler().handleEnvelope(envelope
, receiveConfig
, handler
);
217 for (final var h
: result
.first()) {
218 final var existingAction
= queuedActions
.get(h
);
219 if (existingAction
== null) {
220 queuedActions
.put(h
, h
);
222 existingAction
.mergeOther(h
);
225 final var exception
= result
.second();
227 if (hasCaughtUpWithOldMessages
) {
228 handleQueuedActions(queuedActions
.keySet());
229 queuedActions
.clear();
231 if (cachedMessage
[0] != null) {
232 if (exception
instanceof UntrustedIdentityException
) {
233 logger
.debug("Keeping message with untrusted identity in message cache");
234 final var address
= ((UntrustedIdentityException
) exception
).getSender();
235 if (envelope
.getSourceServiceId().isEmpty() && address
.aci().isPresent()) {
236 final var recipientId
= account
.getRecipientResolver()
237 .resolveRecipient(ACI
.parseOrThrow(address
.aci().get()));
239 cachedMessage
[0] = account
.getMessageCache()
240 .replaceSender(cachedMessage
[0], recipientId
);
241 } catch (IOException ioException
) {
242 logger
.warn("Failed to move cached message to recipient folder: {}",
243 ioException
.getMessage(),
248 cachedMessage
[0].delete();
251 } catch (Exception e
) {
252 logger
.error("Unknown error when handling messages", e
);
257 private void retryFailedReceivedMessages(Manager
.ReceiveMessageHandler handler
) {
258 Set
<HandleAction
> queuedActions
= new HashSet
<>();
259 for (var cachedMessage
: account
.getMessageCache().getCachedMessages()) {
260 var actions
= retryFailedReceivedMessage(handler
, cachedMessage
);
261 if (actions
!= null) {
262 queuedActions
.addAll(actions
);
265 handleQueuedActions(queuedActions
);
266 account
.setNeedsToRetryFailedMessages(false);
269 private List
<HandleAction
> retryFailedReceivedMessage(
270 final Manager
.ReceiveMessageHandler handler
,
271 final CachedMessage cachedMessage
273 var envelope
= cachedMessage
.loadEnvelope();
274 if (envelope
== null) {
275 cachedMessage
.delete();
279 final var result
= context
.getIncomingMessageHandler().handleRetryEnvelope(envelope
, receiveConfig
, handler
);
280 final var actions
= result
.first();
281 final var exception
= result
.second();
283 if (exception
instanceof UntrustedIdentityException
) {
284 if (System
.currentTimeMillis() - envelope
.getServerDeliveredTimestamp() > 1000L * 60 * 60 * 24 * 14) {
285 // Envelope is more than two weeks old, cleaning up.
286 cachedMessage
.delete();
289 if (envelope
.getSourceServiceId().isEmpty()) {
290 final var identifier
= ((UntrustedIdentityException
) exception
).getSender();
291 final var recipientId
= account
.getRecipientResolver()
292 .resolveRecipient(new RecipientAddress(identifier
));
294 account
.getMessageCache().replaceSender(cachedMessage
, recipientId
);
295 } catch (IOException ioException
) {
296 logger
.warn("Failed to move cached message to recipient folder: {}",
297 ioException
.getMessage(),
304 // If successful and for all other errors that are not recoverable, delete the cached message
305 cachedMessage
.delete();
309 private void handleQueuedActions(final Collection
<HandleAction
> queuedActions
) {
310 logger
.debug("Handling message actions");
311 for (var action
: queuedActions
) {
312 logger
.debug("Executing action {}", action
.getClass().getSimpleName());
314 action
.execute(context
);
315 } catch (Throwable e
) {
316 logger
.warn("Message action failed.", e
);
321 private void onWebSocketStateChange(final WebSocketConnectionState s
) {
322 if (s
.equals(WebSocketConnectionState
.AUTHENTICATION_FAILED
)) {
323 account
.setRegistered(false);
324 authenticationFailureListener
.call();
328 public interface Callable
{