From: AsamK Date: Sat, 12 Feb 2022 11:26:42 +0000 (+0100) Subject: Improve stop receive handling X-Git-Tag: v0.10.4~12 X-Git-Url: https://git.nmode.ca/signal-cli/commitdiff_plain/cf0cc50e328c0fc4fcbbecfa0ed21887613110a1 Improve stop receive handling Only interrupt the receive thread if it is currently waiting for new messages from the server, otherwise just set a stop flag. --- diff --git a/graalvm-config-dir/resource-config.json b/graalvm-config-dir/resource-config.json index c0866365..08a9dcc5 100644 --- a/graalvm-config-dir/resource-config.json +++ b/graalvm-config-dir/resource-config.json @@ -58,6 +58,12 @@ { "pattern":"\\Qcom/google/i18n/phonenumbers/data/PhoneNumberMetadataProto_CH\\E" }, + { + "pattern":"\\Qcom/google/i18n/phonenumbers/data/PhoneNumberMetadataProto_CI\\E" + }, + { + "pattern":"\\Qcom/google/i18n/phonenumbers/data/PhoneNumberMetadataProto_CL\\E" + }, { "pattern":"\\Qcom/google/i18n/phonenumbers/data/PhoneNumberMetadataProto_CN\\E" }, @@ -169,6 +175,9 @@ { "pattern":"\\Qcom/google/i18n/phonenumbers/data/PhoneNumberMetadataProto_TH\\E" }, + { + "pattern":"\\Qcom/google/i18n/phonenumbers/data/PhoneNumberMetadataProto_TR\\E" + }, { "pattern":"\\Qcom/google/i18n/phonenumbers/data/PhoneNumberMetadataProto_UA\\E" }, @@ -178,6 +187,9 @@ { "pattern":"\\Qcom/google/i18n/phonenumbers/data/PhoneNumberMetadataProto_US\\E" }, + { + "pattern":"\\Qcom/google/i18n/phonenumbers/data/PhoneNumberMetadataProto_VE\\E" + }, { "pattern":"\\Qcom/google/i18n/phonenumbers/data/PhoneNumberMetadataProto_XK\\E" }, @@ -210,6 +222,7 @@ "name":"net.sourceforge.argparse4j.internal.ArgumentParserImpl", "locales":[ "", + "en", "und" ] }] diff --git a/lib/src/main/java/org/asamk/signal/manager/ManagerImpl.java b/lib/src/main/java/org/asamk/signal/manager/ManagerImpl.java index c0643604..3f418de8 100644 --- a/lib/src/main/java/org/asamk/signal/manager/ManagerImpl.java +++ b/lib/src/main/java/org/asamk/signal/manager/ManagerImpl.java @@ -765,24 +765,17 @@ class ManagerImpl implements Manager { } receiveThread = new Thread(() -> { logger.debug("Starting receiving messages"); - while (!Thread.interrupted()) { - try { - context.getReceiveHelper().receiveMessages(Duration.ofMinutes(1), false, (envelope, e) -> { - synchronized (messageHandlers) { - Stream.concat(messageHandlers.stream(), weakHandlers.stream()).forEach(h -> { - try { - h.handleMessage(envelope, e); - } catch (Exception ex) { - logger.warn("Message handler failed, ignoring", ex); - } - }); + context.getReceiveHelper().receiveMessagesContinuously((envelope, e) -> { + synchronized (messageHandlers) { + Stream.concat(messageHandlers.stream(), weakHandlers.stream()).forEach(h -> { + try { + h.handleMessage(envelope, e); + } catch (Exception ex) { + logger.warn("Message handler failed, ignoring", ex); } }); - break; - } catch (IOException e) { - logger.warn("Receiving messages failed, retrying", e); } - } + }); logger.debug("Finished receiving messages"); synchronized (messageHandlers) { receiveThread = null; @@ -816,7 +809,10 @@ class ManagerImpl implements Manager { } private void stopReceiveThread(final Thread thread) { - thread.interrupt(); + if (context.getReceiveHelper().requestStopReceiveMessages()) { + logger.debug("Receive stop requested, interrupting read from server."); + thread.interrupt(); + } try { thread.join(); } catch (InterruptedException ignored) { @@ -1030,14 +1026,15 @@ class ManagerImpl implements Manager { dependencies.getSignalWebSocket().disconnect(); disposable.dispose(); + if (account != null) { + account.close(); + } + synchronized (closedListeners) { closedListeners.forEach(Runnable::run); closedListeners.clear(); } - if (account != null) { - account.close(); - } account = null; } } diff --git a/lib/src/main/java/org/asamk/signal/manager/helper/ProfileHelper.java b/lib/src/main/java/org/asamk/signal/manager/helper/ProfileHelper.java index ec4d385e..8e58e976 100644 --- a/lib/src/main/java/org/asamk/signal/manager/helper/ProfileHelper.java +++ b/lib/src/main/java/org/asamk/signal/manager/helper/ProfileHelper.java @@ -347,9 +347,6 @@ public final class ProfileHelper { .storeProfileAvatar(address, outputStream -> retrieveProfileAvatar(avatarPath, profileKey, outputStream)); } catch (Throwable e) { - if (e instanceof AssertionError && e.getCause() instanceof InterruptedException) { - Thread.currentThread().interrupt(); - } logger.warn("Failed to download profile avatar, ignoring: {}", e.getMessage()); } } diff --git a/lib/src/main/java/org/asamk/signal/manager/helper/ReceiveHelper.java b/lib/src/main/java/org/asamk/signal/manager/helper/ReceiveHelper.java index 8dfc7deb..7bc2c224 100644 --- a/lib/src/main/java/org/asamk/signal/manager/helper/ReceiveHelper.java +++ b/lib/src/main/java/org/asamk/signal/manager/helper/ReceiveHelper.java @@ -2,8 +2,8 @@ package org.asamk.signal.manager.helper; import org.asamk.signal.manager.Manager; import org.asamk.signal.manager.SignalDependencies; -import org.asamk.signal.manager.api.UntrustedIdentityException; import org.asamk.signal.manager.actions.HandleAction; +import org.asamk.signal.manager.api.UntrustedIdentityException; import org.asamk.signal.manager.storage.SignalAccount; import org.asamk.signal.manager.storage.messageCache.CachedMessage; import org.slf4j.Logger; @@ -37,6 +37,8 @@ public class ReceiveHelper { private boolean ignoreAttachments = false; private boolean needsToRetryFailedMessages = false; private boolean hasCaughtUpWithOldMessages = false; + private boolean isWaitingForMessage = false; + private boolean shouldStop = false; private Callable authenticationFailureListener; private Callable caughtUpWithOldMessagesListener; @@ -66,6 +68,22 @@ public class ReceiveHelper { this.caughtUpWithOldMessagesListener = caughtUpWithOldMessagesListener; } + public boolean requestStopReceiveMessages() { + this.shouldStop = true; + return isWaitingForMessage; + } + + public void receiveMessagesContinuously(Manager.ReceiveMessageHandler handler) { + while (!shouldStop) { + try { + receiveMessages(Duration.ofMinutes(1), false, handler); + break; + } catch (IOException e) { + logger.warn("Receiving messages failed, retrying", e); + } + } + } + public void receiveMessages( Duration timeout, boolean returnOnTimeout, Manager.ReceiveMessageHandler handler ) throws IOException { @@ -92,6 +110,7 @@ public class ReceiveHelper { queuedActions.clear(); dependencies.getSignalWebSocket().disconnect(); webSocketStateDisposable.dispose(); + shouldStop = false; } } @@ -104,8 +123,9 @@ public class ReceiveHelper { final var signalWebSocket = dependencies.getSignalWebSocket(); var backOffCounter = 0; + isWaitingForMessage = false; - while (!Thread.interrupted()) { + while (!shouldStop) { if (needsToRetryFailedMessages) { retryFailedReceivedMessages(handler); needsToRetryFailedMessages = false; @@ -118,13 +138,16 @@ public class ReceiveHelper { } logger.debug("Checking for new message from server"); try { + isWaitingForMessage = true; var result = signalWebSocket.readOrEmpty(timeout.toMillis(), envelope1 -> { + isWaitingForMessage = false; final var recipientId = envelope1.hasSourceUuid() ? account.getRecipientStore() .resolveRecipient(envelope1.getSourceAddress()) : null; logger.trace("Storing new message from {}", recipientId); // store message on disk, before acknowledging receipt to the server cachedMessage[0] = account.getMessageCache().cacheMessage(envelope1, recipientId); }); + isWaitingForMessage = false; backOffCounter = 0; if (result.isPresent()) { @@ -143,7 +166,6 @@ public class ReceiveHelper { } } catch (AssertionError e) { if (e.getCause() instanceof InterruptedException) { - Thread.currentThread().interrupt(); break; } else { throw e; @@ -255,23 +277,14 @@ public class ReceiveHelper { private void handleQueuedActions(final Collection queuedActions) { logger.debug("Handling message actions"); - var interrupted = false; for (var action : queuedActions) { logger.debug("Executing action {}", action.getClass().getSimpleName()); try { action.execute(context); } catch (Throwable e) { - if ((e instanceof AssertionError || e instanceof RuntimeException) - && e.getCause() instanceof InterruptedException) { - interrupted = true; - continue; - } logger.warn("Message action failed.", e); } } - if (interrupted) { - Thread.currentThread().interrupt(); - } } private void onWebSocketStateChange(final WebSocketConnectionState s) { diff --git a/lib/src/main/java/org/asamk/signal/manager/storage/sendLog/MessageSendLogStore.java b/lib/src/main/java/org/asamk/signal/manager/storage/sendLog/MessageSendLogStore.java index c0e7d429..3b75de65 100644 --- a/lib/src/main/java/org/asamk/signal/manager/storage/sendLog/MessageSendLogStore.java +++ b/lib/src/main/java/org/asamk/signal/manager/storage/sendLog/MessageSendLogStore.java @@ -49,7 +49,7 @@ public class MessageSendLogStore implements AutoCloseable { this.cleanupThread = new Thread(() -> { try { final var interval = Duration.ofHours(1).toMillis(); - while (true) { + while (!Thread.interrupted()) { try (final var connection = database.getConnection()) { deleteOutdatedEntries(connection); } catch (SQLException e) {