]> nmode's Git Repositories - signal-cli/blob - src/main/java/org/asamk/signal/commands/DaemonCommand.java
Reorder static final modifier
[signal-cli] / src / main / java / org / asamk / signal / commands / DaemonCommand.java
1 package org.asamk.signal.commands;
2
3 import net.sourceforge.argparse4j.impl.Arguments;
4 import net.sourceforge.argparse4j.inf.Namespace;
5 import net.sourceforge.argparse4j.inf.Subparser;
6
7 import org.asamk.signal.DbusConfig;
8 import org.asamk.signal.OutputType;
9 import org.asamk.signal.ReceiveMessageHandler;
10 import org.asamk.signal.Shutdown;
11 import org.asamk.signal.commands.exceptions.CommandException;
12 import org.asamk.signal.commands.exceptions.IOErrorException;
13 import org.asamk.signal.commands.exceptions.UnexpectedErrorException;
14 import org.asamk.signal.commands.exceptions.UserErrorException;
15 import org.asamk.signal.dbus.DbusSignalControlImpl;
16 import org.asamk.signal.dbus.DbusSignalImpl;
17 import org.asamk.signal.http.HttpServerHandler;
18 import org.asamk.signal.json.JsonReceiveMessageHandler;
19 import org.asamk.signal.jsonrpc.SignalJsonRpcDispatcherHandler;
20 import org.asamk.signal.manager.Manager;
21 import org.asamk.signal.manager.MultiAccountManager;
22 import org.asamk.signal.output.JsonWriter;
23 import org.asamk.signal.output.JsonWriterImpl;
24 import org.asamk.signal.output.OutputWriter;
25 import org.asamk.signal.output.PlainTextWriter;
26 import org.asamk.signal.util.IOUtils;
27 import org.freedesktop.dbus.connections.impl.DBusConnection;
28 import org.freedesktop.dbus.connections.impl.DBusConnectionBuilder;
29 import org.freedesktop.dbus.exceptions.DBusException;
30 import org.slf4j.Logger;
31 import org.slf4j.LoggerFactory;
32
33 import java.io.File;
34 import java.io.IOException;
35 import java.net.InetSocketAddress;
36 import java.net.UnixDomainSocketAddress;
37 import java.nio.channels.Channel;
38 import java.nio.channels.Channels;
39 import java.nio.channels.ClosedChannelException;
40 import java.nio.channels.ServerSocketChannel;
41 import java.nio.channels.SocketChannel;
42 import java.nio.charset.StandardCharsets;
43 import java.util.ArrayList;
44 import java.util.List;
45 import java.util.concurrent.Executors;
46 import java.util.concurrent.atomic.AtomicInteger;
47 import java.util.function.Consumer;
48
49 import static org.asamk.signal.util.CommandUtil.getReceiveConfig;
50
51 public class DaemonCommand implements MultiLocalCommand, LocalCommand {
52
53 private static final Logger logger = LoggerFactory.getLogger(DaemonCommand.class);
54
55 @Override
56 public String getName() {
57 return "daemon";
58 }
59
60 @Override
61 public void attachToSubparser(final Subparser subparser) {
62 final var defaultSocketPath = new File(new File(IOUtils.getRuntimeDir(), "signal-cli"), "socket");
63 subparser.help("Run in daemon mode and provide an experimental dbus or JSON-RPC interface.");
64 subparser.addArgument("--dbus")
65 .action(Arguments.storeTrue())
66 .help("Expose a DBus interface on the user bus (the default, if no other options are given).");
67 subparser.addArgument("--dbus-system", "--system")
68 .action(Arguments.storeTrue())
69 .help("Expose a DBus interface on the system bus.");
70 subparser.addArgument("--socket")
71 .nargs("?")
72 .type(File.class)
73 .setConst(defaultSocketPath)
74 .help("Expose a JSON-RPC interface on a UNIX socket (default $XDG_RUNTIME_DIR/signal-cli/socket).");
75 subparser.addArgument("--tcp")
76 .nargs("?")
77 .setConst("localhost:7583")
78 .help("Expose a JSON-RPC interface on a TCP socket (default localhost:7583).");
79 subparser.addArgument("--http")
80 .nargs("?")
81 .setConst("localhost:8080")
82 .help("Expose a JSON-RPC interface as http endpoint (default localhost:8080).");
83 subparser.addArgument("--no-receive-stdout")
84 .help("Don’t print received messages to stdout.")
85 .action(Arguments.storeTrue());
86 subparser.addArgument("--receive-mode")
87 .help("Specify when to start receiving messages.")
88 .type(Arguments.enumStringType(ReceiveMode.class))
89 .setDefault(ReceiveMode.ON_START);
90 subparser.addArgument("--ignore-attachments")
91 .help("Don’t download attachments of received messages.")
92 .action(Arguments.storeTrue());
93 subparser.addArgument("--ignore-stories")
94 .help("Don’t receive story messages from the server.")
95 .action(Arguments.storeTrue());
96 subparser.addArgument("--send-read-receipts")
97 .help("Send read receipts for all incoming data messages (in addition to the default delivery receipts)")
98 .action(Arguments.storeTrue());
99 }
100
101 @Override
102 public List<OutputType> getSupportedOutputTypes() {
103 return List.of(OutputType.PLAIN_TEXT, OutputType.JSON);
104 }
105
106 @Override
107 public void handleCommand(
108 final Namespace ns, final Manager m, final OutputWriter outputWriter
109 ) throws CommandException {
110 Shutdown.installHandler();
111 logger.info("Starting daemon in single-account mode for " + m.getSelfNumber());
112 final var noReceiveStdOut = Boolean.TRUE.equals(ns.getBoolean("no-receive-stdout"));
113 final var receiveMode = ns.<ReceiveMode>get("receive-mode");
114 final var receiveConfig = getReceiveConfig(ns);
115
116 m.setReceiveConfig(receiveConfig);
117 addDefaultReceiveHandler(m, noReceiveStdOut ? null : outputWriter, receiveMode != ReceiveMode.ON_START);
118
119 try (final var daemonHandler = new SingleAccountDaemonHandler(m, receiveMode)) {
120 setup(ns, daemonHandler);
121
122 m.addClosedListener(Shutdown::triggerShutdown);
123
124 try {
125 Shutdown.waitForShutdown();
126 } catch (InterruptedException ignored) {
127 }
128 }
129 }
130
131 @Override
132 public void handleCommand(
133 final Namespace ns, final MultiAccountManager c, final OutputWriter outputWriter
134 ) throws CommandException {
135 Shutdown.installHandler();
136 logger.info("Starting daemon in multi-account mode");
137 final var noReceiveStdOut = Boolean.TRUE.equals(ns.getBoolean("no-receive-stdout"));
138 final var receiveMode = ns.<ReceiveMode>get("receive-mode");
139 final var receiveConfig = getReceiveConfig(ns);
140 c.getManagers().forEach(m -> {
141 m.setReceiveConfig(receiveConfig);
142 addDefaultReceiveHandler(m, noReceiveStdOut ? null : outputWriter, receiveMode != ReceiveMode.ON_START);
143 });
144 c.addOnManagerAddedHandler(m -> {
145 m.setReceiveConfig(receiveConfig);
146 addDefaultReceiveHandler(m, noReceiveStdOut ? null : outputWriter, receiveMode != ReceiveMode.ON_START);
147 });
148
149 try (final var daemonHandler = new MultiAccountDaemonHandler(c, receiveMode)) {
150 setup(ns, daemonHandler);
151
152 synchronized (this) {
153 try {
154 Shutdown.waitForShutdown();
155 } catch (InterruptedException ignored) {
156 }
157 }
158 }
159 }
160
161 private static void setup(final Namespace ns, final DaemonHandler daemonHandler) throws CommandException {
162 final Channel inheritedChannel;
163 try {
164 inheritedChannel = System.inheritedChannel();
165 if (inheritedChannel instanceof ServerSocketChannel serverChannel) {
166 logger.info("Using inherited socket: " + serverChannel.getLocalAddress());
167 daemonHandler.runSocket(serverChannel);
168 }
169 } catch (IOException e) {
170 throw new IOErrorException("Failed to use inherited socket", e);
171 }
172
173 final var socketFile = ns.<File>get("socket");
174 if (socketFile != null) {
175 final var address = UnixDomainSocketAddress.of(socketFile.toPath());
176 final var serverChannel = IOUtils.bindSocket(address);
177 daemonHandler.runSocket(serverChannel);
178 }
179
180 final var tcpAddress = ns.getString("tcp");
181 if (tcpAddress != null) {
182 final var address = IOUtils.parseInetSocketAddress(tcpAddress);
183 final var serverChannel = IOUtils.bindSocket(address);
184 daemonHandler.runSocket(serverChannel);
185 }
186
187 final var httpAddress = ns.getString("http");
188 if (httpAddress != null) {
189 final var address = IOUtils.parseInetSocketAddress(httpAddress);
190 daemonHandler.runHttp(address);
191 }
192
193 final var isDbusSystem = Boolean.TRUE.equals(ns.getBoolean("dbus-system"));
194 if (isDbusSystem) {
195 daemonHandler.runDbus(true);
196 }
197
198 final var isDbusSession = Boolean.TRUE.equals(ns.getBoolean("dbus"));
199 if (isDbusSession || (
200 !isDbusSystem
201 && socketFile == null
202 && tcpAddress == null
203 && httpAddress == null
204 && !(inheritedChannel instanceof ServerSocketChannel)
205 )) {
206 daemonHandler.runDbus(false);
207 }
208 }
209
210 private void addDefaultReceiveHandler(Manager m, OutputWriter outputWriter, final boolean isWeakListener) {
211 final var handler = switch (outputWriter) {
212 case PlainTextWriter writer -> new ReceiveMessageHandler(m, writer);
213 case JsonWriter writer -> new JsonReceiveMessageHandler(m, writer);
214 case null -> Manager.ReceiveMessageHandler.EMPTY;
215 };
216 m.addReceiveHandler(handler, isWeakListener);
217 }
218
219 private static abstract class DaemonHandler implements AutoCloseable {
220
221 protected final ReceiveMode receiveMode;
222 protected final List<AutoCloseable> closeables = new ArrayList<>();
223
224 private static final AtomicInteger threadNumber = new AtomicInteger(0);
225
226 public DaemonHandler(final ReceiveMode receiveMode) {
227 this.receiveMode = receiveMode;
228 }
229
230 public abstract void runSocket(ServerSocketChannel serverChannel) throws CommandException;
231
232 public abstract void runDbus(boolean isDbusSystem) throws CommandException;
233
234 public abstract void runHttp(InetSocketAddress address) throws CommandException;
235
236 protected void runSocket(final ServerSocketChannel serverChannel, Consumer<SocketChannel> socketHandler) {
237 final List<AutoCloseable> channels = new ArrayList<>();
238 final var thread = Thread.ofPlatform().name("daemon-listener").start(() -> {
239 try (final var executor = Executors.newCachedThreadPool()) {
240 while (true) {
241 final var connectionId = threadNumber.getAndIncrement();
242 final SocketChannel channel;
243 final String clientString;
244 try {
245 channel = serverChannel.accept();
246 clientString = channel.getRemoteAddress() + " " + IOUtils.getUnixDomainPrincipal(channel);
247 logger.info("Accepted new client connection {}: {}", connectionId, clientString);
248 } catch (ClosedChannelException ignored) {
249 logger.trace("Listening socket has been closed");
250 break;
251 } catch (IOException e) {
252 logger.error("Failed to accept new socket connection", e);
253 break;
254 }
255 channels.add(channel);
256 executor.submit(() -> {
257 try (final var c = channel) {
258 socketHandler.accept(c);
259 } catch (IOException e) {
260 logger.warn("Failed to close channel", e);
261 } catch (Throwable e) {
262 logger.warn("Connection handler failed, closing connection", e);
263 }
264 logger.info("Connection {} closed: {}", connectionId, clientString);
265 channels.remove(channel);
266 });
267 }
268 }
269 });
270 closeables.add(() -> {
271 serverChannel.close();
272 for (final var c : new ArrayList<>(channels)) {
273 c.close();
274 }
275 thread.join();
276 });
277 }
278
279 protected SignalJsonRpcDispatcherHandler getSignalJsonRpcDispatcherHandler(final SocketChannel c) {
280 final var lineSupplier = IOUtils.getLineSupplier(Channels.newReader(c, StandardCharsets.UTF_8));
281 final var jsonOutputWriter = new JsonWriterImpl(Channels.newWriter(c, StandardCharsets.UTF_8));
282
283 return new SignalJsonRpcDispatcherHandler(jsonOutputWriter,
284 lineSupplier,
285 receiveMode == ReceiveMode.MANUAL);
286 }
287
288 protected Thread exportDbusObject(final DBusConnection conn, final String objectPath, final Manager m) {
289 final var signal = new DbusSignalImpl(m, conn, objectPath, receiveMode != ReceiveMode.ON_START);
290 closeables.add(signal);
291
292 return Thread.ofPlatform().name("dbus-init-" + m.getSelfNumber()).start(signal::initObjects);
293 }
294
295 protected void runDbus(
296 final boolean isDbusSystem, MultiAccountDaemonHandler.DbusRunner dbusRunner
297 ) throws CommandException {
298 DBusConnection.DBusBusType busType;
299 if (isDbusSystem) {
300 busType = DBusConnection.DBusBusType.SYSTEM;
301 } else {
302 busType = DBusConnection.DBusBusType.SESSION;
303 }
304 DBusConnection conn;
305 try {
306 conn = DBusConnectionBuilder.forType(busType).build();
307 dbusRunner.run(conn, DbusConfig.getObjectPath());
308 } catch (DBusException e) {
309 throw new UnexpectedErrorException("Dbus command failed: " + e.getMessage(), e);
310 } catch (UnsupportedOperationException e) {
311 throw new UserErrorException("Failed to connect to Dbus: " + e.getMessage(), e);
312 }
313
314 try {
315 conn.requestBusName(DbusConfig.getBusname());
316 } catch (DBusException e) {
317 throw new UnexpectedErrorException(
318 "Dbus command failed, maybe signal-cli dbus daemon is already running: " + e.getMessage(),
319 e);
320 }
321 closeables.add(conn);
322
323 logger.info("DBus daemon running on {} bus: {}", busType, DbusConfig.getBusname());
324 }
325
326 @Override
327 public void close() {
328 for (final var closeable : new ArrayList<>(closeables)) {
329 try {
330 closeable.close();
331 } catch (Exception e) {
332 logger.warn("Failed to close daemon handler", e);
333 }
334 }
335 closeables.clear();
336 }
337 }
338
339 private static final class SingleAccountDaemonHandler extends DaemonHandler {
340
341 private final Manager m;
342
343 private SingleAccountDaemonHandler(final Manager m, final ReceiveMode receiveMode) {
344 super(receiveMode);
345 this.m = m;
346 }
347
348 @Override
349 public void runSocket(final ServerSocketChannel serverChannel) {
350 runSocket(serverChannel, channel -> {
351 final var handler = getSignalJsonRpcDispatcherHandler(channel);
352 handler.handleConnection(m);
353 });
354 }
355
356 @Override
357 public void runDbus(final boolean isDbusSystem) throws CommandException {
358 runDbus(isDbusSystem, (conn, objectPath) -> {
359 try {
360 exportDbusObject(conn, objectPath, m).join();
361 } catch (InterruptedException ignored) {
362 }
363 });
364 }
365
366 @Override
367 public void runHttp(InetSocketAddress address) throws CommandException {
368 final var handler = new HttpServerHandler(address, m);
369 try {
370 handler.init();
371 } catch (IOException ex) {
372 throw new IOErrorException("Failed to initialize HTTP Server", ex);
373 }
374 this.closeables.add(handler);
375 }
376 }
377
378 private static final class MultiAccountDaemonHandler extends DaemonHandler {
379
380 private final MultiAccountManager c;
381
382 private MultiAccountDaemonHandler(final MultiAccountManager c, final ReceiveMode receiveMode) {
383 super(receiveMode);
384 this.c = c;
385 }
386
387 public void runSocket(final ServerSocketChannel serverChannel) {
388 runSocket(serverChannel, channel -> {
389 final var handler = getSignalJsonRpcDispatcherHandler(channel);
390 handler.handleConnection(c);
391 });
392 }
393
394 public void runDbus(final boolean isDbusSystem) throws CommandException {
395 runDbus(isDbusSystem, (connection, objectPath) -> {
396 final var signalControl = new DbusSignalControlImpl(c, objectPath);
397 connection.exportObject(signalControl);
398
399 c.addOnManagerAddedHandler(m -> {
400 final var thread = exportManager(connection, m);
401 try {
402 thread.join();
403 } catch (InterruptedException ignored) {
404 }
405 });
406 c.addOnManagerRemovedHandler(m -> {
407 final var path = DbusConfig.getObjectPath(m.getSelfNumber());
408 try {
409 final var object = connection.getExportedObject(null, path);
410 if (object instanceof DbusSignalImpl dbusSignal) {
411 dbusSignal.close();
412 closeables.remove(dbusSignal);
413 }
414 } catch (DBusException ignored) {
415 }
416 });
417
418 final var initThreads = c.getManagers().stream().map(m -> exportManager(connection, m)).toList();
419
420 for (var t : initThreads) {
421 try {
422 t.join();
423 } catch (InterruptedException ignored) {
424 }
425 }
426 });
427 }
428
429 @Override
430 public void runHttp(final InetSocketAddress address) throws CommandException {
431 final var handler = new HttpServerHandler(address, c);
432 try {
433 handler.init();
434 } catch (IOException ex) {
435 throw new IOErrorException("Failed to initialize HTTP Server", ex);
436 }
437 this.closeables.add(handler);
438 }
439
440 private Thread exportManager(
441 final DBusConnection conn, final Manager m
442 ) {
443 final var objectPath = DbusConfig.getObjectPath(m.getSelfNumber());
444 return exportDbusObject(conn, objectPath, m);
445 }
446
447 interface DbusRunner {
448
449 void run(DBusConnection connection, String objectPath) throws DBusException;
450 }
451 }
452 }