]> nmode's Git Repositories - signal-cli/blob - lib/src/main/java/org/asamk/signal/manager/helper/ReceiveHelper.java
Implement reacting to stories
[signal-cli] / lib / src / main / java / org / asamk / signal / manager / helper / ReceiveHelper.java
1 package org.asamk.signal.manager.helper;
2
3 import org.asamk.signal.manager.Manager;
4 import org.asamk.signal.manager.SignalDependencies;
5 import org.asamk.signal.manager.actions.HandleAction;
6 import org.asamk.signal.manager.api.ReceiveConfig;
7 import org.asamk.signal.manager.api.UntrustedIdentityException;
8 import org.asamk.signal.manager.storage.SignalAccount;
9 import org.asamk.signal.manager.storage.messageCache.CachedMessage;
10 import org.asamk.signal.manager.storage.recipients.RecipientAddress;
11 import org.slf4j.Logger;
12 import org.slf4j.LoggerFactory;
13 import org.whispersystems.signalservice.api.SignalWebSocket;
14 import org.whispersystems.signalservice.api.messages.SignalServiceEnvelope;
15 import org.whispersystems.signalservice.api.websocket.WebSocketConnectionState;
16 import org.whispersystems.signalservice.api.websocket.WebSocketUnavailableException;
17
18 import java.io.IOException;
19 import java.time.Duration;
20 import java.util.Collection;
21 import java.util.HashMap;
22 import java.util.HashSet;
23 import java.util.List;
24 import java.util.Map;
25 import java.util.Set;
26 import java.util.concurrent.TimeoutException;
27
28 import io.reactivex.rxjava3.core.Observable;
29 import io.reactivex.rxjava3.schedulers.Schedulers;
30
31 public class ReceiveHelper {
32
33 private final static Logger logger = LoggerFactory.getLogger(ReceiveHelper.class);
34 private final static int MAX_BACKOFF_COUNTER = 9;
35
36 private final SignalAccount account;
37 private final SignalDependencies dependencies;
38 private final Context context;
39
40 private ReceiveConfig receiveConfig = new ReceiveConfig(false, false, false);
41 private boolean needsToRetryFailedMessages = false;
42 private boolean hasCaughtUpWithOldMessages = false;
43 private boolean isWaitingForMessage = false;
44 private boolean shouldStop = false;
45 private Callable authenticationFailureListener;
46 private Callable caughtUpWithOldMessagesListener;
47
48 public ReceiveHelper(final Context context) {
49 this.account = context.getAccount();
50 this.dependencies = context.getDependencies();
51 this.context = context;
52 }
53
54 public void setReceiveConfig(final ReceiveConfig receiveConfig) {
55 this.receiveConfig = receiveConfig;
56 dependencies.setAllowStories(!receiveConfig.ignoreStories());
57 }
58
59 public void setNeedsToRetryFailedMessages(final boolean needsToRetryFailedMessages) {
60 this.needsToRetryFailedMessages = needsToRetryFailedMessages;
61 }
62
63 public boolean hasCaughtUpWithOldMessages() {
64 return hasCaughtUpWithOldMessages;
65 }
66
67 public void setAuthenticationFailureListener(final Callable authenticationFailureListener) {
68 this.authenticationFailureListener = authenticationFailureListener;
69 }
70
71 public void setCaughtUpWithOldMessagesListener(final Callable caughtUpWithOldMessagesListener) {
72 this.caughtUpWithOldMessagesListener = caughtUpWithOldMessagesListener;
73 }
74
75 public boolean requestStopReceiveMessages() {
76 this.shouldStop = true;
77 return isWaitingForMessage;
78 }
79
80 public void receiveMessagesContinuously(Manager.ReceiveMessageHandler handler) {
81 while (!shouldStop) {
82 try {
83 receiveMessages(Duration.ofMinutes(1), false, handler);
84 break;
85 } catch (IOException e) {
86 logger.warn("Receiving messages failed, retrying", e);
87 }
88 }
89 }
90
91 public void receiveMessages(
92 Duration timeout, boolean returnOnTimeout, Manager.ReceiveMessageHandler handler
93 ) throws IOException {
94 needsToRetryFailedMessages = true;
95 hasCaughtUpWithOldMessages = false;
96
97 // Use a Map here because java Set doesn't have a get method ...
98 Map<HandleAction, HandleAction> queuedActions = new HashMap<>();
99
100 final var signalWebSocket = dependencies.getSignalWebSocket();
101 final var webSocketStateDisposable = Observable.merge(signalWebSocket.getUnidentifiedWebSocketState(),
102 signalWebSocket.getWebSocketState())
103 .subscribeOn(Schedulers.computation())
104 .observeOn(Schedulers.computation())
105 .distinctUntilChanged()
106 .subscribe(this::onWebSocketStateChange);
107 signalWebSocket.connect();
108
109 try {
110 receiveMessagesInternal(signalWebSocket, timeout, returnOnTimeout, handler, queuedActions);
111 } finally {
112 hasCaughtUpWithOldMessages = false;
113 handleQueuedActions(queuedActions.keySet());
114 queuedActions.clear();
115 signalWebSocket.disconnect();
116 webSocketStateDisposable.dispose();
117 shouldStop = false;
118 }
119 }
120
121 private void receiveMessagesInternal(
122 final SignalWebSocket signalWebSocket,
123 Duration timeout,
124 boolean returnOnTimeout,
125 Manager.ReceiveMessageHandler handler,
126 final Map<HandleAction, HandleAction> queuedActions
127 ) throws IOException {
128 var backOffCounter = 0;
129 isWaitingForMessage = false;
130
131 while (!shouldStop) {
132 if (needsToRetryFailedMessages) {
133 retryFailedReceivedMessages(handler);
134 needsToRetryFailedMessages = false;
135 }
136 SignalServiceEnvelope envelope;
137 final CachedMessage[] cachedMessage = {null};
138 final var nowMillis = System.currentTimeMillis();
139 if (nowMillis - account.getLastReceiveTimestamp() > 60000) {
140 account.setLastReceiveTimestamp(nowMillis);
141 }
142 logger.debug("Checking for new message from server");
143 try {
144 isWaitingForMessage = true;
145 var result = signalWebSocket.readOrEmpty(timeout.toMillis(), envelope1 -> {
146 isWaitingForMessage = false;
147 final var recipientId = envelope1.hasSourceUuid() ? account.getRecipientResolver()
148 .resolveRecipient(envelope1.getSourceAddress()) : null;
149 logger.trace("Storing new message from {}", recipientId);
150 // store message on disk, before acknowledging receipt to the server
151 cachedMessage[0] = account.getMessageCache().cacheMessage(envelope1, recipientId);
152 });
153 isWaitingForMessage = false;
154 backOffCounter = 0;
155
156 if (result.isPresent()) {
157 envelope = result.get();
158 logger.debug("New message received from server");
159 } else {
160 logger.debug("Received indicator that server queue is empty");
161 handleQueuedActions(queuedActions.keySet());
162 queuedActions.clear();
163
164 hasCaughtUpWithOldMessages = true;
165 caughtUpWithOldMessagesListener.call();
166
167 // Continue to wait another timeout for new messages
168 continue;
169 }
170 } catch (AssertionError e) {
171 if (e.getCause() instanceof InterruptedException) {
172 break;
173 } else {
174 throw e;
175 }
176 } catch (IOException e) {
177 logger.debug("Pipe unexpectedly unavailable: {}", e.getMessage());
178 if (e instanceof WebSocketUnavailableException || "Connection closed!".equals(e.getMessage())) {
179 final var sleepMilliseconds = 100 * (long) Math.pow(2, backOffCounter);
180 backOffCounter = Math.min(backOffCounter + 1, MAX_BACKOFF_COUNTER);
181 logger.warn("Connection closed unexpectedly, reconnecting in {} ms", sleepMilliseconds);
182 try {
183 Thread.sleep(sleepMilliseconds);
184 } catch (InterruptedException interruptedException) {
185 return;
186 }
187 hasCaughtUpWithOldMessages = false;
188 signalWebSocket.connect();
189 continue;
190 }
191 throw e;
192 } catch (TimeoutException e) {
193 backOffCounter = 0;
194 if (returnOnTimeout) return;
195 continue;
196 }
197
198 final var result = context.getIncomingMessageHandler().handleEnvelope(envelope, receiveConfig, handler);
199 for (final var h : result.first()) {
200 final var existingAction = queuedActions.get(h);
201 if (existingAction == null) {
202 queuedActions.put(h, h);
203 } else {
204 existingAction.mergeOther(h);
205 }
206 }
207 final var exception = result.second();
208
209 if (hasCaughtUpWithOldMessages) {
210 handleQueuedActions(queuedActions.keySet());
211 queuedActions.clear();
212 }
213 if (cachedMessage[0] != null) {
214 if (exception instanceof UntrustedIdentityException) {
215 logger.debug("Keeping message with untrusted identity in message cache");
216 final var address = ((UntrustedIdentityException) exception).getSender();
217 final var recipientId = account.getRecipientResolver().resolveRecipient(address.getServiceId());
218 if (!envelope.hasSourceUuid()) {
219 try {
220 cachedMessage[0] = account.getMessageCache().replaceSender(cachedMessage[0], recipientId);
221 } catch (IOException ioException) {
222 logger.warn("Failed to move cached message to recipient folder: {}",
223 ioException.getMessage());
224 }
225 }
226 } else {
227 cachedMessage[0].delete();
228 }
229 }
230 }
231 }
232
233 private void retryFailedReceivedMessages(Manager.ReceiveMessageHandler handler) {
234 Set<HandleAction> queuedActions = new HashSet<>();
235 for (var cachedMessage : account.getMessageCache().getCachedMessages()) {
236 var actions = retryFailedReceivedMessage(handler, cachedMessage);
237 if (actions != null) {
238 queuedActions.addAll(actions);
239 }
240 }
241 handleQueuedActions(queuedActions);
242 }
243
244 private List<HandleAction> retryFailedReceivedMessage(
245 final Manager.ReceiveMessageHandler handler, final CachedMessage cachedMessage
246 ) {
247 var envelope = cachedMessage.loadEnvelope();
248 if (envelope == null) {
249 cachedMessage.delete();
250 return null;
251 }
252
253 final var result = context.getIncomingMessageHandler().handleRetryEnvelope(envelope, receiveConfig, handler);
254 final var actions = result.first();
255 final var exception = result.second();
256
257 if (exception instanceof UntrustedIdentityException) {
258 if (System.currentTimeMillis() - envelope.getServerDeliveredTimestamp() > 1000L * 60 * 60 * 24 * 30) {
259 // Envelope is more than a month old, cleaning up.
260 cachedMessage.delete();
261 return null;
262 }
263 if (!envelope.hasSourceUuid()) {
264 final var identifier = ((UntrustedIdentityException) exception).getSender();
265 final var recipientId = account.getRecipientResolver()
266 .resolveRecipient(new RecipientAddress(identifier));
267 try {
268 account.getMessageCache().replaceSender(cachedMessage, recipientId);
269 } catch (IOException ioException) {
270 logger.warn("Failed to move cached message to recipient folder: {}", ioException.getMessage());
271 }
272 }
273 return null;
274 }
275
276 // If successful and for all other errors that are not recoverable, delete the cached message
277 cachedMessage.delete();
278 return actions;
279 }
280
281 private void handleQueuedActions(final Collection<HandleAction> queuedActions) {
282 logger.debug("Handling message actions");
283 for (var action : queuedActions) {
284 logger.debug("Executing action {}", action.getClass().getSimpleName());
285 try {
286 action.execute(context);
287 } catch (Throwable e) {
288 logger.warn("Message action failed.", e);
289 }
290 }
291 }
292
293 private void onWebSocketStateChange(final WebSocketConnectionState s) {
294 if (s.equals(WebSocketConnectionState.AUTHENTICATION_FAILED)) {
295 account.setRegistered(false);
296 authenticationFailureListener.call();
297 }
298 }
299
300 public interface Callable {
301
302 void call();
303 }
304 }