]> nmode's Git Repositories - signal-cli/blob - lib/src/main/java/org/asamk/signal/manager/helper/ReceiveHelper.java
Add optional message limit for receive command
[signal-cli] / lib / src / main / java / org / asamk / signal / manager / helper / ReceiveHelper.java
1 package org.asamk.signal.manager.helper;
2
3 import org.asamk.signal.manager.Manager;
4 import org.asamk.signal.manager.SignalDependencies;
5 import org.asamk.signal.manager.actions.HandleAction;
6 import org.asamk.signal.manager.api.ReceiveConfig;
7 import org.asamk.signal.manager.api.UntrustedIdentityException;
8 import org.asamk.signal.manager.storage.SignalAccount;
9 import org.asamk.signal.manager.storage.messageCache.CachedMessage;
10 import org.asamk.signal.manager.storage.recipients.RecipientAddress;
11 import org.slf4j.Logger;
12 import org.slf4j.LoggerFactory;
13 import org.whispersystems.signalservice.api.SignalWebSocket;
14 import org.whispersystems.signalservice.api.messages.SignalServiceEnvelope;
15 import org.whispersystems.signalservice.api.websocket.WebSocketConnectionState;
16 import org.whispersystems.signalservice.api.websocket.WebSocketUnavailableException;
17
18 import java.io.IOException;
19 import java.time.Duration;
20 import java.util.Collection;
21 import java.util.HashMap;
22 import java.util.HashSet;
23 import java.util.List;
24 import java.util.Map;
25 import java.util.Set;
26 import java.util.concurrent.TimeoutException;
27
28 import io.reactivex.rxjava3.core.Observable;
29 import io.reactivex.rxjava3.schedulers.Schedulers;
30
31 public class ReceiveHelper {
32
33 private final static Logger logger = LoggerFactory.getLogger(ReceiveHelper.class);
34 private final static int MAX_BACKOFF_COUNTER = 9;
35
36 private final SignalAccount account;
37 private final SignalDependencies dependencies;
38 private final Context context;
39
40 private ReceiveConfig receiveConfig = new ReceiveConfig(false, false, false);
41 private boolean needsToRetryFailedMessages = false;
42 private boolean hasCaughtUpWithOldMessages = false;
43 private boolean isWaitingForMessage = false;
44 private boolean shouldStop = false;
45 private Callable authenticationFailureListener;
46 private Callable caughtUpWithOldMessagesListener;
47
48 public ReceiveHelper(final Context context) {
49 this.account = context.getAccount();
50 this.dependencies = context.getDependencies();
51 this.context = context;
52 }
53
54 public void setReceiveConfig(final ReceiveConfig receiveConfig) {
55 this.receiveConfig = receiveConfig;
56 dependencies.setAllowStories(!receiveConfig.ignoreStories());
57 }
58
59 public void setNeedsToRetryFailedMessages(final boolean needsToRetryFailedMessages) {
60 this.needsToRetryFailedMessages = needsToRetryFailedMessages;
61 }
62
63 public boolean hasCaughtUpWithOldMessages() {
64 return hasCaughtUpWithOldMessages;
65 }
66
67 public void setAuthenticationFailureListener(final Callable authenticationFailureListener) {
68 this.authenticationFailureListener = authenticationFailureListener;
69 }
70
71 public void setCaughtUpWithOldMessagesListener(final Callable caughtUpWithOldMessagesListener) {
72 this.caughtUpWithOldMessagesListener = caughtUpWithOldMessagesListener;
73 }
74
75 public boolean requestStopReceiveMessages() {
76 this.shouldStop = true;
77 return isWaitingForMessage;
78 }
79
80 public void receiveMessagesContinuously(Manager.ReceiveMessageHandler handler) {
81 while (!shouldStop) {
82 try {
83 receiveMessages(Duration.ofMinutes(1), false, null, handler);
84 break;
85 } catch (IOException e) {
86 logger.warn("Receiving messages failed, retrying", e);
87 }
88 }
89 }
90
91 public void receiveMessages(
92 Duration timeout, boolean returnOnTimeout, Integer maxMessages, Manager.ReceiveMessageHandler handler
93 ) throws IOException {
94 needsToRetryFailedMessages = true;
95 hasCaughtUpWithOldMessages = false;
96
97 // Use a Map here because java Set doesn't have a get method ...
98 Map<HandleAction, HandleAction> queuedActions = new HashMap<>();
99
100 final var signalWebSocket = dependencies.getSignalWebSocket();
101 final var webSocketStateDisposable = Observable.merge(signalWebSocket.getUnidentifiedWebSocketState(),
102 signalWebSocket.getWebSocketState())
103 .subscribeOn(Schedulers.computation())
104 .observeOn(Schedulers.computation())
105 .distinctUntilChanged()
106 .subscribe(this::onWebSocketStateChange);
107 signalWebSocket.connect();
108
109 try {
110 receiveMessagesInternal(signalWebSocket, timeout, returnOnTimeout, maxMessages, handler, queuedActions);
111 } finally {
112 hasCaughtUpWithOldMessages = false;
113 handleQueuedActions(queuedActions.keySet());
114 queuedActions.clear();
115 signalWebSocket.disconnect();
116 webSocketStateDisposable.dispose();
117 shouldStop = false;
118 }
119 }
120
121 private void receiveMessagesInternal(
122 final SignalWebSocket signalWebSocket,
123 Duration timeout,
124 boolean returnOnTimeout,
125 Integer maxMessages,
126 Manager.ReceiveMessageHandler handler,
127 final Map<HandleAction, HandleAction> queuedActions
128 ) throws IOException {
129 int remainingMessages = maxMessages == null ? -1 : maxMessages;
130 var backOffCounter = 0;
131 isWaitingForMessage = false;
132
133 while (!shouldStop && remainingMessages != 0) {
134 if (needsToRetryFailedMessages) {
135 retryFailedReceivedMessages(handler);
136 needsToRetryFailedMessages = false;
137 }
138 SignalServiceEnvelope envelope;
139 final CachedMessage[] cachedMessage = {null};
140 final var nowMillis = System.currentTimeMillis();
141 if (nowMillis - account.getLastReceiveTimestamp() > 60000) {
142 account.setLastReceiveTimestamp(nowMillis);
143 }
144 logger.debug("Checking for new message from server");
145 try {
146 isWaitingForMessage = true;
147 var result = signalWebSocket.readOrEmpty(timeout.toMillis(), envelope1 -> {
148 isWaitingForMessage = false;
149 final var recipientId = envelope1.hasSourceUuid() ? account.getRecipientResolver()
150 .resolveRecipient(envelope1.getSourceAddress()) : null;
151 logger.trace("Storing new message from {}", recipientId);
152 // store message on disk, before acknowledging receipt to the server
153 cachedMessage[0] = account.getMessageCache().cacheMessage(envelope1, recipientId);
154 });
155 isWaitingForMessage = false;
156 backOffCounter = 0;
157
158 if (result.isPresent()) {
159 if (remainingMessages > 0) {
160 remainingMessages -= 1;
161 }
162 envelope = result.get();
163 logger.debug("New message received from server");
164 } else {
165 logger.debug("Received indicator that server queue is empty");
166 handleQueuedActions(queuedActions.keySet());
167 queuedActions.clear();
168
169 hasCaughtUpWithOldMessages = true;
170 caughtUpWithOldMessagesListener.call();
171
172 // Continue to wait another timeout for new messages
173 continue;
174 }
175 } catch (AssertionError e) {
176 if (e.getCause() instanceof InterruptedException) {
177 break;
178 } else {
179 throw e;
180 }
181 } catch (IOException e) {
182 logger.debug("Pipe unexpectedly unavailable: {}", e.getMessage());
183 if (e instanceof WebSocketUnavailableException || "Connection closed!".equals(e.getMessage())) {
184 final var sleepMilliseconds = 100 * (long) Math.pow(2, backOffCounter);
185 backOffCounter = Math.min(backOffCounter + 1, MAX_BACKOFF_COUNTER);
186 logger.warn("Connection closed unexpectedly, reconnecting in {} ms", sleepMilliseconds);
187 try {
188 Thread.sleep(sleepMilliseconds);
189 } catch (InterruptedException interruptedException) {
190 return;
191 }
192 hasCaughtUpWithOldMessages = false;
193 signalWebSocket.connect();
194 continue;
195 }
196 throw e;
197 } catch (TimeoutException e) {
198 backOffCounter = 0;
199 if (returnOnTimeout) return;
200 continue;
201 }
202
203 final var result = context.getIncomingMessageHandler().handleEnvelope(envelope, receiveConfig, handler);
204 for (final var h : result.first()) {
205 final var existingAction = queuedActions.get(h);
206 if (existingAction == null) {
207 queuedActions.put(h, h);
208 } else {
209 existingAction.mergeOther(h);
210 }
211 }
212 final var exception = result.second();
213
214 if (hasCaughtUpWithOldMessages) {
215 handleQueuedActions(queuedActions.keySet());
216 queuedActions.clear();
217 }
218 if (cachedMessage[0] != null) {
219 if (exception instanceof UntrustedIdentityException) {
220 logger.debug("Keeping message with untrusted identity in message cache");
221 final var address = ((UntrustedIdentityException) exception).getSender();
222 final var recipientId = account.getRecipientResolver().resolveRecipient(address.getServiceId());
223 if (!envelope.hasSourceUuid()) {
224 try {
225 cachedMessage[0] = account.getMessageCache().replaceSender(cachedMessage[0], recipientId);
226 } catch (IOException ioException) {
227 logger.warn("Failed to move cached message to recipient folder: {}",
228 ioException.getMessage());
229 }
230 }
231 } else {
232 cachedMessage[0].delete();
233 }
234 }
235 }
236 }
237
238 private void retryFailedReceivedMessages(Manager.ReceiveMessageHandler handler) {
239 Set<HandleAction> queuedActions = new HashSet<>();
240 for (var cachedMessage : account.getMessageCache().getCachedMessages()) {
241 var actions = retryFailedReceivedMessage(handler, cachedMessage);
242 if (actions != null) {
243 queuedActions.addAll(actions);
244 }
245 }
246 handleQueuedActions(queuedActions);
247 }
248
249 private List<HandleAction> retryFailedReceivedMessage(
250 final Manager.ReceiveMessageHandler handler, final CachedMessage cachedMessage
251 ) {
252 var envelope = cachedMessage.loadEnvelope();
253 if (envelope == null) {
254 cachedMessage.delete();
255 return null;
256 }
257
258 final var result = context.getIncomingMessageHandler().handleRetryEnvelope(envelope, receiveConfig, handler);
259 final var actions = result.first();
260 final var exception = result.second();
261
262 if (exception instanceof UntrustedIdentityException) {
263 if (System.currentTimeMillis() - envelope.getServerDeliveredTimestamp() > 1000L * 60 * 60 * 24 * 30) {
264 // Envelope is more than a month old, cleaning up.
265 cachedMessage.delete();
266 return null;
267 }
268 if (!envelope.hasSourceUuid()) {
269 final var identifier = ((UntrustedIdentityException) exception).getSender();
270 final var recipientId = account.getRecipientResolver()
271 .resolveRecipient(new RecipientAddress(identifier));
272 try {
273 account.getMessageCache().replaceSender(cachedMessage, recipientId);
274 } catch (IOException ioException) {
275 logger.warn("Failed to move cached message to recipient folder: {}", ioException.getMessage());
276 }
277 }
278 return null;
279 }
280
281 // If successful and for all other errors that are not recoverable, delete the cached message
282 cachedMessage.delete();
283 return actions;
284 }
285
286 private void handleQueuedActions(final Collection<HandleAction> queuedActions) {
287 logger.debug("Handling message actions");
288 for (var action : queuedActions) {
289 logger.debug("Executing action {}", action.getClass().getSimpleName());
290 try {
291 action.execute(context);
292 } catch (Throwable e) {
293 logger.warn("Message action failed.", e);
294 }
295 }
296 }
297
298 private void onWebSocketStateChange(final WebSocketConnectionState s) {
299 if (s.equals(WebSocketConnectionState.AUTHENTICATION_FAILED)) {
300 account.setRegistered(false);
301 authenticationFailureListener.call();
302 }
303 }
304
305 public interface Callable {
306
307 void call();
308 }
309 }