1 package org
.asamk
.signal
.manager
.helper
;
3 import org
.asamk
.signal
.manager
.Manager
;
4 import org
.asamk
.signal
.manager
.SignalDependencies
;
5 import org
.asamk
.signal
.manager
.actions
.HandleAction
;
6 import org
.asamk
.signal
.manager
.api
.ReceiveConfig
;
7 import org
.asamk
.signal
.manager
.api
.UntrustedIdentityException
;
8 import org
.asamk
.signal
.manager
.storage
.SignalAccount
;
9 import org
.asamk
.signal
.manager
.storage
.messageCache
.CachedMessage
;
10 import org
.asamk
.signal
.manager
.storage
.recipients
.RecipientAddress
;
11 import org
.slf4j
.Logger
;
12 import org
.slf4j
.LoggerFactory
;
13 import org
.whispersystems
.signalservice
.api
.SignalWebSocket
;
14 import org
.whispersystems
.signalservice
.api
.messages
.SignalServiceEnvelope
;
15 import org
.whispersystems
.signalservice
.api
.websocket
.WebSocketConnectionState
;
16 import org
.whispersystems
.signalservice
.api
.websocket
.WebSocketUnavailableException
;
18 import java
.io
.IOException
;
19 import java
.time
.Duration
;
20 import java
.util
.Collection
;
21 import java
.util
.HashMap
;
22 import java
.util
.HashSet
;
23 import java
.util
.List
;
26 import java
.util
.concurrent
.TimeoutException
;
28 import io
.reactivex
.rxjava3
.core
.Observable
;
29 import io
.reactivex
.rxjava3
.schedulers
.Schedulers
;
31 public class ReceiveHelper
{
33 private final static Logger logger
= LoggerFactory
.getLogger(ReceiveHelper
.class);
34 private final static int MAX_BACKOFF_COUNTER
= 9;
36 private final SignalAccount account
;
37 private final SignalDependencies dependencies
;
38 private final Context context
;
40 private ReceiveConfig receiveConfig
= new ReceiveConfig(false, false, false);
41 private boolean needsToRetryFailedMessages
= false;
42 private boolean hasCaughtUpWithOldMessages
= false;
43 private boolean isWaitingForMessage
= false;
44 private boolean shouldStop
= false;
45 private Callable authenticationFailureListener
;
46 private Callable caughtUpWithOldMessagesListener
;
48 public ReceiveHelper(final Context context
) {
49 this.account
= context
.getAccount();
50 this.dependencies
= context
.getDependencies();
51 this.context
= context
;
54 public void setReceiveConfig(final ReceiveConfig receiveConfig
) {
55 this.receiveConfig
= receiveConfig
;
56 dependencies
.setAllowStories(!receiveConfig
.ignoreStories());
59 public void setNeedsToRetryFailedMessages(final boolean needsToRetryFailedMessages
) {
60 this.needsToRetryFailedMessages
= needsToRetryFailedMessages
;
63 public boolean hasCaughtUpWithOldMessages() {
64 return hasCaughtUpWithOldMessages
;
67 public void setAuthenticationFailureListener(final Callable authenticationFailureListener
) {
68 this.authenticationFailureListener
= authenticationFailureListener
;
71 public void setCaughtUpWithOldMessagesListener(final Callable caughtUpWithOldMessagesListener
) {
72 this.caughtUpWithOldMessagesListener
= caughtUpWithOldMessagesListener
;
75 public boolean requestStopReceiveMessages() {
76 this.shouldStop
= true;
77 return isWaitingForMessage
;
80 public void receiveMessagesContinuously(Manager
.ReceiveMessageHandler handler
) {
83 receiveMessages(Duration
.ofMinutes(1), false, null, handler
);
85 } catch (IOException e
) {
86 logger
.warn("Receiving messages failed, retrying", e
);
91 public void receiveMessages(
92 Duration timeout
, boolean returnOnTimeout
, Integer maxMessages
, Manager
.ReceiveMessageHandler handler
93 ) throws IOException
{
94 needsToRetryFailedMessages
= true;
95 hasCaughtUpWithOldMessages
= false;
97 // Use a Map here because java Set doesn't have a get method ...
98 Map
<HandleAction
, HandleAction
> queuedActions
= new HashMap
<>();
100 final var signalWebSocket
= dependencies
.getSignalWebSocket();
101 final var webSocketStateDisposable
= Observable
.merge(signalWebSocket
.getUnidentifiedWebSocketState(),
102 signalWebSocket
.getWebSocketState())
103 .subscribeOn(Schedulers
.computation())
104 .observeOn(Schedulers
.computation())
105 .distinctUntilChanged()
106 .subscribe(this::onWebSocketStateChange
);
107 signalWebSocket
.connect();
110 receiveMessagesInternal(signalWebSocket
, timeout
, returnOnTimeout
, maxMessages
, handler
, queuedActions
);
112 hasCaughtUpWithOldMessages
= false;
113 handleQueuedActions(queuedActions
.keySet());
114 queuedActions
.clear();
115 signalWebSocket
.disconnect();
116 webSocketStateDisposable
.dispose();
121 private void receiveMessagesInternal(
122 final SignalWebSocket signalWebSocket
,
124 boolean returnOnTimeout
,
126 Manager
.ReceiveMessageHandler handler
,
127 final Map
<HandleAction
, HandleAction
> queuedActions
128 ) throws IOException
{
129 int remainingMessages
= maxMessages
== null ?
-1 : maxMessages
;
130 var backOffCounter
= 0;
131 isWaitingForMessage
= false;
133 while (!shouldStop
&& remainingMessages
!= 0) {
134 if (needsToRetryFailedMessages
) {
135 retryFailedReceivedMessages(handler
);
136 needsToRetryFailedMessages
= false;
138 SignalServiceEnvelope envelope
;
139 final CachedMessage
[] cachedMessage
= {null};
140 final var nowMillis
= System
.currentTimeMillis();
141 if (nowMillis
- account
.getLastReceiveTimestamp() > 60000) {
142 account
.setLastReceiveTimestamp(nowMillis
);
144 logger
.debug("Checking for new message from server");
146 isWaitingForMessage
= true;
147 var result
= signalWebSocket
.readOrEmpty(timeout
.toMillis(), envelope1
-> {
148 isWaitingForMessage
= false;
149 final var recipientId
= envelope1
.hasSourceUuid() ? account
.getRecipientResolver()
150 .resolveRecipient(envelope1
.getSourceAddress()) : null;
151 logger
.trace("Storing new message from {}", recipientId
);
152 // store message on disk, before acknowledging receipt to the server
153 cachedMessage
[0] = account
.getMessageCache().cacheMessage(envelope1
, recipientId
);
155 isWaitingForMessage
= false;
158 if (result
.isPresent()) {
159 if (remainingMessages
> 0) {
160 remainingMessages
-= 1;
162 envelope
= result
.get();
163 logger
.debug("New message received from server");
165 logger
.debug("Received indicator that server queue is empty");
166 handleQueuedActions(queuedActions
.keySet());
167 queuedActions
.clear();
169 hasCaughtUpWithOldMessages
= true;
170 caughtUpWithOldMessagesListener
.call();
172 // Continue to wait another timeout for new messages
175 } catch (AssertionError e
) {
176 if (e
.getCause() instanceof InterruptedException
) {
181 } catch (IOException e
) {
182 logger
.debug("Pipe unexpectedly unavailable: {}", e
.getMessage());
183 if (e
instanceof WebSocketUnavailableException
|| "Connection closed!".equals(e
.getMessage())) {
184 final var sleepMilliseconds
= 100 * (long) Math
.pow(2, backOffCounter
);
185 backOffCounter
= Math
.min(backOffCounter
+ 1, MAX_BACKOFF_COUNTER
);
186 logger
.warn("Connection closed unexpectedly, reconnecting in {} ms", sleepMilliseconds
);
188 Thread
.sleep(sleepMilliseconds
);
189 } catch (InterruptedException interruptedException
) {
192 hasCaughtUpWithOldMessages
= false;
193 signalWebSocket
.connect();
197 } catch (TimeoutException e
) {
199 if (returnOnTimeout
) return;
203 final var result
= context
.getIncomingMessageHandler().handleEnvelope(envelope
, receiveConfig
, handler
);
204 for (final var h
: result
.first()) {
205 final var existingAction
= queuedActions
.get(h
);
206 if (existingAction
== null) {
207 queuedActions
.put(h
, h
);
209 existingAction
.mergeOther(h
);
212 final var exception
= result
.second();
214 if (hasCaughtUpWithOldMessages
) {
215 handleQueuedActions(queuedActions
.keySet());
216 queuedActions
.clear();
218 if (cachedMessage
[0] != null) {
219 if (exception
instanceof UntrustedIdentityException
) {
220 logger
.debug("Keeping message with untrusted identity in message cache");
221 final var address
= ((UntrustedIdentityException
) exception
).getSender();
222 final var recipientId
= account
.getRecipientResolver().resolveRecipient(address
.getServiceId());
223 if (!envelope
.hasSourceUuid()) {
225 cachedMessage
[0] = account
.getMessageCache().replaceSender(cachedMessage
[0], recipientId
);
226 } catch (IOException ioException
) {
227 logger
.warn("Failed to move cached message to recipient folder: {}",
228 ioException
.getMessage());
232 cachedMessage
[0].delete();
238 private void retryFailedReceivedMessages(Manager
.ReceiveMessageHandler handler
) {
239 Set
<HandleAction
> queuedActions
= new HashSet
<>();
240 for (var cachedMessage
: account
.getMessageCache().getCachedMessages()) {
241 var actions
= retryFailedReceivedMessage(handler
, cachedMessage
);
242 if (actions
!= null) {
243 queuedActions
.addAll(actions
);
246 handleQueuedActions(queuedActions
);
249 private List
<HandleAction
> retryFailedReceivedMessage(
250 final Manager
.ReceiveMessageHandler handler
, final CachedMessage cachedMessage
252 var envelope
= cachedMessage
.loadEnvelope();
253 if (envelope
== null) {
254 cachedMessage
.delete();
258 final var result
= context
.getIncomingMessageHandler().handleRetryEnvelope(envelope
, receiveConfig
, handler
);
259 final var actions
= result
.first();
260 final var exception
= result
.second();
262 if (exception
instanceof UntrustedIdentityException
) {
263 if (System
.currentTimeMillis() - envelope
.getServerDeliveredTimestamp() > 1000L * 60 * 60 * 24 * 30) {
264 // Envelope is more than a month old, cleaning up.
265 cachedMessage
.delete();
268 if (!envelope
.hasSourceUuid()) {
269 final var identifier
= ((UntrustedIdentityException
) exception
).getSender();
270 final var recipientId
= account
.getRecipientResolver()
271 .resolveRecipient(new RecipientAddress(identifier
));
273 account
.getMessageCache().replaceSender(cachedMessage
, recipientId
);
274 } catch (IOException ioException
) {
275 logger
.warn("Failed to move cached message to recipient folder: {}", ioException
.getMessage());
281 // If successful and for all other errors that are not recoverable, delete the cached message
282 cachedMessage
.delete();
286 private void handleQueuedActions(final Collection
<HandleAction
> queuedActions
) {
287 logger
.debug("Handling message actions");
288 for (var action
: queuedActions
) {
289 logger
.debug("Executing action {}", action
.getClass().getSimpleName());
291 action
.execute(context
);
292 } catch (Throwable e
) {
293 logger
.warn("Message action failed.", e
);
298 private void onWebSocketStateChange(final WebSocketConnectionState s
) {
299 if (s
.equals(WebSocketConnectionState
.AUTHENTICATION_FAILED
)) {
300 account
.setRegistered(false);
301 authenticationFailureListener
.call();
305 public interface Callable
{