X-Git-Url: https://git.nmode.ca/signal-cli/blobdiff_plain/5ed9db4f08e52ed0c42cb42740f85d2ad346e13c..de2bfc7f7942908222ebcbac17e6072055acc062:/src/main/java/org/asamk/signal/dbus/DbusManagerImpl.java diff --git a/src/main/java/org/asamk/signal/dbus/DbusManagerImpl.java b/src/main/java/org/asamk/signal/dbus/DbusManagerImpl.java index 8e92cdf4..b59be923 100644 --- a/src/main/java/org/asamk/signal/dbus/DbusManagerImpl.java +++ b/src/main/java/org/asamk/signal/dbus/DbusManagerImpl.java @@ -58,6 +58,7 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; import java.util.function.Supplier; @@ -497,39 +498,46 @@ public class DbusManagerImpl implements Manager { } } - @Override - public void receiveMessages(final ReceiveMessageHandler handler) throws IOException { - addReceiveHandler(handler); - try { - synchronized (this) { - this.wait(); - } - } catch (InterruptedException ignored) { - } - removeReceiveHandler(handler); - } - @Override public void receiveMessages( - final Duration timeout, final ReceiveMessageHandler handler + Optional timeout, Optional maxMessages, ReceiveMessageHandler handler ) throws IOException { + final var remainingMessages = new AtomicInteger(maxMessages.orElse(-1)); final var lastMessage = new AtomicLong(System.currentTimeMillis()); + final var thread = Thread.currentThread(); final ReceiveMessageHandler receiveHandler = (envelope, e) -> { lastMessage.set(System.currentTimeMillis()); handler.handleMessage(envelope, e); + if (remainingMessages.get() > 0) { + if (remainingMessages.decrementAndGet() <= 0) { + remainingMessages.set(0); + thread.interrupt(); + } + } }; addReceiveHandler(receiveHandler); - while (true) { + if (timeout.isPresent()) { + while (remainingMessages.get() != 0) { + try { + final var passedTime = System.currentTimeMillis() - lastMessage.get(); + final var sleepTimeRemaining = timeout.get().toMillis() - passedTime; + if (sleepTimeRemaining < 0) { + break; + } + Thread.sleep(sleepTimeRemaining); + } catch (InterruptedException ignored) { + } + } + } else { try { - final var sleepTimeRemaining = timeout.toMillis() - (System.currentTimeMillis() - lastMessage.get()); - if (sleepTimeRemaining < 0) { - break; + synchronized (this) { + this.wait(); } - Thread.sleep(sleepTimeRemaining); } catch (InterruptedException ignored) { } } + removeReceiveHandler(receiveHandler); }