]> nmode's Git Repositories - signal-cli/commitdiff
Move receive thread handling to manager
authorAsamK <asamk@gmx.de>
Thu, 21 Oct 2021 20:59:52 +0000 (22:59 +0200)
committerAsamK <asamk@gmx.de>
Fri, 22 Oct 2021 15:39:33 +0000 (17:39 +0200)
lib/src/main/java/org/asamk/signal/manager/Manager.java
lib/src/main/java/org/asamk/signal/manager/ManagerImpl.java
src/main/java/org/asamk/signal/commands/DaemonCommand.java
src/main/java/org/asamk/signal/commands/JsonRpcDispatcherCommand.java
src/main/java/org/asamk/signal/dbus/DbusManagerImpl.java

index ac0cc02fc03aa98017798236ef3cabbfbf06bcfe..0a8762d981d1c6b6329108c249cb159790f432fb 100644 (file)
@@ -193,6 +193,20 @@ public interface Manager extends Closeable {
 
     void requestAllSyncData() throws IOException;
 
 
     void requestAllSyncData() throws IOException;
 
+    /**
+     * Add a handler to receive new messages.
+     * Will start receiving messages from server, if not already started.
+     */
+    void addReceiveHandler(ReceiveMessageHandler handler);
+
+    /**
+     * Remove a handler to receive new messages.
+     * Will stop receiving messages from server, if this was the last registered receiver.
+     */
+    void removeReceiveHandler(ReceiveMessageHandler handler);
+
+    boolean isReceiving();
+
     /**
      * Receive new messages from server, returns if no new message arrive in a timespan of timeout.
      */
     /**
      * Receive new messages from server, returns if no new message arrive in a timespan of timeout.
      */
index 0421a4010634e931821defe3e6852cead0e51230..2ea965919d18d4777af9b654d0d333ec00d3457e 100644 (file)
@@ -137,6 +137,10 @@ public class ManagerImpl implements Manager {
     private boolean hasCaughtUpWithOldMessages = false;
     private boolean ignoreAttachments = false;
 
     private boolean hasCaughtUpWithOldMessages = false;
     private boolean ignoreAttachments = false;
 
+    private Thread receiveThread;
+    private final Set<ReceiveMessageHandler> messageHandlers = new HashSet<>();
+    private boolean isReceivingSynchronous;
+
     ManagerImpl(
             SignalAccount account,
             PathConfig pathConfig,
     ManagerImpl(
             SignalAccount account,
             PathConfig pathConfig,
@@ -872,6 +876,88 @@ public class ManagerImpl implements Manager {
         return actions;
     }
 
         return actions;
     }
 
+    @Override
+    public void addReceiveHandler(final ReceiveMessageHandler handler) {
+        if (isReceivingSynchronous) {
+            throw new IllegalStateException("Already receiving message synchronously.");
+        }
+        synchronized (messageHandlers) {
+            messageHandlers.add(handler);
+
+            startReceiveThreadIfRequired();
+        }
+    }
+
+    private void startReceiveThreadIfRequired() {
+        if (receiveThread != null) {
+            return;
+        }
+        receiveThread = new Thread(() -> {
+            while (!Thread.interrupted()) {
+                try {
+                    receiveMessagesInternal(1L, TimeUnit.HOURS, false, (envelope, decryptedContent, e) -> {
+                        synchronized (messageHandlers) {
+                            for (ReceiveMessageHandler h : messageHandlers) {
+                                try {
+                                    h.handleMessage(envelope, decryptedContent, e);
+                                } catch (Exception ex) {
+                                    logger.warn("Message handler failed, ignoring", ex);
+                                }
+                            }
+                        }
+                    });
+                    break;
+                } catch (IOException e) {
+                    logger.warn("Receiving messages failed, retrying", e);
+                }
+            }
+            hasCaughtUpWithOldMessages = false;
+            synchronized (messageHandlers) {
+                receiveThread = null;
+
+                // Check if in the meantime another handler has been registered
+                if (!messageHandlers.isEmpty()) {
+                    startReceiveThreadIfRequired();
+                }
+            }
+        });
+
+        receiveThread.start();
+    }
+
+    @Override
+    public void removeReceiveHandler(final ReceiveMessageHandler handler) {
+        final Thread thread;
+        synchronized (messageHandlers) {
+            thread = receiveThread;
+            receiveThread = null;
+            messageHandlers.remove(handler);
+            if (!messageHandlers.isEmpty() || isReceivingSynchronous) {
+                return;
+            }
+        }
+
+        stopReceiveThread(thread);
+    }
+
+    private void stopReceiveThread(final Thread thread) {
+        thread.interrupt();
+        try {
+            thread.join();
+        } catch (InterruptedException ignored) {
+        }
+    }
+
+    @Override
+    public boolean isReceiving() {
+        if (isReceivingSynchronous) {
+            return true;
+        }
+        synchronized (messageHandlers) {
+            return messageHandlers.size() > 0;
+        }
+    }
+
     @Override
     public void receiveMessages(long timeout, TimeUnit unit, ReceiveMessageHandler handler) throws IOException {
         receiveMessages(timeout, unit, true, handler);
     @Override
     public void receiveMessages(long timeout, TimeUnit unit, ReceiveMessageHandler handler) throws IOException {
         receiveMessages(timeout, unit, true, handler);
@@ -884,6 +970,23 @@ public class ManagerImpl implements Manager {
 
     private void receiveMessages(
             long timeout, TimeUnit unit, boolean returnOnTimeout, ReceiveMessageHandler handler
 
     private void receiveMessages(
             long timeout, TimeUnit unit, boolean returnOnTimeout, ReceiveMessageHandler handler
+    ) throws IOException {
+        if (isReceiving()) {
+            throw new IllegalStateException("Already receiving message.");
+        }
+        isReceivingSynchronous = true;
+        receiveThread = Thread.currentThread();
+        try {
+            receiveMessagesInternal(timeout, unit, returnOnTimeout, handler);
+        } finally {
+            receiveThread = null;
+            hasCaughtUpWithOldMessages = false;
+            isReceivingSynchronous = false;
+        }
+    }
+
+    private void receiveMessagesInternal(
+            long timeout, TimeUnit unit, boolean returnOnTimeout, ReceiveMessageHandler handler
     ) throws IOException {
         retryFailedReceivedMessages(handler);
 
     ) throws IOException {
         retryFailedReceivedMessages(handler);
 
@@ -1249,6 +1352,15 @@ public class ManagerImpl implements Manager {
     }
 
     private void close(boolean closeAccount) throws IOException {
     }
 
     private void close(boolean closeAccount) throws IOException {
+        Thread thread;
+        synchronized (messageHandlers) {
+            messageHandlers.clear();
+            thread = receiveThread;
+            receiveThread = null;
+        }
+        if (thread != null) {
+            stopReceiveThread(thread);
+        }
         executor.shutdown();
 
         dependencies.getSignalWebSocket().disconnect();
         executor.shutdown();
 
         dependencies.getSignalWebSocket().disconnect();
index 9627d9fb7ebb9d79a7191e4b0213f464ec1b377a..a121c7e93de4f31bc9dfb60b7fe480baf8299169 100644 (file)
@@ -71,6 +71,9 @@ public class DaemonCommand implements MultiLocalCommand {
 
             try {
                 t.join();
 
             try {
                 t.join();
+                synchronized (this) {
+                    wait();
+                }
             } catch (InterruptedException ignored) {
             }
         } catch (DBusException | IOException e) {
             } catch (InterruptedException ignored) {
             }
         } catch (DBusException | IOException e) {
@@ -128,27 +131,11 @@ public class DaemonCommand implements MultiLocalCommand {
 
         logger.info("Exported dbus object: " + objectPath);
 
 
         logger.info("Exported dbus object: " + objectPath);
 
-        final var thread = new Thread(() -> {
-            while (!Thread.interrupted()) {
-                try {
-                    final var receiveMessageHandler = outputWriter instanceof JsonWriter
-                            ? new JsonDbusReceiveMessageHandler(m, (JsonWriter) outputWriter, conn, objectPath)
-                            : new DbusReceiveMessageHandler(m, (PlainTextWriter) outputWriter, conn, objectPath);
-                    m.receiveMessages(receiveMessageHandler);
-                    break;
-                } catch (IOException e) {
-                    logger.warn("Receiving messages failed, retrying", e);
-                }
-            }
-            try {
-                initThread.join();
-            } catch (InterruptedException ignored) {
-            }
-            signal.close();
-        });
-
-        thread.start();
-
-        return thread;
+        final var receiveMessageHandler = outputWriter instanceof JsonWriter ? new JsonDbusReceiveMessageHandler(m,
+                (JsonWriter) outputWriter,
+                conn,
+                objectPath) : new DbusReceiveMessageHandler(m, (PlainTextWriter) outputWriter, conn, objectPath);
+        m.addReceiveHandler(receiveMessageHandler);
+        return initThread;
     }
 }
     }
 }
index 2a95a8803319e9d0db2aa7269e6d015f71d945fa..6e0c3173d802813466d4fa491ecfe7aa03e61245 100644 (file)
@@ -70,10 +70,11 @@ public class JsonRpcDispatcherCommand implements LocalCommand {
         final var objectMapper = Util.createJsonObjectMapper();
         final var jsonRpcSender = new JsonRpcSender((JsonWriter) outputWriter);
 
         final var objectMapper = Util.createJsonObjectMapper();
         final var jsonRpcSender = new JsonRpcSender((JsonWriter) outputWriter);
 
-        final var receiveThread = receiveMessages(s -> jsonRpcSender.sendRequest(JsonRpcRequest.forNotification(
-                "receive",
-                objectMapper.valueToTree(s),
-                null)), m);
+        final var receiveMessageHandler = new JsonReceiveMessageHandler(m,
+                s -> jsonRpcSender.sendRequest(JsonRpcRequest.forNotification("receive",
+                        objectMapper.valueToTree(s),
+                        null)));
+        m.addReceiveHandler(receiveMessageHandler);
 
         // Maybe this should be handled inside the Manager
         while (!m.hasCaughtUpWithOldMessages()) {
 
         // Maybe this should be handled inside the Manager
         while (!m.hasCaughtUpWithOldMessages()) {
@@ -97,11 +98,7 @@ public class JsonRpcDispatcherCommand implements LocalCommand {
         jsonRpcReader.readRequests((method, params) -> handleRequest(m, objectMapper, method, params),
                 response -> logger.debug("Received unexpected response for id {}", response.getId()));
 
         jsonRpcReader.readRequests((method, params) -> handleRequest(m, objectMapper, method, params),
                 response -> logger.debug("Received unexpected response for id {}", response.getId()));
 
-        receiveThread.interrupt();
-        try {
-            receiveThread.join();
-        } catch (InterruptedException ignored) {
-        }
+        m.removeReceiveHandler(receiveMessageHandler);
     }
 
     private JsonNode handleRequest(
     }
 
     private JsonNode handleRequest(
@@ -166,22 +163,4 @@ public class JsonRpcDispatcherCommand implements LocalCommand {
         }
         command.handleCommand(requestParams, m, outputWriter);
     }
         }
         command.handleCommand(requestParams, m, outputWriter);
     }
-
-    private Thread receiveMessages(JsonWriter jsonWriter, Manager m) {
-        final var thread = new Thread(() -> {
-            while (!Thread.interrupted()) {
-                try {
-                    final var receiveMessageHandler = new JsonReceiveMessageHandler(m, jsonWriter);
-                    m.receiveMessages(receiveMessageHandler);
-                    break;
-                } catch (IOException e) {
-                    logger.warn("Receiving messages failed, retrying", e);
-                }
-            }
-        });
-
-        thread.start();
-
-        return thread;
-    }
 }
 }
index 31e29ac931faa7ac97b1da59c2a61b1471ca8325..fcbadd38de3cd43fb1317a6c6fc94e6fa704b645 100644 (file)
@@ -423,6 +423,21 @@ public class DbusManagerImpl implements Manager {
         signal.sendSyncRequest();
     }
 
         signal.sendSyncRequest();
     }
 
+    @Override
+    public void addReceiveHandler(final ReceiveMessageHandler handler) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void removeReceiveHandler(final ReceiveMessageHandler handler) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public boolean isReceiving() {
+        throw new UnsupportedOperationException();
+    }
+
     @Override
     public void receiveMessages(final ReceiveMessageHandler handler) throws IOException {
         throw new UnsupportedOperationException();
     @Override
     public void receiveMessages(final ReceiveMessageHandler handler) throws IOException {
         throw new UnsupportedOperationException();