import org.asamk.signal.manager.Manager;
import org.asamk.signal.manager.SignalDependencies;
-import org.asamk.signal.manager.api.UntrustedIdentityException;
import org.asamk.signal.manager.actions.HandleAction;
+import org.asamk.signal.manager.api.UntrustedIdentityException;
import org.asamk.signal.manager.storage.SignalAccount;
import org.asamk.signal.manager.storage.messageCache.CachedMessage;
import org.slf4j.Logger;
private boolean ignoreAttachments = false;
private boolean needsToRetryFailedMessages = false;
private boolean hasCaughtUpWithOldMessages = false;
+ private boolean isWaitingForMessage = false;
+ private boolean shouldStop = false;
private Callable authenticationFailureListener;
private Callable caughtUpWithOldMessagesListener;
this.caughtUpWithOldMessagesListener = caughtUpWithOldMessagesListener;
}
+ public boolean requestStopReceiveMessages() {
+ this.shouldStop = true;
+ return isWaitingForMessage;
+ }
+
+ public void receiveMessagesContinuously(Manager.ReceiveMessageHandler handler) {
+ while (!shouldStop) {
+ try {
+ receiveMessages(Duration.ofMinutes(1), false, handler);
+ break;
+ } catch (IOException e) {
+ logger.warn("Receiving messages failed, retrying", e);
+ }
+ }
+ }
+
public void receiveMessages(
Duration timeout, boolean returnOnTimeout, Manager.ReceiveMessageHandler handler
) throws IOException {
queuedActions.clear();
dependencies.getSignalWebSocket().disconnect();
webSocketStateDisposable.dispose();
+ shouldStop = false;
}
}
final var signalWebSocket = dependencies.getSignalWebSocket();
var backOffCounter = 0;
+ isWaitingForMessage = false;
- while (!Thread.interrupted()) {
+ while (!shouldStop) {
if (needsToRetryFailedMessages) {
retryFailedReceivedMessages(handler);
needsToRetryFailedMessages = false;
}
logger.debug("Checking for new message from server");
try {
+ isWaitingForMessage = true;
var result = signalWebSocket.readOrEmpty(timeout.toMillis(), envelope1 -> {
+ isWaitingForMessage = false;
final var recipientId = envelope1.hasSourceUuid() ? account.getRecipientStore()
.resolveRecipient(envelope1.getSourceAddress()) : null;
logger.trace("Storing new message from {}", recipientId);
// store message on disk, before acknowledging receipt to the server
cachedMessage[0] = account.getMessageCache().cacheMessage(envelope1, recipientId);
});
+ isWaitingForMessage = false;
backOffCounter = 0;
if (result.isPresent()) {
}
} catch (AssertionError e) {
if (e.getCause() instanceof InterruptedException) {
- Thread.currentThread().interrupt();
break;
} else {
throw e;
private void handleQueuedActions(final Collection<HandleAction> queuedActions) {
logger.debug("Handling message actions");
- var interrupted = false;
for (var action : queuedActions) {
logger.debug("Executing action {}", action.getClass().getSimpleName());
try {
action.execute(context);
} catch (Throwable e) {
- if ((e instanceof AssertionError || e instanceof RuntimeException)
- && e.getCause() instanceof InterruptedException) {
- interrupted = true;
- continue;
- }
logger.warn("Message action failed.", e);
}
}
- if (interrupted) {
- Thread.currentThread().interrupt();
- }
}
private void onWebSocketStateChange(final WebSocketConnectionState s) {