]> nmode's Git Repositories - signal-cli/blobdiff - lib/src/main/java/org/asamk/signal/manager/ManagerImpl.java
Use Java 17
[signal-cli] / lib / src / main / java / org / asamk / signal / manager / ManagerImpl.java
index 0deafc833db86b12a06d2dad4ab0400463bdb357..1a1e735e15a6a7d16b266e5de29c7d7ea29ebb3f 100644 (file)
@@ -135,6 +135,11 @@ public class ManagerImpl implements Manager {
 
     private final Context context;
     private boolean hasCaughtUpWithOldMessages = false;
+    private boolean ignoreAttachments = false;
+
+    private Thread receiveThread;
+    private final Set<ReceiveMessageHandler> messageHandlers = new HashSet<>();
+    private boolean isReceivingSynchronous;
 
     ManagerImpl(
             SignalAccount account,
@@ -564,16 +569,15 @@ public class ManagerImpl implements Manager {
         long timestamp = System.currentTimeMillis();
         messageBuilder.withTimestamp(timestamp);
         for (final var recipient : recipients) {
-            if (recipient instanceof RecipientIdentifier.Single) {
-                final var recipientId = resolveRecipient((RecipientIdentifier.Single) recipient);
+            if (recipient instanceof RecipientIdentifier.Single single) {
+                final var recipientId = resolveRecipient(single);
                 final var result = sendHelper.sendMessage(messageBuilder, recipientId);
                 results.put(recipient, List.of(result));
             } else if (recipient instanceof RecipientIdentifier.NoteToSelf) {
                 final var result = sendHelper.sendSelfMessage(messageBuilder);
                 results.put(recipient, List.of(result));
-            } else if (recipient instanceof RecipientIdentifier.Group) {
-                final var groupId = ((RecipientIdentifier.Group) recipient).groupId;
-                final var result = sendHelper.sendAsGroupMessage(messageBuilder, groupId);
+            } else if (recipient instanceof RecipientIdentifier.Group group) {
+                final var result = sendHelper.sendAsGroupMessage(messageBuilder, group.groupId);
                 results.put(recipient, result);
             }
         }
@@ -824,10 +828,10 @@ public class ManagerImpl implements Manager {
         return registeredUsers;
     }
 
-    private void retryFailedReceivedMessages(ReceiveMessageHandler handler, boolean ignoreAttachments) {
+    private void retryFailedReceivedMessages(ReceiveMessageHandler handler) {
         Set<HandleAction> queuedActions = new HashSet<>();
         for (var cachedMessage : account.getMessageCache().getCachedMessages()) {
-            var actions = retryFailedReceivedMessage(handler, ignoreAttachments, cachedMessage);
+            var actions = retryFailedReceivedMessage(handler, cachedMessage);
             if (actions != null) {
                 queuedActions.addAll(actions);
             }
@@ -836,7 +840,7 @@ public class ManagerImpl implements Manager {
     }
 
     private List<HandleAction> retryFailedReceivedMessage(
-            final ReceiveMessageHandler handler, final boolean ignoreAttachments, final CachedMessage cachedMessage
+            final ReceiveMessageHandler handler, final CachedMessage cachedMessage
     ) {
         var envelope = cachedMessage.loadEnvelope();
         if (envelope == null) {
@@ -872,14 +876,118 @@ public class ManagerImpl implements Manager {
     }
 
     @Override
-    public void receiveMessages(
-            long timeout,
-            TimeUnit unit,
-            boolean returnOnTimeout,
-            boolean ignoreAttachments,
-            ReceiveMessageHandler handler
+    public void addReceiveHandler(final ReceiveMessageHandler handler) {
+        if (isReceivingSynchronous) {
+            throw new IllegalStateException("Already receiving message synchronously.");
+        }
+        synchronized (messageHandlers) {
+            messageHandlers.add(handler);
+
+            startReceiveThreadIfRequired();
+        }
+    }
+
+    private void startReceiveThreadIfRequired() {
+        if (receiveThread != null) {
+            return;
+        }
+        receiveThread = new Thread(() -> {
+            while (!Thread.interrupted()) {
+                try {
+                    receiveMessagesInternal(1L, TimeUnit.HOURS, false, (envelope, decryptedContent, e) -> {
+                        synchronized (messageHandlers) {
+                            for (ReceiveMessageHandler h : messageHandlers) {
+                                try {
+                                    h.handleMessage(envelope, decryptedContent, e);
+                                } catch (Exception ex) {
+                                    logger.warn("Message handler failed, ignoring", ex);
+                                }
+                            }
+                        }
+                    });
+                    break;
+                } catch (IOException e) {
+                    logger.warn("Receiving messages failed, retrying", e);
+                }
+            }
+            hasCaughtUpWithOldMessages = false;
+            synchronized (messageHandlers) {
+                receiveThread = null;
+
+                // Check if in the meantime another handler has been registered
+                if (!messageHandlers.isEmpty()) {
+                    startReceiveThreadIfRequired();
+                }
+            }
+        });
+
+        receiveThread.start();
+    }
+
+    @Override
+    public void removeReceiveHandler(final ReceiveMessageHandler handler) {
+        final Thread thread;
+        synchronized (messageHandlers) {
+            thread = receiveThread;
+            receiveThread = null;
+            messageHandlers.remove(handler);
+            if (!messageHandlers.isEmpty() || isReceivingSynchronous) {
+                return;
+            }
+        }
+
+        stopReceiveThread(thread);
+    }
+
+    private void stopReceiveThread(final Thread thread) {
+        thread.interrupt();
+        try {
+            thread.join();
+        } catch (InterruptedException ignored) {
+        }
+    }
+
+    @Override
+    public boolean isReceiving() {
+        if (isReceivingSynchronous) {
+            return true;
+        }
+        synchronized (messageHandlers) {
+            return messageHandlers.size() > 0;
+        }
+    }
+
+    @Override
+    public void receiveMessages(long timeout, TimeUnit unit, ReceiveMessageHandler handler) throws IOException {
+        receiveMessages(timeout, unit, true, handler);
+    }
+
+    @Override
+    public void receiveMessages(ReceiveMessageHandler handler) throws IOException {
+        receiveMessages(1L, TimeUnit.HOURS, false, handler);
+    }
+
+    private void receiveMessages(
+            long timeout, TimeUnit unit, boolean returnOnTimeout, ReceiveMessageHandler handler
+    ) throws IOException {
+        if (isReceiving()) {
+            throw new IllegalStateException("Already receiving message.");
+        }
+        isReceivingSynchronous = true;
+        receiveThread = Thread.currentThread();
+        try {
+            receiveMessagesInternal(timeout, unit, returnOnTimeout, handler);
+        } finally {
+            receiveThread = null;
+            hasCaughtUpWithOldMessages = false;
+            isReceivingSynchronous = false;
+        }
+    }
+
+    private void receiveMessagesInternal(
+            long timeout, TimeUnit unit, boolean returnOnTimeout, ReceiveMessageHandler handler
     ) throws IOException {
-        retryFailedReceivedMessages(handler, ignoreAttachments);
+        retryFailedReceivedMessages(handler);
 
         Set<HandleAction> queuedActions = new HashSet<>();
 
@@ -980,6 +1088,11 @@ public class ManagerImpl implements Manager {
         queuedActions.clear();
     }
 
+    @Override
+    public void setIgnoreAttachments(final boolean ignoreAttachments) {
+        this.ignoreAttachments = ignoreAttachments;
+    }
+
     @Override
     public boolean hasCaughtUpWithOldMessages() {
         return hasCaughtUpWithOldMessages;
@@ -1081,11 +1194,12 @@ public class ManagerImpl implements Manager {
         }
 
         final var address = account.getRecipientStore().resolveRecipientAddress(identityInfo.getRecipientId());
+        final var scannableFingerprint = identityHelper.computeSafetyNumberForScanning(identityInfo.getRecipientId(),
+                identityInfo.getIdentityKey());
         return new Identity(address,
                 identityInfo.getIdentityKey(),
                 identityHelper.computeSafetyNumber(identityInfo.getRecipientId(), identityInfo.getIdentityKey()),
-                identityHelper.computeSafetyNumberForScanning(identityInfo.getRecipientId(),
-                        identityInfo.getIdentityKey()).getSerialized(),
+                scannableFingerprint == null ? null : scannableFingerprint.getSerialized(),
                 identityInfo.getTrustLevel(),
                 identityInfo.getDateAdded());
     }
@@ -1237,6 +1351,15 @@ public class ManagerImpl implements Manager {
     }
 
     private void close(boolean closeAccount) throws IOException {
+        Thread thread;
+        synchronized (messageHandlers) {
+            messageHandlers.clear();
+            thread = receiveThread;
+            receiveThread = null;
+        }
+        if (thread != null) {
+            stopReceiveThread(thread);
+        }
         executor.shutdown();
 
         dependencies.getSignalWebSocket().disconnect();