From 81a11dc9776672e3468ee9a8eed556889fb2e070 Mon Sep 17 00:00:00 2001 From: AsamK Date: Wed, 10 Nov 2021 10:30:57 +0100 Subject: [PATCH] Implement socket/tcp for daemon command --- data/signal-cli-socket.service | 20 ++ data/signal-cli-socket.socket | 8 + .../org/asamk/signal/manager/Manager.java | 9 +- .../org/asamk/signal/manager/ManagerImpl.java | 21 +- man/signal-cli-dbus.5.adoc | 2 +- src/main/java/org/asamk/Signal.java | 4 + src/main/java/org/asamk/signal/App.java | 100 ++++-- .../signal/JsonReceiveMessageHandler.java | 3 +- .../asamk/signal/commands/DaemonCommand.java | 327 +++++++++++++++--- .../commands/JsonRpcDispatcherCommand.java | 22 +- .../signal/commands/MultiLocalCommand.java | 12 +- .../asamk/signal/commands/ReceiveMode.java | 22 ++ .../asamk/signal/commands/SignalCreator.java | 11 + .../asamk/signal/dbus/DbusManagerImpl.java | 48 +-- .../signal/dbus/DbusSignalControlImpl.java | 65 +--- .../org/asamk/signal/dbus/DbusSignalImpl.java | 35 +- .../asamk/signal/jsonrpc/JsonRpcReader.java | 4 +- .../SignalJsonRpcDispatcherHandler.java | 189 ++++++++-- .../java/org/asamk/signal/util/IOUtils.java | 117 +++++++ 19 files changed, 782 insertions(+), 237 deletions(-) create mode 100644 data/signal-cli-socket.service create mode 100644 data/signal-cli-socket.socket create mode 100644 src/main/java/org/asamk/signal/commands/ReceiveMode.java diff --git a/data/signal-cli-socket.service b/data/signal-cli-socket.service new file mode 100644 index 00000000..a6a2cfbc --- /dev/null +++ b/data/signal-cli-socket.service @@ -0,0 +1,20 @@ +[Unit] +Description=Send secure messages to Signal clients +Wants=network-online.target +After=network-online.target +Requires=signal-cli-socket.socket + +[Service] +Type=simple +Environment="SIGNAL_CLI_OPTS=-Xms2m" +ExecStart=%dir%/bin/signal-cli --config /var/lib/signal-cli daemon +User=signal-cli +# JVM always exits with 143 in reaction to SIGTERM signal +SuccessExitStatus=143 +StandardInput=socket +StandardOutput=journal +StandardError=journal + +[Install] +Also=signal-cli-socket.socket +WantedBy=default.target diff --git a/data/signal-cli-socket.socket b/data/signal-cli-socket.socket new file mode 100644 index 00000000..e8583562 --- /dev/null +++ b/data/signal-cli-socket.socket @@ -0,0 +1,8 @@ +[Unit] +Description=Send secure messages to Signal clients + +[Socket] +ListenStream=%t/signal-cli/socket + +[Install] +WantedBy=sockets.target diff --git a/lib/src/main/java/org/asamk/signal/manager/Manager.java b/lib/src/main/java/org/asamk/signal/manager/Manager.java index be5238d1..49331b1f 100644 --- a/lib/src/main/java/org/asamk/signal/manager/Manager.java +++ b/lib/src/main/java/org/asamk/signal/manager/Manager.java @@ -198,7 +198,11 @@ public interface Manager extends Closeable { * Add a handler to receive new messages. * Will start receiving messages from server, if not already started. */ - void addReceiveHandler(ReceiveMessageHandler handler); + default void addReceiveHandler(ReceiveMessageHandler handler) { + addReceiveHandler(handler, false); + } + + void addReceiveHandler(ReceiveMessageHandler handler, final boolean isWeakListener); /** * Remove a handler to receive new messages. @@ -249,6 +253,9 @@ public interface Manager extends Closeable { interface ReceiveMessageHandler { + ReceiveMessageHandler EMPTY = (envelope, e) -> { + }; + void handleMessage(MessageEnvelope envelope, Throwable e); } } diff --git a/lib/src/main/java/org/asamk/signal/manager/ManagerImpl.java b/lib/src/main/java/org/asamk/signal/manager/ManagerImpl.java index 94a3f4be..d7245d56 100644 --- a/lib/src/main/java/org/asamk/signal/manager/ManagerImpl.java +++ b/lib/src/main/java/org/asamk/signal/manager/ManagerImpl.java @@ -108,6 +108,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.asamk.signal.manager.config.ServiceConfig.capabilities; @@ -139,6 +140,7 @@ public class ManagerImpl implements Manager { private boolean ignoreAttachments = false; private Thread receiveThread; + private final Set weakHandlers = new HashSet<>(); private final Set messageHandlers = new HashSet<>(); private boolean isReceivingSynchronous; @@ -904,14 +906,17 @@ public class ManagerImpl implements Manager { } @Override - public void addReceiveHandler(final ReceiveMessageHandler handler) { + public void addReceiveHandler(final ReceiveMessageHandler handler, final boolean isWeakListener) { if (isReceivingSynchronous) { throw new IllegalStateException("Already receiving message synchronously."); } synchronized (messageHandlers) { - messageHandlers.add(handler); - - startReceiveThreadIfRequired(); + if (isWeakListener) { + weakHandlers.add(handler); + } else { + messageHandlers.add(handler); + startReceiveThreadIfRequired(); + } } } @@ -925,13 +930,13 @@ public class ManagerImpl implements Manager { try { receiveMessagesInternal(1L, TimeUnit.HOURS, false, (envelope, e) -> { synchronized (messageHandlers) { - for (ReceiveMessageHandler h : messageHandlers) { + Stream.concat(messageHandlers.stream(), weakHandlers.stream()).forEach(h -> { try { h.handleMessage(envelope, e); } catch (Exception ex) { logger.warn("Message handler failed, ignoring", ex); } - } + }); } }); break; @@ -959,8 +964,9 @@ public class ManagerImpl implements Manager { public void removeReceiveHandler(final ReceiveMessageHandler handler) { final Thread thread; synchronized (messageHandlers) { + weakHandlers.remove(handler); messageHandlers.remove(handler); - if (!messageHandlers.isEmpty() || isReceivingSynchronous) { + if (!messageHandlers.isEmpty() || receiveThread == null || isReceivingSynchronous) { return; } thread = receiveThread; @@ -1380,6 +1386,7 @@ public class ManagerImpl implements Manager { private void close(boolean closeAccount) throws IOException { Thread thread; synchronized (messageHandlers) { + weakHandlers.clear(); messageHandlers.clear(); thread = receiveThread; receiveThread = null; diff --git a/man/signal-cli-dbus.5.adoc b/man/signal-cli-dbus.5.adoc index 870678e3..75b4e8a1 100755 --- a/man/signal-cli-dbus.5.adoc +++ b/man/signal-cli-dbus.5.adoc @@ -51,7 +51,7 @@ Phone numbers always have the format + These methods are available if the daemon is started anonymously (without an explicit `-u USERNAME`). Requests are sent to `/org/asamk/Signal`; requests related to individual accounts are sent to `/org/asamk/Signal/_441234567890` where the + dialing code is replaced by an underscore (_). -Only `version()` is activated in single-user mode; the rest are disabled. +Only `version()` is activated in single-account mode; the rest are disabled. link() -> deviceLinkUri:: link(newDeviceName) -> deviceLinkUri:: diff --git a/src/main/java/org/asamk/Signal.java b/src/main/java/org/asamk/Signal.java index 4e3514f7..7e25bfc4 100644 --- a/src/main/java/org/asamk/Signal.java +++ b/src/main/java/org/asamk/Signal.java @@ -22,6 +22,10 @@ public interface Signal extends DBusInterface { String getSelfNumber(); + void subscribeReceive(); + + void unsubscribeReceive(); + long sendMessage( String message, List attachments, String recipient ) throws Error.AttachmentInvalid, Error.Failure, Error.InvalidNumber, Error.UntrustedIdentity; diff --git a/src/main/java/org/asamk/signal/App.java b/src/main/java/org/asamk/signal/App.java index 03bb6fcb..01af3178 100644 --- a/src/main/java/org/asamk/signal/App.java +++ b/src/main/java/org/asamk/signal/App.java @@ -39,6 +39,8 @@ import java.io.OutputStreamWriter; import java.nio.charset.Charset; import java.util.ArrayList; import java.util.List; +import java.util.function.Consumer; +import java.util.stream.Collectors; import static net.sourceforge.argparse4j.DefaultSettings.VERSION_0_9_0_DEFAULT_SETTINGS; @@ -66,8 +68,11 @@ public class App { parser.addArgument("-u", "--username").help("Specify your phone number, that will be your identifier."); var mut = parser.addMutuallyExclusiveGroup(); - mut.addArgument("--dbus").help("Make request via user dbus.").action(Arguments.storeTrue()); - mut.addArgument("--dbus-system").help("Make request via system dbus.").action(Arguments.storeTrue()); + mut.addArgument("--dbus").dest("global-dbus").help("Make request via user dbus.").action(Arguments.storeTrue()); + mut.addArgument("--dbus-system") + .dest("global-dbus-system") + .help("Make request via system dbus.") + .action(Arguments.storeTrue()); parser.addArgument("-o", "--output") .help("Choose to output in plain text or JSON") @@ -119,8 +124,8 @@ public class App { var username = ns.getString("username"); - final var useDbus = Boolean.TRUE.equals(ns.getBoolean("dbus")); - final var useDbusSystem = Boolean.TRUE.equals(ns.getBoolean("dbus-system")); + final var useDbus = Boolean.TRUE.equals(ns.getBoolean("global-dbus")); + final var useDbusSystem = Boolean.TRUE.equals(ns.getBoolean("global-dbus-system")); if (useDbus || useDbusSystem) { // If username is null, it will connect to the default object path initDbusClient(command, username, useDbusSystem, outputWriter); @@ -262,31 +267,72 @@ public class App { final TrustNewIdentity trustNewIdentity ) throws CommandException { final var managers = new ArrayList(); - for (String u : usernames) { - try { - managers.add(loadManager(u, dataPath, serviceEnvironment, trustNewIdentity)); - } catch (CommandException e) { - logger.warn("Ignoring {}: {}", u, e.getMessage()); - } - } - - command.handleCommand(ns, managers, new SignalCreator() { - @Override - public ProvisioningManager getNewProvisioningManager() { - return ProvisioningManager.init(dataPath, serviceEnvironment, BaseConfig.USER_AGENT); - } - - @Override - public RegistrationManager getNewRegistrationManager(String username) throws IOException { - return RegistrationManager.init(username, dataPath, serviceEnvironment, BaseConfig.USER_AGENT); + try { + for (String u : usernames) { + try { + managers.add(loadManager(u, dataPath, serviceEnvironment, trustNewIdentity)); + } catch (CommandException e) { + logger.warn("Ignoring {}: {}", u, e.getMessage()); + } } - }, outputWriter); - for (var m : managers) { - try { - m.close(); - } catch (IOException e) { - logger.warn("Cleanup failed", e); + command.handleCommand(ns, new SignalCreator() { + private List> onManagerAddedHandlers = new ArrayList<>(); + + @Override + public List getAccountNumbers() { + synchronized (managers) { + return managers.stream().map(Manager::getSelfNumber).collect(Collectors.toList()); + } + } + + @Override + public void addManager(final Manager m) { + synchronized (managers) { + if (!managers.contains(m)) { + managers.add(m); + for (final var handler : onManagerAddedHandlers) { + handler.accept(m); + } + } + } + } + + @Override + public void addOnManagerAddedHandler(final Consumer handler) { + onManagerAddedHandlers.add(handler); + } + + @Override + public Manager getManager(final String phoneNumber) { + synchronized (managers) { + return managers.stream() + .filter(m -> m.getSelfNumber().equals(phoneNumber)) + .findFirst() + .orElse(null); + } + } + + @Override + public ProvisioningManager getNewProvisioningManager() { + return ProvisioningManager.init(dataPath, serviceEnvironment, BaseConfig.USER_AGENT); + } + + @Override + public RegistrationManager getNewRegistrationManager(String username) throws IOException { + return RegistrationManager.init(username, dataPath, serviceEnvironment, BaseConfig.USER_AGENT); + } + }, outputWriter); + } finally { + synchronized (managers) { + for (var m : managers) { + try { + m.close(); + } catch (IOException e) { + logger.warn("Cleanup failed", e); + } + } + managers.clear(); } } } diff --git a/src/main/java/org/asamk/signal/JsonReceiveMessageHandler.java b/src/main/java/org/asamk/signal/JsonReceiveMessageHandler.java index 1135e89a..41edfd3f 100644 --- a/src/main/java/org/asamk/signal/JsonReceiveMessageHandler.java +++ b/src/main/java/org/asamk/signal/JsonReceiveMessageHandler.java @@ -13,7 +13,7 @@ public class JsonReceiveMessageHandler implements Manager.ReceiveMessageHandler private final static Logger logger = LoggerFactory.getLogger(JsonReceiveMessageHandler.class); - protected final Manager m; + private final Manager m; private final JsonWriter jsonWriter; public JsonReceiveMessageHandler(Manager m, JsonWriter jsonWriter) { @@ -24,6 +24,7 @@ public class JsonReceiveMessageHandler implements Manager.ReceiveMessageHandler @Override public void handleMessage(MessageEnvelope envelope, Throwable exception) { final var object = new HashMap(); + object.put("account", m.getSelfNumber()); if (exception != null) { object.put("error", JsonError.from(exception)); } diff --git a/src/main/java/org/asamk/signal/commands/DaemonCommand.java b/src/main/java/org/asamk/signal/commands/DaemonCommand.java index ee9368f8..51a5c947 100644 --- a/src/main/java/org/asamk/signal/commands/DaemonCommand.java +++ b/src/main/java/org/asamk/signal/commands/DaemonCommand.java @@ -5,25 +5,38 @@ import net.sourceforge.argparse4j.inf.Namespace; import net.sourceforge.argparse4j.inf.Subparser; import org.asamk.signal.DbusConfig; -import org.asamk.signal.DbusReceiveMessageHandler; import org.asamk.signal.JsonReceiveMessageHandler; import org.asamk.signal.JsonWriter; +import org.asamk.signal.JsonWriterImpl; import org.asamk.signal.OutputType; import org.asamk.signal.OutputWriter; import org.asamk.signal.PlainTextWriter; import org.asamk.signal.ReceiveMessageHandler; import org.asamk.signal.commands.exceptions.CommandException; +import org.asamk.signal.commands.exceptions.IOErrorException; import org.asamk.signal.commands.exceptions.UnexpectedErrorException; import org.asamk.signal.dbus.DbusSignalControlImpl; import org.asamk.signal.dbus.DbusSignalImpl; +import org.asamk.signal.jsonrpc.SignalJsonRpcDispatcherHandler; import org.asamk.signal.manager.Manager; +import org.asamk.signal.util.IOUtils; import org.freedesktop.dbus.connections.impl.DBusConnection; import org.freedesktop.dbus.exceptions.DBusException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.File; import java.io.IOException; +import java.net.UnixDomainSocketAddress; +import java.nio.channels.Channel; +import java.nio.channels.Channels; +import java.nio.channels.ServerSocketChannel; +import java.nio.channels.SocketChannel; +import java.nio.charset.StandardCharsets; import java.util.List; +import java.util.Objects; +import java.util.function.Consumer; +import java.util.stream.Collectors; public class DaemonCommand implements MultiLocalCommand { @@ -36,10 +49,30 @@ public class DaemonCommand implements MultiLocalCommand { @Override public void attachToSubparser(final Subparser subparser) { - subparser.help("Run in daemon mode and provide an experimental dbus interface."); - subparser.addArgument("--system") + final var defaultSocketPath = new File(new File(IOUtils.getRuntimeDir(), "signal-cli"), "socket"); + subparser.help("Run in daemon mode and provide an experimental dbus or JSON-RPC interface."); + subparser.addArgument("--dbus") .action(Arguments.storeTrue()) - .help("Use DBus system bus instead of user bus."); + .help("Expose a DBus interface on the user bus (the default, if no other options are given)."); + subparser.addArgument("--dbus-system", "--system") + .action(Arguments.storeTrue()) + .help("Expose a DBus interface on the system bus."); + subparser.addArgument("--socket") + .nargs("?") + .type(File.class) + .setConst(defaultSocketPath) + .help("Expose a JSON-RPC interface on a UNIX socket (default $XDG_RUNTIME_DIR/signal-cli/socket)."); + subparser.addArgument("--tcp") + .nargs("?") + .setConst("localhost:7583") + .help("Expose a JSON-RPC interface on a TCP socket (default localhost:7583)."); + subparser.addArgument("--no-receive-stdout") + .help("Don’t print received messages to stdout.") + .action(Arguments.storeTrue()); + subparser.addArgument("--receive-mode") + .help("Specify when to start receiving messages.") + .type(Arguments.enumStringType(ReceiveMode.class)) + .setDefault(ReceiveMode.ON_START); subparser.addArgument("--ignore-attachments") .help("Don’t download attachments of received messages.") .action(Arguments.storeTrue()); @@ -54,93 +87,277 @@ public class DaemonCommand implements MultiLocalCommand { public void handleCommand( final Namespace ns, final Manager m, final OutputWriter outputWriter ) throws CommandException { - boolean ignoreAttachments = Boolean.TRUE.equals(ns.getBoolean("ignore-attachments")); + logger.info("Starting daemon in single-account mode for " + m.getSelfNumber()); + final var noReceiveStdOut = Boolean.TRUE.equals(ns.getBoolean("no-receive-stdout")); + final var receiveMode = ns.get("receive-mode"); + final var ignoreAttachments = Boolean.TRUE.equals(ns.getBoolean("ignore-attachments")); + m.setIgnoreAttachments(ignoreAttachments); + addDefaultReceiveHandler(m, noReceiveStdOut ? null : outputWriter, receiveMode != ReceiveMode.ON_START); - DBusConnection.DBusBusType busType; - if (Boolean.TRUE.equals(ns.getBoolean("system"))) { - busType = DBusConnection.DBusBusType.SYSTEM; - } else { - busType = DBusConnection.DBusBusType.SESSION; + final Channel inheritedChannel; + try { + inheritedChannel = System.inheritedChannel(); + if (inheritedChannel instanceof ServerSocketChannel serverChannel) { + logger.info("Using inherited socket: " + serverChannel.getLocalAddress()); + runSocketSingleAccount(m, serverChannel, receiveMode == ReceiveMode.MANUAL); + } + } catch (IOException e) { + throw new IOErrorException("Failed to use inherited socket", e); + } + final var socketFile = ns.get("socket"); + if (socketFile != null) { + final var address = UnixDomainSocketAddress.of(socketFile.toPath()); + final var serverChannel = IOUtils.bindSocket(address); + runSocketSingleAccount(m, serverChannel, receiveMode == ReceiveMode.MANUAL); + } + final var tcpAddress = ns.getString("tcp"); + if (tcpAddress != null) { + final var address = IOUtils.parseInetSocketAddress(tcpAddress); + final var serverChannel = IOUtils.bindSocket(address); + runSocketSingleAccount(m, serverChannel, receiveMode == ReceiveMode.MANUAL); + } + final var isDbusSystem = Boolean.TRUE.equals(ns.getBoolean("dbus-system")); + if (isDbusSystem) { + runDbusSingleAccount(m, true, receiveMode != ReceiveMode.ON_START); + } + final var isDbusSession = Boolean.TRUE.equals(ns.getBoolean("dbus")); + if (isDbusSession || ( + !isDbusSystem + && socketFile == null + && tcpAddress == null + && !(inheritedChannel instanceof ServerSocketChannel) + )) { + runDbusSingleAccount(m, false, receiveMode != ReceiveMode.ON_START); } - try (var conn = DBusConnection.getConnection(busType)) { - var objectPath = DbusConfig.getObjectPath(); - var t = run(conn, objectPath, m, outputWriter); - - conn.requestBusName(DbusConfig.getBusname()); - logger.info("DBus daemon running in single-user mode for " + m.getSelfNumber()); - + synchronized (this) { try { - t.join(); - synchronized (this) { - wait(); - } + wait(); } catch (InterruptedException ignored) { } - } catch (DBusException | IOException e) { - logger.error("Dbus command failed", e); - throw new UnexpectedErrorException("Dbus command failed", e); } } @Override public void handleCommand( - final Namespace ns, final List managers, final SignalCreator c, final OutputWriter outputWriter + final Namespace ns, final SignalCreator c, final OutputWriter outputWriter ) throws CommandException { - boolean ignoreAttachments = Boolean.TRUE.equals(ns.getBoolean("ignore-attachments")); + logger.info("Starting daemon in multi-account mode"); + final var noReceiveStdOut = Boolean.TRUE.equals(ns.getBoolean("no-receive-stdout")); + final var receiveMode = ns.get("receive-mode"); + final var ignoreAttachments = Boolean.TRUE.equals(ns.getBoolean("ignore-attachments")); - DBusConnection.DBusBusType busType; - if (Boolean.TRUE.equals(ns.getBoolean("system"))) { - busType = DBusConnection.DBusBusType.SYSTEM; - } else { - busType = DBusConnection.DBusBusType.SESSION; + c.getAccountNumbers().stream().map(c::getManager).filter(Objects::nonNull).forEach(m -> { + m.setIgnoreAttachments(ignoreAttachments); + addDefaultReceiveHandler(m, noReceiveStdOut ? null : outputWriter, receiveMode != ReceiveMode.ON_START); + }); + c.addOnManagerAddedHandler(m -> { + m.setIgnoreAttachments(ignoreAttachments); + addDefaultReceiveHandler(m, noReceiveStdOut ? null : outputWriter, receiveMode != ReceiveMode.ON_START); + }); + + final Channel inheritedChannel; + try { + inheritedChannel = System.inheritedChannel(); + if (inheritedChannel instanceof ServerSocketChannel serverChannel) { + logger.info("Using inherited socket: " + serverChannel.getLocalAddress()); + runSocketMultiAccount(c, serverChannel, receiveMode == ReceiveMode.MANUAL); + } + } catch (IOException e) { + throw new IOErrorException("Failed to use inherited socket", e); + } + final var socketFile = ns.get("socket"); + if (socketFile != null) { + final var address = UnixDomainSocketAddress.of(socketFile.toPath()); + final var serverChannel = IOUtils.bindSocket(address); + runSocketMultiAccount(c, serverChannel, receiveMode == ReceiveMode.MANUAL); + } + final var tcpAddress = ns.getString("tcp"); + if (tcpAddress != null) { + final var address = IOUtils.parseInetSocketAddress(tcpAddress); + final var serverChannel = IOUtils.bindSocket(address); + runSocketMultiAccount(c, serverChannel, receiveMode == ReceiveMode.MANUAL); + } + final var isDbusSystem = Boolean.TRUE.equals(ns.getBoolean("dbus-system")); + if (isDbusSystem) { + runDbusMultiAccount(c, receiveMode != ReceiveMode.ON_START, true); } + final var isDbusSession = Boolean.TRUE.equals(ns.getBoolean("dbus")); + if (isDbusSession || ( + !isDbusSystem + && socketFile == null + && tcpAddress == null + && !(inheritedChannel instanceof ServerSocketChannel) + )) { + runDbusMultiAccount(c, receiveMode != ReceiveMode.ON_START, false); + } + + synchronized (this) { + try { + wait(); + } catch (InterruptedException ignored) { + } + } + } - try (var conn = DBusConnection.getConnection(busType)) { - final var signalControl = new DbusSignalControlImpl(c, m -> { - m.setIgnoreAttachments(ignoreAttachments); + private void addDefaultReceiveHandler(Manager m, OutputWriter outputWriter, final boolean isWeakListener) { + final var handler = outputWriter instanceof JsonWriter o + ? new JsonReceiveMessageHandler(m, o) + : outputWriter instanceof PlainTextWriter o + ? new ReceiveMessageHandler(m, o) + : Manager.ReceiveMessageHandler.EMPTY; + m.addReceiveHandler(handler, isWeakListener); + } + + private void runSocketSingleAccount( + final Manager m, final ServerSocketChannel serverChannel, final boolean noReceiveOnStart + ) { + runSocket(serverChannel, channel -> { + final var handler = getSignalJsonRpcDispatcherHandler(channel, noReceiveOnStart); + handler.handleConnection(m); + }); + } + + private void runSocketMultiAccount( + final SignalCreator c, final ServerSocketChannel serverChannel, final boolean noReceiveOnStart + ) { + runSocket(serverChannel, channel -> { + final var handler = getSignalJsonRpcDispatcherHandler(channel, noReceiveOnStart); + handler.handleConnection(c); + }); + } + + private void runSocket(final ServerSocketChannel serverChannel, Consumer socketHandler) { + final var mainThread = Thread.currentThread(); + new Thread(() -> { + while (true) { + final SocketChannel channel; + final String clientString; try { - final var objectPath = DbusConfig.getObjectPath(m.getSelfNumber()); - return run(conn, objectPath, m, outputWriter); - } catch (DBusException e) { - logger.error("Failed to export object", e); - return null; + channel = serverChannel.accept(); + clientString = channel.getRemoteAddress() + " " + IOUtils.getUnixDomainPrincipal(channel); + logger.info("Accepted new client: " + clientString); + } catch (IOException e) { + logger.error("Failed to accept new socket connection", e); + mainThread.notifyAll(); + break; + } + new Thread(() -> { + try (final var c = channel) { + socketHandler.accept(c); + logger.info("Connection closed: " + clientString); + } catch (IOException e) { + logger.warn("Failed to close channel", e); + } + }).start(); + } + }).start(); + } + + private SignalJsonRpcDispatcherHandler getSignalJsonRpcDispatcherHandler( + final SocketChannel c, final boolean noReceiveOnStart + ) { + final var lineSupplier = IOUtils.getLineSupplier(Channels.newReader(c, StandardCharsets.UTF_8)); + final var jsonOutputWriter = new JsonWriterImpl(Channels.newWriter(c, StandardCharsets.UTF_8)); + + return new SignalJsonRpcDispatcherHandler(jsonOutputWriter, lineSupplier, noReceiveOnStart); + } + + private void runDbusSingleAccount( + final Manager m, final boolean isDbusSystem, final boolean noReceiveOnStart + ) throws UnexpectedErrorException { + runDbus(isDbusSystem, (conn, objectPath) -> { + try { + exportDbusObject(conn, objectPath, m, noReceiveOnStart).join(); + } catch (InterruptedException ignored) { + } + }); + } + + private void runDbusMultiAccount( + final SignalCreator c, final boolean noReceiveOnStart, final boolean isDbusSystem + ) throws UnexpectedErrorException { + runDbus(isDbusSystem, (connection, objectPath) -> { + final var signalControl = new DbusSignalControlImpl(c, objectPath); + connection.exportObject(signalControl); + + c.addOnManagerAddedHandler(m -> { + final var thread = exportMultiAccountManager(connection, m, noReceiveOnStart); + if (thread != null) { + try { + thread.join(); + } catch (InterruptedException ignored) { + } } - }, DbusConfig.getObjectPath()); - conn.exportObject(signalControl); + }); + + final var initThreads = c.getAccountNumbers() + .stream() + .map(c::getManager) + .filter(Objects::nonNull) + .map(m -> exportMultiAccountManager(connection, m, noReceiveOnStart)) + .filter(Objects::nonNull) + .collect(Collectors.toList()); - for (var m : managers) { - signalControl.addManager(m); + for (var t : initThreads) { + try { + t.join(); + } catch (InterruptedException ignored) { + } } + }); + } + + private void runDbus( + final boolean isDbusSystem, DbusRunner dbusRunner + ) throws UnexpectedErrorException { + DBusConnection.DBusBusType busType; + if (isDbusSystem) { + busType = DBusConnection.DBusBusType.SYSTEM; + } else { + busType = DBusConnection.DBusBusType.SESSION; + } + try { + var conn = DBusConnection.getConnection(busType); + dbusRunner.run(conn, DbusConfig.getObjectPath()); conn.requestBusName(DbusConfig.getBusname()); - logger.info("DBus daemon running in mulit-account mode"); - signalControl.run(); - } catch (DBusException | IOException e) { + logger.info("DBus daemon running on {} bus: {}", busType, DbusConfig.getBusname()); + } catch (DBusException e) { logger.error("Dbus command failed", e); throw new UnexpectedErrorException("Dbus command failed", e); } } - private Thread run( - DBusConnection conn, String objectPath, Manager m, OutputWriter outputWriter + private Thread exportMultiAccountManager( + final DBusConnection conn, final Manager m, final boolean noReceiveOnStart + ) { + try { + final var objectPath = DbusConfig.getObjectPath(m.getSelfNumber()); + return exportDbusObject(conn, objectPath, m, noReceiveOnStart); + } catch (DBusException e) { + logger.error("Failed to export object", e); + return null; + } + } + + private Thread exportDbusObject( + final DBusConnection conn, final String objectPath, final Manager m, final boolean noReceiveOnStart ) throws DBusException { - final var signal = new DbusSignalImpl(m, conn, objectPath); + final var signal = new DbusSignalImpl(m, conn, objectPath, noReceiveOnStart); conn.exportObject(signal); final var initThread = new Thread(signal::initObjects); initThread.start(); logger.debug("Exported dbus object: " + objectPath); - final var handler = outputWriter instanceof JsonWriter ? new JsonReceiveMessageHandler(m, - (JsonWriter) outputWriter) : new ReceiveMessageHandler(m, (PlainTextWriter) outputWriter); - m.addReceiveHandler(handler); + return initThread; + } - final var dbusMessageHandler = new DbusReceiveMessageHandler(m, conn, objectPath); - m.addReceiveHandler(dbusMessageHandler); + interface DbusRunner { - return initThread; + void run(DBusConnection connection, String objectPath) throws DBusException; } } diff --git a/src/main/java/org/asamk/signal/commands/JsonRpcDispatcherCommand.java b/src/main/java/org/asamk/signal/commands/JsonRpcDispatcherCommand.java index 4090d128..f5a4d011 100644 --- a/src/main/java/org/asamk/signal/commands/JsonRpcDispatcherCommand.java +++ b/src/main/java/org/asamk/signal/commands/JsonRpcDispatcherCommand.java @@ -10,13 +10,11 @@ import org.asamk.signal.OutputWriter; import org.asamk.signal.commands.exceptions.CommandException; import org.asamk.signal.jsonrpc.SignalJsonRpcDispatcherHandler; import org.asamk.signal.manager.Manager; +import org.asamk.signal.util.IOUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.BufferedReader; -import java.io.IOException; import java.io.InputStreamReader; -import java.io.Reader; import java.util.List; import java.util.function.Supplier; @@ -50,21 +48,9 @@ public class JsonRpcDispatcherCommand implements LocalCommand { m.setIgnoreAttachments(ignoreAttachments); final var jsonOutputWriter = (JsonWriter) outputWriter; - final Supplier lineSupplier = getLineSupplier(new InputStreamReader(System.in)); + final Supplier lineSupplier = IOUtils.getLineSupplier(new InputStreamReader(System.in)); - final var handler = new SignalJsonRpcDispatcherHandler(m, jsonOutputWriter, lineSupplier); - handler.handleConnection(); - } - - private Supplier getLineSupplier(final Reader reader) { - final var bufferedReader = new BufferedReader(reader); - return () -> { - try { - return bufferedReader.readLine(); - } catch (IOException e) { - logger.error("Error occurred while reading line", e); - return null; - } - }; + final var handler = new SignalJsonRpcDispatcherHandler(jsonOutputWriter, lineSupplier, false); + handler.handleConnection(m); } } diff --git a/src/main/java/org/asamk/signal/commands/MultiLocalCommand.java b/src/main/java/org/asamk/signal/commands/MultiLocalCommand.java index 1c01a6ae..f333d3a5 100644 --- a/src/main/java/org/asamk/signal/commands/MultiLocalCommand.java +++ b/src/main/java/org/asamk/signal/commands/MultiLocalCommand.java @@ -4,20 +4,10 @@ import net.sourceforge.argparse4j.inf.Namespace; import org.asamk.signal.OutputWriter; import org.asamk.signal.commands.exceptions.CommandException; -import org.asamk.signal.manager.Manager; - -import java.util.List; public interface MultiLocalCommand extends LocalCommand { void handleCommand( - Namespace ns, List m, SignalCreator c, OutputWriter outputWriter + Namespace ns, SignalCreator c, OutputWriter outputWriter ) throws CommandException; - - @Override - default void handleCommand( - final Namespace ns, final Manager m, final OutputWriter outputWriter - ) throws CommandException { - handleCommand(ns, List.of(m), null, outputWriter); - } } diff --git a/src/main/java/org/asamk/signal/commands/ReceiveMode.java b/src/main/java/org/asamk/signal/commands/ReceiveMode.java new file mode 100644 index 00000000..bbe14312 --- /dev/null +++ b/src/main/java/org/asamk/signal/commands/ReceiveMode.java @@ -0,0 +1,22 @@ +package org.asamk.signal.commands; + +enum ReceiveMode { + ON_START { + @Override + public String toString() { + return "on-start"; + } + }, + ON_CONNECTION { + @Override + public String toString() { + return "on-connection"; + } + }, + MANUAL { + @Override + public String toString() { + return "manual"; + } + }, +} diff --git a/src/main/java/org/asamk/signal/commands/SignalCreator.java b/src/main/java/org/asamk/signal/commands/SignalCreator.java index 675d7f2a..46bbbfc0 100644 --- a/src/main/java/org/asamk/signal/commands/SignalCreator.java +++ b/src/main/java/org/asamk/signal/commands/SignalCreator.java @@ -1,12 +1,23 @@ package org.asamk.signal.commands; +import org.asamk.signal.manager.Manager; import org.asamk.signal.manager.ProvisioningManager; import org.asamk.signal.manager.RegistrationManager; import java.io.IOException; +import java.util.List; +import java.util.function.Consumer; public interface SignalCreator { + List getAccountNumbers(); + + void addManager(Manager m); + + void addOnManagerAddedHandler(Consumer handler); + + Manager getManager(String phoneNumber); + ProvisioningManager getNewProvisioningManager(); RegistrationManager getNewRegistrationManager(String username) throws IOException; diff --git a/src/main/java/org/asamk/signal/dbus/DbusManagerImpl.java b/src/main/java/org/asamk/signal/dbus/DbusManagerImpl.java index c8dd1d45..cc346f7a 100644 --- a/src/main/java/org/asamk/signal/dbus/DbusManagerImpl.java +++ b/src/main/java/org/asamk/signal/dbus/DbusManagerImpl.java @@ -55,6 +55,7 @@ import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.Collectors; +import java.util.stream.Stream; /** * This class implements the Manager interface using the DBus Signal interface, where possible. @@ -65,6 +66,7 @@ public class DbusManagerImpl implements Manager { private final Signal signal; private final DBusConnection connection; + private final Set weakHandlers = new HashSet<>(); private final Set messageHandlers = new HashSet<>(); private DBusSigHandler dbusMsgHandler; private DBusSigHandler dbusRcptHandler; @@ -424,18 +426,23 @@ public class DbusManagerImpl implements Manager { } @Override - public void addReceiveHandler(final ReceiveMessageHandler handler) { + public void addReceiveHandler(final ReceiveMessageHandler handler, final boolean isWeakListener) { synchronized (messageHandlers) { - if (messageHandlers.size() == 0) { - installMessageHandlers(); + if (isWeakListener) { + weakHandlers.add(handler); + } else { + if (messageHandlers.size() == 0) { + installMessageHandlers(); + } + messageHandlers.add(handler); } - messageHandlers.add(handler); } } @Override public void removeReceiveHandler(final ReceiveMessageHandler handler) { synchronized (messageHandlers) { + weakHandlers.remove(handler); messageHandlers.remove(handler); if (messageHandlers.size() == 0) { uninstallMessageHandlers(); @@ -582,8 +589,11 @@ public class DbusManagerImpl implements Manager { this.notify(); } synchronized (messageHandlers) { + if (messageHandlers.size() > 0) { + uninstallMessageHandlers(); + } + weakHandlers.clear(); messageHandlers.clear(); - uninstallMessageHandlers(); } } @@ -664,11 +674,7 @@ public class DbusManagerImpl implements Manager { List.of())), Optional.empty(), Optional.empty()); - synchronized (messageHandlers) { - for (final var messageHandler : messageHandlers) { - messageHandler.handleMessage(envelope, null); - } - } + notifyMessageHandlers(envelope); }; connection.addSigHandler(Signal.MessageReceivedV2.class, signal, this.dbusMsgHandler); @@ -693,11 +699,7 @@ public class DbusManagerImpl implements Manager { Optional.empty(), Optional.empty(), Optional.empty()); - synchronized (messageHandlers) { - for (final var messageHandler : messageHandlers) { - messageHandler.handleMessage(envelope, null); - } - } + notifyMessageHandlers(envelope); }; connection.addSigHandler(Signal.ReceiptReceivedV2.class, signal, this.dbusRcptHandler); @@ -747,20 +749,26 @@ public class DbusManagerImpl implements Manager { Optional.empty(), Optional.empty())), Optional.empty()); - synchronized (messageHandlers) { - for (final var messageHandler : messageHandlers) { - messageHandler.handleMessage(envelope, null); - } - } + notifyMessageHandlers(envelope); }; connection.addSigHandler(Signal.SyncMessageReceivedV2.class, signal, this.dbusSyncHandler); } catch (DBusException e) { e.printStackTrace(); } + signal.subscribeReceive(); + } + + private void notifyMessageHandlers(final MessageEnvelope envelope) { + synchronized (messageHandlers) { + Stream.concat(messageHandlers.stream(), weakHandlers.stream()).forEach(h -> { + h.handleMessage(envelope, null); + }); + } } private void uninstallMessageHandlers() { try { + signal.unsubscribeReceive(); connection.removeSigHandler(Signal.MessageReceivedV2.class, signal, this.dbusMsgHandler); connection.removeSigHandler(Signal.ReceiptReceivedV2.class, signal, this.dbusRcptHandler); connection.removeSigHandler(Signal.SyncMessageReceivedV2.class, signal, this.dbusSyncHandler); diff --git a/src/main/java/org/asamk/signal/dbus/DbusSignalControlImpl.java b/src/main/java/org/asamk/signal/dbus/DbusSignalControlImpl.java index e69bf059..e178ca15 100644 --- a/src/main/java/org/asamk/signal/dbus/DbusSignalControlImpl.java +++ b/src/main/java/org/asamk/signal/dbus/DbusSignalControlImpl.java @@ -10,74 +10,26 @@ import org.asamk.signal.manager.RegistrationManager; import org.asamk.signal.manager.UserAlreadyExists; import org.asamk.signal.manager.api.CaptchaRequiredException; import org.asamk.signal.manager.api.IncorrectPinException; -import org.asamk.signal.manager.api.Pair; import org.asamk.signal.manager.api.PinLockedException; import org.freedesktop.dbus.DBusPath; import java.io.IOException; import java.net.URI; -import java.util.ArrayList; import java.util.List; import java.util.concurrent.TimeoutException; -import java.util.function.Function; import java.util.stream.Collectors; public class DbusSignalControlImpl implements org.asamk.SignalControl { private final SignalCreator c; - private final Function newManagerRunner; - private final List> receiveThreads = new ArrayList<>(); - private final Object stopTrigger = new Object(); private final String objectPath; - public DbusSignalControlImpl( - final SignalCreator c, final Function newManagerRunner, final String objectPath - ) { + public DbusSignalControlImpl(final SignalCreator c, final String objectPath) { this.c = c; - this.newManagerRunner = newManagerRunner; this.objectPath = objectPath; } - public void addManager(Manager m) { - var thread = newManagerRunner.apply(m); - if (thread == null) { - return; - } - synchronized (receiveThreads) { - receiveThreads.add(new Pair<>(m, thread)); - } - } - - public void run() { - synchronized (stopTrigger) { - try { - stopTrigger.wait(); - } catch (InterruptedException ignored) { - } - } - - synchronized (receiveThreads) { - for (var t : receiveThreads) { - t.second().interrupt(); - } - } - while (true) { - final Thread thread; - synchronized (receiveThreads) { - if (receiveThreads.size() == 0) { - break; - } - var pair = receiveThreads.remove(0); - thread = pair.second(); - } - try { - thread.join(); - } catch (InterruptedException ignored) { - } - } - } - @Override public boolean isRemote() { return false; @@ -124,7 +76,7 @@ public class DbusSignalControlImpl implements org.asamk.SignalControl { ) throws Error.Failure, Error.InvalidNumber { try (final RegistrationManager registrationManager = c.getNewRegistrationManager(number)) { final Manager manager = registrationManager.verifyAccount(verificationCode, pin); - addManager(manager); + c.addManager(manager); } catch (IOException | PinLockedException | IncorrectPinException e) { throw new SignalControl.Error.Failure(e.getClass().getSimpleName() + " " + e.getMessage()); } @@ -138,7 +90,7 @@ public class DbusSignalControlImpl implements org.asamk.SignalControl { new Thread(() -> { try { final Manager manager = provisioningManager.finishDeviceLink(newDeviceName); - addManager(manager); + c.addManager(manager); } catch (IOException | TimeoutException | UserAlreadyExists e) { e.printStackTrace(); } @@ -156,12 +108,9 @@ public class DbusSignalControlImpl implements org.asamk.SignalControl { @Override public List listAccounts() { - synchronized (receiveThreads) { - return receiveThreads.stream() - .map(Pair::first) - .map(Manager::getSelfNumber) - .map(u -> new DBusPath(DbusConfig.getObjectPath(u))) - .collect(Collectors.toList()); - } + return c.getAccountNumbers() + .stream() + .map(u -> new DBusPath(DbusConfig.getObjectPath(u))) + .collect(Collectors.toList()); } } diff --git a/src/main/java/org/asamk/signal/dbus/DbusSignalImpl.java b/src/main/java/org/asamk/signal/dbus/DbusSignalImpl.java index 9f6f1340..c9099c0b 100644 --- a/src/main/java/org/asamk/signal/dbus/DbusSignalImpl.java +++ b/src/main/java/org/asamk/signal/dbus/DbusSignalImpl.java @@ -2,6 +2,7 @@ package org.asamk.signal.dbus; import org.asamk.Signal; import org.asamk.signal.BaseConfig; +import org.asamk.signal.DbusReceiveMessageHandler; import org.asamk.signal.manager.AttachmentInvalidException; import org.asamk.signal.manager.Manager; import org.asamk.signal.manager.NotMasterDeviceException; @@ -60,26 +61,40 @@ public class DbusSignalImpl implements Signal { private final Manager m; private final DBusConnection connection; private final String objectPath; + private final boolean noReceiveOnStart; private DBusPath thisDevice; private final List devices = new ArrayList<>(); private final List groups = new ArrayList<>(); + private DbusReceiveMessageHandler dbusMessageHandler; + private int subscriberCount; private final static Logger logger = LoggerFactory.getLogger(DbusSignalImpl.class); - public DbusSignalImpl(final Manager m, DBusConnection connection, final String objectPath) { + public DbusSignalImpl( + final Manager m, DBusConnection connection, final String objectPath, final boolean noReceiveOnStart + ) { this.m = m; this.connection = connection; this.objectPath = objectPath; + this.noReceiveOnStart = noReceiveOnStart; } public void initObjects() { + if (!noReceiveOnStart) { + subscribeReceive(); + } + updateDevices(); updateGroups(); updateConfiguration(); } public void close() { + if (dbusMessageHandler != null) { + m.removeReceiveHandler(dbusMessageHandler); + dbusMessageHandler = null; + } unExportDevices(); unExportGroups(); unExportConfiguration(); @@ -95,6 +110,24 @@ public class DbusSignalImpl implements Signal { return m.getSelfNumber(); } + @Override + public void subscribeReceive() { + if (dbusMessageHandler == null) { + dbusMessageHandler = new DbusReceiveMessageHandler(m, connection, objectPath); + m.addReceiveHandler(dbusMessageHandler); + } + subscriberCount++; + } + + @Override + public void unsubscribeReceive() { + subscriberCount = Math.max(0, subscriberCount - 1); + if (subscriberCount == 0 && dbusMessageHandler != null) { + m.removeReceiveHandler(dbusMessageHandler); + dbusMessageHandler = null; + } + } + @Override public void submitRateLimitChallenge(String challenge, String captchaString) { final var captcha = captchaString == null ? null : captchaString.replace("signalcaptcha://", ""); diff --git a/src/main/java/org/asamk/signal/jsonrpc/JsonRpcReader.java b/src/main/java/org/asamk/signal/jsonrpc/JsonRpcReader.java index c2727d28..3bc7e701 100644 --- a/src/main/java/org/asamk/signal/jsonrpc/JsonRpcReader.java +++ b/src/main/java/org/asamk/signal/jsonrpc/JsonRpcReader.java @@ -34,9 +34,7 @@ public class JsonRpcReader { this.objectMapper = Util.createJsonObjectMapper(); } - public void readRequests( - final RequestHandler requestHandler, final Consumer responseHandler - ) { + public void readMessages(final RequestHandler requestHandler, final Consumer responseHandler) { while (!Thread.interrupted()) { JsonRpcMessage message = readMessage(); if (message == null) break; diff --git a/src/main/java/org/asamk/signal/jsonrpc/SignalJsonRpcDispatcherHandler.java b/src/main/java/org/asamk/signal/jsonrpc/SignalJsonRpcDispatcherHandler.java index 9085609c..c33cd9a5 100644 --- a/src/main/java/org/asamk/signal/jsonrpc/SignalJsonRpcDispatcherHandler.java +++ b/src/main/java/org/asamk/signal/jsonrpc/SignalJsonRpcDispatcherHandler.java @@ -1,6 +1,7 @@ package org.asamk.signal.jsonrpc; import com.fasterxml.jackson.core.TreeNode; +import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.JsonMappingException; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; @@ -9,8 +10,10 @@ import com.fasterxml.jackson.databind.node.ContainerNode; import org.asamk.signal.JsonReceiveMessageHandler; import org.asamk.signal.JsonWriter; import org.asamk.signal.OutputWriter; +import org.asamk.signal.commands.Command; import org.asamk.signal.commands.Commands; import org.asamk.signal.commands.JsonRpcCommand; +import org.asamk.signal.commands.SignalCreator; import org.asamk.signal.commands.exceptions.CommandException; import org.asamk.signal.commands.exceptions.IOErrorException; import org.asamk.signal.commands.exceptions.UntrustedKeyErrorException; @@ -21,7 +24,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.HashMap; import java.util.Map; +import java.util.Objects; import java.util.function.Supplier; public class SignalJsonRpcDispatcherHandler { @@ -32,49 +37,143 @@ public class SignalJsonRpcDispatcherHandler { private static final int IO_ERROR = -3; private static final int UNTRUSTED_KEY_ERROR = -4; - private final Manager m; - private final JsonWriter outputWriter; - private final Supplier lineSupplier; + private final ObjectMapper objectMapper; + private final JsonRpcSender jsonRpcSender; + private final JsonRpcReader jsonRpcReader; + private final boolean noReceiveOnStart; + + private SignalCreator c; + private final Map receiveHandlers = new HashMap<>(); + + private Manager m; public SignalJsonRpcDispatcherHandler( - final Manager m, final JsonWriter outputWriter, final Supplier lineSupplier + final JsonWriter outputWriter, final Supplier lineSupplier, final boolean noReceiveOnStart ) { + this.noReceiveOnStart = noReceiveOnStart; + this.objectMapper = Util.createJsonObjectMapper(); + this.jsonRpcSender = new JsonRpcSender(outputWriter); + this.jsonRpcReader = new JsonRpcReader(jsonRpcSender, lineSupplier); + } + + public void handleConnection(final SignalCreator c) { + this.c = c; + + if (!noReceiveOnStart) { + c.getAccountNumbers().stream().map(c::getManager).filter(Objects::nonNull).forEach(this::subscribeReceive); + } + + handleConnection(); + } + + public void handleConnection(final Manager m) { this.m = m; - this.outputWriter = outputWriter; - this.lineSupplier = lineSupplier; + + if (!noReceiveOnStart) { + subscribeReceive(m); + } + + handleConnection(); } - public void handleConnection() { - final var objectMapper = Util.createJsonObjectMapper(); - final var jsonRpcSender = new JsonRpcSender(outputWriter); + private void subscribeReceive(final Manager m) { + if (receiveHandlers.containsKey(m)) { + return; + } final var receiveMessageHandler = new JsonReceiveMessageHandler(m, s -> jsonRpcSender.sendRequest(JsonRpcRequest.forNotification("receive", objectMapper.valueToTree(s), null))); - try { - m.addReceiveHandler(receiveMessageHandler); - - // Maybe this should be handled inside the Manager - while (!m.hasCaughtUpWithOldMessages()) { - try { - synchronized (m) { - m.wait(); - } - } catch (InterruptedException ignored) { + m.addReceiveHandler(receiveMessageHandler); + receiveHandlers.put(m, receiveMessageHandler); + + while (!m.hasCaughtUpWithOldMessages()) { + try { + synchronized (m) { + m.wait(); } + } catch (InterruptedException ignored) { } + } + } + + void unsubscribeReceive(final Manager m) { + final var receiveMessageHandler = receiveHandlers.remove(m); + if (receiveMessageHandler != null) { + m.removeReceiveHandler(receiveMessageHandler); + } + } - final var jsonRpcReader = new JsonRpcReader(jsonRpcSender, lineSupplier); - jsonRpcReader.readRequests((method, params) -> handleRequest(m, objectMapper, method, params), + private void handleConnection() { + try { + jsonRpcReader.readMessages((method, params) -> handleRequest(objectMapper, method, params), response -> logger.debug("Received unexpected response for id {}", response.getId())); } finally { - m.removeReceiveHandler(receiveMessageHandler); + receiveHandlers.forEach(Manager::removeReceiveHandler); + receiveHandlers.clear(); } } private JsonNode handleRequest( - final Manager m, final ObjectMapper objectMapper, final String method, ContainerNode params + final ObjectMapper objectMapper, final String method, ContainerNode params + ) throws JsonRpcException { + var command = getCommand(method); + // TODO implement listAccounts, register, verify, link + if (command instanceof JsonRpcCommand jsonRpcCommand) { + if (m != null) { + return runCommand(objectMapper, params, new CommandRunnerImpl<>(m, jsonRpcCommand)); + } + + if (params.has("account")) { + Manager manager = c.getManager(params.get("account").asText()); + if (manager != null) { + return runCommand(objectMapper, params, new CommandRunnerImpl<>(manager, jsonRpcCommand)); + } + } else { + throw new JsonRpcException(new JsonRpcResponse.Error(JsonRpcResponse.Error.INVALID_PARAMS, + "Method requires valid account parameter", + null)); + } + } + + throw new JsonRpcException(new JsonRpcResponse.Error(JsonRpcResponse.Error.METHOD_NOT_FOUND, + "Method not implemented", + null)); + } + + private Command getCommand(final String method) { + if ("subscribeReceive".equals(method)) { + return new SubscribeReceiveCommand(); + } + if ("unsubscribeReceive".equals(method)) { + return new UnsubscribeReceiveCommand(); + } + return Commands.getCommand(method); + } + + private record CommandRunnerImpl(Manager m, JsonRpcCommand command) implements CommandRunner { + + @Override + public void handleCommand(final T request, final OutputWriter outputWriter) throws CommandException { + command.handleCommand(request, m, outputWriter); + } + + @Override + public TypeReference getRequestType() { + return command.getRequestType(); + } + } + + interface CommandRunner { + + void handleCommand(T request, OutputWriter outputWriter) throws CommandException; + + TypeReference getRequestType(); + } + + private JsonNode runCommand( + final ObjectMapper objectMapper, final ContainerNode params, final CommandRunner command ) throws JsonRpcException { final Object[] result = {null}; final JsonWriter commandOutputWriter = s -> { @@ -85,15 +184,8 @@ public class SignalJsonRpcDispatcherHandler { result[0] = s; }; - var command = Commands.getCommand(method); - if (!(command instanceof JsonRpcCommand)) { - throw new JsonRpcException(new JsonRpcResponse.Error(JsonRpcResponse.Error.METHOD_NOT_FOUND, - "Method not implemented", - null)); - } - try { - parseParamsAndRunCommand(m, objectMapper, params, commandOutputWriter, (JsonRpcCommand) command); + parseParamsAndRunCommand(objectMapper, params, commandOutputWriter, command); } catch (JsonMappingException e) { throw new JsonRpcException(new JsonRpcResponse.Error(JsonRpcResponse.Error.INVALID_REQUEST, e.getMessage(), @@ -116,11 +208,10 @@ public class SignalJsonRpcDispatcherHandler { } private void parseParamsAndRunCommand( - final Manager m, final ObjectMapper objectMapper, final TreeNode params, final OutputWriter outputWriter, - final JsonRpcCommand command + final CommandRunner command ) throws CommandException, JsonMappingException { T requestParams = null; final var requestType = command.getRequestType(); @@ -133,6 +224,36 @@ public class SignalJsonRpcDispatcherHandler { throw new AssertionError(e); } } - command.handleCommand(requestParams, m, outputWriter); + command.handleCommand(requestParams, outputWriter); + } + + private class SubscribeReceiveCommand implements JsonRpcCommand { + + @Override + public String getName() { + return "subscribeReceive"; + } + + @Override + public void handleCommand( + final Void request, final Manager m, final OutputWriter outputWriter + ) throws CommandException { + subscribeReceive(m); + } + } + + private class UnsubscribeReceiveCommand implements JsonRpcCommand { + + @Override + public String getName() { + return "unsubscribeReceive"; + } + + @Override + public void handleCommand( + final Void request, final Manager m, final OutputWriter outputWriter + ) throws CommandException { + unsubscribeReceive(m); + } } } diff --git a/src/main/java/org/asamk/signal/util/IOUtils.java b/src/main/java/org/asamk/signal/util/IOUtils.java index 5505e518..b4c4c6dd 100644 --- a/src/main/java/org/asamk/signal/util/IOUtils.java +++ b/src/main/java/org/asamk/signal/util/IOUtils.java @@ -1,13 +1,41 @@ package org.asamk.signal.util; +import org.asamk.signal.commands.exceptions.IOErrorException; +import org.asamk.signal.commands.exceptions.UserErrorException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; import java.io.File; import java.io.IOException; import java.io.InputStream; +import java.io.Reader; import java.io.StringWriter; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.net.StandardProtocolFamily; +import java.net.UnixDomainSocketAddress; +import java.nio.channels.ServerSocketChannel; +import java.nio.channels.SocketChannel; import java.nio.charset.Charset; +import java.nio.file.Files; +import java.nio.file.attribute.PosixFilePermission; +import java.nio.file.attribute.PosixFilePermissions; +import java.util.EnumSet; +import java.util.Set; +import java.util.function.Supplier; + +import jdk.net.ExtendedSocketOptions; +import jdk.net.UnixDomainPrincipal; + +import static java.nio.file.attribute.PosixFilePermission.OWNER_EXECUTE; +import static java.nio.file.attribute.PosixFilePermission.OWNER_READ; +import static java.nio.file.attribute.PosixFilePermission.OWNER_WRITE; public class IOUtils { + private final static Logger logger = LoggerFactory.getLogger(IOUtils.class); + private IOUtils() { } @@ -21,12 +49,101 @@ public class IOUtils { return output.toString(); } + public static void createPrivateDirectories(File file) throws IOException { + if (file.exists()) { + return; + } + + final var path = file.toPath(); + try { + Set perms = EnumSet.of(OWNER_READ, OWNER_WRITE, OWNER_EXECUTE); + Files.createDirectories(path, PosixFilePermissions.asFileAttribute(perms)); + } catch (UnsupportedOperationException e) { + Files.createDirectories(path); + } + } + public static File getDataHomeDir() { var dataHome = System.getenv("XDG_DATA_HOME"); if (dataHome != null) { return new File(dataHome); } + logger.debug("XDG_DATA_HOME not set, falling back to home dir"); return new File(new File(System.getProperty("user.home"), ".local"), "share"); } + + public static File getRuntimeDir() { + var runtimeDir = System.getenv("XDG_RUNTIME_DIR"); + if (runtimeDir != null) { + return new File(runtimeDir); + } + + logger.debug("XDG_RUNTIME_DIR not set, falling back to temp dir"); + return new File(System.getProperty("java.io.tmpdir")); + } + + public static Supplier getLineSupplier(final Reader reader) { + final var bufferedReader = new BufferedReader(reader); + return () -> { + try { + return bufferedReader.readLine(); + } catch (IOException e) { + logger.error("Error occurred while reading line", e); + return null; + } + }; + } + + public static InetSocketAddress parseInetSocketAddress(final String tcpAddress) throws UserErrorException { + final var colonIndex = tcpAddress.lastIndexOf(':'); + if (colonIndex < 0) { + throw new UserErrorException("Invalid tcp bind address: " + tcpAddress); + } + final String host = tcpAddress.substring(0, colonIndex); + final int port; + try { + port = Integer.parseInt(tcpAddress.substring(colonIndex + 1)); + } catch (NumberFormatException e) { + throw new UserErrorException("Invalid tcp bind address: " + tcpAddress, e); + } + return new InetSocketAddress(host, port); + } + + public static UnixDomainPrincipal getUnixDomainPrincipal(final SocketChannel channel) throws IOException { + UnixDomainPrincipal principal = null; + try { + principal = channel.getOption(ExtendedSocketOptions.SO_PEERCRED); + } catch (UnsupportedOperationException ignored) { + } + return principal; + } + + public static ServerSocketChannel bindSocket(final SocketAddress address) throws IOErrorException { + final ServerSocketChannel serverChannel; + try { + preBind(address); + serverChannel = address instanceof UnixDomainSocketAddress + ? ServerSocketChannel.open(StandardProtocolFamily.UNIX) + : ServerSocketChannel.open(); + serverChannel.bind(address); + logger.info("Listening on socket: " + address); + postBind(address); + } catch (IOException e) { + throw new IOErrorException("Failed to bind socket: " + e.getMessage(), e); + } + return serverChannel; + } + + private static void preBind(SocketAddress address) throws IOException { + if (address instanceof UnixDomainSocketAddress usa) { + createPrivateDirectories(usa.getPath().toFile().getParentFile()); + } + } + + private static void postBind(SocketAddress address) { + if (address instanceof UnixDomainSocketAddress usa) { + usa.getPath().toFile().deleteOnExit(); + } + } } -- 2.50.1