From 5a2e37a6e242b920e5647e3d98c2aecb1932f763 Mon Sep 17 00:00:00 2001 From: AsamK Date: Sat, 4 Sep 2021 15:06:25 +0200 Subject: [PATCH] Only handle jsonRpc requests, after receive thread has caught up with old messages --- .../org/asamk/signal/manager/Manager.java | 24 +++++++++++++++---- .../commands/JsonRpcDispatcherCommand.java | 10 ++++++++ 2 files changed, 29 insertions(+), 5 deletions(-) diff --git a/lib/src/main/java/org/asamk/signal/manager/Manager.java b/lib/src/main/java/org/asamk/signal/manager/Manager.java index c40fa7cd..c4c77b34 100644 --- a/lib/src/main/java/org/asamk/signal/manager/Manager.java +++ b/lib/src/main/java/org/asamk/signal/manager/Manager.java @@ -141,6 +141,7 @@ public class Manager implements Closeable { private final IncomingMessageHandler incomingMessageHandler; private final Context context; + private boolean hasCaughtUpWithOldMessages = false; Manager( SignalAccount account, @@ -865,7 +866,7 @@ public class Manager implements Closeable { final var signalWebSocket = dependencies.getSignalWebSocket(); signalWebSocket.connect(); - var hasCaughtUpWithOldMessages = false; + hasCaughtUpWithOldMessages = false; while (!Thread.interrupted()) { SignalServiceEnvelope envelope; @@ -885,11 +886,14 @@ public class Manager implements Closeable { envelope = result.get(); } else { // Received indicator that server queue is empty - hasCaughtUpWithOldMessages = true; - handleQueuedActions(queuedActions); queuedActions.clear(); + hasCaughtUpWithOldMessages = true; + synchronized (this) { + this.notifyAll(); + } + // Continue to wait another timeout for new messages continue; } @@ -936,17 +940,27 @@ public class Manager implements Closeable { handleQueuedActions(queuedActions); } + public boolean hasCaughtUpWithOldMessages() { + return hasCaughtUpWithOldMessages; + } + private void handleQueuedActions(final Collection queuedActions) { + var interrupted = false; for (var action : queuedActions) { try { action.execute(context); } catch (Throwable e) { - if (e instanceof AssertionError && e.getCause() instanceof InterruptedException) { - Thread.currentThread().interrupt(); + if ((e instanceof AssertionError || e instanceof RuntimeException) + && e.getCause() instanceof InterruptedException) { + interrupted = true; + continue; } logger.warn("Message action failed.", e); } } + if (interrupted) { + Thread.currentThread().interrupt(); + } } public boolean isContactBlocked(final RecipientIdentifier.Single recipient) { diff --git a/src/main/java/org/asamk/signal/commands/JsonRpcDispatcherCommand.java b/src/main/java/org/asamk/signal/commands/JsonRpcDispatcherCommand.java index 16d0cf71..d0e4dfec 100644 --- a/src/main/java/org/asamk/signal/commands/JsonRpcDispatcherCommand.java +++ b/src/main/java/org/asamk/signal/commands/JsonRpcDispatcherCommand.java @@ -75,6 +75,16 @@ public class JsonRpcDispatcherCommand implements LocalCommand { objectMapper.valueToTree(s), null)), m, ignoreAttachments); + // Maybe this should be handled inside the Manager + while (!m.hasCaughtUpWithOldMessages()) { + try { + synchronized (m) { + m.wait(); + } + } catch (InterruptedException ignored) { + } + } + final BufferedReader reader = new BufferedReader(new InputStreamReader(System.in)); final var jsonRpcReader = new JsonRpcReader(jsonRpcSender, () -> { -- 2.50.1