1 package org
.asamk
.signal
.manager
.helper
;
3 import org
.asamk
.signal
.manager
.Manager
;
4 import org
.asamk
.signal
.manager
.SignalDependencies
;
5 import org
.asamk
.signal
.manager
.api
.UntrustedIdentityException
;
6 import org
.asamk
.signal
.manager
.actions
.HandleAction
;
7 import org
.asamk
.signal
.manager
.storage
.SignalAccount
;
8 import org
.asamk
.signal
.manager
.storage
.messageCache
.CachedMessage
;
9 import org
.slf4j
.Logger
;
10 import org
.slf4j
.LoggerFactory
;
11 import org
.whispersystems
.signalservice
.api
.messages
.SignalServiceEnvelope
;
12 import org
.whispersystems
.signalservice
.api
.websocket
.WebSocketConnectionState
;
13 import org
.whispersystems
.signalservice
.api
.websocket
.WebSocketUnavailableException
;
15 import java
.io
.IOException
;
16 import java
.time
.Duration
;
17 import java
.util
.Collection
;
18 import java
.util
.HashMap
;
19 import java
.util
.HashSet
;
20 import java
.util
.List
;
23 import java
.util
.concurrent
.TimeoutException
;
25 import io
.reactivex
.rxjava3
.core
.Observable
;
26 import io
.reactivex
.rxjava3
.schedulers
.Schedulers
;
28 public class ReceiveHelper
{
30 private final static Logger logger
= LoggerFactory
.getLogger(ReceiveHelper
.class);
31 private final static int MAX_BACKOFF_COUNTER
= 9;
33 private final SignalAccount account
;
34 private final SignalDependencies dependencies
;
35 private final Context context
;
37 private boolean ignoreAttachments
= false;
38 private boolean needsToRetryFailedMessages
= false;
39 private boolean hasCaughtUpWithOldMessages
= false;
40 private Callable authenticationFailureListener
;
41 private Callable caughtUpWithOldMessagesListener
;
43 public ReceiveHelper(final Context context
) {
44 this.account
= context
.getAccount();
45 this.dependencies
= context
.getDependencies();
46 this.context
= context
;
49 public void setIgnoreAttachments(final boolean ignoreAttachments
) {
50 this.ignoreAttachments
= ignoreAttachments
;
53 public void setNeedsToRetryFailedMessages(final boolean needsToRetryFailedMessages
) {
54 this.needsToRetryFailedMessages
= needsToRetryFailedMessages
;
57 public boolean hasCaughtUpWithOldMessages() {
58 return hasCaughtUpWithOldMessages
;
61 public void setAuthenticationFailureListener(final Callable authenticationFailureListener
) {
62 this.authenticationFailureListener
= authenticationFailureListener
;
65 public void setCaughtUpWithOldMessagesListener(final Callable caughtUpWithOldMessagesListener
) {
66 this.caughtUpWithOldMessagesListener
= caughtUpWithOldMessagesListener
;
69 public void receiveMessages(
70 Duration timeout
, boolean returnOnTimeout
, Manager
.ReceiveMessageHandler handler
71 ) throws IOException
{
72 needsToRetryFailedMessages
= true;
73 hasCaughtUpWithOldMessages
= false;
75 // Use a Map here because java Set doesn't have a get method ...
76 Map
<HandleAction
, HandleAction
> queuedActions
= new HashMap
<>();
78 final var signalWebSocket
= dependencies
.getSignalWebSocket();
79 final var webSocketStateDisposable
= Observable
.merge(signalWebSocket
.getUnidentifiedWebSocketState(),
80 signalWebSocket
.getWebSocketState())
81 .subscribeOn(Schedulers
.computation())
82 .observeOn(Schedulers
.computation())
83 .distinctUntilChanged()
84 .subscribe(this::onWebSocketStateChange
);
85 signalWebSocket
.connect();
88 receiveMessagesInternal(timeout
, returnOnTimeout
, handler
, queuedActions
);
90 hasCaughtUpWithOldMessages
= false;
91 handleQueuedActions(queuedActions
.keySet());
92 queuedActions
.clear();
93 dependencies
.getSignalWebSocket().disconnect();
94 webSocketStateDisposable
.dispose();
98 private void receiveMessagesInternal(
100 boolean returnOnTimeout
,
101 Manager
.ReceiveMessageHandler handler
,
102 final Map
<HandleAction
, HandleAction
> queuedActions
103 ) throws IOException
{
104 final var signalWebSocket
= dependencies
.getSignalWebSocket();
106 var backOffCounter
= 0;
108 while (!Thread
.interrupted()) {
109 if (needsToRetryFailedMessages
) {
110 retryFailedReceivedMessages(handler
);
111 needsToRetryFailedMessages
= false;
113 SignalServiceEnvelope envelope
;
114 final CachedMessage
[] cachedMessage
= {null};
115 final var nowMillis
= System
.currentTimeMillis();
116 if (nowMillis
- account
.getLastReceiveTimestamp() > 60000) {
117 account
.setLastReceiveTimestamp(nowMillis
);
119 logger
.debug("Checking for new message from server");
121 var result
= signalWebSocket
.readOrEmpty(timeout
.toMillis(), envelope1
-> {
122 final var recipientId
= envelope1
.hasSourceUuid() ? account
.getRecipientStore()
123 .resolveRecipient(envelope1
.getSourceAddress()) : null;
124 logger
.trace("Storing new message from {}", recipientId
);
125 // store message on disk, before acknowledging receipt to the server
126 cachedMessage
[0] = account
.getMessageCache().cacheMessage(envelope1
, recipientId
);
130 if (result
.isPresent()) {
131 envelope
= result
.get();
132 logger
.debug("New message received from server");
134 logger
.debug("Received indicator that server queue is empty");
135 handleQueuedActions(queuedActions
.keySet());
136 queuedActions
.clear();
138 hasCaughtUpWithOldMessages
= true;
139 caughtUpWithOldMessagesListener
.call();
141 // Continue to wait another timeout for new messages
144 } catch (AssertionError e
) {
145 if (e
.getCause() instanceof InterruptedException
) {
146 Thread
.currentThread().interrupt();
151 } catch (IOException e
) {
152 logger
.debug("Pipe unexpectedly unavailable: {}", e
.getMessage());
153 if (e
instanceof WebSocketUnavailableException
|| "Connection closed!".equals(e
.getMessage())) {
154 final var sleepMilliseconds
= 100 * (long) Math
.pow(2, backOffCounter
);
155 backOffCounter
= Math
.min(backOffCounter
+ 1, MAX_BACKOFF_COUNTER
);
156 logger
.warn("Connection closed unexpectedly, reconnecting in {} ms", sleepMilliseconds
);
158 Thread
.sleep(sleepMilliseconds
);
159 } catch (InterruptedException interruptedException
) {
162 hasCaughtUpWithOldMessages
= false;
163 signalWebSocket
.connect();
167 } catch (TimeoutException e
) {
169 if (returnOnTimeout
) return;
173 final var result
= context
.getIncomingMessageHandler().handleEnvelope(envelope
, ignoreAttachments
, handler
);
174 for (final var h
: result
.first()) {
175 final var existingAction
= queuedActions
.get(h
);
176 if (existingAction
== null) {
177 queuedActions
.put(h
, h
);
179 existingAction
.mergeOther(h
);
182 final var exception
= result
.second();
184 if (hasCaughtUpWithOldMessages
) {
185 handleQueuedActions(queuedActions
.keySet());
186 queuedActions
.clear();
188 if (cachedMessage
[0] != null) {
189 if (exception
instanceof UntrustedIdentityException
) {
190 logger
.debug("Keeping message with untrusted identity in message cache");
191 final var address
= ((UntrustedIdentityException
) exception
).getSender();
192 final var recipientId
= account
.getRecipientStore().resolveRecipient(address
);
193 if (!envelope
.hasSourceUuid()) {
195 cachedMessage
[0] = account
.getMessageCache().replaceSender(cachedMessage
[0], recipientId
);
196 } catch (IOException ioException
) {
197 logger
.warn("Failed to move cached message to recipient folder: {}",
198 ioException
.getMessage());
202 cachedMessage
[0].delete();
208 private void retryFailedReceivedMessages(Manager
.ReceiveMessageHandler handler
) {
209 Set
<HandleAction
> queuedActions
= new HashSet
<>();
210 for (var cachedMessage
: account
.getMessageCache().getCachedMessages()) {
211 var actions
= retryFailedReceivedMessage(handler
, cachedMessage
);
212 if (actions
!= null) {
213 queuedActions
.addAll(actions
);
216 handleQueuedActions(queuedActions
);
219 private List
<HandleAction
> retryFailedReceivedMessage(
220 final Manager
.ReceiveMessageHandler handler
, final CachedMessage cachedMessage
222 var envelope
= cachedMessage
.loadEnvelope();
223 if (envelope
== null) {
224 cachedMessage
.delete();
228 final var result
= context
.getIncomingMessageHandler()
229 .handleRetryEnvelope(envelope
, ignoreAttachments
, handler
);
230 final var actions
= result
.first();
231 final var exception
= result
.second();
233 if (exception
instanceof UntrustedIdentityException
) {
234 if (System
.currentTimeMillis() - envelope
.getServerDeliveredTimestamp() > 1000L * 60 * 60 * 24 * 30) {
235 // Envelope is more than a month old, cleaning up.
236 cachedMessage
.delete();
239 if (!envelope
.hasSourceUuid()) {
240 final var identifier
= ((UntrustedIdentityException
) exception
).getSender();
241 final var recipientId
= account
.getRecipientStore().resolveRecipient(identifier
);
243 account
.getMessageCache().replaceSender(cachedMessage
, recipientId
);
244 } catch (IOException ioException
) {
245 logger
.warn("Failed to move cached message to recipient folder: {}", ioException
.getMessage());
251 // If successful and for all other errors that are not recoverable, delete the cached message
252 cachedMessage
.delete();
256 private void handleQueuedActions(final Collection
<HandleAction
> queuedActions
) {
257 logger
.debug("Handling message actions");
258 var interrupted
= false;
259 for (var action
: queuedActions
) {
260 logger
.debug("Executing action {}", action
.getClass().getSimpleName());
262 action
.execute(context
);
263 } catch (Throwable e
) {
264 if ((e
instanceof AssertionError
|| e
instanceof RuntimeException
)
265 && e
.getCause() instanceof InterruptedException
) {
269 logger
.warn("Message action failed.", e
);
273 Thread
.currentThread().interrupt();
277 private void onWebSocketStateChange(final WebSocketConnectionState s
) {
278 if (s
.equals(WebSocketConnectionState
.AUTHENTICATION_FAILED
)) {
279 account
.setRegistered(false);
280 authenticationFailureListener
.call();
284 public interface Callable
{