From: AsamK Date: Thu, 21 Oct 2021 20:59:52 +0000 (+0200) Subject: Move receive thread handling to manager X-Git-Tag: v0.9.2~5 X-Git-Url: https://git.nmode.ca/signal-cli/commitdiff_plain/fc0a9b4102feef185e4a09881e3b079b82df3da7 Move receive thread handling to manager --- diff --git a/lib/src/main/java/org/asamk/signal/manager/Manager.java b/lib/src/main/java/org/asamk/signal/manager/Manager.java index ac0cc02f..0a8762d9 100644 --- a/lib/src/main/java/org/asamk/signal/manager/Manager.java +++ b/lib/src/main/java/org/asamk/signal/manager/Manager.java @@ -193,6 +193,20 @@ public interface Manager extends Closeable { void requestAllSyncData() throws IOException; + /** + * Add a handler to receive new messages. + * Will start receiving messages from server, if not already started. + */ + void addReceiveHandler(ReceiveMessageHandler handler); + + /** + * Remove a handler to receive new messages. + * Will stop receiving messages from server, if this was the last registered receiver. + */ + void removeReceiveHandler(ReceiveMessageHandler handler); + + boolean isReceiving(); + /** * Receive new messages from server, returns if no new message arrive in a timespan of timeout. */ diff --git a/lib/src/main/java/org/asamk/signal/manager/ManagerImpl.java b/lib/src/main/java/org/asamk/signal/manager/ManagerImpl.java index 0421a401..2ea96591 100644 --- a/lib/src/main/java/org/asamk/signal/manager/ManagerImpl.java +++ b/lib/src/main/java/org/asamk/signal/manager/ManagerImpl.java @@ -137,6 +137,10 @@ public class ManagerImpl implements Manager { private boolean hasCaughtUpWithOldMessages = false; private boolean ignoreAttachments = false; + private Thread receiveThread; + private final Set messageHandlers = new HashSet<>(); + private boolean isReceivingSynchronous; + ManagerImpl( SignalAccount account, PathConfig pathConfig, @@ -872,6 +876,88 @@ public class ManagerImpl implements Manager { return actions; } + @Override + public void addReceiveHandler(final ReceiveMessageHandler handler) { + if (isReceivingSynchronous) { + throw new IllegalStateException("Already receiving message synchronously."); + } + synchronized (messageHandlers) { + messageHandlers.add(handler); + + startReceiveThreadIfRequired(); + } + } + + private void startReceiveThreadIfRequired() { + if (receiveThread != null) { + return; + } + receiveThread = new Thread(() -> { + while (!Thread.interrupted()) { + try { + receiveMessagesInternal(1L, TimeUnit.HOURS, false, (envelope, decryptedContent, e) -> { + synchronized (messageHandlers) { + for (ReceiveMessageHandler h : messageHandlers) { + try { + h.handleMessage(envelope, decryptedContent, e); + } catch (Exception ex) { + logger.warn("Message handler failed, ignoring", ex); + } + } + } + }); + break; + } catch (IOException e) { + logger.warn("Receiving messages failed, retrying", e); + } + } + hasCaughtUpWithOldMessages = false; + synchronized (messageHandlers) { + receiveThread = null; + + // Check if in the meantime another handler has been registered + if (!messageHandlers.isEmpty()) { + startReceiveThreadIfRequired(); + } + } + }); + + receiveThread.start(); + } + + @Override + public void removeReceiveHandler(final ReceiveMessageHandler handler) { + final Thread thread; + synchronized (messageHandlers) { + thread = receiveThread; + receiveThread = null; + messageHandlers.remove(handler); + if (!messageHandlers.isEmpty() || isReceivingSynchronous) { + return; + } + } + + stopReceiveThread(thread); + } + + private void stopReceiveThread(final Thread thread) { + thread.interrupt(); + try { + thread.join(); + } catch (InterruptedException ignored) { + } + } + + @Override + public boolean isReceiving() { + if (isReceivingSynchronous) { + return true; + } + synchronized (messageHandlers) { + return messageHandlers.size() > 0; + } + } + @Override public void receiveMessages(long timeout, TimeUnit unit, ReceiveMessageHandler handler) throws IOException { receiveMessages(timeout, unit, true, handler); @@ -884,6 +970,23 @@ public class ManagerImpl implements Manager { private void receiveMessages( long timeout, TimeUnit unit, boolean returnOnTimeout, ReceiveMessageHandler handler + ) throws IOException { + if (isReceiving()) { + throw new IllegalStateException("Already receiving message."); + } + isReceivingSynchronous = true; + receiveThread = Thread.currentThread(); + try { + receiveMessagesInternal(timeout, unit, returnOnTimeout, handler); + } finally { + receiveThread = null; + hasCaughtUpWithOldMessages = false; + isReceivingSynchronous = false; + } + } + + private void receiveMessagesInternal( + long timeout, TimeUnit unit, boolean returnOnTimeout, ReceiveMessageHandler handler ) throws IOException { retryFailedReceivedMessages(handler); @@ -1249,6 +1352,15 @@ public class ManagerImpl implements Manager { } private void close(boolean closeAccount) throws IOException { + Thread thread; + synchronized (messageHandlers) { + messageHandlers.clear(); + thread = receiveThread; + receiveThread = null; + } + if (thread != null) { + stopReceiveThread(thread); + } executor.shutdown(); dependencies.getSignalWebSocket().disconnect(); diff --git a/src/main/java/org/asamk/signal/commands/DaemonCommand.java b/src/main/java/org/asamk/signal/commands/DaemonCommand.java index 9627d9fb..a121c7e9 100644 --- a/src/main/java/org/asamk/signal/commands/DaemonCommand.java +++ b/src/main/java/org/asamk/signal/commands/DaemonCommand.java @@ -71,6 +71,9 @@ public class DaemonCommand implements MultiLocalCommand { try { t.join(); + synchronized (this) { + wait(); + } } catch (InterruptedException ignored) { } } catch (DBusException | IOException e) { @@ -128,27 +131,11 @@ public class DaemonCommand implements MultiLocalCommand { logger.info("Exported dbus object: " + objectPath); - final var thread = new Thread(() -> { - while (!Thread.interrupted()) { - try { - final var receiveMessageHandler = outputWriter instanceof JsonWriter - ? new JsonDbusReceiveMessageHandler(m, (JsonWriter) outputWriter, conn, objectPath) - : new DbusReceiveMessageHandler(m, (PlainTextWriter) outputWriter, conn, objectPath); - m.receiveMessages(receiveMessageHandler); - break; - } catch (IOException e) { - logger.warn("Receiving messages failed, retrying", e); - } - } - try { - initThread.join(); - } catch (InterruptedException ignored) { - } - signal.close(); - }); - - thread.start(); - - return thread; + final var receiveMessageHandler = outputWriter instanceof JsonWriter ? new JsonDbusReceiveMessageHandler(m, + (JsonWriter) outputWriter, + conn, + objectPath) : new DbusReceiveMessageHandler(m, (PlainTextWriter) outputWriter, conn, objectPath); + m.addReceiveHandler(receiveMessageHandler); + return initThread; } } diff --git a/src/main/java/org/asamk/signal/commands/JsonRpcDispatcherCommand.java b/src/main/java/org/asamk/signal/commands/JsonRpcDispatcherCommand.java index 2a95a880..6e0c3173 100644 --- a/src/main/java/org/asamk/signal/commands/JsonRpcDispatcherCommand.java +++ b/src/main/java/org/asamk/signal/commands/JsonRpcDispatcherCommand.java @@ -70,10 +70,11 @@ public class JsonRpcDispatcherCommand implements LocalCommand { final var objectMapper = Util.createJsonObjectMapper(); final var jsonRpcSender = new JsonRpcSender((JsonWriter) outputWriter); - final var receiveThread = receiveMessages(s -> jsonRpcSender.sendRequest(JsonRpcRequest.forNotification( - "receive", - objectMapper.valueToTree(s), - null)), m); + final var receiveMessageHandler = new JsonReceiveMessageHandler(m, + s -> jsonRpcSender.sendRequest(JsonRpcRequest.forNotification("receive", + objectMapper.valueToTree(s), + null))); + m.addReceiveHandler(receiveMessageHandler); // Maybe this should be handled inside the Manager while (!m.hasCaughtUpWithOldMessages()) { @@ -97,11 +98,7 @@ public class JsonRpcDispatcherCommand implements LocalCommand { jsonRpcReader.readRequests((method, params) -> handleRequest(m, objectMapper, method, params), response -> logger.debug("Received unexpected response for id {}", response.getId())); - receiveThread.interrupt(); - try { - receiveThread.join(); - } catch (InterruptedException ignored) { - } + m.removeReceiveHandler(receiveMessageHandler); } private JsonNode handleRequest( @@ -166,22 +163,4 @@ public class JsonRpcDispatcherCommand implements LocalCommand { } command.handleCommand(requestParams, m, outputWriter); } - - private Thread receiveMessages(JsonWriter jsonWriter, Manager m) { - final var thread = new Thread(() -> { - while (!Thread.interrupted()) { - try { - final var receiveMessageHandler = new JsonReceiveMessageHandler(m, jsonWriter); - m.receiveMessages(receiveMessageHandler); - break; - } catch (IOException e) { - logger.warn("Receiving messages failed, retrying", e); - } - } - }); - - thread.start(); - - return thread; - } } diff --git a/src/main/java/org/asamk/signal/dbus/DbusManagerImpl.java b/src/main/java/org/asamk/signal/dbus/DbusManagerImpl.java index 31e29ac9..fcbadd38 100644 --- a/src/main/java/org/asamk/signal/dbus/DbusManagerImpl.java +++ b/src/main/java/org/asamk/signal/dbus/DbusManagerImpl.java @@ -423,6 +423,21 @@ public class DbusManagerImpl implements Manager { signal.sendSyncRequest(); } + @Override + public void addReceiveHandler(final ReceiveMessageHandler handler) { + throw new UnsupportedOperationException(); + } + + @Override + public void removeReceiveHandler(final ReceiveMessageHandler handler) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isReceiving() { + throw new UnsupportedOperationException(); + } + @Override public void receiveMessages(final ReceiveMessageHandler handler) throws IOException { throw new UnsupportedOperationException();