]> nmode's Git Repositories - signal-cli/blobdiff - src/main/java/org/asamk/signal/dbus/DbusManagerImpl.java
Implement socket/tcp for daemon command
[signal-cli] / src / main / java / org / asamk / signal / dbus / DbusManagerImpl.java
index c8dd1d45b3b85cddd06c479c171e6dd10fee648e..cc346f7af7ff7ab4b8fd434dfb7c1297b0c0eeb4 100644 (file)
@@ -55,6 +55,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 /**
  * This class implements the Manager interface using the DBus Signal interface, where possible.
@@ -65,6 +66,7 @@ public class DbusManagerImpl implements Manager {
     private final Signal signal;
     private final DBusConnection connection;
 
+    private final Set<ReceiveMessageHandler> weakHandlers = new HashSet<>();
     private final Set<ReceiveMessageHandler> messageHandlers = new HashSet<>();
     private DBusSigHandler<Signal.MessageReceivedV2> dbusMsgHandler;
     private DBusSigHandler<Signal.ReceiptReceivedV2> dbusRcptHandler;
@@ -424,18 +426,23 @@ public class DbusManagerImpl implements Manager {
     }
 
     @Override
-    public void addReceiveHandler(final ReceiveMessageHandler handler) {
+    public void addReceiveHandler(final ReceiveMessageHandler handler, final boolean isWeakListener) {
         synchronized (messageHandlers) {
-            if (messageHandlers.size() == 0) {
-                installMessageHandlers();
+            if (isWeakListener) {
+                weakHandlers.add(handler);
+            } else {
+                if (messageHandlers.size() == 0) {
+                    installMessageHandlers();
+                }
+                messageHandlers.add(handler);
             }
-            messageHandlers.add(handler);
         }
     }
 
     @Override
     public void removeReceiveHandler(final ReceiveMessageHandler handler) {
         synchronized (messageHandlers) {
+            weakHandlers.remove(handler);
             messageHandlers.remove(handler);
             if (messageHandlers.size() == 0) {
                 uninstallMessageHandlers();
@@ -582,8 +589,11 @@ public class DbusManagerImpl implements Manager {
             this.notify();
         }
         synchronized (messageHandlers) {
+            if (messageHandlers.size() > 0) {
+                uninstallMessageHandlers();
+            }
+            weakHandlers.clear();
             messageHandlers.clear();
-            uninstallMessageHandlers();
         }
     }
 
@@ -664,11 +674,7 @@ public class DbusManagerImpl implements Manager {
                                 List.of())),
                         Optional.empty(),
                         Optional.empty());
-                synchronized (messageHandlers) {
-                    for (final var messageHandler : messageHandlers) {
-                        messageHandler.handleMessage(envelope, null);
-                    }
-                }
+                notifyMessageHandlers(envelope);
             };
             connection.addSigHandler(Signal.MessageReceivedV2.class, signal, this.dbusMsgHandler);
 
@@ -693,11 +699,7 @@ public class DbusManagerImpl implements Manager {
                         Optional.empty(),
                         Optional.empty(),
                         Optional.empty());
-                synchronized (messageHandlers) {
-                    for (final var messageHandler : messageHandlers) {
-                        messageHandler.handleMessage(envelope, null);
-                    }
-                }
+                notifyMessageHandlers(envelope);
             };
             connection.addSigHandler(Signal.ReceiptReceivedV2.class, signal, this.dbusRcptHandler);
 
@@ -747,20 +749,26 @@ public class DbusManagerImpl implements Manager {
                                 Optional.empty(),
                                 Optional.empty())),
                         Optional.empty());
-                synchronized (messageHandlers) {
-                    for (final var messageHandler : messageHandlers) {
-                        messageHandler.handleMessage(envelope, null);
-                    }
-                }
+                notifyMessageHandlers(envelope);
             };
             connection.addSigHandler(Signal.SyncMessageReceivedV2.class, signal, this.dbusSyncHandler);
         } catch (DBusException e) {
             e.printStackTrace();
         }
+        signal.subscribeReceive();
+    }
+
+    private void notifyMessageHandlers(final MessageEnvelope envelope) {
+        synchronized (messageHandlers) {
+            Stream.concat(messageHandlers.stream(), weakHandlers.stream()).forEach(h -> {
+                h.handleMessage(envelope, null);
+            });
+        }
     }
 
     private void uninstallMessageHandlers() {
         try {
+            signal.unsubscribeReceive();
             connection.removeSigHandler(Signal.MessageReceivedV2.class, signal, this.dbusMsgHandler);
             connection.removeSigHandler(Signal.ReceiptReceivedV2.class, signal, this.dbusRcptHandler);
             connection.removeSigHandler(Signal.SyncMessageReceivedV2.class, signal, this.dbusSyncHandler);