]> nmode's Git Repositories - signal-cli/blob - lib/src/main/java/org/asamk/signal/manager/helper/ReceiveHelper.java
Remove unused code
[signal-cli] / lib / src / main / java / org / asamk / signal / manager / helper / ReceiveHelper.java
1 package org.asamk.signal.manager.helper;
2
3 import org.asamk.signal.manager.Manager;
4 import org.asamk.signal.manager.SignalDependencies;
5 import org.asamk.signal.manager.actions.HandleAction;
6 import org.asamk.signal.manager.api.ReceiveConfig;
7 import org.asamk.signal.manager.api.UntrustedIdentityException;
8 import org.asamk.signal.manager.storage.SignalAccount;
9 import org.asamk.signal.manager.storage.messageCache.CachedMessage;
10 import org.asamk.signal.manager.storage.recipients.RecipientAddress;
11 import org.slf4j.Logger;
12 import org.slf4j.LoggerFactory;
13 import org.whispersystems.signalservice.api.SignalWebSocket;
14 import org.whispersystems.signalservice.api.messages.SignalServiceEnvelope;
15 import org.whispersystems.signalservice.api.push.ServiceId;
16 import org.whispersystems.signalservice.api.websocket.WebSocketConnectionState;
17 import org.whispersystems.signalservice.api.websocket.WebSocketUnavailableException;
18
19 import java.io.IOException;
20 import java.time.Duration;
21 import java.util.Collection;
22 import java.util.HashMap;
23 import java.util.HashSet;
24 import java.util.List;
25 import java.util.Map;
26 import java.util.Set;
27 import java.util.concurrent.TimeoutException;
28
29 import io.reactivex.rxjava3.core.Observable;
30 import io.reactivex.rxjava3.schedulers.Schedulers;
31
32 public class ReceiveHelper {
33
34 private final static Logger logger = LoggerFactory.getLogger(ReceiveHelper.class);
35 private final static int MAX_BACKOFF_COUNTER = 9;
36
37 private final SignalAccount account;
38 private final SignalDependencies dependencies;
39 private final Context context;
40
41 private ReceiveConfig receiveConfig = new ReceiveConfig(false, false, false);
42 private boolean needsToRetryFailedMessages = false;
43 private boolean hasCaughtUpWithOldMessages = false;
44 private boolean isWaitingForMessage = false;
45 private boolean shouldStop = false;
46 private Callable authenticationFailureListener;
47 private Callable caughtUpWithOldMessagesListener;
48
49 public ReceiveHelper(final Context context) {
50 this.account = context.getAccount();
51 this.dependencies = context.getDependencies();
52 this.context = context;
53 }
54
55 public void setReceiveConfig(final ReceiveConfig receiveConfig) {
56 this.receiveConfig = receiveConfig;
57 dependencies.setAllowStories(!receiveConfig.ignoreStories());
58 }
59
60 public void setNeedsToRetryFailedMessages(final boolean needsToRetryFailedMessages) {
61 this.needsToRetryFailedMessages = needsToRetryFailedMessages;
62 }
63
64 public void setAuthenticationFailureListener(final Callable authenticationFailureListener) {
65 this.authenticationFailureListener = authenticationFailureListener;
66 }
67
68 public void setCaughtUpWithOldMessagesListener(final Callable caughtUpWithOldMessagesListener) {
69 this.caughtUpWithOldMessagesListener = caughtUpWithOldMessagesListener;
70 }
71
72 public boolean requestStopReceiveMessages() {
73 this.shouldStop = true;
74 return isWaitingForMessage;
75 }
76
77 public void receiveMessagesContinuously(Manager.ReceiveMessageHandler handler) {
78 while (!shouldStop) {
79 try {
80 receiveMessages(Duration.ofMinutes(1), false, null, handler);
81 break;
82 } catch (IOException e) {
83 logger.warn("Receiving messages failed, retrying", e);
84 }
85 }
86 }
87
88 public void receiveMessages(
89 Duration timeout, boolean returnOnTimeout, Integer maxMessages, Manager.ReceiveMessageHandler handler
90 ) throws IOException {
91 needsToRetryFailedMessages = true;
92 hasCaughtUpWithOldMessages = false;
93
94 // Use a Map here because java Set doesn't have a get method ...
95 Map<HandleAction, HandleAction> queuedActions = new HashMap<>();
96
97 final var signalWebSocket = dependencies.getSignalWebSocket();
98 final var webSocketStateDisposable = Observable.merge(signalWebSocket.getUnidentifiedWebSocketState(),
99 signalWebSocket.getWebSocketState())
100 .subscribeOn(Schedulers.computation())
101 .observeOn(Schedulers.computation())
102 .distinctUntilChanged()
103 .subscribe(this::onWebSocketStateChange);
104 signalWebSocket.connect();
105
106 try {
107 receiveMessagesInternal(signalWebSocket, timeout, returnOnTimeout, maxMessages, handler, queuedActions);
108 } finally {
109 hasCaughtUpWithOldMessages = false;
110 handleQueuedActions(queuedActions.keySet());
111 queuedActions.clear();
112 signalWebSocket.disconnect();
113 webSocketStateDisposable.dispose();
114 shouldStop = false;
115 }
116 }
117
118 private void receiveMessagesInternal(
119 final SignalWebSocket signalWebSocket,
120 Duration timeout,
121 boolean returnOnTimeout,
122 Integer maxMessages,
123 Manager.ReceiveMessageHandler handler,
124 final Map<HandleAction, HandleAction> queuedActions
125 ) throws IOException {
126 int remainingMessages = maxMessages == null ? -1 : maxMessages;
127 var backOffCounter = 0;
128 isWaitingForMessage = false;
129
130 while (!shouldStop && remainingMessages != 0) {
131 if (needsToRetryFailedMessages) {
132 retryFailedReceivedMessages(handler);
133 needsToRetryFailedMessages = false;
134 }
135 SignalServiceEnvelope envelope;
136 final CachedMessage[] cachedMessage = {null};
137 final var nowMillis = System.currentTimeMillis();
138 if (nowMillis - account.getLastReceiveTimestamp() > 60000) {
139 account.setLastReceiveTimestamp(nowMillis);
140 }
141 logger.debug("Checking for new message from server");
142 try {
143 isWaitingForMessage = true;
144 var queueNotEmpty = signalWebSocket.readMessageBatch(timeout.toMillis(), 1, batch -> {
145 logger.debug("Retrieved {} envelopes!", batch.size());
146 isWaitingForMessage = false;
147 for (final var it : batch) {
148 SignalServiceEnvelope envelope1 = new SignalServiceEnvelope(it.getEnvelope(),
149 it.getServerDeliveredTimestamp());
150 final var recipientId = envelope1.hasSourceUuid() ? account.getRecipientResolver()
151 .resolveRecipient(envelope1.getSourceAddress()) : null;
152 logger.trace("Storing new message from {}", recipientId);
153 // store message on disk, before acknowledging receipt to the server
154 cachedMessage[0] = account.getMessageCache().cacheMessage(envelope1, recipientId);
155 }
156 return true;
157 });
158 isWaitingForMessage = false;
159 backOffCounter = 0;
160
161 if (queueNotEmpty) {
162 if (remainingMessages > 0) {
163 remainingMessages -= 1;
164 }
165 envelope = cachedMessage[0].loadEnvelope();
166 logger.debug("New message received from server");
167 } else {
168 logger.debug("Received indicator that server queue is empty");
169 handleQueuedActions(queuedActions.keySet());
170 queuedActions.clear();
171
172 hasCaughtUpWithOldMessages = true;
173 caughtUpWithOldMessagesListener.call();
174
175 // Continue to wait another timeout for new messages
176 continue;
177 }
178 } catch (AssertionError e) {
179 if (e.getCause() instanceof InterruptedException) {
180 break;
181 } else {
182 throw e;
183 }
184 } catch (IOException e) {
185 logger.debug("Pipe unexpectedly unavailable: {}", e.getMessage());
186 if (e instanceof WebSocketUnavailableException || "Connection closed!".equals(e.getMessage())) {
187 final var sleepMilliseconds = 100 * (long) Math.pow(2, backOffCounter);
188 backOffCounter = Math.min(backOffCounter + 1, MAX_BACKOFF_COUNTER);
189 logger.warn("Connection closed unexpectedly, reconnecting in {} ms", sleepMilliseconds);
190 try {
191 Thread.sleep(sleepMilliseconds);
192 } catch (InterruptedException interruptedException) {
193 return;
194 }
195 hasCaughtUpWithOldMessages = false;
196 signalWebSocket.connect();
197 continue;
198 }
199 throw e;
200 } catch (TimeoutException e) {
201 backOffCounter = 0;
202 if (returnOnTimeout) return;
203 continue;
204 }
205
206 final var result = context.getIncomingMessageHandler().handleEnvelope(envelope, receiveConfig, handler);
207 for (final var h : result.first()) {
208 final var existingAction = queuedActions.get(h);
209 if (existingAction == null) {
210 queuedActions.put(h, h);
211 } else {
212 existingAction.mergeOther(h);
213 }
214 }
215 final var exception = result.second();
216
217 if (hasCaughtUpWithOldMessages) {
218 handleQueuedActions(queuedActions.keySet());
219 queuedActions.clear();
220 }
221 if (cachedMessage[0] != null) {
222 if (exception instanceof UntrustedIdentityException) {
223 logger.debug("Keeping message with untrusted identity in message cache");
224 final var address = ((UntrustedIdentityException) exception).getSender();
225 if (!envelope.hasSourceUuid() && address.uuid().isPresent()) {
226 final var recipientId = account.getRecipientResolver()
227 .resolveRecipient(ServiceId.from(address.uuid().get()));
228 try {
229 cachedMessage[0] = account.getMessageCache().replaceSender(cachedMessage[0], recipientId);
230 } catch (IOException ioException) {
231 logger.warn("Failed to move cached message to recipient folder: {}",
232 ioException.getMessage());
233 }
234 }
235 } else {
236 cachedMessage[0].delete();
237 }
238 }
239 }
240 }
241
242 private void retryFailedReceivedMessages(Manager.ReceiveMessageHandler handler) {
243 Set<HandleAction> queuedActions = new HashSet<>();
244 for (var cachedMessage : account.getMessageCache().getCachedMessages()) {
245 var actions = retryFailedReceivedMessage(handler, cachedMessage);
246 if (actions != null) {
247 queuedActions.addAll(actions);
248 }
249 }
250 handleQueuedActions(queuedActions);
251 }
252
253 private List<HandleAction> retryFailedReceivedMessage(
254 final Manager.ReceiveMessageHandler handler, final CachedMessage cachedMessage
255 ) {
256 var envelope = cachedMessage.loadEnvelope();
257 if (envelope == null) {
258 cachedMessage.delete();
259 return null;
260 }
261
262 final var result = context.getIncomingMessageHandler().handleRetryEnvelope(envelope, receiveConfig, handler);
263 final var actions = result.first();
264 final var exception = result.second();
265
266 if (exception instanceof UntrustedIdentityException) {
267 if (System.currentTimeMillis() - envelope.getServerDeliveredTimestamp() > 1000L * 60 * 60 * 24 * 30) {
268 // Envelope is more than a month old, cleaning up.
269 cachedMessage.delete();
270 return null;
271 }
272 if (!envelope.hasSourceUuid()) {
273 final var identifier = ((UntrustedIdentityException) exception).getSender();
274 final var recipientId = account.getRecipientResolver()
275 .resolveRecipient(new RecipientAddress(identifier));
276 try {
277 account.getMessageCache().replaceSender(cachedMessage, recipientId);
278 } catch (IOException ioException) {
279 logger.warn("Failed to move cached message to recipient folder: {}", ioException.getMessage());
280 }
281 }
282 return null;
283 }
284
285 // If successful and for all other errors that are not recoverable, delete the cached message
286 cachedMessage.delete();
287 return actions;
288 }
289
290 private void handleQueuedActions(final Collection<HandleAction> queuedActions) {
291 logger.debug("Handling message actions");
292 for (var action : queuedActions) {
293 logger.debug("Executing action {}", action.getClass().getSimpleName());
294 try {
295 action.execute(context);
296 } catch (Throwable e) {
297 logger.warn("Message action failed.", e);
298 }
299 }
300 }
301
302 private void onWebSocketStateChange(final WebSocketConnectionState s) {
303 if (s.equals(WebSocketConnectionState.AUTHENTICATION_FAILED)) {
304 account.setRegistered(false);
305 authenticationFailureListener.call();
306 }
307 }
308
309 public interface Callable {
310
311 void call();
312 }
313 }