1 package org
.asamk
.signal
.commands
;
3 import net
.sourceforge
.argparse4j
.impl
.Arguments
;
4 import net
.sourceforge
.argparse4j
.inf
.Namespace
;
5 import net
.sourceforge
.argparse4j
.inf
.Subparser
;
7 import org
.asamk
.signal
.DbusConfig
;
8 import org
.asamk
.signal
.OutputType
;
9 import org
.asamk
.signal
.ReceiveMessageHandler
;
10 import org
.asamk
.signal
.Shutdown
;
11 import org
.asamk
.signal
.commands
.exceptions
.CommandException
;
12 import org
.asamk
.signal
.commands
.exceptions
.IOErrorException
;
13 import org
.asamk
.signal
.commands
.exceptions
.UnexpectedErrorException
;
14 import org
.asamk
.signal
.commands
.exceptions
.UserErrorException
;
15 import org
.asamk
.signal
.dbus
.DbusSignalControlImpl
;
16 import org
.asamk
.signal
.dbus
.DbusSignalImpl
;
17 import org
.asamk
.signal
.http
.HttpServerHandler
;
18 import org
.asamk
.signal
.json
.JsonReceiveMessageHandler
;
19 import org
.asamk
.signal
.jsonrpc
.SignalJsonRpcDispatcherHandler
;
20 import org
.asamk
.signal
.manager
.Manager
;
21 import org
.asamk
.signal
.manager
.MultiAccountManager
;
22 import org
.asamk
.signal
.output
.JsonWriter
;
23 import org
.asamk
.signal
.output
.JsonWriterImpl
;
24 import org
.asamk
.signal
.output
.OutputWriter
;
25 import org
.asamk
.signal
.output
.PlainTextWriter
;
26 import org
.asamk
.signal
.util
.IOUtils
;
27 import org
.freedesktop
.dbus
.connections
.impl
.DBusConnection
;
28 import org
.freedesktop
.dbus
.connections
.impl
.DBusConnectionBuilder
;
29 import org
.freedesktop
.dbus
.exceptions
.DBusException
;
30 import org
.slf4j
.Logger
;
31 import org
.slf4j
.LoggerFactory
;
34 import java
.io
.IOException
;
35 import java
.net
.InetSocketAddress
;
36 import java
.net
.UnixDomainSocketAddress
;
37 import java
.nio
.channels
.Channel
;
38 import java
.nio
.channels
.Channels
;
39 import java
.nio
.channels
.ClosedChannelException
;
40 import java
.nio
.channels
.ServerSocketChannel
;
41 import java
.nio
.channels
.SocketChannel
;
42 import java
.nio
.charset
.StandardCharsets
;
43 import java
.util
.ArrayList
;
44 import java
.util
.List
;
45 import java
.util
.concurrent
.Executors
;
46 import java
.util
.concurrent
.atomic
.AtomicInteger
;
47 import java
.util
.function
.Consumer
;
49 import static org
.asamk
.signal
.util
.CommandUtil
.getReceiveConfig
;
51 public class DaemonCommand
implements MultiLocalCommand
, LocalCommand
{
53 private final static Logger logger
= LoggerFactory
.getLogger(DaemonCommand
.class);
56 public String
getName() {
61 public void attachToSubparser(final Subparser subparser
) {
62 final var defaultSocketPath
= new File(new File(IOUtils
.getRuntimeDir(), "signal-cli"), "socket");
63 subparser
.help("Run in daemon mode and provide an experimental dbus or JSON-RPC interface.");
64 subparser
.addArgument("--dbus")
65 .action(Arguments
.storeTrue())
66 .help("Expose a DBus interface on the user bus (the default, if no other options are given).");
67 subparser
.addArgument("--dbus-system", "--system")
68 .action(Arguments
.storeTrue())
69 .help("Expose a DBus interface on the system bus.");
70 subparser
.addArgument("--socket")
73 .setConst(defaultSocketPath
)
74 .help("Expose a JSON-RPC interface on a UNIX socket (default $XDG_RUNTIME_DIR/signal-cli/socket).");
75 subparser
.addArgument("--tcp")
77 .setConst("localhost:7583")
78 .help("Expose a JSON-RPC interface on a TCP socket (default localhost:7583).");
79 subparser
.addArgument("--http")
81 .setConst("localhost:8080")
82 .help("Expose a JSON-RPC interface as http endpoint (default localhost:8080).");
83 subparser
.addArgument("--no-receive-stdout")
84 .help("Don’t print received messages to stdout.")
85 .action(Arguments
.storeTrue());
86 subparser
.addArgument("--receive-mode")
87 .help("Specify when to start receiving messages.")
88 .type(Arguments
.enumStringType(ReceiveMode
.class))
89 .setDefault(ReceiveMode
.ON_START
);
90 subparser
.addArgument("--ignore-attachments")
91 .help("Don’t download attachments of received messages.")
92 .action(Arguments
.storeTrue());
93 subparser
.addArgument("--ignore-stories")
94 .help("Don’t receive story messages from the server.")
95 .action(Arguments
.storeTrue());
96 subparser
.addArgument("--send-read-receipts")
97 .help("Send read receipts for all incoming data messages (in addition to the default delivery receipts)")
98 .action(Arguments
.storeTrue());
102 public List
<OutputType
> getSupportedOutputTypes() {
103 return List
.of(OutputType
.PLAIN_TEXT
, OutputType
.JSON
);
107 public void handleCommand(
108 final Namespace ns
, final Manager m
, final OutputWriter outputWriter
109 ) throws CommandException
{
110 Shutdown
.installHandler();
111 logger
.info("Starting daemon in single-account mode for " + m
.getSelfNumber());
112 final var noReceiveStdOut
= Boolean
.TRUE
.equals(ns
.getBoolean("no-receive-stdout"));
113 final var receiveMode
= ns
.<ReceiveMode
>get("receive-mode");
114 final var receiveConfig
= getReceiveConfig(ns
);
116 m
.setReceiveConfig(receiveConfig
);
117 addDefaultReceiveHandler(m
, noReceiveStdOut ?
null : outputWriter
, receiveMode
!= ReceiveMode
.ON_START
);
119 try (final var daemonHandler
= new SingleAccountDaemonHandler(m
, receiveMode
)) {
120 setup(ns
, daemonHandler
);
122 m
.addClosedListener(Shutdown
::triggerShutdown
);
125 Shutdown
.waitForShutdown();
126 } catch (InterruptedException ignored
) {
132 public void handleCommand(
133 final Namespace ns
, final MultiAccountManager c
, final OutputWriter outputWriter
134 ) throws CommandException
{
135 Shutdown
.installHandler();
136 logger
.info("Starting daemon in multi-account mode");
137 final var noReceiveStdOut
= Boolean
.TRUE
.equals(ns
.getBoolean("no-receive-stdout"));
138 final var receiveMode
= ns
.<ReceiveMode
>get("receive-mode");
139 final var receiveConfig
= getReceiveConfig(ns
);
140 c
.getManagers().forEach(m
-> {
141 m
.setReceiveConfig(receiveConfig
);
142 addDefaultReceiveHandler(m
, noReceiveStdOut ?
null : outputWriter
, receiveMode
!= ReceiveMode
.ON_START
);
144 c
.addOnManagerAddedHandler(m
-> {
145 m
.setReceiveConfig(receiveConfig
);
146 addDefaultReceiveHandler(m
, noReceiveStdOut ?
null : outputWriter
, receiveMode
!= ReceiveMode
.ON_START
);
149 try (final var daemonHandler
= new MultiAccountDaemonHandler(c
, receiveMode
)) {
150 setup(ns
, daemonHandler
);
152 synchronized (this) {
154 Shutdown
.waitForShutdown();
155 } catch (InterruptedException ignored
) {
161 private static void setup(final Namespace ns
, final DaemonHandler daemonHandler
) throws CommandException
{
162 final Channel inheritedChannel
;
164 inheritedChannel
= System
.inheritedChannel();
165 if (inheritedChannel
instanceof ServerSocketChannel serverChannel
) {
166 logger
.info("Using inherited socket: " + serverChannel
.getLocalAddress());
167 daemonHandler
.runSocket(serverChannel
);
169 } catch (IOException e
) {
170 throw new IOErrorException("Failed to use inherited socket", e
);
173 final var socketFile
= ns
.<File
>get("socket");
174 if (socketFile
!= null) {
175 final var address
= UnixDomainSocketAddress
.of(socketFile
.toPath());
176 final var serverChannel
= IOUtils
.bindSocket(address
);
177 daemonHandler
.runSocket(serverChannel
);
180 final var tcpAddress
= ns
.getString("tcp");
181 if (tcpAddress
!= null) {
182 final var address
= IOUtils
.parseInetSocketAddress(tcpAddress
);
183 final var serverChannel
= IOUtils
.bindSocket(address
);
184 daemonHandler
.runSocket(serverChannel
);
187 final var httpAddress
= ns
.getString("http");
188 if (httpAddress
!= null) {
189 final var address
= IOUtils
.parseInetSocketAddress(httpAddress
);
190 daemonHandler
.runHttp(address
);
193 final var isDbusSystem
= Boolean
.TRUE
.equals(ns
.getBoolean("dbus-system"));
195 daemonHandler
.runDbus(true);
198 final var isDbusSession
= Boolean
.TRUE
.equals(ns
.getBoolean("dbus"));
199 if (isDbusSession
|| (
201 && socketFile
== null
202 && tcpAddress
== null
203 && httpAddress
== null
204 && !(inheritedChannel
instanceof ServerSocketChannel
)
206 daemonHandler
.runDbus(false);
210 private void addDefaultReceiveHandler(Manager m
, OutputWriter outputWriter
, final boolean isWeakListener
) {
211 final var handler
= switch (outputWriter
) {
212 case PlainTextWriter writer
-> new ReceiveMessageHandler(m
, writer
);
213 case JsonWriter writer
-> new JsonReceiveMessageHandler(m
, writer
);
214 case null -> Manager
.ReceiveMessageHandler
.EMPTY
;
216 m
.addReceiveHandler(handler
, isWeakListener
);
219 private static abstract class DaemonHandler
implements AutoCloseable
{
221 protected final ReceiveMode receiveMode
;
222 protected final List
<AutoCloseable
> closeables
= new ArrayList
<>();
224 private static final AtomicInteger threadNumber
= new AtomicInteger(0);
226 public DaemonHandler(final ReceiveMode receiveMode
) {
227 this.receiveMode
= receiveMode
;
230 public abstract void runSocket(ServerSocketChannel serverChannel
) throws CommandException
;
232 public abstract void runDbus(boolean isDbusSystem
) throws CommandException
;
234 public abstract void runHttp(InetSocketAddress address
) throws CommandException
;
236 protected void runSocket(final ServerSocketChannel serverChannel
, Consumer
<SocketChannel
> socketHandler
) {
237 final List
<AutoCloseable
> channels
= new ArrayList
<>();
238 final var thread
= Thread
.ofPlatform().name("daemon-listener").start(() -> {
239 try (final var executor
= Executors
.newCachedThreadPool()) {
241 final var connectionId
= threadNumber
.getAndIncrement();
242 final SocketChannel channel
;
243 final String clientString
;
245 channel
= serverChannel
.accept();
246 clientString
= channel
.getRemoteAddress() + " " + IOUtils
.getUnixDomainPrincipal(channel
);
247 logger
.info("Accepted new client connection {}: {}", connectionId
, clientString
);
248 } catch (ClosedChannelException ignored
) {
249 logger
.trace("Listening socket has been closed");
251 } catch (IOException e
) {
252 logger
.error("Failed to accept new socket connection", e
);
255 channels
.add(channel
);
256 executor
.submit(() -> {
257 try (final var c
= channel
) {
258 socketHandler
.accept(c
);
259 } catch (IOException e
) {
260 logger
.warn("Failed to close channel", e
);
261 } catch (Throwable e
) {
262 logger
.warn("Connection handler failed, closing connection", e
);
264 logger
.info("Connection {} closed: {}", connectionId
, clientString
);
265 channels
.remove(channel
);
270 closeables
.add(() -> {
271 serverChannel
.close();
272 for (final var c
: new ArrayList
<>(channels
)) {
279 protected SignalJsonRpcDispatcherHandler
getSignalJsonRpcDispatcherHandler(final SocketChannel c
) {
280 final var lineSupplier
= IOUtils
.getLineSupplier(Channels
.newReader(c
, StandardCharsets
.UTF_8
));
281 final var jsonOutputWriter
= new JsonWriterImpl(Channels
.newWriter(c
, StandardCharsets
.UTF_8
));
283 return new SignalJsonRpcDispatcherHandler(jsonOutputWriter
,
285 receiveMode
== ReceiveMode
.MANUAL
);
288 protected Thread
exportDbusObject(final DBusConnection conn
, final String objectPath
, final Manager m
) {
289 final var signal
= new DbusSignalImpl(m
, conn
, objectPath
, receiveMode
!= ReceiveMode
.ON_START
);
290 closeables
.add(signal
);
292 return Thread
.ofPlatform().name("dbus-init-" + m
.getSelfNumber()).start(signal
::initObjects
);
295 protected void runDbus(
296 final boolean isDbusSystem
, MultiAccountDaemonHandler
.DbusRunner dbusRunner
297 ) throws CommandException
{
298 DBusConnection
.DBusBusType busType
;
300 busType
= DBusConnection
.DBusBusType
.SYSTEM
;
302 busType
= DBusConnection
.DBusBusType
.SESSION
;
306 conn
= DBusConnectionBuilder
.forType(busType
).build();
307 dbusRunner
.run(conn
, DbusConfig
.getObjectPath());
308 } catch (DBusException e
) {
309 throw new UnexpectedErrorException("Dbus command failed: " + e
.getMessage(), e
);
310 } catch (UnsupportedOperationException e
) {
311 throw new UserErrorException("Failed to connect to Dbus: " + e
.getMessage(), e
);
315 conn
.requestBusName(DbusConfig
.getBusname());
316 } catch (DBusException e
) {
317 throw new UnexpectedErrorException(
318 "Dbus command failed, maybe signal-cli dbus daemon is already running: " + e
.getMessage(),
321 closeables
.add(conn
);
323 logger
.info("DBus daemon running on {} bus: {}", busType
, DbusConfig
.getBusname());
327 public void close() {
328 for (final var closeable
: new ArrayList
<>(closeables
)) {
331 } catch (Exception e
) {
332 logger
.warn("Failed to close daemon handler", e
);
339 private static final class SingleAccountDaemonHandler
extends DaemonHandler
{
341 private final Manager m
;
343 private SingleAccountDaemonHandler(final Manager m
, final ReceiveMode receiveMode
) {
349 public void runSocket(final ServerSocketChannel serverChannel
) {
350 runSocket(serverChannel
, channel
-> {
351 final var handler
= getSignalJsonRpcDispatcherHandler(channel
);
352 handler
.handleConnection(m
);
357 public void runDbus(final boolean isDbusSystem
) throws CommandException
{
358 runDbus(isDbusSystem
, (conn
, objectPath
) -> {
360 exportDbusObject(conn
, objectPath
, m
).join();
361 } catch (InterruptedException ignored
) {
367 public void runHttp(InetSocketAddress address
) throws CommandException
{
368 final var handler
= new HttpServerHandler(address
, m
);
371 } catch (IOException ex
) {
372 throw new IOErrorException("Failed to initialize HTTP Server", ex
);
374 this.closeables
.add(handler
);
378 private static final class MultiAccountDaemonHandler
extends DaemonHandler
{
380 private final MultiAccountManager c
;
382 private MultiAccountDaemonHandler(final MultiAccountManager c
, final ReceiveMode receiveMode
) {
387 public void runSocket(final ServerSocketChannel serverChannel
) {
388 runSocket(serverChannel
, channel
-> {
389 final var handler
= getSignalJsonRpcDispatcherHandler(channel
);
390 handler
.handleConnection(c
);
394 public void runDbus(final boolean isDbusSystem
) throws CommandException
{
395 runDbus(isDbusSystem
, (connection
, objectPath
) -> {
396 final var signalControl
= new DbusSignalControlImpl(c
, objectPath
);
397 connection
.exportObject(signalControl
);
399 c
.addOnManagerAddedHandler(m
-> {
400 final var thread
= exportManager(connection
, m
);
403 } catch (InterruptedException ignored
) {
406 c
.addOnManagerRemovedHandler(m
-> {
407 final var path
= DbusConfig
.getObjectPath(m
.getSelfNumber());
409 final var object
= connection
.getExportedObject(null, path
);
410 if (object
instanceof DbusSignalImpl dbusSignal
) {
412 closeables
.remove(dbusSignal
);
414 } catch (DBusException ignored
) {
418 final var initThreads
= c
.getManagers().stream().map(m
-> exportManager(connection
, m
)).toList();
420 for (var t
: initThreads
) {
423 } catch (InterruptedException ignored
) {
430 public void runHttp(final InetSocketAddress address
) throws CommandException
{
431 final var handler
= new HttpServerHandler(address
, c
);
434 } catch (IOException ex
) {
435 throw new IOErrorException("Failed to initialize HTTP Server", ex
);
437 this.closeables
.add(handler
);
440 private Thread
exportManager(
441 final DBusConnection conn
, final Manager m
443 final var objectPath
= DbusConfig
.getObjectPath(m
.getSelfNumber());
444 return exportDbusObject(conn
, objectPath
, m
);
447 interface DbusRunner
{
449 void run(DBusConnection connection
, String objectPath
) throws DBusException
;