]> nmode's Git Repositories - signal-cli/blob - lib/src/main/java/org/asamk/signal/manager/helper/ReceiveHelper.java
5dca96eacdae3b3e7ce96f7308721a9e00d00a96
[signal-cli] / lib / src / main / java / org / asamk / signal / manager / helper / ReceiveHelper.java
1 package org.asamk.signal.manager.helper;
2
3 import org.asamk.signal.manager.Manager;
4 import org.asamk.signal.manager.SignalDependencies;
5 import org.asamk.signal.manager.actions.HandleAction;
6 import org.asamk.signal.manager.api.ReceiveConfig;
7 import org.asamk.signal.manager.api.UntrustedIdentityException;
8 import org.asamk.signal.manager.storage.SignalAccount;
9 import org.asamk.signal.manager.storage.messageCache.CachedMessage;
10 import org.slf4j.Logger;
11 import org.slf4j.LoggerFactory;
12 import org.whispersystems.signalservice.api.SignalWebSocket;
13 import org.whispersystems.signalservice.api.messages.SignalServiceEnvelope;
14 import org.whispersystems.signalservice.api.websocket.WebSocketConnectionState;
15 import org.whispersystems.signalservice.api.websocket.WebSocketUnavailableException;
16
17 import java.io.IOException;
18 import java.time.Duration;
19 import java.util.Collection;
20 import java.util.HashMap;
21 import java.util.HashSet;
22 import java.util.List;
23 import java.util.Map;
24 import java.util.Set;
25 import java.util.concurrent.TimeoutException;
26
27 import io.reactivex.rxjava3.core.Observable;
28 import io.reactivex.rxjava3.schedulers.Schedulers;
29
30 public class ReceiveHelper {
31
32 private final static Logger logger = LoggerFactory.getLogger(ReceiveHelper.class);
33 private final static int MAX_BACKOFF_COUNTER = 9;
34
35 private final SignalAccount account;
36 private final SignalDependencies dependencies;
37 private final Context context;
38
39 private ReceiveConfig receiveConfig = new ReceiveConfig(false, false, false);
40 private boolean needsToRetryFailedMessages = false;
41 private boolean hasCaughtUpWithOldMessages = false;
42 private boolean isWaitingForMessage = false;
43 private boolean shouldStop = false;
44 private Callable authenticationFailureListener;
45 private Callable caughtUpWithOldMessagesListener;
46
47 public ReceiveHelper(final Context context) {
48 this.account = context.getAccount();
49 this.dependencies = context.getDependencies();
50 this.context = context;
51 }
52
53 public void setReceiveConfig(final ReceiveConfig receiveConfig) {
54 this.receiveConfig = receiveConfig;
55 dependencies.setAllowStories(!receiveConfig.ignoreStories());
56 }
57
58 public void setNeedsToRetryFailedMessages(final boolean needsToRetryFailedMessages) {
59 this.needsToRetryFailedMessages = needsToRetryFailedMessages;
60 }
61
62 public boolean hasCaughtUpWithOldMessages() {
63 return hasCaughtUpWithOldMessages;
64 }
65
66 public void setAuthenticationFailureListener(final Callable authenticationFailureListener) {
67 this.authenticationFailureListener = authenticationFailureListener;
68 }
69
70 public void setCaughtUpWithOldMessagesListener(final Callable caughtUpWithOldMessagesListener) {
71 this.caughtUpWithOldMessagesListener = caughtUpWithOldMessagesListener;
72 }
73
74 public boolean requestStopReceiveMessages() {
75 this.shouldStop = true;
76 return isWaitingForMessage;
77 }
78
79 public void receiveMessagesContinuously(Manager.ReceiveMessageHandler handler) {
80 while (!shouldStop) {
81 try {
82 receiveMessages(Duration.ofMinutes(1), false, handler);
83 break;
84 } catch (IOException e) {
85 logger.warn("Receiving messages failed, retrying", e);
86 }
87 }
88 }
89
90 public void receiveMessages(
91 Duration timeout, boolean returnOnTimeout, Manager.ReceiveMessageHandler handler
92 ) throws IOException {
93 needsToRetryFailedMessages = true;
94 hasCaughtUpWithOldMessages = false;
95
96 // Use a Map here because java Set doesn't have a get method ...
97 Map<HandleAction, HandleAction> queuedActions = new HashMap<>();
98
99 final var signalWebSocket = dependencies.getSignalWebSocket();
100 final var webSocketStateDisposable = Observable.merge(signalWebSocket.getUnidentifiedWebSocketState(),
101 signalWebSocket.getWebSocketState())
102 .subscribeOn(Schedulers.computation())
103 .observeOn(Schedulers.computation())
104 .distinctUntilChanged()
105 .subscribe(this::onWebSocketStateChange);
106 signalWebSocket.connect();
107
108 try {
109 receiveMessagesInternal(signalWebSocket, timeout, returnOnTimeout, handler, queuedActions);
110 } finally {
111 hasCaughtUpWithOldMessages = false;
112 handleQueuedActions(queuedActions.keySet());
113 queuedActions.clear();
114 signalWebSocket.disconnect();
115 webSocketStateDisposable.dispose();
116 shouldStop = false;
117 }
118 }
119
120 private void receiveMessagesInternal(
121 final SignalWebSocket signalWebSocket,
122 Duration timeout,
123 boolean returnOnTimeout,
124 Manager.ReceiveMessageHandler handler,
125 final Map<HandleAction, HandleAction> queuedActions
126 ) throws IOException {
127 var backOffCounter = 0;
128 isWaitingForMessage = false;
129
130 while (!shouldStop) {
131 if (needsToRetryFailedMessages) {
132 retryFailedReceivedMessages(handler);
133 needsToRetryFailedMessages = false;
134 }
135 SignalServiceEnvelope envelope;
136 final CachedMessage[] cachedMessage = {null};
137 final var nowMillis = System.currentTimeMillis();
138 if (nowMillis - account.getLastReceiveTimestamp() > 60000) {
139 account.setLastReceiveTimestamp(nowMillis);
140 }
141 logger.debug("Checking for new message from server");
142 try {
143 isWaitingForMessage = true;
144 var result = signalWebSocket.readOrEmpty(timeout.toMillis(), envelope1 -> {
145 isWaitingForMessage = false;
146 final var recipientId = envelope1.hasSourceUuid() ? account.getRecipientResolver()
147 .resolveRecipient(envelope1.getSourceAddress()) : null;
148 logger.trace("Storing new message from {}", recipientId);
149 // store message on disk, before acknowledging receipt to the server
150 cachedMessage[0] = account.getMessageCache().cacheMessage(envelope1, recipientId);
151 });
152 isWaitingForMessage = false;
153 backOffCounter = 0;
154
155 if (result.isPresent()) {
156 envelope = result.get();
157 logger.debug("New message received from server");
158 } else {
159 logger.debug("Received indicator that server queue is empty");
160 handleQueuedActions(queuedActions.keySet());
161 queuedActions.clear();
162
163 hasCaughtUpWithOldMessages = true;
164 caughtUpWithOldMessagesListener.call();
165
166 // Continue to wait another timeout for new messages
167 continue;
168 }
169 } catch (AssertionError e) {
170 if (e.getCause() instanceof InterruptedException) {
171 break;
172 } else {
173 throw e;
174 }
175 } catch (IOException e) {
176 logger.debug("Pipe unexpectedly unavailable: {}", e.getMessage());
177 if (e instanceof WebSocketUnavailableException || "Connection closed!".equals(e.getMessage())) {
178 final var sleepMilliseconds = 100 * (long) Math.pow(2, backOffCounter);
179 backOffCounter = Math.min(backOffCounter + 1, MAX_BACKOFF_COUNTER);
180 logger.warn("Connection closed unexpectedly, reconnecting in {} ms", sleepMilliseconds);
181 try {
182 Thread.sleep(sleepMilliseconds);
183 } catch (InterruptedException interruptedException) {
184 return;
185 }
186 hasCaughtUpWithOldMessages = false;
187 signalWebSocket.connect();
188 continue;
189 }
190 throw e;
191 } catch (TimeoutException e) {
192 backOffCounter = 0;
193 if (returnOnTimeout) return;
194 continue;
195 }
196
197 final var result = context.getIncomingMessageHandler().handleEnvelope(envelope, receiveConfig, handler);
198 for (final var h : result.first()) {
199 final var existingAction = queuedActions.get(h);
200 if (existingAction == null) {
201 queuedActions.put(h, h);
202 } else {
203 existingAction.mergeOther(h);
204 }
205 }
206 final var exception = result.second();
207
208 if (hasCaughtUpWithOldMessages) {
209 handleQueuedActions(queuedActions.keySet());
210 queuedActions.clear();
211 }
212 if (cachedMessage[0] != null) {
213 if (exception instanceof UntrustedIdentityException) {
214 logger.debug("Keeping message with untrusted identity in message cache");
215 final var address = ((UntrustedIdentityException) exception).getSender();
216 final var recipientId = account.getRecipientResolver().resolveRecipient(address.getServiceId());
217 if (!envelope.hasSourceUuid()) {
218 try {
219 cachedMessage[0] = account.getMessageCache().replaceSender(cachedMessage[0], recipientId);
220 } catch (IOException ioException) {
221 logger.warn("Failed to move cached message to recipient folder: {}",
222 ioException.getMessage());
223 }
224 }
225 } else {
226 cachedMessage[0].delete();
227 }
228 }
229 }
230 }
231
232 private void retryFailedReceivedMessages(Manager.ReceiveMessageHandler handler) {
233 Set<HandleAction> queuedActions = new HashSet<>();
234 for (var cachedMessage : account.getMessageCache().getCachedMessages()) {
235 var actions = retryFailedReceivedMessage(handler, cachedMessage);
236 if (actions != null) {
237 queuedActions.addAll(actions);
238 }
239 }
240 handleQueuedActions(queuedActions);
241 }
242
243 private List<HandleAction> retryFailedReceivedMessage(
244 final Manager.ReceiveMessageHandler handler, final CachedMessage cachedMessage
245 ) {
246 var envelope = cachedMessage.loadEnvelope();
247 if (envelope == null) {
248 cachedMessage.delete();
249 return null;
250 }
251
252 final var result = context.getIncomingMessageHandler().handleRetryEnvelope(envelope, receiveConfig, handler);
253 final var actions = result.first();
254 final var exception = result.second();
255
256 if (exception instanceof UntrustedIdentityException) {
257 if (System.currentTimeMillis() - envelope.getServerDeliveredTimestamp() > 1000L * 60 * 60 * 24 * 30) {
258 // Envelope is more than a month old, cleaning up.
259 cachedMessage.delete();
260 return null;
261 }
262 if (!envelope.hasSourceUuid()) {
263 final var identifier = ((UntrustedIdentityException) exception).getSender();
264 final var recipientId = account.getRecipientResolver().resolveRecipient(identifier.getServiceId());
265 try {
266 account.getMessageCache().replaceSender(cachedMessage, recipientId);
267 } catch (IOException ioException) {
268 logger.warn("Failed to move cached message to recipient folder: {}", ioException.getMessage());
269 }
270 }
271 return null;
272 }
273
274 // If successful and for all other errors that are not recoverable, delete the cached message
275 cachedMessage.delete();
276 return actions;
277 }
278
279 private void handleQueuedActions(final Collection<HandleAction> queuedActions) {
280 logger.debug("Handling message actions");
281 for (var action : queuedActions) {
282 logger.debug("Executing action {}", action.getClass().getSimpleName());
283 try {
284 action.execute(context);
285 } catch (Throwable e) {
286 logger.warn("Message action failed.", e);
287 }
288 }
289 }
290
291 private void onWebSocketStateChange(final WebSocketConnectionState s) {
292 if (s.equals(WebSocketConnectionState.AUTHENTICATION_FAILED)) {
293 account.setRegistered(false);
294 authenticationFailureListener.call();
295 }
296 }
297
298 public interface Callable {
299
300 void call();
301 }
302 }