package org.asamk.signal.manager.helper;
import org.asamk.signal.manager.Manager;
-import org.asamk.signal.manager.SignalDependencies;
-import org.asamk.signal.manager.api.UntrustedIdentityException;
import org.asamk.signal.manager.actions.HandleAction;
+import org.asamk.signal.manager.api.ReceiveConfig;
+import org.asamk.signal.manager.api.UntrustedIdentityException;
+import org.asamk.signal.manager.internal.SignalDependencies;
+import org.asamk.signal.manager.jobs.CleanOldPreKeysJob;
import org.asamk.signal.manager.storage.SignalAccount;
import org.asamk.signal.manager.storage.messageCache.CachedMessage;
+import org.asamk.signal.manager.storage.recipients.RecipientAddress;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.signalservice.api.messages.SignalServiceEnvelope;
+import org.whispersystems.signalservice.api.push.ServiceId;
+import org.whispersystems.signalservice.api.push.ServiceId.ACI;
+import org.whispersystems.signalservice.api.websocket.SignalWebSocket;
import org.whispersystems.signalservice.api.websocket.WebSocketConnectionState;
import org.whispersystems.signalservice.api.websocket.WebSocketUnavailableException;
import java.util.Set;
import java.util.concurrent.TimeoutException;
-import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.schedulers.Schedulers;
public class ReceiveHelper {
- private final static Logger logger = LoggerFactory.getLogger(ReceiveHelper.class);
- private final static int MAX_BACKOFF_COUNTER = 9;
+ private static final Logger logger = LoggerFactory.getLogger(ReceiveHelper.class);
+ private static final int MAX_BACKOFF_COUNTER = 9;
private final SignalAccount account;
private final SignalDependencies dependencies;
private final Context context;
- private boolean ignoreAttachments = false;
- private boolean needsToRetryFailedMessages = false;
+ private ReceiveConfig receiveConfig = new ReceiveConfig(false, false, false);
private boolean hasCaughtUpWithOldMessages = false;
+ private boolean isWaitingForMessage = false;
+ private boolean shouldStop = false;
private Callable authenticationFailureListener;
private Callable caughtUpWithOldMessagesListener;
this.context = context;
}
- public void setIgnoreAttachments(final boolean ignoreAttachments) {
- this.ignoreAttachments = ignoreAttachments;
- }
-
- public void setNeedsToRetryFailedMessages(final boolean needsToRetryFailedMessages) {
- this.needsToRetryFailedMessages = needsToRetryFailedMessages;
- }
-
- public boolean hasCaughtUpWithOldMessages() {
- return hasCaughtUpWithOldMessages;
+ public void setReceiveConfig(final ReceiveConfig receiveConfig) {
+ this.receiveConfig = receiveConfig;
+ dependencies.setAllowStories(!receiveConfig.ignoreStories());
}
public void setAuthenticationFailureListener(final Callable authenticationFailureListener) {
this.caughtUpWithOldMessagesListener = caughtUpWithOldMessagesListener;
}
+ public boolean requestStopReceiveMessages() {
+ this.shouldStop = true;
+ return isWaitingForMessage;
+ }
+
+ public void receiveMessagesContinuously(Manager.ReceiveMessageHandler handler) {
+ while (!shouldStop) {
+ try {
+ receiveMessages(Duration.ofMinutes(1), false, null, handler);
+ break;
+ } catch (IOException e) {
+ logger.warn("Receiving messages failed, retrying", e);
+ }
+ }
+ }
+
public void receiveMessages(
- Duration timeout, boolean returnOnTimeout, Manager.ReceiveMessageHandler handler
+ Duration timeout,
+ boolean returnOnTimeout,
+ Integer maxMessages,
+ Manager.ReceiveMessageHandler handler
) throws IOException {
- needsToRetryFailedMessages = true;
+ account.setNeedsToRetryFailedMessages(true);
hasCaughtUpWithOldMessages = false;
// Use a Map here because java Set doesn't have a get method ...
Map<HandleAction, HandleAction> queuedActions = new HashMap<>();
- final var signalWebSocket = dependencies.getSignalWebSocket();
- final var webSocketStateDisposable = Observable.merge(signalWebSocket.getUnidentifiedWebSocketState(),
- signalWebSocket.getWebSocketState())
+ final var signalWebSocket = dependencies.getAuthenticatedSignalWebSocket();
+ final var webSocketStateDisposable = signalWebSocket.getState()
.subscribeOn(Schedulers.computation())
.observeOn(Schedulers.computation())
.distinctUntilChanged()
.subscribe(this::onWebSocketStateChange);
signalWebSocket.connect();
+ signalWebSocket.registerKeepAliveToken("receive");
try {
- receiveMessagesInternal(timeout, returnOnTimeout, handler, queuedActions);
+ receiveMessagesInternal(signalWebSocket, timeout, returnOnTimeout, maxMessages, handler, queuedActions);
} finally {
hasCaughtUpWithOldMessages = false;
handleQueuedActions(queuedActions.keySet());
queuedActions.clear();
- dependencies.getSignalWebSocket().disconnect();
+ signalWebSocket.removeKeepAliveToken("receive");
+ signalWebSocket.disconnect();
webSocketStateDisposable.dispose();
+ shouldStop = false;
}
}
private void receiveMessagesInternal(
+ final SignalWebSocket.AuthenticatedWebSocket signalWebSocket,
Duration timeout,
boolean returnOnTimeout,
+ Integer maxMessages,
Manager.ReceiveMessageHandler handler,
final Map<HandleAction, HandleAction> queuedActions
) throws IOException {
- final var signalWebSocket = dependencies.getSignalWebSocket();
-
+ int remainingMessages = maxMessages == null ? -1 : maxMessages;
var backOffCounter = 0;
+ isWaitingForMessage = false;
- while (!Thread.interrupted()) {
- if (needsToRetryFailedMessages) {
+ while (!shouldStop && remainingMessages != 0) {
+ if (account.getNeedsToRetryFailedMessages()) {
retryFailedReceivedMessages(handler);
- needsToRetryFailedMessages = false;
}
SignalServiceEnvelope envelope;
final CachedMessage[] cachedMessage = {null};
}
logger.debug("Checking for new message from server");
try {
- var result = signalWebSocket.readOrEmpty(timeout.toMillis(), envelope1 -> {
- final var recipientId = envelope1.hasSourceUuid() ? account.getRecipientStore()
- .resolveRecipient(envelope1.getSourceAddress()) : null;
- logger.trace("Storing new message from {}", recipientId);
- // store message on disk, before acknowledging receipt to the server
- cachedMessage[0] = account.getMessageCache().cacheMessage(envelope1, recipientId);
+ isWaitingForMessage = true;
+ var queueNotEmpty = signalWebSocket.readMessageBatch(timeout.toMillis(), 1, batch -> {
+ logger.debug("Retrieved {} envelopes!", batch.size());
+ isWaitingForMessage = false;
+ for (final var it : batch) {
+ SignalServiceEnvelope envelope1 = new SignalServiceEnvelope(it.getEnvelope(),
+ it.getServerDeliveredTimestamp());
+ final var recipientId = envelope1.getSourceServiceId()
+ .map(ServiceId::parseOrNull)
+ .map(s -> account.getRecipientResolver().resolveRecipient(s))
+ .orElse(null);
+ logger.trace("Storing new message from {}", recipientId);
+ // store message on disk, before acknowledging receipt to the server
+ cachedMessage[0] = account.getMessageCache().cacheMessage(envelope1, recipientId);
+ try {
+ signalWebSocket.sendAck(it);
+ } catch (IOException e) {
+ logger.warn("Failed to ack envelope to server after storing it: {}", e.getMessage());
+ }
+ }
});
+ isWaitingForMessage = false;
backOffCounter = 0;
- if (result.isPresent()) {
- envelope = result.get();
+ if (queueNotEmpty) {
+ if (remainingMessages > 0) {
+ remainingMessages -= 1;
+ }
+ envelope = cachedMessage[0].loadEnvelope();
logger.debug("New message received from server");
} else {
logger.debug("Received indicator that server queue is empty");
handleQueuedActions(queuedActions.keySet());
queuedActions.clear();
+ context.getJobExecutor().enqueueJob(new CleanOldPreKeysJob());
hasCaughtUpWithOldMessages = true;
caughtUpWithOldMessagesListener.call();
}
} catch (AssertionError e) {
if (e.getCause() instanceof InterruptedException) {
- Thread.currentThread().interrupt();
break;
} else {
throw e;
backOffCounter = 0;
if (returnOnTimeout) return;
continue;
+ } catch (Exception e) {
+ logger.error("Unknown error when receiving messages", e);
+ continue;
}
- final var result = context.getIncomingMessageHandler().handleEnvelope(envelope, ignoreAttachments, handler);
- for (final var h : result.first()) {
- final var existingAction = queuedActions.get(h);
- if (existingAction == null) {
- queuedActions.put(h, h);
- } else {
- existingAction.mergeOther(h);
+ try {
+ final var result = context.getIncomingMessageHandler().handleEnvelope(envelope, receiveConfig, handler);
+ for (final var h : result.first()) {
+ final var existingAction = queuedActions.get(h);
+ if (existingAction == null) {
+ queuedActions.put(h, h);
+ } else {
+ existingAction.mergeOther(h);
+ }
}
- }
- final var exception = result.second();
+ final var exception = result.second();
- if (hasCaughtUpWithOldMessages) {
- handleQueuedActions(queuedActions.keySet());
- queuedActions.clear();
- }
- if (cachedMessage[0] != null) {
- if (exception instanceof UntrustedIdentityException) {
- logger.debug("Keeping message with untrusted identity in message cache");
- final var address = ((UntrustedIdentityException) exception).getSender();
- final var recipientId = account.getRecipientStore().resolveRecipient(address);
- if (!envelope.hasSourceUuid()) {
- try {
- cachedMessage[0] = account.getMessageCache().replaceSender(cachedMessage[0], recipientId);
- } catch (IOException ioException) {
- logger.warn("Failed to move cached message to recipient folder: {}",
- ioException.getMessage());
+ if (hasCaughtUpWithOldMessages) {
+ handleQueuedActions(queuedActions.keySet());
+ queuedActions.clear();
+ }
+ if (cachedMessage[0] != null) {
+ if (exception instanceof UntrustedIdentityException) {
+ logger.debug("Keeping message with untrusted identity in message cache");
+ final var address = ((UntrustedIdentityException) exception).getSender();
+ if (envelope.getSourceServiceId().isEmpty() && address.aci().isPresent()) {
+ final var recipientId = account.getRecipientResolver()
+ .resolveRecipient(ACI.parseOrThrow(address.aci().get()));
+ try {
+ cachedMessage[0] = account.getMessageCache()
+ .replaceSender(cachedMessage[0], recipientId);
+ } catch (IOException ioException) {
+ logger.warn("Failed to move cached message to recipient folder: {}",
+ ioException.getMessage(),
+ ioException);
+ }
}
+ } else {
+ cachedMessage[0].delete();
}
- } else {
- cachedMessage[0].delete();
}
+ } catch (Exception e) {
+ logger.error("Unknown error when handling messages", e);
}
}
}
}
}
handleQueuedActions(queuedActions);
+ account.setNeedsToRetryFailedMessages(false);
}
private List<HandleAction> retryFailedReceivedMessage(
- final Manager.ReceiveMessageHandler handler, final CachedMessage cachedMessage
+ final Manager.ReceiveMessageHandler handler,
+ final CachedMessage cachedMessage
) {
var envelope = cachedMessage.loadEnvelope();
if (envelope == null) {
return null;
}
- final var result = context.getIncomingMessageHandler()
- .handleRetryEnvelope(envelope, ignoreAttachments, handler);
+ final var result = context.getIncomingMessageHandler().handleRetryEnvelope(envelope, receiveConfig, handler);
final var actions = result.first();
final var exception = result.second();
if (exception instanceof UntrustedIdentityException) {
- if (System.currentTimeMillis() - envelope.getServerDeliveredTimestamp() > 1000L * 60 * 60 * 24 * 30) {
- // Envelope is more than a month old, cleaning up.
+ if (System.currentTimeMillis() - envelope.getServerDeliveredTimestamp() > 1000L * 60 * 60 * 24 * 14) {
+ // Envelope is more than two weeks old, cleaning up.
cachedMessage.delete();
return null;
}
- if (!envelope.hasSourceUuid()) {
+ if (envelope.getSourceServiceId().isEmpty()) {
final var identifier = ((UntrustedIdentityException) exception).getSender();
- final var recipientId = account.getRecipientStore().resolveRecipient(identifier);
+ final var recipientId = account.getRecipientResolver()
+ .resolveRecipient(new RecipientAddress(identifier));
try {
account.getMessageCache().replaceSender(cachedMessage, recipientId);
} catch (IOException ioException) {
- logger.warn("Failed to move cached message to recipient folder: {}", ioException.getMessage());
+ logger.warn("Failed to move cached message to recipient folder: {}",
+ ioException.getMessage(),
+ ioException);
}
}
return null;
private void handleQueuedActions(final Collection<HandleAction> queuedActions) {
logger.debug("Handling message actions");
- var interrupted = false;
for (var action : queuedActions) {
logger.debug("Executing action {}", action.getClass().getSimpleName());
try {
action.execute(context);
} catch (Throwable e) {
- if ((e instanceof AssertionError || e instanceof RuntimeException)
- && e.getCause() instanceof InterruptedException) {
- interrupted = true;
- continue;
- }
logger.warn("Message action failed.", e);
}
}
- if (interrupted) {
- Thread.currentThread().interrupt();
- }
}
private void onWebSocketStateChange(final WebSocketConnectionState s) {