]> nmode's Git Repositories - signal-cli/blob - src/main/java/org/asamk/signal/commands/DaemonCommand.java
aee1330d0b0bb218405f4ca45978fea8fa42ee08
[signal-cli] / src / main / java / org / asamk / signal / commands / DaemonCommand.java
1 package org.asamk.signal.commands;
2
3 import net.sourceforge.argparse4j.impl.Arguments;
4 import net.sourceforge.argparse4j.inf.Namespace;
5 import net.sourceforge.argparse4j.inf.Subparser;
6
7 import org.asamk.signal.DbusConfig;
8 import org.asamk.signal.OutputType;
9 import org.asamk.signal.ReceiveMessageHandler;
10 import org.asamk.signal.commands.exceptions.CommandException;
11 import org.asamk.signal.commands.exceptions.IOErrorException;
12 import org.asamk.signal.commands.exceptions.UnexpectedErrorException;
13 import org.asamk.signal.commands.exceptions.UserErrorException;
14 import org.asamk.signal.dbus.DbusSignalControlImpl;
15 import org.asamk.signal.dbus.DbusSignalImpl;
16 import org.asamk.signal.http.HttpServerHandler;
17 import org.asamk.signal.json.JsonReceiveMessageHandler;
18 import org.asamk.signal.jsonrpc.SignalJsonRpcDispatcherHandler;
19 import org.asamk.signal.manager.Manager;
20 import org.asamk.signal.manager.MultiAccountManager;
21 import org.asamk.signal.output.JsonWriter;
22 import org.asamk.signal.output.JsonWriterImpl;
23 import org.asamk.signal.output.OutputWriter;
24 import org.asamk.signal.output.PlainTextWriter;
25 import org.asamk.signal.util.IOUtils;
26 import org.freedesktop.dbus.connections.impl.DBusConnection;
27 import org.freedesktop.dbus.connections.impl.DBusConnectionBuilder;
28 import org.freedesktop.dbus.exceptions.DBusException;
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
31
32 import java.io.File;
33 import java.io.IOException;
34 import java.net.InetSocketAddress;
35 import java.net.UnixDomainSocketAddress;
36 import java.nio.channels.Channel;
37 import java.nio.channels.Channels;
38 import java.nio.channels.ServerSocketChannel;
39 import java.nio.channels.SocketChannel;
40 import java.nio.charset.StandardCharsets;
41 import java.util.List;
42 import java.util.concurrent.Executors;
43 import java.util.concurrent.atomic.AtomicInteger;
44 import java.util.function.Consumer;
45
46 import static org.asamk.signal.util.CommandUtil.getReceiveConfig;
47
48 public class DaemonCommand implements MultiLocalCommand, LocalCommand {
49
50 private final static Logger logger = LoggerFactory.getLogger(DaemonCommand.class);
51
52 @Override
53 public String getName() {
54 return "daemon";
55 }
56
57 @Override
58 public void attachToSubparser(final Subparser subparser) {
59 final var defaultSocketPath = new File(new File(IOUtils.getRuntimeDir(), "signal-cli"), "socket");
60 subparser.help("Run in daemon mode and provide an experimental dbus or JSON-RPC interface.");
61 subparser.addArgument("--dbus")
62 .action(Arguments.storeTrue())
63 .help("Expose a DBus interface on the user bus (the default, if no other options are given).");
64 subparser.addArgument("--dbus-system", "--system")
65 .action(Arguments.storeTrue())
66 .help("Expose a DBus interface on the system bus.");
67 subparser.addArgument("--socket")
68 .nargs("?")
69 .type(File.class)
70 .setConst(defaultSocketPath)
71 .help("Expose a JSON-RPC interface on a UNIX socket (default $XDG_RUNTIME_DIR/signal-cli/socket).");
72 subparser.addArgument("--tcp")
73 .nargs("?")
74 .setConst("localhost:7583")
75 .help("Expose a JSON-RPC interface on a TCP socket (default localhost:7583).");
76 subparser.addArgument("--http")
77 .nargs("?")
78 .setConst("localhost:8080")
79 .help("Expose a JSON-RPC interface as http endpoint (default localhost:8080).");
80 subparser.addArgument("--no-receive-stdout")
81 .help("Don’t print received messages to stdout.")
82 .action(Arguments.storeTrue());
83 subparser.addArgument("--receive-mode")
84 .help("Specify when to start receiving messages.")
85 .type(Arguments.enumStringType(ReceiveMode.class))
86 .setDefault(ReceiveMode.ON_START);
87 subparser.addArgument("--ignore-attachments")
88 .help("Don’t download attachments of received messages.")
89 .action(Arguments.storeTrue());
90 subparser.addArgument("--ignore-stories")
91 .help("Don’t receive story messages from the server.")
92 .action(Arguments.storeTrue());
93 subparser.addArgument("--send-read-receipts")
94 .help("Send read receipts for all incoming data messages (in addition to the default delivery receipts)")
95 .action(Arguments.storeTrue());
96 }
97
98 @Override
99 public List<OutputType> getSupportedOutputTypes() {
100 return List.of(OutputType.PLAIN_TEXT, OutputType.JSON);
101 }
102
103 @Override
104 public void handleCommand(
105 final Namespace ns, final Manager m, final OutputWriter outputWriter
106 ) throws CommandException {
107 logger.info("Starting daemon in single-account mode for " + m.getSelfNumber());
108 final var noReceiveStdOut = Boolean.TRUE.equals(ns.getBoolean("no-receive-stdout"));
109 final var receiveMode = ns.<ReceiveMode>get("receive-mode");
110 final var receiveConfig = getReceiveConfig(ns);
111
112 m.setReceiveConfig(receiveConfig);
113 addDefaultReceiveHandler(m, noReceiveStdOut ? null : outputWriter, receiveMode != ReceiveMode.ON_START);
114
115 try (final var daemonHandler = new SingleAccountDaemonHandler(m, receiveMode)) {
116 setup(ns, daemonHandler);
117
118 m.addClosedListener(() -> {
119 synchronized (this) {
120 notifyAll();
121 }
122 });
123
124 synchronized (this) {
125 try {
126 wait();
127 } catch (InterruptedException ignored) {
128 }
129 }
130 }
131 }
132
133 @Override
134 public void handleCommand(
135 final Namespace ns, final MultiAccountManager c, final OutputWriter outputWriter
136 ) throws CommandException {
137 logger.info("Starting daemon in multi-account mode");
138 final var noReceiveStdOut = Boolean.TRUE.equals(ns.getBoolean("no-receive-stdout"));
139 final var receiveMode = ns.<ReceiveMode>get("receive-mode");
140 final var receiveConfig = getReceiveConfig(ns);
141 c.getManagers().forEach(m -> {
142 m.setReceiveConfig(receiveConfig);
143 addDefaultReceiveHandler(m, noReceiveStdOut ? null : outputWriter, receiveMode != ReceiveMode.ON_START);
144 });
145 c.addOnManagerAddedHandler(m -> {
146 m.setReceiveConfig(receiveConfig);
147 addDefaultReceiveHandler(m, noReceiveStdOut ? null : outputWriter, receiveMode != ReceiveMode.ON_START);
148 });
149
150 try (final var daemonHandler = new MultiAccountDaemonHandler(c, receiveMode)) {
151 setup(ns, daemonHandler);
152
153 synchronized (this) {
154 try {
155 wait();
156 } catch (InterruptedException ignored) {
157 }
158 }
159 }
160 }
161
162 private static void setup(final Namespace ns, final DaemonHandler daemonHandler) throws CommandException {
163 final Channel inheritedChannel;
164 try {
165 inheritedChannel = System.inheritedChannel();
166 if (inheritedChannel instanceof ServerSocketChannel serverChannel) {
167 logger.info("Using inherited socket: " + serverChannel.getLocalAddress());
168 daemonHandler.runSocket(serverChannel);
169 }
170 } catch (IOException e) {
171 throw new IOErrorException("Failed to use inherited socket", e);
172 }
173
174 final var socketFile = ns.<File>get("socket");
175 if (socketFile != null) {
176 final var address = UnixDomainSocketAddress.of(socketFile.toPath());
177 final var serverChannel = IOUtils.bindSocket(address);
178 daemonHandler.runSocket(serverChannel);
179 }
180
181 final var tcpAddress = ns.getString("tcp");
182 if (tcpAddress != null) {
183 final var address = IOUtils.parseInetSocketAddress(tcpAddress);
184 final var serverChannel = IOUtils.bindSocket(address);
185 daemonHandler.runSocket(serverChannel);
186 }
187
188 final var httpAddress = ns.getString("http");
189 if (httpAddress != null) {
190 final var address = IOUtils.parseInetSocketAddress(httpAddress);
191 daemonHandler.runHttp(address);
192 }
193
194 final var isDbusSystem = Boolean.TRUE.equals(ns.getBoolean("dbus-system"));
195 if (isDbusSystem) {
196 daemonHandler.runDbus(true);
197 }
198
199 final var isDbusSession = Boolean.TRUE.equals(ns.getBoolean("dbus"));
200 if (isDbusSession || (
201 !isDbusSystem
202 && socketFile == null
203 && tcpAddress == null
204 && httpAddress == null
205 && !(inheritedChannel instanceof ServerSocketChannel)
206 )) {
207 daemonHandler.runDbus(false);
208 }
209 }
210
211 private void addDefaultReceiveHandler(Manager m, OutputWriter outputWriter, final boolean isWeakListener) {
212 final var handler = switch (outputWriter) {
213 case PlainTextWriter writer -> new ReceiveMessageHandler(m, writer);
214 case JsonWriter writer -> new JsonReceiveMessageHandler(m, writer);
215 case null -> Manager.ReceiveMessageHandler.EMPTY;
216 };
217 m.addReceiveHandler(handler, isWeakListener);
218 }
219
220 private static abstract class DaemonHandler implements AutoCloseable {
221
222 protected final ReceiveMode receiveMode;
223 private static final AtomicInteger threadNumber = new AtomicInteger(0);
224
225 public DaemonHandler(final ReceiveMode receiveMode) {
226 this.receiveMode = receiveMode;
227 }
228
229 public abstract void runSocket(ServerSocketChannel serverChannel) throws CommandException;
230
231 public abstract void runDbus(boolean isDbusSystem) throws CommandException;
232
233 public abstract void runHttp(InetSocketAddress address) throws CommandException;
234
235 protected void runSocket(final ServerSocketChannel serverChannel, Consumer<SocketChannel> socketHandler) {
236 Thread.ofPlatform().name("daemon-listener").start(() -> {
237 try (final var executor = Executors.newCachedThreadPool()) {
238 while (true) {
239 final var connectionId = threadNumber.getAndIncrement();
240 final SocketChannel channel;
241 final String clientString;
242 try {
243 channel = serverChannel.accept();
244 clientString = channel.getRemoteAddress() + " " + IOUtils.getUnixDomainPrincipal(channel);
245 logger.info("Accepted new client connection {}: {}", connectionId, clientString);
246 } catch (IOException e) {
247 logger.error("Failed to accept new socket connection", e);
248 break;
249 }
250 executor.submit(() -> {
251 try (final var c = channel) {
252 socketHandler.accept(c);
253 } catch (IOException e) {
254 logger.warn("Failed to close channel", e);
255 } catch (Throwable e) {
256 logger.warn("Connection handler failed, closing connection", e);
257 }
258 logger.info("Connection {} closed: {}", connectionId, clientString);
259 });
260 }
261 }
262 });
263 }
264
265 protected SignalJsonRpcDispatcherHandler getSignalJsonRpcDispatcherHandler(final SocketChannel c) {
266 final var lineSupplier = IOUtils.getLineSupplier(Channels.newReader(c, StandardCharsets.UTF_8));
267 final var jsonOutputWriter = new JsonWriterImpl(Channels.newWriter(c, StandardCharsets.UTF_8));
268
269 return new SignalJsonRpcDispatcherHandler(jsonOutputWriter,
270 lineSupplier,
271 receiveMode == ReceiveMode.MANUAL);
272 }
273
274 protected Thread exportDbusObject(final DBusConnection conn, final String objectPath, final Manager m) {
275 final var signal = new DbusSignalImpl(m, conn, objectPath, receiveMode != ReceiveMode.ON_START);
276
277 return Thread.ofPlatform().name("dbus-init-" + m.getSelfNumber()).start(signal::initObjects);
278 }
279
280 protected void runDbus(
281 final boolean isDbusSystem, MultiAccountDaemonHandler.DbusRunner dbusRunner
282 ) throws CommandException {
283 DBusConnection.DBusBusType busType;
284 if (isDbusSystem) {
285 busType = DBusConnection.DBusBusType.SYSTEM;
286 } else {
287 busType = DBusConnection.DBusBusType.SESSION;
288 }
289 DBusConnection conn;
290 try {
291 conn = DBusConnectionBuilder.forType(busType).build();
292 dbusRunner.run(conn, DbusConfig.getObjectPath());
293 } catch (DBusException e) {
294 throw new UnexpectedErrorException("Dbus command failed: " + e.getMessage(), e);
295 } catch (UnsupportedOperationException e) {
296 throw new UserErrorException("Failed to connect to Dbus: " + e.getMessage(), e);
297 }
298
299 try {
300 conn.requestBusName(DbusConfig.getBusname());
301 } catch (DBusException e) {
302 throw new UnexpectedErrorException(
303 "Dbus command failed, maybe signal-cli dbus daemon is already running: " + e.getMessage(),
304 e);
305 }
306
307 logger.info("DBus daemon running on {} bus: {}", busType, DbusConfig.getBusname());
308 }
309
310 @Override
311 public void close() {
312 // TODO
313 }
314 }
315
316 private static final class SingleAccountDaemonHandler extends DaemonHandler {
317
318 private final Manager m;
319
320 private SingleAccountDaemonHandler(final Manager m, final ReceiveMode receiveMode) {
321 super(receiveMode);
322 this.m = m;
323 }
324
325 @Override
326 public void runSocket(final ServerSocketChannel serverChannel) {
327 runSocket(serverChannel, channel -> {
328 final var handler = getSignalJsonRpcDispatcherHandler(channel);
329 handler.handleConnection(m);
330 });
331 }
332
333 @Override
334 public void runDbus(final boolean isDbusSystem) throws CommandException {
335 runDbus(isDbusSystem, (conn, objectPath) -> {
336 try {
337 exportDbusObject(conn, objectPath, m).join();
338 } catch (InterruptedException ignored) {
339 }
340 });
341 }
342
343 @Override
344 public void runHttp(InetSocketAddress address) throws CommandException {
345 final var handler = new HttpServerHandler(address, m);
346 try {
347 handler.init();
348 } catch (IOException ex) {
349 throw new IOErrorException("Failed to initialize HTTP Server", ex);
350 }
351 }
352 }
353
354 private static final class MultiAccountDaemonHandler extends DaemonHandler {
355
356 private final MultiAccountManager c;
357
358 private MultiAccountDaemonHandler(final MultiAccountManager c, final ReceiveMode receiveMode) {
359 super(receiveMode);
360 this.c = c;
361 }
362
363 public void runSocket(final ServerSocketChannel serverChannel) {
364 runSocket(serverChannel, channel -> {
365 final var handler = getSignalJsonRpcDispatcherHandler(channel);
366 handler.handleConnection(c);
367 });
368 }
369
370 public void runDbus(final boolean isDbusSystem) throws CommandException {
371 runDbus(isDbusSystem, (connection, objectPath) -> {
372 final var signalControl = new DbusSignalControlImpl(c, objectPath);
373 connection.exportObject(signalControl);
374
375 c.addOnManagerAddedHandler(m -> {
376 final var thread = exportManager(connection, m);
377 try {
378 thread.join();
379 } catch (InterruptedException ignored) {
380 }
381 });
382 c.addOnManagerRemovedHandler(m -> {
383 final var path = DbusConfig.getObjectPath(m.getSelfNumber());
384 try {
385 final var object = connection.getExportedObject(null, path);
386 if (object instanceof DbusSignalImpl dbusSignal) {
387 dbusSignal.close();
388 }
389 } catch (DBusException ignored) {
390 }
391 });
392
393 final var initThreads = c.getManagers().stream().map(m -> exportManager(connection, m)).toList();
394
395 for (var t : initThreads) {
396 try {
397 t.join();
398 } catch (InterruptedException ignored) {
399 }
400 }
401 });
402 }
403
404 @Override
405 public void runHttp(final InetSocketAddress address) throws CommandException {
406 final var handler = new HttpServerHandler(address, c);
407 try {
408 handler.init();
409 } catch (IOException ex) {
410 throw new IOErrorException("Failed to initialize HTTP Server", ex);
411 }
412 }
413
414 private Thread exportManager(
415 final DBusConnection conn, final Manager m
416 ) {
417 final var objectPath = DbusConfig.getObjectPath(m.getSelfNumber());
418 return exportDbusObject(conn, objectPath, m);
419 }
420
421 interface DbusRunner {
422
423 void run(DBusConnection connection, String objectPath) throws DBusException;
424 }
425 }
426 }