]> nmode's Git Repositories - signal-cli/commitdiff
Use improved shutdown for receive command
authorAsamK <asamk@gmx.de>
Thu, 9 Nov 2023 18:22:58 +0000 (19:22 +0100)
committerAsamK <asamk@gmx.de>
Thu, 9 Nov 2023 18:23:11 +0000 (19:23 +0100)
lib/src/main/java/org/asamk/signal/manager/Manager.java
lib/src/main/java/org/asamk/signal/manager/internal/ManagerImpl.java
src/main/java/org/asamk/signal/commands/ReceiveCommand.java
src/main/java/org/asamk/signal/dbus/DbusManagerImpl.java

index 41e08fb6309c9ddb8b1ff91faa0eed4d9a8f08c6..da69a8da5557c561be40a7df9cec952ee5bec19c 100644 (file)
@@ -255,6 +255,8 @@ public interface Manager extends Closeable {
             Optional<Duration> timeout, Optional<Integer> maxMessages, ReceiveMessageHandler handler
     ) throws IOException, AlreadyReceivingException;
 
+    void stopReceiveMessages();
+
     void setReceiveConfig(ReceiveConfig receiveConfig);
 
     boolean isContactBlocked(RecipientIdentifier.Single recipient);
index cbaaaab362f80c699766b09a8d3bede5de38549c..af001f40b0ca115f0f5bb697bb3bc87badc5fb6b 100644 (file)
@@ -1091,6 +1091,20 @@ public class ManagerImpl implements Manager {
         receiveMessages(timeout.orElse(Duration.ofMinutes(1)), timeout.isPresent(), maxMessages.orElse(null), handler);
     }
 
+    @Override
+    public void stopReceiveMessages() {
+        Thread thread = null;
+        synchronized (messageHandlers) {
+            if (isReceivingSynchronous) {
+                thread = receiveThread;
+                receiveThread = null;
+            }
+        }
+        if (thread != null) {
+            stopReceiveThread(thread);
+        }
+    }
+
     private void receiveMessages(
             Duration timeout, boolean returnOnTimeout, Integer maxMessages, ReceiveMessageHandler handler
     ) throws IOException, AlreadyReceivingException {
index 228a05619441ab15ff4e6cd0875c1571922258cb..4a6f746aa804823eef7803cd7906eff8edc4cc5c 100644 (file)
@@ -8,6 +8,7 @@ import net.sourceforge.argparse4j.inf.Subparser;
 
 import org.asamk.signal.OutputType;
 import org.asamk.signal.ReceiveMessageHandler;
+import org.asamk.signal.Shutdown;
 import org.asamk.signal.commands.exceptions.CommandException;
 import org.asamk.signal.commands.exceptions.IOErrorException;
 import org.asamk.signal.commands.exceptions.UserErrorException;
@@ -67,6 +68,7 @@ public class ReceiveCommand implements LocalCommand, JsonRpcSingleCommand<Receiv
     public void handleCommand(
             final Namespace ns, final Manager m, final OutputWriter outputWriter
     ) throws CommandException {
+        Shutdown.installHandler();
         final var timeout = ns.getDouble("timeout");
         final var maxMessagesRaw = ns.getInt("max-messages");
         final var ignoreAttachments = Boolean.TRUE.equals(ns.getBoolean("ignore-attachments"));
@@ -80,6 +82,7 @@ public class ReceiveCommand implements LocalCommand, JsonRpcSingleCommand<Receiv
             };
             final var duration = timeout < 0 ? null : Duration.ofMillis((long) (timeout * 1000));
             final var maxMessages = maxMessagesRaw < 0 ? null : maxMessagesRaw;
+            Shutdown.registerShutdownListener(m::stopReceiveMessages);
             m.receiveMessages(Optional.ofNullable(duration), Optional.ofNullable(maxMessages), handler);
         } catch (IOException e) {
             throw new IOErrorException("Error while receiving messages: " + e.getMessage(), e);
index e9fc3f9cd14a74852cb4de9519fb0028eca18174..cd6c67150bd260a2377273cd483ac10b066b9058 100644 (file)
@@ -3,6 +3,7 @@ package org.asamk.signal.dbus;
 import org.asamk.Signal;
 import org.asamk.signal.DbusConfig;
 import org.asamk.signal.manager.Manager;
+import org.asamk.signal.manager.api.AlreadyReceivingException;
 import org.asamk.signal.manager.api.AttachmentInvalidException;
 import org.asamk.signal.manager.api.CaptchaRequiredException;
 import org.asamk.signal.manager.api.Configuration;
@@ -548,10 +549,17 @@ public class DbusManagerImpl implements Manager {
         }
     }
 
+    private Thread receiveThread;
+
     @Override
     public void receiveMessages(
             Optional<Duration> timeout, Optional<Integer> maxMessages, ReceiveMessageHandler handler
-    ) throws IOException {
+    ) throws IOException, AlreadyReceivingException {
+        if (receiveThread != null) {
+            throw new AlreadyReceivingException("Already receiving message.");
+        }
+        receiveThread = Thread.currentThread();
+
         final var remainingMessages = new AtomicInteger(maxMessages.orElse(-1));
         final var lastMessage = new AtomicLong(System.currentTimeMillis());
         final var thread = Thread.currentThread();
@@ -577,6 +585,7 @@ public class DbusManagerImpl implements Manager {
                     }
                     Thread.sleep(sleepTimeRemaining);
                 } catch (InterruptedException ignored) {
+                    break;
                 }
             }
         } else {
@@ -589,6 +598,14 @@ public class DbusManagerImpl implements Manager {
         }
 
         removeReceiveHandler(receiveHandler);
+        receiveThread = null;
+    }
+
+    @Override
+    public void stopReceiveMessages() {
+        if (receiveThread != null) {
+            receiveThread.interrupt();
+        }
     }
 
     @Override