]> nmode's Git Repositories - signal-cli/blobdiff - src/main/java/org/asamk/signal/dbus/DbusManagerImpl.java
Add command to get an attachment (#1080)
[signal-cli] / src / main / java / org / asamk / signal / dbus / DbusManagerImpl.java
index 8e92cdf4ab4efe596b03a0d3fa65bb11a81408a8..c0dc1cf832ac8e68f4b4b06ff050fc1dbbc276d2 100644 (file)
@@ -46,6 +46,7 @@ import org.freedesktop.dbus.types.Variant;
 
 import java.io.File;
 import java.io.IOException;
+import java.io.InputStream;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.time.Duration;
@@ -58,6 +59,7 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Function;
 import java.util.function.Supplier;
@@ -497,39 +499,46 @@ public class DbusManagerImpl implements Manager {
         }
     }
 
-    @Override
-    public void receiveMessages(final ReceiveMessageHandler handler) throws IOException {
-        addReceiveHandler(handler);
-        try {
-            synchronized (this) {
-                this.wait();
-            }
-        } catch (InterruptedException ignored) {
-        }
-        removeReceiveHandler(handler);
-    }
-
     @Override
     public void receiveMessages(
-            final Duration timeout, final ReceiveMessageHandler handler
+            Optional<Duration> timeout, Optional<Integer> maxMessages, ReceiveMessageHandler handler
     ) throws IOException {
+        final var remainingMessages = new AtomicInteger(maxMessages.orElse(-1));
         final var lastMessage = new AtomicLong(System.currentTimeMillis());
+        final var thread = Thread.currentThread();
 
         final ReceiveMessageHandler receiveHandler = (envelope, e) -> {
             lastMessage.set(System.currentTimeMillis());
             handler.handleMessage(envelope, e);
+            if (remainingMessages.get() > 0) {
+                if (remainingMessages.decrementAndGet() <= 0) {
+                    remainingMessages.set(0);
+                    thread.interrupt();
+                }
+            }
         };
         addReceiveHandler(receiveHandler);
-        while (true) {
+        if (timeout.isPresent()) {
+            while (remainingMessages.get() != 0) {
+                try {
+                    final var passedTime = System.currentTimeMillis() - lastMessage.get();
+                    final var sleepTimeRemaining = timeout.get().toMillis() - passedTime;
+                    if (sleepTimeRemaining < 0) {
+                        break;
+                    }
+                    Thread.sleep(sleepTimeRemaining);
+                } catch (InterruptedException ignored) {
+                }
+            }
+        } else {
             try {
-                final var sleepTimeRemaining = timeout.toMillis() - (System.currentTimeMillis() - lastMessage.get());
-                if (sleepTimeRemaining < 0) {
-                    break;
+                synchronized (this) {
+                    this.wait();
                 }
-                Thread.sleep(sleepTimeRemaining);
             } catch (InterruptedException ignored) {
             }
         }
+
         removeReceiveHandler(receiveHandler);
     }
 
@@ -908,6 +917,11 @@ public class DbusManagerImpl implements Manager {
         }).toList();
     }
 
+    @Override
+    public InputStream retrieveAttachment(final String id) throws IOException {
+        throw new UnsupportedOperationException();
+    }
+
     @SuppressWarnings("unchecked")
     private <T> T getValue(
             final Map<String, Variant<?>> stringVariantMap, final String field