]> nmode's Git Repositories - signal-cli/blobdiff - lib/src/main/java/org/asamk/signal/manager/ManagerImpl.java
Use Java 17
[signal-cli] / lib / src / main / java / org / asamk / signal / manager / ManagerImpl.java
index 0421a4010634e931821defe3e6852cead0e51230..1a1e735e15a6a7d16b266e5de29c7d7ea29ebb3f 100644 (file)
@@ -137,6 +137,10 @@ public class ManagerImpl implements Manager {
     private boolean hasCaughtUpWithOldMessages = false;
     private boolean ignoreAttachments = false;
 
+    private Thread receiveThread;
+    private final Set<ReceiveMessageHandler> messageHandlers = new HashSet<>();
+    private boolean isReceivingSynchronous;
+
     ManagerImpl(
             SignalAccount account,
             PathConfig pathConfig,
@@ -565,16 +569,15 @@ public class ManagerImpl implements Manager {
         long timestamp = System.currentTimeMillis();
         messageBuilder.withTimestamp(timestamp);
         for (final var recipient : recipients) {
-            if (recipient instanceof RecipientIdentifier.Single) {
-                final var recipientId = resolveRecipient((RecipientIdentifier.Single) recipient);
+            if (recipient instanceof RecipientIdentifier.Single single) {
+                final var recipientId = resolveRecipient(single);
                 final var result = sendHelper.sendMessage(messageBuilder, recipientId);
                 results.put(recipient, List.of(result));
             } else if (recipient instanceof RecipientIdentifier.NoteToSelf) {
                 final var result = sendHelper.sendSelfMessage(messageBuilder);
                 results.put(recipient, List.of(result));
-            } else if (recipient instanceof RecipientIdentifier.Group) {
-                final var groupId = ((RecipientIdentifier.Group) recipient).groupId;
-                final var result = sendHelper.sendAsGroupMessage(messageBuilder, groupId);
+            } else if (recipient instanceof RecipientIdentifier.Group group) {
+                final var result = sendHelper.sendAsGroupMessage(messageBuilder, group.groupId);
                 results.put(recipient, result);
             }
         }
@@ -872,6 +875,88 @@ public class ManagerImpl implements Manager {
         return actions;
     }
 
+    @Override
+    public void addReceiveHandler(final ReceiveMessageHandler handler) {
+        if (isReceivingSynchronous) {
+            throw new IllegalStateException("Already receiving message synchronously.");
+        }
+        synchronized (messageHandlers) {
+            messageHandlers.add(handler);
+
+            startReceiveThreadIfRequired();
+        }
+    }
+
+    private void startReceiveThreadIfRequired() {
+        if (receiveThread != null) {
+            return;
+        }
+        receiveThread = new Thread(() -> {
+            while (!Thread.interrupted()) {
+                try {
+                    receiveMessagesInternal(1L, TimeUnit.HOURS, false, (envelope, decryptedContent, e) -> {
+                        synchronized (messageHandlers) {
+                            for (ReceiveMessageHandler h : messageHandlers) {
+                                try {
+                                    h.handleMessage(envelope, decryptedContent, e);
+                                } catch (Exception ex) {
+                                    logger.warn("Message handler failed, ignoring", ex);
+                                }
+                            }
+                        }
+                    });
+                    break;
+                } catch (IOException e) {
+                    logger.warn("Receiving messages failed, retrying", e);
+                }
+            }
+            hasCaughtUpWithOldMessages = false;
+            synchronized (messageHandlers) {
+                receiveThread = null;
+
+                // Check if in the meantime another handler has been registered
+                if (!messageHandlers.isEmpty()) {
+                    startReceiveThreadIfRequired();
+                }
+            }
+        });
+
+        receiveThread.start();
+    }
+
+    @Override
+    public void removeReceiveHandler(final ReceiveMessageHandler handler) {
+        final Thread thread;
+        synchronized (messageHandlers) {
+            thread = receiveThread;
+            receiveThread = null;
+            messageHandlers.remove(handler);
+            if (!messageHandlers.isEmpty() || isReceivingSynchronous) {
+                return;
+            }
+        }
+
+        stopReceiveThread(thread);
+    }
+
+    private void stopReceiveThread(final Thread thread) {
+        thread.interrupt();
+        try {
+            thread.join();
+        } catch (InterruptedException ignored) {
+        }
+    }
+
+    @Override
+    public boolean isReceiving() {
+        if (isReceivingSynchronous) {
+            return true;
+        }
+        synchronized (messageHandlers) {
+            return messageHandlers.size() > 0;
+        }
+    }
+
     @Override
     public void receiveMessages(long timeout, TimeUnit unit, ReceiveMessageHandler handler) throws IOException {
         receiveMessages(timeout, unit, true, handler);
@@ -884,6 +969,23 @@ public class ManagerImpl implements Manager {
 
     private void receiveMessages(
             long timeout, TimeUnit unit, boolean returnOnTimeout, ReceiveMessageHandler handler
+    ) throws IOException {
+        if (isReceiving()) {
+            throw new IllegalStateException("Already receiving message.");
+        }
+        isReceivingSynchronous = true;
+        receiveThread = Thread.currentThread();
+        try {
+            receiveMessagesInternal(timeout, unit, returnOnTimeout, handler);
+        } finally {
+            receiveThread = null;
+            hasCaughtUpWithOldMessages = false;
+            isReceivingSynchronous = false;
+        }
+    }
+
+    private void receiveMessagesInternal(
+            long timeout, TimeUnit unit, boolean returnOnTimeout, ReceiveMessageHandler handler
     ) throws IOException {
         retryFailedReceivedMessages(handler);
 
@@ -1249,6 +1351,15 @@ public class ManagerImpl implements Manager {
     }
 
     private void close(boolean closeAccount) throws IOException {
+        Thread thread;
+        synchronized (messageHandlers) {
+            messageHandlers.clear();
+            thread = receiveThread;
+            receiveThread = null;
+        }
+        if (thread != null) {
+            stopReceiveThread(thread);
+        }
         executor.shutdown();
 
         dependencies.getSignalWebSocket().disconnect();