]> nmode's Git Repositories - signal-cli/commitdiff
Implement socket/tcp for daemon command
authorAsamK <asamk@gmx.de>
Wed, 10 Nov 2021 09:30:57 +0000 (10:30 +0100)
committerAsamK <asamk@gmx.de>
Wed, 10 Nov 2021 11:12:35 +0000 (12:12 +0100)
19 files changed:
data/signal-cli-socket.service [new file with mode: 0644]
data/signal-cli-socket.socket [new file with mode: 0644]
lib/src/main/java/org/asamk/signal/manager/Manager.java
lib/src/main/java/org/asamk/signal/manager/ManagerImpl.java
man/signal-cli-dbus.5.adoc
src/main/java/org/asamk/Signal.java
src/main/java/org/asamk/signal/App.java
src/main/java/org/asamk/signal/JsonReceiveMessageHandler.java
src/main/java/org/asamk/signal/commands/DaemonCommand.java
src/main/java/org/asamk/signal/commands/JsonRpcDispatcherCommand.java
src/main/java/org/asamk/signal/commands/MultiLocalCommand.java
src/main/java/org/asamk/signal/commands/ReceiveMode.java [new file with mode: 0644]
src/main/java/org/asamk/signal/commands/SignalCreator.java
src/main/java/org/asamk/signal/dbus/DbusManagerImpl.java
src/main/java/org/asamk/signal/dbus/DbusSignalControlImpl.java
src/main/java/org/asamk/signal/dbus/DbusSignalImpl.java
src/main/java/org/asamk/signal/jsonrpc/JsonRpcReader.java
src/main/java/org/asamk/signal/jsonrpc/SignalJsonRpcDispatcherHandler.java
src/main/java/org/asamk/signal/util/IOUtils.java

diff --git a/data/signal-cli-socket.service b/data/signal-cli-socket.service
new file mode 100644 (file)
index 0000000..a6a2cfb
--- /dev/null
@@ -0,0 +1,20 @@
+[Unit]
+Description=Send secure messages to Signal clients
+Wants=network-online.target
+After=network-online.target
+Requires=signal-cli-socket.socket
+
+[Service]
+Type=simple
+Environment="SIGNAL_CLI_OPTS=-Xms2m"
+ExecStart=%dir%/bin/signal-cli --config /var/lib/signal-cli daemon
+User=signal-cli
+# JVM always exits with 143 in reaction to SIGTERM signal
+SuccessExitStatus=143
+StandardInput=socket
+StandardOutput=journal
+StandardError=journal
+
+[Install]
+Also=signal-cli-socket.socket
+WantedBy=default.target
diff --git a/data/signal-cli-socket.socket b/data/signal-cli-socket.socket
new file mode 100644 (file)
index 0000000..e858356
--- /dev/null
@@ -0,0 +1,8 @@
+[Unit]
+Description=Send secure messages to Signal clients
+
+[Socket]
+ListenStream=%t/signal-cli/socket
+
+[Install]
+WantedBy=sockets.target
index be5238d140cd981b2188570e37b46a414b905856..49331b1f4a7913c22ed24dc38ab6d888fa1b3463 100644 (file)
@@ -198,7 +198,11 @@ public interface Manager extends Closeable {
      * Add a handler to receive new messages.
      * Will start receiving messages from server, if not already started.
      */
      * Add a handler to receive new messages.
      * Will start receiving messages from server, if not already started.
      */
-    void addReceiveHandler(ReceiveMessageHandler handler);
+    default void addReceiveHandler(ReceiveMessageHandler handler) {
+        addReceiveHandler(handler, false);
+    }
+
+    void addReceiveHandler(ReceiveMessageHandler handler, final boolean isWeakListener);
 
     /**
      * Remove a handler to receive new messages.
 
     /**
      * Remove a handler to receive new messages.
@@ -249,6 +253,9 @@ public interface Manager extends Closeable {
 
     interface ReceiveMessageHandler {
 
 
     interface ReceiveMessageHandler {
 
+        ReceiveMessageHandler EMPTY = (envelope, e) -> {
+        };
+
         void handleMessage(MessageEnvelope envelope, Throwable e);
     }
 }
         void handleMessage(MessageEnvelope envelope, Throwable e);
     }
 }
index 94a3f4bea4477617d897300c9f5e323e54c5c5e5..d7245d56eb5487620cd98b27ccd53a8de6402e19 100644 (file)
@@ -108,6 +108,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.stream.Collectors;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import static org.asamk.signal.manager.config.ServiceConfig.capabilities;
 
 
 import static org.asamk.signal.manager.config.ServiceConfig.capabilities;
 
@@ -139,6 +140,7 @@ public class ManagerImpl implements Manager {
     private boolean ignoreAttachments = false;
 
     private Thread receiveThread;
     private boolean ignoreAttachments = false;
 
     private Thread receiveThread;
+    private final Set<ReceiveMessageHandler> weakHandlers = new HashSet<>();
     private final Set<ReceiveMessageHandler> messageHandlers = new HashSet<>();
     private boolean isReceivingSynchronous;
 
     private final Set<ReceiveMessageHandler> messageHandlers = new HashSet<>();
     private boolean isReceivingSynchronous;
 
@@ -904,14 +906,17 @@ public class ManagerImpl implements Manager {
     }
 
     @Override
     }
 
     @Override
-    public void addReceiveHandler(final ReceiveMessageHandler handler) {
+    public void addReceiveHandler(final ReceiveMessageHandler handler, final boolean isWeakListener) {
         if (isReceivingSynchronous) {
             throw new IllegalStateException("Already receiving message synchronously.");
         }
         synchronized (messageHandlers) {
         if (isReceivingSynchronous) {
             throw new IllegalStateException("Already receiving message synchronously.");
         }
         synchronized (messageHandlers) {
-            messageHandlers.add(handler);
-
-            startReceiveThreadIfRequired();
+            if (isWeakListener) {
+                weakHandlers.add(handler);
+            } else {
+                messageHandlers.add(handler);
+                startReceiveThreadIfRequired();
+            }
         }
     }
 
         }
     }
 
@@ -925,13 +930,13 @@ public class ManagerImpl implements Manager {
                 try {
                     receiveMessagesInternal(1L, TimeUnit.HOURS, false, (envelope, e) -> {
                         synchronized (messageHandlers) {
                 try {
                     receiveMessagesInternal(1L, TimeUnit.HOURS, false, (envelope, e) -> {
                         synchronized (messageHandlers) {
-                            for (ReceiveMessageHandler h : messageHandlers) {
+                            Stream.concat(messageHandlers.stream(), weakHandlers.stream()).forEach(h -> {
                                 try {
                                     h.handleMessage(envelope, e);
                                 } catch (Exception ex) {
                                     logger.warn("Message handler failed, ignoring", ex);
                                 }
                                 try {
                                     h.handleMessage(envelope, e);
                                 } catch (Exception ex) {
                                     logger.warn("Message handler failed, ignoring", ex);
                                 }
-                            }
+                            });
                         }
                     });
                     break;
                         }
                     });
                     break;
@@ -959,8 +964,9 @@ public class ManagerImpl implements Manager {
     public void removeReceiveHandler(final ReceiveMessageHandler handler) {
         final Thread thread;
         synchronized (messageHandlers) {
     public void removeReceiveHandler(final ReceiveMessageHandler handler) {
         final Thread thread;
         synchronized (messageHandlers) {
+            weakHandlers.remove(handler);
             messageHandlers.remove(handler);
             messageHandlers.remove(handler);
-            if (!messageHandlers.isEmpty() || isReceivingSynchronous) {
+            if (!messageHandlers.isEmpty() || receiveThread == null || isReceivingSynchronous) {
                 return;
             }
             thread = receiveThread;
                 return;
             }
             thread = receiveThread;
@@ -1380,6 +1386,7 @@ public class ManagerImpl implements Manager {
     private void close(boolean closeAccount) throws IOException {
         Thread thread;
         synchronized (messageHandlers) {
     private void close(boolean closeAccount) throws IOException {
         Thread thread;
         synchronized (messageHandlers) {
+            weakHandlers.clear();
             messageHandlers.clear();
             thread = receiveThread;
             receiveThread = null;
             messageHandlers.clear();
             thread = receiveThread;
             receiveThread = null;
index 870678e32efe43afa1338f85d123f26c9c50821e..75b4e8a1d20a135fce18bc3b7e7ed5571cd23f6b 100755 (executable)
@@ -51,7 +51,7 @@ Phone numbers always have the format +<countrycode><regional number>
 These methods are available if the daemon is started anonymously (without an explicit `-u USERNAME`).
 Requests are sent to `/org/asamk/Signal`; requests related to individual accounts are sent to
 `/org/asamk/Signal/_441234567890` where the + dialing code is replaced by an underscore (_).
 These methods are available if the daemon is started anonymously (without an explicit `-u USERNAME`).
 Requests are sent to `/org/asamk/Signal`; requests related to individual accounts are sent to
 `/org/asamk/Signal/_441234567890` where the + dialing code is replaced by an underscore (_).
-Only `version()` is activated in single-user mode; the rest are disabled.
+Only `version()` is activated in single-account mode; the rest are disabled.
 
 link() -> deviceLinkUri<s>::
 link(newDeviceName<s>) -> deviceLinkUri<s>::
 
 link() -> deviceLinkUri<s>::
 link(newDeviceName<s>) -> deviceLinkUri<s>::
index 4e3514f753f3ec94fd819f1f4ab037f342f5dedb..7e25bfc489b676a2cc0bc7e49ae6d3e31daf4e1b 100644 (file)
@@ -22,6 +22,10 @@ public interface Signal extends DBusInterface {
 
     String getSelfNumber();
 
 
     String getSelfNumber();
 
+    void subscribeReceive();
+
+    void unsubscribeReceive();
+
     long sendMessage(
             String message, List<String> attachments, String recipient
     ) throws Error.AttachmentInvalid, Error.Failure, Error.InvalidNumber, Error.UntrustedIdentity;
     long sendMessage(
             String message, List<String> attachments, String recipient
     ) throws Error.AttachmentInvalid, Error.Failure, Error.InvalidNumber, Error.UntrustedIdentity;
index 03bb6fcb2e0b57c55624ee1816ae431597222221..01af3178b680cd6d726aa43de63861038fa21503 100644 (file)
@@ -39,6 +39,8 @@ import java.io.OutputStreamWriter;
 import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.List;
 import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
 
 import static net.sourceforge.argparse4j.DefaultSettings.VERSION_0_9_0_DEFAULT_SETTINGS;
 
 
 import static net.sourceforge.argparse4j.DefaultSettings.VERSION_0_9_0_DEFAULT_SETTINGS;
 
@@ -66,8 +68,11 @@ public class App {
         parser.addArgument("-u", "--username").help("Specify your phone number, that will be your identifier.");
 
         var mut = parser.addMutuallyExclusiveGroup();
         parser.addArgument("-u", "--username").help("Specify your phone number, that will be your identifier.");
 
         var mut = parser.addMutuallyExclusiveGroup();
-        mut.addArgument("--dbus").help("Make request via user dbus.").action(Arguments.storeTrue());
-        mut.addArgument("--dbus-system").help("Make request via system dbus.").action(Arguments.storeTrue());
+        mut.addArgument("--dbus").dest("global-dbus").help("Make request via user dbus.").action(Arguments.storeTrue());
+        mut.addArgument("--dbus-system")
+                .dest("global-dbus-system")
+                .help("Make request via system dbus.")
+                .action(Arguments.storeTrue());
 
         parser.addArgument("-o", "--output")
                 .help("Choose to output in plain text or JSON")
 
         parser.addArgument("-o", "--output")
                 .help("Choose to output in plain text or JSON")
@@ -119,8 +124,8 @@ public class App {
 
         var username = ns.getString("username");
 
 
         var username = ns.getString("username");
 
-        final var useDbus = Boolean.TRUE.equals(ns.getBoolean("dbus"));
-        final var useDbusSystem = Boolean.TRUE.equals(ns.getBoolean("dbus-system"));
+        final var useDbus = Boolean.TRUE.equals(ns.getBoolean("global-dbus"));
+        final var useDbusSystem = Boolean.TRUE.equals(ns.getBoolean("global-dbus-system"));
         if (useDbus || useDbusSystem) {
             // If username is null, it will connect to the default object path
             initDbusClient(command, username, useDbusSystem, outputWriter);
         if (useDbus || useDbusSystem) {
             // If username is null, it will connect to the default object path
             initDbusClient(command, username, useDbusSystem, outputWriter);
@@ -262,31 +267,72 @@ public class App {
             final TrustNewIdentity trustNewIdentity
     ) throws CommandException {
         final var managers = new ArrayList<Manager>();
             final TrustNewIdentity trustNewIdentity
     ) throws CommandException {
         final var managers = new ArrayList<Manager>();
-        for (String u : usernames) {
-            try {
-                managers.add(loadManager(u, dataPath, serviceEnvironment, trustNewIdentity));
-            } catch (CommandException e) {
-                logger.warn("Ignoring {}: {}", u, e.getMessage());
-            }
-        }
-
-        command.handleCommand(ns, managers, new SignalCreator() {
-            @Override
-            public ProvisioningManager getNewProvisioningManager() {
-                return ProvisioningManager.init(dataPath, serviceEnvironment, BaseConfig.USER_AGENT);
-            }
-
-            @Override
-            public RegistrationManager getNewRegistrationManager(String username) throws IOException {
-                return RegistrationManager.init(username, dataPath, serviceEnvironment, BaseConfig.USER_AGENT);
+        try {
+            for (String u : usernames) {
+                try {
+                    managers.add(loadManager(u, dataPath, serviceEnvironment, trustNewIdentity));
+                } catch (CommandException e) {
+                    logger.warn("Ignoring {}: {}", u, e.getMessage());
+                }
             }
             }
-        }, outputWriter);
 
 
-        for (var m : managers) {
-            try {
-                m.close();
-            } catch (IOException e) {
-                logger.warn("Cleanup failed", e);
+            command.handleCommand(ns, new SignalCreator() {
+                private List<Consumer<Manager>> onManagerAddedHandlers = new ArrayList<>();
+
+                @Override
+                public List<String> getAccountNumbers() {
+                    synchronized (managers) {
+                        return managers.stream().map(Manager::getSelfNumber).collect(Collectors.toList());
+                    }
+                }
+
+                @Override
+                public void addManager(final Manager m) {
+                    synchronized (managers) {
+                        if (!managers.contains(m)) {
+                            managers.add(m);
+                            for (final var handler : onManagerAddedHandlers) {
+                                handler.accept(m);
+                            }
+                        }
+                    }
+                }
+
+                @Override
+                public void addOnManagerAddedHandler(final Consumer<Manager> handler) {
+                    onManagerAddedHandlers.add(handler);
+                }
+
+                @Override
+                public Manager getManager(final String phoneNumber) {
+                    synchronized (managers) {
+                        return managers.stream()
+                                .filter(m -> m.getSelfNumber().equals(phoneNumber))
+                                .findFirst()
+                                .orElse(null);
+                    }
+                }
+
+                @Override
+                public ProvisioningManager getNewProvisioningManager() {
+                    return ProvisioningManager.init(dataPath, serviceEnvironment, BaseConfig.USER_AGENT);
+                }
+
+                @Override
+                public RegistrationManager getNewRegistrationManager(String username) throws IOException {
+                    return RegistrationManager.init(username, dataPath, serviceEnvironment, BaseConfig.USER_AGENT);
+                }
+            }, outputWriter);
+        } finally {
+            synchronized (managers) {
+                for (var m : managers) {
+                    try {
+                        m.close();
+                    } catch (IOException e) {
+                        logger.warn("Cleanup failed", e);
+                    }
+                }
+                managers.clear();
             }
         }
     }
             }
         }
     }
index 1135e89a879eba581f707be295ebd9db4e271ea0..41edfd3f95cfcf369d89a8748c36afcf87337885 100644 (file)
@@ -13,7 +13,7 @@ public class JsonReceiveMessageHandler implements Manager.ReceiveMessageHandler
 
     private final static Logger logger = LoggerFactory.getLogger(JsonReceiveMessageHandler.class);
 
 
     private final static Logger logger = LoggerFactory.getLogger(JsonReceiveMessageHandler.class);
 
-    protected final Manager m;
+    private final Manager m;
     private final JsonWriter jsonWriter;
 
     public JsonReceiveMessageHandler(Manager m, JsonWriter jsonWriter) {
     private final JsonWriter jsonWriter;
 
     public JsonReceiveMessageHandler(Manager m, JsonWriter jsonWriter) {
@@ -24,6 +24,7 @@ public class JsonReceiveMessageHandler implements Manager.ReceiveMessageHandler
     @Override
     public void handleMessage(MessageEnvelope envelope, Throwable exception) {
         final var object = new HashMap<String, Object>();
     @Override
     public void handleMessage(MessageEnvelope envelope, Throwable exception) {
         final var object = new HashMap<String, Object>();
+        object.put("account", m.getSelfNumber());
         if (exception != null) {
             object.put("error", JsonError.from(exception));
         }
         if (exception != null) {
             object.put("error", JsonError.from(exception));
         }
index ee9368f87fb521096ed0004c2bc8aaa0b83b9f8d..51a5c947de4dbb654217bc6ac99e645a5d2feef8 100644 (file)
@@ -5,25 +5,38 @@ import net.sourceforge.argparse4j.inf.Namespace;
 import net.sourceforge.argparse4j.inf.Subparser;
 
 import org.asamk.signal.DbusConfig;
 import net.sourceforge.argparse4j.inf.Subparser;
 
 import org.asamk.signal.DbusConfig;
-import org.asamk.signal.DbusReceiveMessageHandler;
 import org.asamk.signal.JsonReceiveMessageHandler;
 import org.asamk.signal.JsonWriter;
 import org.asamk.signal.JsonReceiveMessageHandler;
 import org.asamk.signal.JsonWriter;
+import org.asamk.signal.JsonWriterImpl;
 import org.asamk.signal.OutputType;
 import org.asamk.signal.OutputWriter;
 import org.asamk.signal.PlainTextWriter;
 import org.asamk.signal.ReceiveMessageHandler;
 import org.asamk.signal.commands.exceptions.CommandException;
 import org.asamk.signal.OutputType;
 import org.asamk.signal.OutputWriter;
 import org.asamk.signal.PlainTextWriter;
 import org.asamk.signal.ReceiveMessageHandler;
 import org.asamk.signal.commands.exceptions.CommandException;
+import org.asamk.signal.commands.exceptions.IOErrorException;
 import org.asamk.signal.commands.exceptions.UnexpectedErrorException;
 import org.asamk.signal.dbus.DbusSignalControlImpl;
 import org.asamk.signal.dbus.DbusSignalImpl;
 import org.asamk.signal.commands.exceptions.UnexpectedErrorException;
 import org.asamk.signal.dbus.DbusSignalControlImpl;
 import org.asamk.signal.dbus.DbusSignalImpl;
+import org.asamk.signal.jsonrpc.SignalJsonRpcDispatcherHandler;
 import org.asamk.signal.manager.Manager;
 import org.asamk.signal.manager.Manager;
+import org.asamk.signal.util.IOUtils;
 import org.freedesktop.dbus.connections.impl.DBusConnection;
 import org.freedesktop.dbus.exceptions.DBusException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.freedesktop.dbus.connections.impl.DBusConnection;
 import org.freedesktop.dbus.exceptions.DBusException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.File;
 import java.io.IOException;
 import java.io.IOException;
+import java.net.UnixDomainSocketAddress;
+import java.nio.channels.Channel;
+import java.nio.channels.Channels;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+import java.nio.charset.StandardCharsets;
 import java.util.List;
 import java.util.List;
+import java.util.Objects;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
 
 public class DaemonCommand implements MultiLocalCommand {
 
 
 public class DaemonCommand implements MultiLocalCommand {
 
@@ -36,10 +49,30 @@ public class DaemonCommand implements MultiLocalCommand {
 
     @Override
     public void attachToSubparser(final Subparser subparser) {
 
     @Override
     public void attachToSubparser(final Subparser subparser) {
-        subparser.help("Run in daemon mode and provide an experimental dbus interface.");
-        subparser.addArgument("--system")
+        final var defaultSocketPath = new File(new File(IOUtils.getRuntimeDir(), "signal-cli"), "socket");
+        subparser.help("Run in daemon mode and provide an experimental dbus or JSON-RPC interface.");
+        subparser.addArgument("--dbus")
                 .action(Arguments.storeTrue())
                 .action(Arguments.storeTrue())
-                .help("Use DBus system bus instead of user bus.");
+                .help("Expose a DBus interface on the user bus (the default, if no other options are given).");
+        subparser.addArgument("--dbus-system", "--system")
+                .action(Arguments.storeTrue())
+                .help("Expose a DBus interface on the system bus.");
+        subparser.addArgument("--socket")
+                .nargs("?")
+                .type(File.class)
+                .setConst(defaultSocketPath)
+                .help("Expose a JSON-RPC interface on a UNIX socket (default $XDG_RUNTIME_DIR/signal-cli/socket).");
+        subparser.addArgument("--tcp")
+                .nargs("?")
+                .setConst("localhost:7583")
+                .help("Expose a JSON-RPC interface on a TCP socket (default localhost:7583).");
+        subparser.addArgument("--no-receive-stdout")
+                .help("Don’t print received messages to stdout.")
+                .action(Arguments.storeTrue());
+        subparser.addArgument("--receive-mode")
+                .help("Specify when to start receiving messages.")
+                .type(Arguments.enumStringType(ReceiveMode.class))
+                .setDefault(ReceiveMode.ON_START);
         subparser.addArgument("--ignore-attachments")
                 .help("Don’t download attachments of received messages.")
                 .action(Arguments.storeTrue());
         subparser.addArgument("--ignore-attachments")
                 .help("Don’t download attachments of received messages.")
                 .action(Arguments.storeTrue());
@@ -54,93 +87,277 @@ public class DaemonCommand implements MultiLocalCommand {
     public void handleCommand(
             final Namespace ns, final Manager m, final OutputWriter outputWriter
     ) throws CommandException {
     public void handleCommand(
             final Namespace ns, final Manager m, final OutputWriter outputWriter
     ) throws CommandException {
-        boolean ignoreAttachments = Boolean.TRUE.equals(ns.getBoolean("ignore-attachments"));
+        logger.info("Starting daemon in single-account mode for " + m.getSelfNumber());
+        final var noReceiveStdOut = Boolean.TRUE.equals(ns.getBoolean("no-receive-stdout"));
+        final var receiveMode = ns.<ReceiveMode>get("receive-mode");
+        final var ignoreAttachments = Boolean.TRUE.equals(ns.getBoolean("ignore-attachments"));
+
         m.setIgnoreAttachments(ignoreAttachments);
         m.setIgnoreAttachments(ignoreAttachments);
+        addDefaultReceiveHandler(m, noReceiveStdOut ? null : outputWriter, receiveMode != ReceiveMode.ON_START);
 
 
-        DBusConnection.DBusBusType busType;
-        if (Boolean.TRUE.equals(ns.getBoolean("system"))) {
-            busType = DBusConnection.DBusBusType.SYSTEM;
-        } else {
-            busType = DBusConnection.DBusBusType.SESSION;
+        final Channel inheritedChannel;
+        try {
+            inheritedChannel = System.inheritedChannel();
+            if (inheritedChannel instanceof ServerSocketChannel serverChannel) {
+                logger.info("Using inherited socket: " + serverChannel.getLocalAddress());
+                runSocketSingleAccount(m, serverChannel, receiveMode == ReceiveMode.MANUAL);
+            }
+        } catch (IOException e) {
+            throw new IOErrorException("Failed to use inherited socket", e);
+        }
+        final var socketFile = ns.<File>get("socket");
+        if (socketFile != null) {
+            final var address = UnixDomainSocketAddress.of(socketFile.toPath());
+            final var serverChannel = IOUtils.bindSocket(address);
+            runSocketSingleAccount(m, serverChannel, receiveMode == ReceiveMode.MANUAL);
+        }
+        final var tcpAddress = ns.getString("tcp");
+        if (tcpAddress != null) {
+            final var address = IOUtils.parseInetSocketAddress(tcpAddress);
+            final var serverChannel = IOUtils.bindSocket(address);
+            runSocketSingleAccount(m, serverChannel, receiveMode == ReceiveMode.MANUAL);
+        }
+        final var isDbusSystem = Boolean.TRUE.equals(ns.getBoolean("dbus-system"));
+        if (isDbusSystem) {
+            runDbusSingleAccount(m, true, receiveMode != ReceiveMode.ON_START);
+        }
+        final var isDbusSession = Boolean.TRUE.equals(ns.getBoolean("dbus"));
+        if (isDbusSession || (
+                !isDbusSystem
+                        && socketFile == null
+                        && tcpAddress == null
+                        && !(inheritedChannel instanceof ServerSocketChannel)
+        )) {
+            runDbusSingleAccount(m, false, receiveMode != ReceiveMode.ON_START);
         }
 
         }
 
-        try (var conn = DBusConnection.getConnection(busType)) {
-            var objectPath = DbusConfig.getObjectPath();
-            var t = run(conn, objectPath, m, outputWriter);
-
-            conn.requestBusName(DbusConfig.getBusname());
-            logger.info("DBus daemon running in single-user mode for " + m.getSelfNumber());
-
+        synchronized (this) {
             try {
             try {
-                t.join();
-                synchronized (this) {
-                    wait();
-                }
+                wait();
             } catch (InterruptedException ignored) {
             }
             } catch (InterruptedException ignored) {
             }
-        } catch (DBusException | IOException e) {
-            logger.error("Dbus command failed", e);
-            throw new UnexpectedErrorException("Dbus command failed", e);
         }
     }
 
     @Override
     public void handleCommand(
         }
     }
 
     @Override
     public void handleCommand(
-            final Namespace ns, final List<Manager> managers, final SignalCreator c, final OutputWriter outputWriter
+            final Namespace ns, final SignalCreator c, final OutputWriter outputWriter
     ) throws CommandException {
     ) throws CommandException {
-        boolean ignoreAttachments = Boolean.TRUE.equals(ns.getBoolean("ignore-attachments"));
+        logger.info("Starting daemon in multi-account mode");
+        final var noReceiveStdOut = Boolean.TRUE.equals(ns.getBoolean("no-receive-stdout"));
+        final var receiveMode = ns.<ReceiveMode>get("receive-mode");
+        final var ignoreAttachments = Boolean.TRUE.equals(ns.getBoolean("ignore-attachments"));
 
 
-        DBusConnection.DBusBusType busType;
-        if (Boolean.TRUE.equals(ns.getBoolean("system"))) {
-            busType = DBusConnection.DBusBusType.SYSTEM;
-        } else {
-            busType = DBusConnection.DBusBusType.SESSION;
+        c.getAccountNumbers().stream().map(c::getManager).filter(Objects::nonNull).forEach(m -> {
+            m.setIgnoreAttachments(ignoreAttachments);
+            addDefaultReceiveHandler(m, noReceiveStdOut ? null : outputWriter, receiveMode != ReceiveMode.ON_START);
+        });
+        c.addOnManagerAddedHandler(m -> {
+            m.setIgnoreAttachments(ignoreAttachments);
+            addDefaultReceiveHandler(m, noReceiveStdOut ? null : outputWriter, receiveMode != ReceiveMode.ON_START);
+        });
+
+        final Channel inheritedChannel;
+        try {
+            inheritedChannel = System.inheritedChannel();
+            if (inheritedChannel instanceof ServerSocketChannel serverChannel) {
+                logger.info("Using inherited socket: " + serverChannel.getLocalAddress());
+                runSocketMultiAccount(c, serverChannel, receiveMode == ReceiveMode.MANUAL);
+            }
+        } catch (IOException e) {
+            throw new IOErrorException("Failed to use inherited socket", e);
+        }
+        final var socketFile = ns.<File>get("socket");
+        if (socketFile != null) {
+            final var address = UnixDomainSocketAddress.of(socketFile.toPath());
+            final var serverChannel = IOUtils.bindSocket(address);
+            runSocketMultiAccount(c, serverChannel, receiveMode == ReceiveMode.MANUAL);
+        }
+        final var tcpAddress = ns.getString("tcp");
+        if (tcpAddress != null) {
+            final var address = IOUtils.parseInetSocketAddress(tcpAddress);
+            final var serverChannel = IOUtils.bindSocket(address);
+            runSocketMultiAccount(c, serverChannel, receiveMode == ReceiveMode.MANUAL);
+        }
+        final var isDbusSystem = Boolean.TRUE.equals(ns.getBoolean("dbus-system"));
+        if (isDbusSystem) {
+            runDbusMultiAccount(c, receiveMode != ReceiveMode.ON_START, true);
         }
         }
+        final var isDbusSession = Boolean.TRUE.equals(ns.getBoolean("dbus"));
+        if (isDbusSession || (
+                !isDbusSystem
+                        && socketFile == null
+                        && tcpAddress == null
+                        && !(inheritedChannel instanceof ServerSocketChannel)
+        )) {
+            runDbusMultiAccount(c, receiveMode != ReceiveMode.ON_START, false);
+        }
+
+        synchronized (this) {
+            try {
+                wait();
+            } catch (InterruptedException ignored) {
+            }
+        }
+    }
 
 
-        try (var conn = DBusConnection.getConnection(busType)) {
-            final var signalControl = new DbusSignalControlImpl(c, m -> {
-                m.setIgnoreAttachments(ignoreAttachments);
+    private void addDefaultReceiveHandler(Manager m, OutputWriter outputWriter, final boolean isWeakListener) {
+        final var handler = outputWriter instanceof JsonWriter o
+                ? new JsonReceiveMessageHandler(m, o)
+                : outputWriter instanceof PlainTextWriter o
+                        ? new ReceiveMessageHandler(m, o)
+                        : Manager.ReceiveMessageHandler.EMPTY;
+        m.addReceiveHandler(handler, isWeakListener);
+    }
+
+    private void runSocketSingleAccount(
+            final Manager m, final ServerSocketChannel serverChannel, final boolean noReceiveOnStart
+    ) {
+        runSocket(serverChannel, channel -> {
+            final var handler = getSignalJsonRpcDispatcherHandler(channel, noReceiveOnStart);
+            handler.handleConnection(m);
+        });
+    }
+
+    private void runSocketMultiAccount(
+            final SignalCreator c, final ServerSocketChannel serverChannel, final boolean noReceiveOnStart
+    ) {
+        runSocket(serverChannel, channel -> {
+            final var handler = getSignalJsonRpcDispatcherHandler(channel, noReceiveOnStart);
+            handler.handleConnection(c);
+        });
+    }
+
+    private void runSocket(final ServerSocketChannel serverChannel, Consumer<SocketChannel> socketHandler) {
+        final var mainThread = Thread.currentThread();
+        new Thread(() -> {
+            while (true) {
+                final SocketChannel channel;
+                final String clientString;
                 try {
                 try {
-                    final var objectPath = DbusConfig.getObjectPath(m.getSelfNumber());
-                    return run(conn, objectPath, m, outputWriter);
-                } catch (DBusException e) {
-                    logger.error("Failed to export object", e);
-                    return null;
+                    channel = serverChannel.accept();
+                    clientString = channel.getRemoteAddress() + " " + IOUtils.getUnixDomainPrincipal(channel);
+                    logger.info("Accepted new client: " + clientString);
+                } catch (IOException e) {
+                    logger.error("Failed to accept new socket connection", e);
+                    mainThread.notifyAll();
+                    break;
+                }
+                new Thread(() -> {
+                    try (final var c = channel) {
+                        socketHandler.accept(c);
+                        logger.info("Connection closed: " + clientString);
+                    } catch (IOException e) {
+                        logger.warn("Failed to close channel", e);
+                    }
+                }).start();
+            }
+        }).start();
+    }
+
+    private SignalJsonRpcDispatcherHandler getSignalJsonRpcDispatcherHandler(
+            final SocketChannel c, final boolean noReceiveOnStart
+    ) {
+        final var lineSupplier = IOUtils.getLineSupplier(Channels.newReader(c, StandardCharsets.UTF_8));
+        final var jsonOutputWriter = new JsonWriterImpl(Channels.newWriter(c, StandardCharsets.UTF_8));
+
+        return new SignalJsonRpcDispatcherHandler(jsonOutputWriter, lineSupplier, noReceiveOnStart);
+    }
+
+    private void runDbusSingleAccount(
+            final Manager m, final boolean isDbusSystem, final boolean noReceiveOnStart
+    ) throws UnexpectedErrorException {
+        runDbus(isDbusSystem, (conn, objectPath) -> {
+            try {
+                exportDbusObject(conn, objectPath, m, noReceiveOnStart).join();
+            } catch (InterruptedException ignored) {
+            }
+        });
+    }
+
+    private void runDbusMultiAccount(
+            final SignalCreator c, final boolean noReceiveOnStart, final boolean isDbusSystem
+    ) throws UnexpectedErrorException {
+        runDbus(isDbusSystem, (connection, objectPath) -> {
+            final var signalControl = new DbusSignalControlImpl(c, objectPath);
+            connection.exportObject(signalControl);
+
+            c.addOnManagerAddedHandler(m -> {
+                final var thread = exportMultiAccountManager(connection, m, noReceiveOnStart);
+                if (thread != null) {
+                    try {
+                        thread.join();
+                    } catch (InterruptedException ignored) {
+                    }
                 }
                 }
-            }, DbusConfig.getObjectPath());
-            conn.exportObject(signalControl);
+            });
+
+            final var initThreads = c.getAccountNumbers()
+                    .stream()
+                    .map(c::getManager)
+                    .filter(Objects::nonNull)
+                    .map(m -> exportMultiAccountManager(connection, m, noReceiveOnStart))
+                    .filter(Objects::nonNull)
+                    .collect(Collectors.toList());
 
 
-            for (var m : managers) {
-                signalControl.addManager(m);
+            for (var t : initThreads) {
+                try {
+                    t.join();
+                } catch (InterruptedException ignored) {
+                }
             }
             }
+        });
+    }
+
+    private void runDbus(
+            final boolean isDbusSystem, DbusRunner dbusRunner
+    ) throws UnexpectedErrorException {
+        DBusConnection.DBusBusType busType;
+        if (isDbusSystem) {
+            busType = DBusConnection.DBusBusType.SYSTEM;
+        } else {
+            busType = DBusConnection.DBusBusType.SESSION;
+        }
+        try {
+            var conn = DBusConnection.getConnection(busType);
+            dbusRunner.run(conn, DbusConfig.getObjectPath());
 
             conn.requestBusName(DbusConfig.getBusname());
 
             conn.requestBusName(DbusConfig.getBusname());
-            logger.info("DBus daemon running in mulit-account mode");
 
 
-            signalControl.run();
-        } catch (DBusException | IOException e) {
+            logger.info("DBus daemon running on {} bus: {}", busType, DbusConfig.getBusname());
+        } catch (DBusException e) {
             logger.error("Dbus command failed", e);
             throw new UnexpectedErrorException("Dbus command failed", e);
         }
     }
 
             logger.error("Dbus command failed", e);
             throw new UnexpectedErrorException("Dbus command failed", e);
         }
     }
 
-    private Thread run(
-            DBusConnection conn, String objectPath, Manager m, OutputWriter outputWriter
+    private Thread exportMultiAccountManager(
+            final DBusConnection conn, final Manager m, final boolean noReceiveOnStart
+    ) {
+        try {
+            final var objectPath = DbusConfig.getObjectPath(m.getSelfNumber());
+            return exportDbusObject(conn, objectPath, m, noReceiveOnStart);
+        } catch (DBusException e) {
+            logger.error("Failed to export object", e);
+            return null;
+        }
+    }
+
+    private Thread exportDbusObject(
+            final DBusConnection conn, final String objectPath, final Manager m, final boolean noReceiveOnStart
     ) throws DBusException {
     ) throws DBusException {
-        final var signal = new DbusSignalImpl(m, conn, objectPath);
+        final var signal = new DbusSignalImpl(m, conn, objectPath, noReceiveOnStart);
         conn.exportObject(signal);
         final var initThread = new Thread(signal::initObjects);
         initThread.start();
 
         logger.debug("Exported dbus object: " + objectPath);
 
         conn.exportObject(signal);
         final var initThread = new Thread(signal::initObjects);
         initThread.start();
 
         logger.debug("Exported dbus object: " + objectPath);
 
-        final var handler = outputWriter instanceof JsonWriter ? new JsonReceiveMessageHandler(m,
-                (JsonWriter) outputWriter) : new ReceiveMessageHandler(m, (PlainTextWriter) outputWriter);
-        m.addReceiveHandler(handler);
+        return initThread;
+    }
 
 
-        final var dbusMessageHandler = new DbusReceiveMessageHandler(m, conn, objectPath);
-        m.addReceiveHandler(dbusMessageHandler);
+    interface DbusRunner {
 
 
-        return initThread;
+        void run(DBusConnection connection, String objectPath) throws DBusException;
     }
 }
     }
 }
index 4090d1281063cf3c43b406f579306962702a9c68..f5a4d011e6bc485ef3d4d3101dbf35ab220a2f97 100644 (file)
@@ -10,13 +10,11 @@ import org.asamk.signal.OutputWriter;
 import org.asamk.signal.commands.exceptions.CommandException;
 import org.asamk.signal.jsonrpc.SignalJsonRpcDispatcherHandler;
 import org.asamk.signal.manager.Manager;
 import org.asamk.signal.commands.exceptions.CommandException;
 import org.asamk.signal.jsonrpc.SignalJsonRpcDispatcherHandler;
 import org.asamk.signal.manager.Manager;
+import org.asamk.signal.util.IOUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.BufferedReader;
-import java.io.IOException;
 import java.io.InputStreamReader;
 import java.io.InputStreamReader;
-import java.io.Reader;
 import java.util.List;
 import java.util.function.Supplier;
 
 import java.util.List;
 import java.util.function.Supplier;
 
@@ -50,21 +48,9 @@ public class JsonRpcDispatcherCommand implements LocalCommand {
         m.setIgnoreAttachments(ignoreAttachments);
 
         final var jsonOutputWriter = (JsonWriter) outputWriter;
         m.setIgnoreAttachments(ignoreAttachments);
 
         final var jsonOutputWriter = (JsonWriter) outputWriter;
-        final Supplier<String> lineSupplier = getLineSupplier(new InputStreamReader(System.in));
+        final Supplier<String> lineSupplier = IOUtils.getLineSupplier(new InputStreamReader(System.in));
 
 
-        final var handler = new SignalJsonRpcDispatcherHandler(m, jsonOutputWriter, lineSupplier);
-        handler.handleConnection();
-    }
-
-    private Supplier<String> getLineSupplier(final Reader reader) {
-        final var bufferedReader = new BufferedReader(reader);
-        return () -> {
-            try {
-                return bufferedReader.readLine();
-            } catch (IOException e) {
-                logger.error("Error occurred while reading line", e);
-                return null;
-            }
-        };
+        final var handler = new SignalJsonRpcDispatcherHandler(jsonOutputWriter, lineSupplier, false);
+        handler.handleConnection(m);
     }
 }
     }
 }
index 1c01a6aec994d9678db214b71c56a408c068ad13..f333d3a593e98c81c77d1d17c54e6d30bcb65038 100644 (file)
@@ -4,20 +4,10 @@ import net.sourceforge.argparse4j.inf.Namespace;
 
 import org.asamk.signal.OutputWriter;
 import org.asamk.signal.commands.exceptions.CommandException;
 
 import org.asamk.signal.OutputWriter;
 import org.asamk.signal.commands.exceptions.CommandException;
-import org.asamk.signal.manager.Manager;
-
-import java.util.List;
 
 public interface MultiLocalCommand extends LocalCommand {
 
     void handleCommand(
 
 public interface MultiLocalCommand extends LocalCommand {
 
     void handleCommand(
-            Namespace ns, List<Manager> m, SignalCreator c, OutputWriter outputWriter
+            Namespace ns, SignalCreator c, OutputWriter outputWriter
     ) throws CommandException;
     ) throws CommandException;
-
-    @Override
-    default void handleCommand(
-            final Namespace ns, final Manager m, final OutputWriter outputWriter
-    ) throws CommandException {
-        handleCommand(ns, List.of(m), null, outputWriter);
-    }
 }
 }
diff --git a/src/main/java/org/asamk/signal/commands/ReceiveMode.java b/src/main/java/org/asamk/signal/commands/ReceiveMode.java
new file mode 100644 (file)
index 0000000..bbe1431
--- /dev/null
@@ -0,0 +1,22 @@
+package org.asamk.signal.commands;
+
+enum ReceiveMode {
+    ON_START {
+        @Override
+        public String toString() {
+            return "on-start";
+        }
+    },
+    ON_CONNECTION {
+        @Override
+        public String toString() {
+            return "on-connection";
+        }
+    },
+    MANUAL {
+        @Override
+        public String toString() {
+            return "manual";
+        }
+    },
+}
index 675d7f2ada9847367e092a71016cbb99457a10f1..46bbbfc0672ca45d075ba4b41e4aa67fd7b1caf5 100644 (file)
@@ -1,12 +1,23 @@
 package org.asamk.signal.commands;
 
 package org.asamk.signal.commands;
 
+import org.asamk.signal.manager.Manager;
 import org.asamk.signal.manager.ProvisioningManager;
 import org.asamk.signal.manager.RegistrationManager;
 
 import java.io.IOException;
 import org.asamk.signal.manager.ProvisioningManager;
 import org.asamk.signal.manager.RegistrationManager;
 
 import java.io.IOException;
+import java.util.List;
+import java.util.function.Consumer;
 
 public interface SignalCreator {
 
 
 public interface SignalCreator {
 
+    List<String> getAccountNumbers();
+
+    void addManager(Manager m);
+
+    void addOnManagerAddedHandler(Consumer<Manager> handler);
+
+    Manager getManager(String phoneNumber);
+
     ProvisioningManager getNewProvisioningManager();
 
     RegistrationManager getNewRegistrationManager(String username) throws IOException;
     ProvisioningManager getNewProvisioningManager();
 
     RegistrationManager getNewRegistrationManager(String username) throws IOException;
index c8dd1d45b3b85cddd06c479c171e6dd10fee648e..cc346f7af7ff7ab4b8fd434dfb7c1297b0c0eeb4 100644 (file)
@@ -55,6 +55,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import java.util.function.Function;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 /**
  * This class implements the Manager interface using the DBus Signal interface, where possible.
 
 /**
  * This class implements the Manager interface using the DBus Signal interface, where possible.
@@ -65,6 +66,7 @@ public class DbusManagerImpl implements Manager {
     private final Signal signal;
     private final DBusConnection connection;
 
     private final Signal signal;
     private final DBusConnection connection;
 
+    private final Set<ReceiveMessageHandler> weakHandlers = new HashSet<>();
     private final Set<ReceiveMessageHandler> messageHandlers = new HashSet<>();
     private DBusSigHandler<Signal.MessageReceivedV2> dbusMsgHandler;
     private DBusSigHandler<Signal.ReceiptReceivedV2> dbusRcptHandler;
     private final Set<ReceiveMessageHandler> messageHandlers = new HashSet<>();
     private DBusSigHandler<Signal.MessageReceivedV2> dbusMsgHandler;
     private DBusSigHandler<Signal.ReceiptReceivedV2> dbusRcptHandler;
@@ -424,18 +426,23 @@ public class DbusManagerImpl implements Manager {
     }
 
     @Override
     }
 
     @Override
-    public void addReceiveHandler(final ReceiveMessageHandler handler) {
+    public void addReceiveHandler(final ReceiveMessageHandler handler, final boolean isWeakListener) {
         synchronized (messageHandlers) {
         synchronized (messageHandlers) {
-            if (messageHandlers.size() == 0) {
-                installMessageHandlers();
+            if (isWeakListener) {
+                weakHandlers.add(handler);
+            } else {
+                if (messageHandlers.size() == 0) {
+                    installMessageHandlers();
+                }
+                messageHandlers.add(handler);
             }
             }
-            messageHandlers.add(handler);
         }
     }
 
     @Override
     public void removeReceiveHandler(final ReceiveMessageHandler handler) {
         synchronized (messageHandlers) {
         }
     }
 
     @Override
     public void removeReceiveHandler(final ReceiveMessageHandler handler) {
         synchronized (messageHandlers) {
+            weakHandlers.remove(handler);
             messageHandlers.remove(handler);
             if (messageHandlers.size() == 0) {
                 uninstallMessageHandlers();
             messageHandlers.remove(handler);
             if (messageHandlers.size() == 0) {
                 uninstallMessageHandlers();
@@ -582,8 +589,11 @@ public class DbusManagerImpl implements Manager {
             this.notify();
         }
         synchronized (messageHandlers) {
             this.notify();
         }
         synchronized (messageHandlers) {
+            if (messageHandlers.size() > 0) {
+                uninstallMessageHandlers();
+            }
+            weakHandlers.clear();
             messageHandlers.clear();
             messageHandlers.clear();
-            uninstallMessageHandlers();
         }
     }
 
         }
     }
 
@@ -664,11 +674,7 @@ public class DbusManagerImpl implements Manager {
                                 List.of())),
                         Optional.empty(),
                         Optional.empty());
                                 List.of())),
                         Optional.empty(),
                         Optional.empty());
-                synchronized (messageHandlers) {
-                    for (final var messageHandler : messageHandlers) {
-                        messageHandler.handleMessage(envelope, null);
-                    }
-                }
+                notifyMessageHandlers(envelope);
             };
             connection.addSigHandler(Signal.MessageReceivedV2.class, signal, this.dbusMsgHandler);
 
             };
             connection.addSigHandler(Signal.MessageReceivedV2.class, signal, this.dbusMsgHandler);
 
@@ -693,11 +699,7 @@ public class DbusManagerImpl implements Manager {
                         Optional.empty(),
                         Optional.empty(),
                         Optional.empty());
                         Optional.empty(),
                         Optional.empty(),
                         Optional.empty());
-                synchronized (messageHandlers) {
-                    for (final var messageHandler : messageHandlers) {
-                        messageHandler.handleMessage(envelope, null);
-                    }
-                }
+                notifyMessageHandlers(envelope);
             };
             connection.addSigHandler(Signal.ReceiptReceivedV2.class, signal, this.dbusRcptHandler);
 
             };
             connection.addSigHandler(Signal.ReceiptReceivedV2.class, signal, this.dbusRcptHandler);
 
@@ -747,20 +749,26 @@ public class DbusManagerImpl implements Manager {
                                 Optional.empty(),
                                 Optional.empty())),
                         Optional.empty());
                                 Optional.empty(),
                                 Optional.empty())),
                         Optional.empty());
-                synchronized (messageHandlers) {
-                    for (final var messageHandler : messageHandlers) {
-                        messageHandler.handleMessage(envelope, null);
-                    }
-                }
+                notifyMessageHandlers(envelope);
             };
             connection.addSigHandler(Signal.SyncMessageReceivedV2.class, signal, this.dbusSyncHandler);
         } catch (DBusException e) {
             e.printStackTrace();
         }
             };
             connection.addSigHandler(Signal.SyncMessageReceivedV2.class, signal, this.dbusSyncHandler);
         } catch (DBusException e) {
             e.printStackTrace();
         }
+        signal.subscribeReceive();
+    }
+
+    private void notifyMessageHandlers(final MessageEnvelope envelope) {
+        synchronized (messageHandlers) {
+            Stream.concat(messageHandlers.stream(), weakHandlers.stream()).forEach(h -> {
+                h.handleMessage(envelope, null);
+            });
+        }
     }
 
     private void uninstallMessageHandlers() {
         try {
     }
 
     private void uninstallMessageHandlers() {
         try {
+            signal.unsubscribeReceive();
             connection.removeSigHandler(Signal.MessageReceivedV2.class, signal, this.dbusMsgHandler);
             connection.removeSigHandler(Signal.ReceiptReceivedV2.class, signal, this.dbusRcptHandler);
             connection.removeSigHandler(Signal.SyncMessageReceivedV2.class, signal, this.dbusSyncHandler);
             connection.removeSigHandler(Signal.MessageReceivedV2.class, signal, this.dbusMsgHandler);
             connection.removeSigHandler(Signal.ReceiptReceivedV2.class, signal, this.dbusRcptHandler);
             connection.removeSigHandler(Signal.SyncMessageReceivedV2.class, signal, this.dbusSyncHandler);
index e69bf059c4d75f4328dfe41ee570bfc360dc4583..e178ca1520fd5c468618efda6d37179ef72be74e 100644 (file)
@@ -10,74 +10,26 @@ import org.asamk.signal.manager.RegistrationManager;
 import org.asamk.signal.manager.UserAlreadyExists;
 import org.asamk.signal.manager.api.CaptchaRequiredException;
 import org.asamk.signal.manager.api.IncorrectPinException;
 import org.asamk.signal.manager.UserAlreadyExists;
 import org.asamk.signal.manager.api.CaptchaRequiredException;
 import org.asamk.signal.manager.api.IncorrectPinException;
-import org.asamk.signal.manager.api.Pair;
 import org.asamk.signal.manager.api.PinLockedException;
 import org.freedesktop.dbus.DBusPath;
 
 import java.io.IOException;
 import java.net.URI;
 import org.asamk.signal.manager.api.PinLockedException;
 import org.freedesktop.dbus.DBusPath;
 
 import java.io.IOException;
 import java.net.URI;
-import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.TimeoutException;
 import java.util.List;
 import java.util.concurrent.TimeoutException;
-import java.util.function.Function;
 import java.util.stream.Collectors;
 
 public class DbusSignalControlImpl implements org.asamk.SignalControl {
 
     private final SignalCreator c;
 import java.util.stream.Collectors;
 
 public class DbusSignalControlImpl implements org.asamk.SignalControl {
 
     private final SignalCreator c;
-    private final Function<Manager, Thread> newManagerRunner;
 
 
-    private final List<Pair<Manager, Thread>> receiveThreads = new ArrayList<>();
-    private final Object stopTrigger = new Object();
     private final String objectPath;
 
     private final String objectPath;
 
-    public DbusSignalControlImpl(
-            final SignalCreator c, final Function<Manager, Thread> newManagerRunner, final String objectPath
-    ) {
+    public DbusSignalControlImpl(final SignalCreator c, final String objectPath) {
         this.c = c;
         this.c = c;
-        this.newManagerRunner = newManagerRunner;
         this.objectPath = objectPath;
     }
 
         this.objectPath = objectPath;
     }
 
-    public void addManager(Manager m) {
-        var thread = newManagerRunner.apply(m);
-        if (thread == null) {
-            return;
-        }
-        synchronized (receiveThreads) {
-            receiveThreads.add(new Pair<>(m, thread));
-        }
-    }
-
-    public void run() {
-        synchronized (stopTrigger) {
-            try {
-                stopTrigger.wait();
-            } catch (InterruptedException ignored) {
-            }
-        }
-
-        synchronized (receiveThreads) {
-            for (var t : receiveThreads) {
-                t.second().interrupt();
-            }
-        }
-        while (true) {
-            final Thread thread;
-            synchronized (receiveThreads) {
-                if (receiveThreads.size() == 0) {
-                    break;
-                }
-                var pair = receiveThreads.remove(0);
-                thread = pair.second();
-            }
-            try {
-                thread.join();
-            } catch (InterruptedException ignored) {
-            }
-        }
-    }
-
     @Override
     public boolean isRemote() {
         return false;
     @Override
     public boolean isRemote() {
         return false;
@@ -124,7 +76,7 @@ public class DbusSignalControlImpl implements org.asamk.SignalControl {
     ) throws Error.Failure, Error.InvalidNumber {
         try (final RegistrationManager registrationManager = c.getNewRegistrationManager(number)) {
             final Manager manager = registrationManager.verifyAccount(verificationCode, pin);
     ) throws Error.Failure, Error.InvalidNumber {
         try (final RegistrationManager registrationManager = c.getNewRegistrationManager(number)) {
             final Manager manager = registrationManager.verifyAccount(verificationCode, pin);
-            addManager(manager);
+            c.addManager(manager);
         } catch (IOException | PinLockedException | IncorrectPinException e) {
             throw new SignalControl.Error.Failure(e.getClass().getSimpleName() + " " + e.getMessage());
         }
         } catch (IOException | PinLockedException | IncorrectPinException e) {
             throw new SignalControl.Error.Failure(e.getClass().getSimpleName() + " " + e.getMessage());
         }
@@ -138,7 +90,7 @@ public class DbusSignalControlImpl implements org.asamk.SignalControl {
             new Thread(() -> {
                 try {
                     final Manager manager = provisioningManager.finishDeviceLink(newDeviceName);
             new Thread(() -> {
                 try {
                     final Manager manager = provisioningManager.finishDeviceLink(newDeviceName);
-                    addManager(manager);
+                    c.addManager(manager);
                 } catch (IOException | TimeoutException | UserAlreadyExists e) {
                     e.printStackTrace();
                 }
                 } catch (IOException | TimeoutException | UserAlreadyExists e) {
                     e.printStackTrace();
                 }
@@ -156,12 +108,9 @@ public class DbusSignalControlImpl implements org.asamk.SignalControl {
 
     @Override
     public List<DBusPath> listAccounts() {
 
     @Override
     public List<DBusPath> listAccounts() {
-        synchronized (receiveThreads) {
-            return receiveThreads.stream()
-                    .map(Pair::first)
-                    .map(Manager::getSelfNumber)
-                    .map(u -> new DBusPath(DbusConfig.getObjectPath(u)))
-                    .collect(Collectors.toList());
-        }
+        return c.getAccountNumbers()
+                .stream()
+                .map(u -> new DBusPath(DbusConfig.getObjectPath(u)))
+                .collect(Collectors.toList());
     }
 }
     }
 }
index 9f6f1340766d4a51c8209d56eb12b04b32cc57f7..c9099c0b1d66406c01c5ee08ee571040a4268bda 100644 (file)
@@ -2,6 +2,7 @@ package org.asamk.signal.dbus;
 
 import org.asamk.Signal;
 import org.asamk.signal.BaseConfig;
 
 import org.asamk.Signal;
 import org.asamk.signal.BaseConfig;
+import org.asamk.signal.DbusReceiveMessageHandler;
 import org.asamk.signal.manager.AttachmentInvalidException;
 import org.asamk.signal.manager.Manager;
 import org.asamk.signal.manager.NotMasterDeviceException;
 import org.asamk.signal.manager.AttachmentInvalidException;
 import org.asamk.signal.manager.Manager;
 import org.asamk.signal.manager.NotMasterDeviceException;
@@ -60,26 +61,40 @@ public class DbusSignalImpl implements Signal {
     private final Manager m;
     private final DBusConnection connection;
     private final String objectPath;
     private final Manager m;
     private final DBusConnection connection;
     private final String objectPath;
+    private final boolean noReceiveOnStart;
 
     private DBusPath thisDevice;
     private final List<StructDevice> devices = new ArrayList<>();
     private final List<StructGroup> groups = new ArrayList<>();
 
     private DBusPath thisDevice;
     private final List<StructDevice> devices = new ArrayList<>();
     private final List<StructGroup> groups = new ArrayList<>();
+    private DbusReceiveMessageHandler dbusMessageHandler;
+    private int subscriberCount;
 
     private final static Logger logger = LoggerFactory.getLogger(DbusSignalImpl.class);
 
 
     private final static Logger logger = LoggerFactory.getLogger(DbusSignalImpl.class);
 
-    public DbusSignalImpl(final Manager m, DBusConnection connection, final String objectPath) {
+    public DbusSignalImpl(
+            final Manager m, DBusConnection connection, final String objectPath, final boolean noReceiveOnStart
+    ) {
         this.m = m;
         this.connection = connection;
         this.objectPath = objectPath;
         this.m = m;
         this.connection = connection;
         this.objectPath = objectPath;
+        this.noReceiveOnStart = noReceiveOnStart;
     }
 
     public void initObjects() {
     }
 
     public void initObjects() {
+        if (!noReceiveOnStart) {
+            subscribeReceive();
+        }
+
         updateDevices();
         updateGroups();
         updateConfiguration();
     }
 
     public void close() {
         updateDevices();
         updateGroups();
         updateConfiguration();
     }
 
     public void close() {
+        if (dbusMessageHandler != null) {
+            m.removeReceiveHandler(dbusMessageHandler);
+            dbusMessageHandler = null;
+        }
         unExportDevices();
         unExportGroups();
         unExportConfiguration();
         unExportDevices();
         unExportGroups();
         unExportConfiguration();
@@ -95,6 +110,24 @@ public class DbusSignalImpl implements Signal {
         return m.getSelfNumber();
     }
 
         return m.getSelfNumber();
     }
 
+    @Override
+    public void subscribeReceive() {
+        if (dbusMessageHandler == null) {
+            dbusMessageHandler = new DbusReceiveMessageHandler(m, connection, objectPath);
+            m.addReceiveHandler(dbusMessageHandler);
+        }
+        subscriberCount++;
+    }
+
+    @Override
+    public void unsubscribeReceive() {
+        subscriberCount = Math.max(0, subscriberCount - 1);
+        if (subscriberCount == 0 && dbusMessageHandler != null) {
+            m.removeReceiveHandler(dbusMessageHandler);
+            dbusMessageHandler = null;
+        }
+    }
+
     @Override
     public void submitRateLimitChallenge(String challenge, String captchaString) {
         final var captcha = captchaString == null ? null : captchaString.replace("signalcaptcha://", "");
     @Override
     public void submitRateLimitChallenge(String challenge, String captchaString) {
         final var captcha = captchaString == null ? null : captchaString.replace("signalcaptcha://", "");
index c2727d28410102948fe0cb82db6c52954a14e3d7..3bc7e701238811151441cc0f8f90cf546b8f999d 100644 (file)
@@ -34,9 +34,7 @@ public class JsonRpcReader {
         this.objectMapper = Util.createJsonObjectMapper();
     }
 
         this.objectMapper = Util.createJsonObjectMapper();
     }
 
-    public void readRequests(
-            final RequestHandler requestHandler, final Consumer<JsonRpcResponse> responseHandler
-    ) {
+    public void readMessages(final RequestHandler requestHandler, final Consumer<JsonRpcResponse> responseHandler) {
         while (!Thread.interrupted()) {
             JsonRpcMessage message = readMessage();
             if (message == null) break;
         while (!Thread.interrupted()) {
             JsonRpcMessage message = readMessage();
             if (message == null) break;
index 9085609ca733c40246d9abe7c35049d6964989af..c33cd9a5057e6fa796da55c20002d2aa37834607 100644 (file)
@@ -1,6 +1,7 @@
 package org.asamk.signal.jsonrpc;
 
 import com.fasterxml.jackson.core.TreeNode;
 package org.asamk.signal.jsonrpc;
 
 import com.fasterxml.jackson.core.TreeNode;
+import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.JsonMappingException;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.JsonMappingException;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
@@ -9,8 +10,10 @@ import com.fasterxml.jackson.databind.node.ContainerNode;
 import org.asamk.signal.JsonReceiveMessageHandler;
 import org.asamk.signal.JsonWriter;
 import org.asamk.signal.OutputWriter;
 import org.asamk.signal.JsonReceiveMessageHandler;
 import org.asamk.signal.JsonWriter;
 import org.asamk.signal.OutputWriter;
+import org.asamk.signal.commands.Command;
 import org.asamk.signal.commands.Commands;
 import org.asamk.signal.commands.JsonRpcCommand;
 import org.asamk.signal.commands.Commands;
 import org.asamk.signal.commands.JsonRpcCommand;
+import org.asamk.signal.commands.SignalCreator;
 import org.asamk.signal.commands.exceptions.CommandException;
 import org.asamk.signal.commands.exceptions.IOErrorException;
 import org.asamk.signal.commands.exceptions.UntrustedKeyErrorException;
 import org.asamk.signal.commands.exceptions.CommandException;
 import org.asamk.signal.commands.exceptions.IOErrorException;
 import org.asamk.signal.commands.exceptions.UntrustedKeyErrorException;
@@ -21,7 +24,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.Map;
+import java.util.Objects;
 import java.util.function.Supplier;
 
 public class SignalJsonRpcDispatcherHandler {
 import java.util.function.Supplier;
 
 public class SignalJsonRpcDispatcherHandler {
@@ -32,49 +37,143 @@ public class SignalJsonRpcDispatcherHandler {
     private static final int IO_ERROR = -3;
     private static final int UNTRUSTED_KEY_ERROR = -4;
 
     private static final int IO_ERROR = -3;
     private static final int UNTRUSTED_KEY_ERROR = -4;
 
-    private final Manager m;
-    private final JsonWriter outputWriter;
-    private final Supplier<String> lineSupplier;
+    private final ObjectMapper objectMapper;
+    private final JsonRpcSender jsonRpcSender;
+    private final JsonRpcReader jsonRpcReader;
+    private final boolean noReceiveOnStart;
+
+    private SignalCreator c;
+    private final Map<Manager, Manager.ReceiveMessageHandler> receiveHandlers = new HashMap<>();
+
+    private Manager m;
 
     public SignalJsonRpcDispatcherHandler(
 
     public SignalJsonRpcDispatcherHandler(
-            final Manager m, final JsonWriter outputWriter, final Supplier<String> lineSupplier
+            final JsonWriter outputWriter, final Supplier<String> lineSupplier, final boolean noReceiveOnStart
     ) {
     ) {
+        this.noReceiveOnStart = noReceiveOnStart;
+        this.objectMapper = Util.createJsonObjectMapper();
+        this.jsonRpcSender = new JsonRpcSender(outputWriter);
+        this.jsonRpcReader = new JsonRpcReader(jsonRpcSender, lineSupplier);
+    }
+
+    public void handleConnection(final SignalCreator c) {
+        this.c = c;
+
+        if (!noReceiveOnStart) {
+            c.getAccountNumbers().stream().map(c::getManager).filter(Objects::nonNull).forEach(this::subscribeReceive);
+        }
+
+        handleConnection();
+    }
+
+    public void handleConnection(final Manager m) {
         this.m = m;
         this.m = m;
-        this.outputWriter = outputWriter;
-        this.lineSupplier = lineSupplier;
+
+        if (!noReceiveOnStart) {
+            subscribeReceive(m);
+        }
+
+        handleConnection();
     }
 
     }
 
-    public void handleConnection() {
-        final var objectMapper = Util.createJsonObjectMapper();
-        final var jsonRpcSender = new JsonRpcSender(outputWriter);
+    private void subscribeReceive(final Manager m) {
+        if (receiveHandlers.containsKey(m)) {
+            return;
+        }
 
         final var receiveMessageHandler = new JsonReceiveMessageHandler(m,
                 s -> jsonRpcSender.sendRequest(JsonRpcRequest.forNotification("receive",
                         objectMapper.valueToTree(s),
                         null)));
 
         final var receiveMessageHandler = new JsonReceiveMessageHandler(m,
                 s -> jsonRpcSender.sendRequest(JsonRpcRequest.forNotification("receive",
                         objectMapper.valueToTree(s),
                         null)));
-        try {
-            m.addReceiveHandler(receiveMessageHandler);
-
-            // Maybe this should be handled inside the Manager
-            while (!m.hasCaughtUpWithOldMessages()) {
-                try {
-                    synchronized (m) {
-                        m.wait();
-                    }
-                } catch (InterruptedException ignored) {
+        m.addReceiveHandler(receiveMessageHandler);
+        receiveHandlers.put(m, receiveMessageHandler);
+
+        while (!m.hasCaughtUpWithOldMessages()) {
+            try {
+                synchronized (m) {
+                    m.wait();
                 }
                 }
+            } catch (InterruptedException ignored) {
             }
             }
+        }
+    }
+
+    void unsubscribeReceive(final Manager m) {
+        final var receiveMessageHandler = receiveHandlers.remove(m);
+        if (receiveMessageHandler != null) {
+            m.removeReceiveHandler(receiveMessageHandler);
+        }
+    }
 
 
-            final var jsonRpcReader = new JsonRpcReader(jsonRpcSender, lineSupplier);
-            jsonRpcReader.readRequests((method, params) -> handleRequest(m, objectMapper, method, params),
+    private void handleConnection() {
+        try {
+            jsonRpcReader.readMessages((method, params) -> handleRequest(objectMapper, method, params),
                     response -> logger.debug("Received unexpected response for id {}", response.getId()));
         } finally {
                     response -> logger.debug("Received unexpected response for id {}", response.getId()));
         } finally {
-            m.removeReceiveHandler(receiveMessageHandler);
+            receiveHandlers.forEach(Manager::removeReceiveHandler);
+            receiveHandlers.clear();
         }
     }
 
     private JsonNode handleRequest(
         }
     }
 
     private JsonNode handleRequest(
-            final Manager m, final ObjectMapper objectMapper, final String method, ContainerNode<?> params
+            final ObjectMapper objectMapper, final String method, ContainerNode<?> params
+    ) throws JsonRpcException {
+        var command = getCommand(method);
+        // TODO implement listAccounts, register, verify, link
+        if (command instanceof JsonRpcCommand<?> jsonRpcCommand) {
+            if (m != null) {
+                return runCommand(objectMapper, params, new CommandRunnerImpl<>(m, jsonRpcCommand));
+            }
+
+            if (params.has("account")) {
+                Manager manager = c.getManager(params.get("account").asText());
+                if (manager != null) {
+                    return runCommand(objectMapper, params, new CommandRunnerImpl<>(manager, jsonRpcCommand));
+                }
+            } else {
+                throw new JsonRpcException(new JsonRpcResponse.Error(JsonRpcResponse.Error.INVALID_PARAMS,
+                        "Method requires valid account parameter",
+                        null));
+            }
+        }
+
+        throw new JsonRpcException(new JsonRpcResponse.Error(JsonRpcResponse.Error.METHOD_NOT_FOUND,
+                "Method not implemented",
+                null));
+    }
+
+    private Command getCommand(final String method) {
+        if ("subscribeReceive".equals(method)) {
+            return new SubscribeReceiveCommand();
+        }
+        if ("unsubscribeReceive".equals(method)) {
+            return new UnsubscribeReceiveCommand();
+        }
+        return Commands.getCommand(method);
+    }
+
+    private record CommandRunnerImpl<T>(Manager m, JsonRpcCommand<T> command) implements CommandRunner<T> {
+
+        @Override
+        public void handleCommand(final T request, final OutputWriter outputWriter) throws CommandException {
+            command.handleCommand(request, m, outputWriter);
+        }
+
+        @Override
+        public TypeReference<T> getRequestType() {
+            return command.getRequestType();
+        }
+    }
+
+    interface CommandRunner<T> {
+
+        void handleCommand(T request, OutputWriter outputWriter) throws CommandException;
+
+        TypeReference<T> getRequestType();
+    }
+
+    private JsonNode runCommand(
+            final ObjectMapper objectMapper, final ContainerNode<?> params, final CommandRunner<?> command
     ) throws JsonRpcException {
         final Object[] result = {null};
         final JsonWriter commandOutputWriter = s -> {
     ) throws JsonRpcException {
         final Object[] result = {null};
         final JsonWriter commandOutputWriter = s -> {
@@ -85,15 +184,8 @@ public class SignalJsonRpcDispatcherHandler {
             result[0] = s;
         };
 
             result[0] = s;
         };
 
-        var command = Commands.getCommand(method);
-        if (!(command instanceof JsonRpcCommand)) {
-            throw new JsonRpcException(new JsonRpcResponse.Error(JsonRpcResponse.Error.METHOD_NOT_FOUND,
-                    "Method not implemented",
-                    null));
-        }
-
         try {
         try {
-            parseParamsAndRunCommand(m, objectMapper, params, commandOutputWriter, (JsonRpcCommand<?>) command);
+            parseParamsAndRunCommand(objectMapper, params, commandOutputWriter, command);
         } catch (JsonMappingException e) {
             throw new JsonRpcException(new JsonRpcResponse.Error(JsonRpcResponse.Error.INVALID_REQUEST,
                     e.getMessage(),
         } catch (JsonMappingException e) {
             throw new JsonRpcException(new JsonRpcResponse.Error(JsonRpcResponse.Error.INVALID_REQUEST,
                     e.getMessage(),
@@ -116,11 +208,10 @@ public class SignalJsonRpcDispatcherHandler {
     }
 
     private <T> void parseParamsAndRunCommand(
     }
 
     private <T> void parseParamsAndRunCommand(
-            final Manager m,
             final ObjectMapper objectMapper,
             final TreeNode params,
             final OutputWriter outputWriter,
             final ObjectMapper objectMapper,
             final TreeNode params,
             final OutputWriter outputWriter,
-            final JsonRpcCommand<T> command
+            final CommandRunner<T> command
     ) throws CommandException, JsonMappingException {
         T requestParams = null;
         final var requestType = command.getRequestType();
     ) throws CommandException, JsonMappingException {
         T requestParams = null;
         final var requestType = command.getRequestType();
@@ -133,6 +224,36 @@ public class SignalJsonRpcDispatcherHandler {
                 throw new AssertionError(e);
             }
         }
                 throw new AssertionError(e);
             }
         }
-        command.handleCommand(requestParams, m, outputWriter);
+        command.handleCommand(requestParams, outputWriter);
+    }
+
+    private class SubscribeReceiveCommand implements JsonRpcCommand<Void> {
+
+        @Override
+        public String getName() {
+            return "subscribeReceive";
+        }
+
+        @Override
+        public void handleCommand(
+                final Void request, final Manager m, final OutputWriter outputWriter
+        ) throws CommandException {
+            subscribeReceive(m);
+        }
+    }
+
+    private class UnsubscribeReceiveCommand implements JsonRpcCommand<Void> {
+
+        @Override
+        public String getName() {
+            return "unsubscribeReceive";
+        }
+
+        @Override
+        public void handleCommand(
+                final Void request, final Manager m, final OutputWriter outputWriter
+        ) throws CommandException {
+            unsubscribeReceive(m);
+        }
     }
 }
     }
 }
index 5505e51851b645bc17309cf6b9033a57cf5e3e58..b4c4c6dd883f1ea24cc81df9ed99283e62078971 100644 (file)
@@ -1,13 +1,41 @@
 package org.asamk.signal.util;
 
 package org.asamk.signal.util;
 
+import org.asamk.signal.commands.exceptions.IOErrorException;
+import org.asamk.signal.commands.exceptions.UserErrorException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
 import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.Reader;
 import java.io.StringWriter;
 import java.io.StringWriter;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.net.StandardProtocolFamily;
+import java.net.UnixDomainSocketAddress;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
 import java.nio.charset.Charset;
 import java.nio.charset.Charset;
+import java.nio.file.Files;
+import java.nio.file.attribute.PosixFilePermission;
+import java.nio.file.attribute.PosixFilePermissions;
+import java.util.EnumSet;
+import java.util.Set;
+import java.util.function.Supplier;
+
+import jdk.net.ExtendedSocketOptions;
+import jdk.net.UnixDomainPrincipal;
+
+import static java.nio.file.attribute.PosixFilePermission.OWNER_EXECUTE;
+import static java.nio.file.attribute.PosixFilePermission.OWNER_READ;
+import static java.nio.file.attribute.PosixFilePermission.OWNER_WRITE;
 
 public class IOUtils {
 
 
 public class IOUtils {
 
+    private final static Logger logger = LoggerFactory.getLogger(IOUtils.class);
+
     private IOUtils() {
     }
 
     private IOUtils() {
     }
 
@@ -21,12 +49,101 @@ public class IOUtils {
         return output.toString();
     }
 
         return output.toString();
     }
 
+    public static void createPrivateDirectories(File file) throws IOException {
+        if (file.exists()) {
+            return;
+        }
+
+        final var path = file.toPath();
+        try {
+            Set<PosixFilePermission> perms = EnumSet.of(OWNER_READ, OWNER_WRITE, OWNER_EXECUTE);
+            Files.createDirectories(path, PosixFilePermissions.asFileAttribute(perms));
+        } catch (UnsupportedOperationException e) {
+            Files.createDirectories(path);
+        }
+    }
+
     public static File getDataHomeDir() {
         var dataHome = System.getenv("XDG_DATA_HOME");
         if (dataHome != null) {
             return new File(dataHome);
         }
 
     public static File getDataHomeDir() {
         var dataHome = System.getenv("XDG_DATA_HOME");
         if (dataHome != null) {
             return new File(dataHome);
         }
 
+        logger.debug("XDG_DATA_HOME not set, falling back to home dir");
         return new File(new File(System.getProperty("user.home"), ".local"), "share");
     }
         return new File(new File(System.getProperty("user.home"), ".local"), "share");
     }
+
+    public static File getRuntimeDir() {
+        var runtimeDir = System.getenv("XDG_RUNTIME_DIR");
+        if (runtimeDir != null) {
+            return new File(runtimeDir);
+        }
+
+        logger.debug("XDG_RUNTIME_DIR not set, falling back to temp dir");
+        return new File(System.getProperty("java.io.tmpdir"));
+    }
+
+    public static Supplier<String> getLineSupplier(final Reader reader) {
+        final var bufferedReader = new BufferedReader(reader);
+        return () -> {
+            try {
+                return bufferedReader.readLine();
+            } catch (IOException e) {
+                logger.error("Error occurred while reading line", e);
+                return null;
+            }
+        };
+    }
+
+    public static InetSocketAddress parseInetSocketAddress(final String tcpAddress) throws UserErrorException {
+        final var colonIndex = tcpAddress.lastIndexOf(':');
+        if (colonIndex < 0) {
+            throw new UserErrorException("Invalid tcp bind address: " + tcpAddress);
+        }
+        final String host = tcpAddress.substring(0, colonIndex);
+        final int port;
+        try {
+            port = Integer.parseInt(tcpAddress.substring(colonIndex + 1));
+        } catch (NumberFormatException e) {
+            throw new UserErrorException("Invalid tcp bind address: " + tcpAddress, e);
+        }
+        return new InetSocketAddress(host, port);
+    }
+
+    public static UnixDomainPrincipal getUnixDomainPrincipal(final SocketChannel channel) throws IOException {
+        UnixDomainPrincipal principal = null;
+        try {
+            principal = channel.getOption(ExtendedSocketOptions.SO_PEERCRED);
+        } catch (UnsupportedOperationException ignored) {
+        }
+        return principal;
+    }
+
+    public static ServerSocketChannel bindSocket(final SocketAddress address) throws IOErrorException {
+        final ServerSocketChannel serverChannel;
+        try {
+            preBind(address);
+            serverChannel = address instanceof UnixDomainSocketAddress
+                    ? ServerSocketChannel.open(StandardProtocolFamily.UNIX)
+                    : ServerSocketChannel.open();
+            serverChannel.bind(address);
+            logger.info("Listening on socket: " + address);
+            postBind(address);
+        } catch (IOException e) {
+            throw new IOErrorException("Failed to bind socket: " + e.getMessage(), e);
+        }
+        return serverChannel;
+    }
+
+    private static void preBind(SocketAddress address) throws IOException {
+        if (address instanceof UnixDomainSocketAddress usa) {
+            createPrivateDirectories(usa.getPath().toFile().getParentFile());
+        }
+    }
+
+    private static void postBind(SocketAddress address) {
+        if (address instanceof UnixDomainSocketAddress usa) {
+            usa.getPath().toFile().deleteOnExit();
+        }
+    }
 }
 }