From: AsamK Date: Fri, 10 Nov 2023 14:18:06 +0000 (+0100) Subject: Refactor DaemonCommand X-Git-Tag: v0.13.0~95 X-Git-Url: https://git.nmode.ca/signal-cli/commitdiff_plain/7e9940be4ac1d3b7e19bcbd1e92b8db436415195?hp=c0aa338d7c8e40874dbc453b3fc3916701762029 Refactor DaemonCommand --- diff --git a/src/main/java/org/asamk/signal/commands/DaemonCommand.java b/src/main/java/org/asamk/signal/commands/DaemonCommand.java index a0683c34..561ee8c3 100644 --- a/src/main/java/org/asamk/signal/commands/DaemonCommand.java +++ b/src/main/java/org/asamk/signal/commands/DaemonCommand.java @@ -4,29 +4,21 @@ import net.sourceforge.argparse4j.impl.Arguments; import net.sourceforge.argparse4j.inf.Namespace; import net.sourceforge.argparse4j.inf.Subparser; -import org.asamk.signal.DbusConfig; import org.asamk.signal.OutputType; import org.asamk.signal.ReceiveMessageHandler; import org.asamk.signal.Shutdown; import org.asamk.signal.commands.exceptions.CommandException; import org.asamk.signal.commands.exceptions.IOErrorException; -import org.asamk.signal.commands.exceptions.UnexpectedErrorException; -import org.asamk.signal.commands.exceptions.UserErrorException; -import org.asamk.signal.dbus.DbusSignalControlImpl; -import org.asamk.signal.dbus.DbusSignalImpl; +import org.asamk.signal.dbus.DbusHandler; import org.asamk.signal.http.HttpServerHandler; import org.asamk.signal.json.JsonReceiveMessageHandler; -import org.asamk.signal.jsonrpc.SignalJsonRpcDispatcherHandler; +import org.asamk.signal.jsonrpc.SocketHandler; import org.asamk.signal.manager.Manager; import org.asamk.signal.manager.MultiAccountManager; import org.asamk.signal.output.JsonWriter; -import org.asamk.signal.output.JsonWriterImpl; import org.asamk.signal.output.OutputWriter; import org.asamk.signal.output.PlainTextWriter; import org.asamk.signal.util.IOUtils; -import org.freedesktop.dbus.connections.impl.DBusConnection; -import org.freedesktop.dbus.connections.impl.DBusConnectionBuilder; -import org.freedesktop.dbus.exceptions.DBusException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,16 +27,9 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.net.UnixDomainSocketAddress; import java.nio.channels.Channel; -import java.nio.channels.Channels; -import java.nio.channels.ClosedChannelException; import java.nio.channels.ServerSocketChannel; -import java.nio.channels.SocketChannel; -import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.Executors; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.Consumer; import static org.asamk.signal.util.CommandUtil.getReceiveConfig; @@ -60,7 +45,7 @@ public class DaemonCommand implements MultiLocalCommand, LocalCommand { @Override public void attachToSubparser(final Subparser subparser) { final var defaultSocketPath = new File(new File(IOUtils.getRuntimeDir(), "signal-cli"), "socket"); - subparser.help("Run in daemon mode and provide an experimental dbus or JSON-RPC interface."); + subparser.help("Run in daemon mode and provide a JSON-RPC or an experimental dbus interface."); subparser.addArgument("--dbus") .action(Arguments.storeTrue()) .help("Expose a DBus interface on the user bus (the default, if no other options are given)."); @@ -161,10 +146,12 @@ public class DaemonCommand implements MultiLocalCommand, LocalCommand { private static void setup(final Namespace ns, final DaemonHandler daemonHandler) throws CommandException { final Channel inheritedChannel; try { - inheritedChannel = System.inheritedChannel(); - if (inheritedChannel instanceof ServerSocketChannel serverChannel) { + if (System.inheritedChannel() instanceof ServerSocketChannel serverChannel) { + inheritedChannel = serverChannel; logger.info("Using inherited socket: " + serverChannel.getLocalAddress()); daemonHandler.runSocket(serverChannel); + } else { + inheritedChannel = null; } } catch (IOException e) { throw new IOErrorException("Failed to use inherited socket", e); @@ -201,7 +188,7 @@ public class DaemonCommand implements MultiLocalCommand, LocalCommand { && socketFile == null && tcpAddress == null && httpAddress == null - && !(inheritedChannel instanceof ServerSocketChannel) + && inheritedChannel == null )) { daemonHandler.runDbus(false); } @@ -221,9 +208,7 @@ public class DaemonCommand implements MultiLocalCommand, LocalCommand { protected final ReceiveMode receiveMode; protected final List closeables = new ArrayList<>(); - private static final AtomicInteger threadNumber = new AtomicInteger(0); - - public DaemonHandler(final ReceiveMode receiveMode) { + protected DaemonHandler(final ReceiveMode receiveMode) { this.receiveMode = receiveMode; } @@ -233,106 +218,37 @@ public class DaemonCommand implements MultiLocalCommand, LocalCommand { public abstract void runHttp(InetSocketAddress address) throws CommandException; - protected void runSocket(final ServerSocketChannel serverChannel, Consumer socketHandler) { - final List channels = new ArrayList<>(); - final var thread = Thread.ofPlatform().name("daemon-listener").start(() -> { - try (final var executor = Executors.newCachedThreadPool()) { - while (true) { - final var connectionId = threadNumber.getAndIncrement(); - final SocketChannel channel; - final String clientString; - try { - channel = serverChannel.accept(); - clientString = channel.getRemoteAddress() + " " + IOUtils.getUnixDomainPrincipal(channel); - logger.info("Accepted new client connection {}: {}", connectionId, clientString); - } catch (ClosedChannelException ignored) { - logger.trace("Listening socket has been closed"); - break; - } catch (IOException e) { - logger.error("Failed to accept new socket connection", e); - break; - } - channels.add(channel); - executor.submit(() -> { - try (final var c = channel) { - socketHandler.accept(c); - } catch (IOException e) { - logger.warn("Failed to close channel", e); - } catch (Throwable e) { - logger.warn("Connection handler failed, closing connection", e); - } - logger.info("Connection {} closed: {}", connectionId, clientString); - channels.remove(channel); - }); - } - } - }); - closeables.add(() -> { - serverChannel.close(); - for (final var c : new ArrayList<>(channels)) { - c.close(); - } - thread.join(); - }); + protected final void runSocket(final SocketHandler socketHandler) { + socketHandler.init(); + this.closeables.add(socketHandler); } - protected SignalJsonRpcDispatcherHandler getSignalJsonRpcDispatcherHandler(final SocketChannel c) { - final var lineSupplier = IOUtils.getLineSupplier(Channels.newReader(c, StandardCharsets.UTF_8)); - final var jsonOutputWriter = new JsonWriterImpl(Channels.newWriter(c, StandardCharsets.UTF_8)); - - return new SignalJsonRpcDispatcherHandler(jsonOutputWriter, - lineSupplier, - receiveMode == ReceiveMode.MANUAL); - } - - protected Thread exportDbusObject(final DBusConnection conn, final String objectPath, final Manager m) { - final var signal = new DbusSignalImpl(m, conn, objectPath, receiveMode != ReceiveMode.ON_START); - closeables.add(signal); - - return Thread.ofPlatform().name("dbus-init-" + m.getSelfNumber()).start(signal::initObjects); - } - - protected void runDbus( - final boolean isDbusSystem, MultiAccountDaemonHandler.DbusRunner dbusRunner + protected final void runDbus( + DbusHandler dbusHandler ) throws CommandException { - DBusConnection.DBusBusType busType; - if (isDbusSystem) { - busType = DBusConnection.DBusBusType.SYSTEM; - } else { - busType = DBusConnection.DBusBusType.SESSION; - } - DBusConnection conn; - try { - conn = DBusConnectionBuilder.forType(busType).build(); - dbusRunner.run(conn, DbusConfig.getObjectPath()); - } catch (DBusException e) { - throw new UnexpectedErrorException("Dbus command failed: " + e.getMessage(), e); - } catch (UnsupportedOperationException e) { - throw new UserErrorException("Failed to connect to Dbus: " + e.getMessage(), e); - } + dbusHandler.init(); + this.closeables.add(dbusHandler); + } + protected final void runHttp(final HttpServerHandler handler) throws CommandException { try { - conn.requestBusName(DbusConfig.getBusname()); - } catch (DBusException e) { - throw new UnexpectedErrorException( - "Dbus command failed, maybe signal-cli dbus daemon is already running: " + e.getMessage(), - e); + handler.init(); + } catch (IOException ex) { + throw new IOErrorException("Failed to initialize HTTP Server", ex); } - closeables.add(conn); - - logger.info("DBus daemon running on {} bus: {}", busType, DbusConfig.getBusname()); + this.closeables.add(handler); } @Override public void close() { - for (final var closeable : new ArrayList<>(closeables)) { + for (final var closeable : new ArrayList<>(this.closeables)) { try { closeable.close(); } catch (Exception e) { logger.warn("Failed to close daemon handler", e); } } - closeables.clear(); + this.closeables.clear(); } } @@ -340,38 +256,24 @@ public class DaemonCommand implements MultiLocalCommand, LocalCommand { private final Manager m; - private SingleAccountDaemonHandler(final Manager m, final ReceiveMode receiveMode) { + public SingleAccountDaemonHandler(final Manager m, final ReceiveMode receiveMode) { super(receiveMode); this.m = m; } @Override public void runSocket(final ServerSocketChannel serverChannel) { - runSocket(serverChannel, channel -> { - final var handler = getSignalJsonRpcDispatcherHandler(channel); - handler.handleConnection(m); - }); + runSocket(new SocketHandler(serverChannel, m, receiveMode == ReceiveMode.MANUAL)); } @Override public void runDbus(final boolean isDbusSystem) throws CommandException { - runDbus(isDbusSystem, (conn, objectPath) -> { - try { - exportDbusObject(conn, objectPath, m).join(); - } catch (InterruptedException ignored) { - } - }); + runDbus(new DbusHandler(isDbusSystem, m, receiveMode != ReceiveMode.ON_START)); } @Override public void runHttp(InetSocketAddress address) throws CommandException { - final var handler = new HttpServerHandler(address, m); - try { - handler.init(); - } catch (IOException ex) { - throw new IOErrorException("Failed to initialize HTTP Server", ex); - } - this.closeables.add(handler); + runHttp(new HttpServerHandler(address, m)); } } @@ -379,74 +281,24 @@ public class DaemonCommand implements MultiLocalCommand, LocalCommand { private final MultiAccountManager c; - private MultiAccountDaemonHandler(final MultiAccountManager c, final ReceiveMode receiveMode) { + public MultiAccountDaemonHandler(final MultiAccountManager c, final ReceiveMode receiveMode) { super(receiveMode); this.c = c; } + @Override public void runSocket(final ServerSocketChannel serverChannel) { - runSocket(serverChannel, channel -> { - final var handler = getSignalJsonRpcDispatcherHandler(channel); - handler.handleConnection(c); - }); + runSocket(new SocketHandler(serverChannel, c, receiveMode == ReceiveMode.MANUAL)); } + @Override public void runDbus(final boolean isDbusSystem) throws CommandException { - runDbus(isDbusSystem, (connection, objectPath) -> { - final var signalControl = new DbusSignalControlImpl(c, objectPath); - connection.exportObject(signalControl); - - c.addOnManagerAddedHandler(m -> { - final var thread = exportManager(connection, m); - try { - thread.join(); - } catch (InterruptedException ignored) { - } - }); - c.addOnManagerRemovedHandler(m -> { - final var path = DbusConfig.getObjectPath(m.getSelfNumber()); - try { - final var object = connection.getExportedObject(null, path); - if (object instanceof DbusSignalImpl dbusSignal) { - dbusSignal.close(); - closeables.remove(dbusSignal); - } - } catch (DBusException ignored) { - } - }); - - final var initThreads = c.getManagers().stream().map(m -> exportManager(connection, m)).toList(); - - for (var t : initThreads) { - try { - t.join(); - } catch (InterruptedException ignored) { - } - } - }); + runDbus(new DbusHandler(isDbusSystem, c, receiveMode != ReceiveMode.ON_START)); } @Override public void runHttp(final InetSocketAddress address) throws CommandException { - final var handler = new HttpServerHandler(address, c); - try { - handler.init(); - } catch (IOException ex) { - throw new IOErrorException("Failed to initialize HTTP Server", ex); - } - this.closeables.add(handler); - } - - private Thread exportManager( - final DBusConnection conn, final Manager m - ) { - final var objectPath = DbusConfig.getObjectPath(m.getSelfNumber()); - return exportDbusObject(conn, objectPath, m); - } - - interface DbusRunner { - - void run(DBusConnection connection, String objectPath) throws DBusException; + runHttp(new HttpServerHandler(address, c)); } } } diff --git a/src/main/java/org/asamk/signal/dbus/DbusHandler.java b/src/main/java/org/asamk/signal/dbus/DbusHandler.java new file mode 100644 index 00000000..8b1dd61e --- /dev/null +++ b/src/main/java/org/asamk/signal/dbus/DbusHandler.java @@ -0,0 +1,131 @@ +package org.asamk.signal.dbus; + +import org.asamk.signal.DbusConfig; +import org.asamk.signal.commands.exceptions.CommandException; +import org.asamk.signal.commands.exceptions.UnexpectedErrorException; +import org.asamk.signal.commands.exceptions.UserErrorException; +import org.asamk.signal.manager.Manager; +import org.asamk.signal.manager.MultiAccountManager; +import org.freedesktop.dbus.connections.impl.DBusConnection; +import org.freedesktop.dbus.connections.impl.DBusConnectionBuilder; +import org.freedesktop.dbus.exceptions.DBusException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; + +public class DbusHandler implements AutoCloseable { + + private static final Logger logger = LoggerFactory.getLogger(DbusHandler.class); + + private final boolean isDbusSystem; + private DBusConnection dBusConnection; + + private final List closeables = new ArrayList<>(); + private final DbusRunner dbusRunner; + private final boolean noReceiveOnStart; + + public DbusHandler(final boolean isDbusSystem, final Manager m, final boolean noReceiveOnStart) { + this.isDbusSystem = isDbusSystem; + this.dbusRunner = (connection) -> { + try { + exportDbusObject(connection, DbusConfig.getObjectPath(), m).join(); + } catch (InterruptedException ignored) { + } + }; + this.noReceiveOnStart = noReceiveOnStart; + } + + public DbusHandler(final boolean isDbusSystem, final MultiAccountManager c, final boolean noReceiveOnStart) { + this.isDbusSystem = isDbusSystem; + this.dbusRunner = (connection) -> { + final var signalControl = new DbusSignalControlImpl(c, DbusConfig.getObjectPath()); + connection.exportObject(signalControl); + + c.addOnManagerAddedHandler(m -> { + final var thread = exportManager(connection, m); + try { + thread.join(); + } catch (InterruptedException ignored) { + } + }); + c.addOnManagerRemovedHandler(m -> { + final var path = DbusConfig.getObjectPath(m.getSelfNumber()); + try { + final var object = connection.getExportedObject(null, path); + if (object instanceof DbusSignalImpl dbusSignal) { + dbusSignal.close(); + closeables.remove(dbusSignal); + } + } catch (DBusException ignored) { + } + }); + + final var initThreads = c.getManagers().stream().map(m -> exportManager(connection, m)).toList(); + + for (var t : initThreads) { + try { + t.join(); + } catch (InterruptedException ignored) { + } + } + }; + this.noReceiveOnStart = noReceiveOnStart; + } + + public void init() throws CommandException { + if (dBusConnection != null) { + throw new AssertionError("DbusHandler already initialized"); + } + final var busType = isDbusSystem ? DBusConnection.DBusBusType.SYSTEM : DBusConnection.DBusBusType.SESSION; + logger.debug("Starting DBus server on {} bus: {}", busType, DbusConfig.getBusname()); + try { + dBusConnection = DBusConnectionBuilder.forType(busType).build(); + dbusRunner.run(dBusConnection); + } catch (DBusException e) { + throw new UnexpectedErrorException("Dbus command failed: " + e.getMessage(), e); + } catch (UnsupportedOperationException e) { + throw new UserErrorException("Failed to connect to Dbus: " + e.getMessage(), e); + } + + try { + dBusConnection.requestBusName(DbusConfig.getBusname()); + } catch (DBusException e) { + throw new UnexpectedErrorException("Dbus command failed, maybe signal-cli dbus daemon is already running: " + + e.getMessage(), e); + } + + logger.info("Started DBus server on {} bus: {}", busType, DbusConfig.getBusname()); + } + + @Override + public void close() throws Exception { + if (dBusConnection == null) { + return; + } + dBusConnection.close(); + for (final var c : new ArrayList<>(closeables)) { + c.close(); + } + closeables.clear(); + dBusConnection = null; + } + + private Thread exportDbusObject(final DBusConnection conn, final String objectPath, final Manager m) { + final var signal = new DbusSignalImpl(m, conn, objectPath, noReceiveOnStart); + closeables.add(signal); + + return Thread.ofPlatform().name("dbus-init-" + m.getSelfNumber()).start(signal::initObjects); + } + + private Thread exportManager(final DBusConnection conn, final Manager m) { + final var objectPath = DbusConfig.getObjectPath(m.getSelfNumber()); + return exportDbusObject(conn, objectPath, m); + } + + private interface DbusRunner { + + void run(DBusConnection connection) throws DBusException; + } +} diff --git a/src/main/java/org/asamk/signal/http/HttpServerHandler.java b/src/main/java/org/asamk/signal/http/HttpServerHandler.java index 6f8b1826..c9c8fa16 100644 --- a/src/main/java/org/asamk/signal/http/HttpServerHandler.java +++ b/src/main/java/org/asamk/signal/http/HttpServerHandler.java @@ -56,7 +56,7 @@ public class HttpServerHandler implements AutoCloseable { if (server != null) { throw new AssertionError("HttpServerHandler already initialized"); } - logger.info("Starting server on " + address.toString()); + logger.debug("Starting HTTP server on {}", address); server = HttpServer.create(address, 0); server.setExecutor(Executors.newCachedThreadPool()); @@ -66,6 +66,21 @@ public class HttpServerHandler implements AutoCloseable { server.createContext("/api/v1/check", this::handleCheckEndpoint); server.start(); + logger.info("Started HTTP server on {}", address); + } + + @Override + public void close() { + if (server != null) { + shutdown.set(true); + synchronized (this) { + this.notifyAll(); + } + // Increase this delay when https://bugs.openjdk.org/browse/JDK-8304065 is fixed + server.stop(2); + server = null; + shutdown.set(false); + } } private void sendResponse(int status, Object response, HttpExchange httpExchange) throws IOException { @@ -221,7 +236,7 @@ public class HttpServerHandler implements AutoCloseable { return List.of(manager); } } - return List.of(); + throw new AssertionError("Unreachable state"); } private List> subscribeReceiveHandlers( @@ -246,20 +261,6 @@ public class HttpServerHandler implements AutoCloseable { m.removeReceiveHandler(handler); } - @Override - public void close() { - if (server != null) { - shutdown.set(true); - synchronized (this) { - this.notifyAll(); - } - // Increase this delay when https://bugs.openjdk.org/browse/JDK-8304065 is fixed - server.stop(2); - server = null; - shutdown.set(false); - } - } - private interface Callable { void call(); diff --git a/src/main/java/org/asamk/signal/jsonrpc/SocketHandler.java b/src/main/java/org/asamk/signal/jsonrpc/SocketHandler.java new file mode 100644 index 00000000..dc620bc8 --- /dev/null +++ b/src/main/java/org/asamk/signal/jsonrpc/SocketHandler.java @@ -0,0 +1,118 @@ +package org.asamk.signal.jsonrpc; + +import org.asamk.signal.manager.Manager; +import org.asamk.signal.manager.MultiAccountManager; +import org.asamk.signal.output.JsonWriterImpl; +import org.asamk.signal.util.IOUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.SocketAddress; +import java.nio.channels.Channels; +import java.nio.channels.ClosedChannelException; +import java.nio.channels.ServerSocketChannel; +import java.nio.channels.SocketChannel; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; + +public class SocketHandler implements AutoCloseable { + + private static final Logger logger = LoggerFactory.getLogger(SocketHandler.class); + private static final AtomicInteger threadNumber = new AtomicInteger(0); + + private final ServerSocketChannel serverChannel; + + private Thread listenerThread; + private final List channels = new ArrayList<>(); + private final Consumer socketHandler; + private final boolean noReceiveOnStart; + + public SocketHandler(final ServerSocketChannel serverChannel, final Manager m, final boolean noReceiveOnStart) { + this.serverChannel = serverChannel; + this.socketHandler = channel -> getSignalJsonRpcDispatcherHandler(channel).handleConnection(m); + this.noReceiveOnStart = noReceiveOnStart; + } + + public SocketHandler( + final ServerSocketChannel serverChannel, final MultiAccountManager c, final boolean noReceiveOnStart + ) { + this.serverChannel = serverChannel; + this.socketHandler = channel -> getSignalJsonRpcDispatcherHandler(channel).handleConnection(c); + this.noReceiveOnStart = noReceiveOnStart; + } + + public void init() { + if (listenerThread != null) { + throw new AssertionError("SocketHandler already initialized"); + } + SocketAddress socketAddress; + try { + socketAddress = serverChannel.getLocalAddress(); + } catch (IOException e) { + logger.debug("Failed to get socket address: {}", e.getMessage()); + socketAddress = null; + } + final var address = socketAddress == null ? "" : socketAddress; + logger.debug("Starting JSON-RPC server on {}", address); + + listenerThread = Thread.ofPlatform().name("daemon-listener").start(() -> { + try (final var executor = Executors.newCachedThreadPool()) { + logger.info("Started JSON-RPC server on {}", address); + while (true) { + final var connectionId = threadNumber.getAndIncrement(); + final SocketChannel channel; + final String clientString; + try { + channel = serverChannel.accept(); + clientString = channel.getRemoteAddress() + " " + IOUtils.getUnixDomainPrincipal(channel); + logger.info("Accepted new client connection {}: {}", connectionId, clientString); + } catch (ClosedChannelException ignored) { + logger.trace("Listening socket has been closed"); + break; + } catch (IOException e) { + logger.error("Failed to accept new socket connection", e); + break; + } + channels.add(channel); + executor.submit(() -> { + try (final var c = channel) { + socketHandler.accept(c); + } catch (IOException e) { + logger.warn("Failed to close channel", e); + } catch (Throwable e) { + logger.warn("Connection handler failed, closing connection", e); + } + logger.info("Connection {} closed: {}", connectionId, clientString); + channels.remove(channel); + }); + } + } + }); + } + + @Override + public void close() throws Exception { + if (listenerThread == null) { + return; + } + serverChannel.close(); + for (final var c : new ArrayList<>(channels)) { + c.close(); + } + listenerThread.join(); + channels.clear(); + listenerThread = null; + } + + private SignalJsonRpcDispatcherHandler getSignalJsonRpcDispatcherHandler(final SocketChannel c) { + final var lineSupplier = IOUtils.getLineSupplier(Channels.newReader(c, StandardCharsets.UTF_8)); + final var jsonOutputWriter = new JsonWriterImpl(Channels.newWriter(c, StandardCharsets.UTF_8)); + + return new SignalJsonRpcDispatcherHandler(jsonOutputWriter, lineSupplier, noReceiveOnStart); + } +} diff --git a/src/main/java/org/asamk/signal/util/IOUtils.java b/src/main/java/org/asamk/signal/util/IOUtils.java index aea49e09..7f93417f 100644 --- a/src/main/java/org/asamk/signal/util/IOUtils.java +++ b/src/main/java/org/asamk/signal/util/IOUtils.java @@ -142,7 +142,7 @@ public class IOUtils { ? ServerSocketChannel.open(StandardProtocolFamily.UNIX) : ServerSocketChannel.open(); serverChannel.bind(address); - logger.info("Listening on socket: " + address); + logger.debug("Listening on socket: " + address); postBind(address); } catch (IOException e) { throw new IOErrorException("Failed to bind socket " + address + ": " + e.getMessage(), e);