X-Git-Url: https://git.nmode.ca/signal-cli/blobdiff_plain/1058e33f1208e1ca284d3fd4760380d68093741f..5d33f71d4d337e45c6051274b404424562503938:/src/main/java/org/asamk/signal/dbus/DbusManagerImpl.java diff --git a/src/main/java/org/asamk/signal/dbus/DbusManagerImpl.java b/src/main/java/org/asamk/signal/dbus/DbusManagerImpl.java index e9fc3f9c..cd6c6715 100644 --- a/src/main/java/org/asamk/signal/dbus/DbusManagerImpl.java +++ b/src/main/java/org/asamk/signal/dbus/DbusManagerImpl.java @@ -3,6 +3,7 @@ package org.asamk.signal.dbus; import org.asamk.Signal; import org.asamk.signal.DbusConfig; import org.asamk.signal.manager.Manager; +import org.asamk.signal.manager.api.AlreadyReceivingException; import org.asamk.signal.manager.api.AttachmentInvalidException; import org.asamk.signal.manager.api.CaptchaRequiredException; import org.asamk.signal.manager.api.Configuration; @@ -548,10 +549,17 @@ public class DbusManagerImpl implements Manager { } } + private Thread receiveThread; + @Override public void receiveMessages( Optional timeout, Optional maxMessages, ReceiveMessageHandler handler - ) throws IOException { + ) throws IOException, AlreadyReceivingException { + if (receiveThread != null) { + throw new AlreadyReceivingException("Already receiving message."); + } + receiveThread = Thread.currentThread(); + final var remainingMessages = new AtomicInteger(maxMessages.orElse(-1)); final var lastMessage = new AtomicLong(System.currentTimeMillis()); final var thread = Thread.currentThread(); @@ -577,6 +585,7 @@ public class DbusManagerImpl implements Manager { } Thread.sleep(sleepTimeRemaining); } catch (InterruptedException ignored) { + break; } } } else { @@ -589,6 +598,14 @@ public class DbusManagerImpl implements Manager { } removeReceiveHandler(receiveHandler); + receiveThread = null; + } + + @Override + public void stopReceiveMessages() { + if (receiveThread != null) { + receiveThread.interrupt(); + } } @Override