]> nmode's Git Repositories - signal-cli/blob - src/main/java/org/asamk/signal/commands/DaemonCommand.java
Improve behavior with synchronous and asynchronous receivers
[signal-cli] / src / main / java / org / asamk / signal / commands / DaemonCommand.java
1 package org.asamk.signal.commands;
2
3 import net.sourceforge.argparse4j.impl.Arguments;
4 import net.sourceforge.argparse4j.inf.Namespace;
5 import net.sourceforge.argparse4j.inf.Subparser;
6
7 import org.asamk.signal.DbusConfig;
8 import org.asamk.signal.OutputType;
9 import org.asamk.signal.ReceiveMessageHandler;
10 import org.asamk.signal.commands.exceptions.CommandException;
11 import org.asamk.signal.commands.exceptions.IOErrorException;
12 import org.asamk.signal.commands.exceptions.UnexpectedErrorException;
13 import org.asamk.signal.commands.exceptions.UserErrorException;
14 import org.asamk.signal.dbus.DbusSignalControlImpl;
15 import org.asamk.signal.dbus.DbusSignalImpl;
16 import org.asamk.signal.json.JsonReceiveMessageHandler;
17 import org.asamk.signal.jsonrpc.SignalJsonRpcDispatcherHandler;
18 import org.asamk.signal.manager.Manager;
19 import org.asamk.signal.manager.MultiAccountManager;
20 import org.asamk.signal.manager.api.ReceiveConfig;
21 import org.asamk.signal.output.JsonWriter;
22 import org.asamk.signal.output.JsonWriterImpl;
23 import org.asamk.signal.output.OutputWriter;
24 import org.asamk.signal.output.PlainTextWriter;
25 import org.asamk.signal.util.IOUtils;
26 import org.freedesktop.dbus.connections.impl.DBusConnection;
27 import org.freedesktop.dbus.connections.impl.DBusConnectionBuilder;
28 import org.freedesktop.dbus.exceptions.DBusException;
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
31
32 import java.io.File;
33 import java.io.IOException;
34 import java.net.UnixDomainSocketAddress;
35 import java.nio.channels.Channel;
36 import java.nio.channels.Channels;
37 import java.nio.channels.ServerSocketChannel;
38 import java.nio.channels.SocketChannel;
39 import java.nio.charset.StandardCharsets;
40 import java.util.List;
41 import java.util.concurrent.atomic.AtomicInteger;
42 import java.util.function.Consumer;
43
44 public class DaemonCommand implements MultiLocalCommand, LocalCommand {
45
46 private final static Logger logger = LoggerFactory.getLogger(DaemonCommand.class);
47
48 @Override
49 public String getName() {
50 return "daemon";
51 }
52
53 @Override
54 public void attachToSubparser(final Subparser subparser) {
55 final var defaultSocketPath = new File(new File(IOUtils.getRuntimeDir(), "signal-cli"), "socket");
56 subparser.help("Run in daemon mode and provide an experimental dbus or JSON-RPC interface.");
57 subparser.addArgument("--dbus")
58 .action(Arguments.storeTrue())
59 .help("Expose a DBus interface on the user bus (the default, if no other options are given).");
60 subparser.addArgument("--dbus-system", "--system")
61 .action(Arguments.storeTrue())
62 .help("Expose a DBus interface on the system bus.");
63 subparser.addArgument("--socket")
64 .nargs("?")
65 .type(File.class)
66 .setConst(defaultSocketPath)
67 .help("Expose a JSON-RPC interface on a UNIX socket (default $XDG_RUNTIME_DIR/signal-cli/socket).");
68 subparser.addArgument("--tcp")
69 .nargs("?")
70 .setConst("localhost:7583")
71 .help("Expose a JSON-RPC interface on a TCP socket (default localhost:7583).");
72 subparser.addArgument("--no-receive-stdout")
73 .help("Don’t print received messages to stdout.")
74 .action(Arguments.storeTrue());
75 subparser.addArgument("--receive-mode")
76 .help("Specify when to start receiving messages.")
77 .type(Arguments.enumStringType(ReceiveMode.class))
78 .setDefault(ReceiveMode.ON_START);
79 subparser.addArgument("--ignore-attachments")
80 .help("Don’t download attachments of received messages.")
81 .action(Arguments.storeTrue());
82 subparser.addArgument("--ignore-stories")
83 .help("Don’t receive story messages from the server.")
84 .action(Arguments.storeTrue());
85 subparser.addArgument("--send-read-receipts")
86 .help("Send read receipts for all incoming data messages (in addition to the default delivery receipts)")
87 .action(Arguments.storeTrue());
88 }
89
90 @Override
91 public List<OutputType> getSupportedOutputTypes() {
92 return List.of(OutputType.PLAIN_TEXT, OutputType.JSON);
93 }
94
95 @Override
96 public void handleCommand(
97 final Namespace ns, final Manager m, final OutputWriter outputWriter
98 ) throws CommandException {
99 logger.info("Starting daemon in single-account mode for " + m.getSelfNumber());
100 final var noReceiveStdOut = Boolean.TRUE.equals(ns.getBoolean("no-receive-stdout"));
101 final var receiveMode = ns.<ReceiveMode>get("receive-mode");
102 final var ignoreAttachments = Boolean.TRUE.equals(ns.getBoolean("ignore-attachments"));
103 final var ignoreStories = Boolean.TRUE.equals(ns.getBoolean("ignore-stories"));
104 final var sendReadReceipts = Boolean.TRUE.equals(ns.getBoolean("send-read-receipts"));
105
106 m.setReceiveConfig(new ReceiveConfig(ignoreAttachments, ignoreStories, sendReadReceipts));
107 addDefaultReceiveHandler(m, noReceiveStdOut ? null : outputWriter, receiveMode != ReceiveMode.ON_START);
108
109 final Channel inheritedChannel;
110 try {
111 inheritedChannel = System.inheritedChannel();
112 if (inheritedChannel instanceof ServerSocketChannel serverChannel) {
113 logger.info("Using inherited socket: " + serverChannel.getLocalAddress());
114 runSocketSingleAccount(m, serverChannel, receiveMode == ReceiveMode.MANUAL);
115 }
116 } catch (IOException e) {
117 throw new IOErrorException("Failed to use inherited socket", e);
118 }
119 final var socketFile = ns.<File>get("socket");
120 if (socketFile != null) {
121 final var address = UnixDomainSocketAddress.of(socketFile.toPath());
122 final var serverChannel = IOUtils.bindSocket(address);
123 runSocketSingleAccount(m, serverChannel, receiveMode == ReceiveMode.MANUAL);
124 }
125 final var tcpAddress = ns.getString("tcp");
126 if (tcpAddress != null) {
127 final var address = IOUtils.parseInetSocketAddress(tcpAddress);
128 final var serverChannel = IOUtils.bindSocket(address);
129 runSocketSingleAccount(m, serverChannel, receiveMode == ReceiveMode.MANUAL);
130 }
131 final var isDbusSystem = Boolean.TRUE.equals(ns.getBoolean("dbus-system"));
132 if (isDbusSystem) {
133 runDbusSingleAccount(m, true, receiveMode != ReceiveMode.ON_START);
134 }
135 final var isDbusSession = Boolean.TRUE.equals(ns.getBoolean("dbus"));
136 if (isDbusSession || (
137 !isDbusSystem
138 && socketFile == null
139 && tcpAddress == null
140 && !(inheritedChannel instanceof ServerSocketChannel)
141 )) {
142 runDbusSingleAccount(m, false, receiveMode != ReceiveMode.ON_START);
143 }
144
145 m.addClosedListener(() -> {
146 synchronized (this) {
147 notifyAll();
148 }
149 });
150
151 synchronized (this) {
152 try {
153 wait();
154 } catch (InterruptedException ignored) {
155 }
156 }
157 }
158
159 @Override
160 public void handleCommand(
161 final Namespace ns, final MultiAccountManager c, final OutputWriter outputWriter
162 ) throws CommandException {
163 logger.info("Starting daemon in multi-account mode");
164 final var noReceiveStdOut = Boolean.TRUE.equals(ns.getBoolean("no-receive-stdout"));
165 final var receiveMode = ns.<ReceiveMode>get("receive-mode");
166 final var ignoreAttachments = Boolean.TRUE.equals(ns.getBoolean("ignore-attachments"));
167 final var ignoreStories = Boolean.TRUE.equals(ns.getBoolean("ignore-stories"));
168 final var sendReadReceipts = Boolean.TRUE.equals(ns.getBoolean("send-read-receipts"));
169
170 final var receiveConfig = new ReceiveConfig(ignoreAttachments, ignoreStories, sendReadReceipts);
171 c.getManagers().forEach(m -> {
172 m.setReceiveConfig(receiveConfig);
173 addDefaultReceiveHandler(m, noReceiveStdOut ? null : outputWriter, receiveMode != ReceiveMode.ON_START);
174 });
175 c.addOnManagerAddedHandler(m -> {
176 m.setReceiveConfig(receiveConfig);
177 addDefaultReceiveHandler(m, noReceiveStdOut ? null : outputWriter, receiveMode != ReceiveMode.ON_START);
178 });
179
180 final Channel inheritedChannel;
181 try {
182 inheritedChannel = System.inheritedChannel();
183 if (inheritedChannel instanceof ServerSocketChannel serverChannel) {
184 logger.info("Using inherited socket: " + serverChannel.getLocalAddress());
185 runSocketMultiAccount(c, serverChannel, receiveMode == ReceiveMode.MANUAL);
186 }
187 } catch (IOException e) {
188 throw new IOErrorException("Failed to use inherited socket", e);
189 }
190 final var socketFile = ns.<File>get("socket");
191 if (socketFile != null) {
192 final var address = UnixDomainSocketAddress.of(socketFile.toPath());
193 final var serverChannel = IOUtils.bindSocket(address);
194 runSocketMultiAccount(c, serverChannel, receiveMode == ReceiveMode.MANUAL);
195 }
196 final var tcpAddress = ns.getString("tcp");
197 if (tcpAddress != null) {
198 final var address = IOUtils.parseInetSocketAddress(tcpAddress);
199 final var serverChannel = IOUtils.bindSocket(address);
200 runSocketMultiAccount(c, serverChannel, receiveMode == ReceiveMode.MANUAL);
201 }
202 final var isDbusSystem = Boolean.TRUE.equals(ns.getBoolean("dbus-system"));
203 if (isDbusSystem) {
204 runDbusMultiAccount(c, receiveMode != ReceiveMode.ON_START, true);
205 }
206 final var isDbusSession = Boolean.TRUE.equals(ns.getBoolean("dbus"));
207 if (isDbusSession || (
208 !isDbusSystem
209 && socketFile == null
210 && tcpAddress == null
211 && !(inheritedChannel instanceof ServerSocketChannel)
212 )) {
213 runDbusMultiAccount(c, receiveMode != ReceiveMode.ON_START, false);
214 }
215
216 synchronized (this) {
217 try {
218 wait();
219 } catch (InterruptedException ignored) {
220 }
221 }
222 }
223
224 private void addDefaultReceiveHandler(Manager m, OutputWriter outputWriter, final boolean isWeakListener) {
225 final var handler = outputWriter instanceof JsonWriter o
226 ? new JsonReceiveMessageHandler(m, o)
227 : outputWriter instanceof PlainTextWriter o
228 ? new ReceiveMessageHandler(m, o)
229 : Manager.ReceiveMessageHandler.EMPTY;
230 m.addReceiveHandler(handler, isWeakListener);
231 }
232
233 private void runSocketSingleAccount(
234 final Manager m, final ServerSocketChannel serverChannel, final boolean noReceiveOnStart
235 ) {
236 runSocket(serverChannel, channel -> {
237 final var handler = getSignalJsonRpcDispatcherHandler(channel, noReceiveOnStart);
238 handler.handleConnection(m);
239 });
240 }
241
242 private void runSocketMultiAccount(
243 final MultiAccountManager c, final ServerSocketChannel serverChannel, final boolean noReceiveOnStart
244 ) {
245 runSocket(serverChannel, channel -> {
246 final var handler = getSignalJsonRpcDispatcherHandler(channel, noReceiveOnStart);
247 handler.handleConnection(c);
248 });
249 }
250
251 private static final AtomicInteger threadNumber = new AtomicInteger(0);
252
253 private void runSocket(final ServerSocketChannel serverChannel, Consumer<SocketChannel> socketHandler) {
254 final var thread = new Thread(() -> {
255 while (true) {
256 final var connectionId = threadNumber.getAndIncrement();
257 final SocketChannel channel;
258 final String clientString;
259 try {
260 channel = serverChannel.accept();
261 clientString = channel.getRemoteAddress() + " " + IOUtils.getUnixDomainPrincipal(channel);
262 logger.info("Accepted new client connection {}: {}", connectionId, clientString);
263 } catch (IOException e) {
264 logger.error("Failed to accept new socket connection", e);
265 synchronized (this) {
266 notifyAll();
267 }
268 break;
269 }
270 final var connectionThread = new Thread(() -> {
271 try (final var c = channel) {
272 socketHandler.accept(c);
273 } catch (IOException e) {
274 logger.warn("Failed to close channel", e);
275 } catch (Throwable e) {
276 logger.warn("Connection handler failed, closing connection", e);
277 }
278 logger.info("Connection {} closed: {}", connectionId, clientString);
279 });
280 connectionThread.setName("daemon-connection-" + connectionId);
281 connectionThread.start();
282 }
283 });
284 thread.setName("daemon-listener");
285 thread.start();
286 }
287
288 private SignalJsonRpcDispatcherHandler getSignalJsonRpcDispatcherHandler(
289 final SocketChannel c, final boolean noReceiveOnStart
290 ) {
291 final var lineSupplier = IOUtils.getLineSupplier(Channels.newReader(c, StandardCharsets.UTF_8));
292 final var jsonOutputWriter = new JsonWriterImpl(Channels.newWriter(c, StandardCharsets.UTF_8));
293
294 return new SignalJsonRpcDispatcherHandler(jsonOutputWriter, lineSupplier, noReceiveOnStart);
295 }
296
297 private void runDbusSingleAccount(
298 final Manager m, final boolean isDbusSystem, final boolean noReceiveOnStart
299 ) throws CommandException {
300 runDbus(isDbusSystem, (conn, objectPath) -> {
301 try {
302 exportDbusObject(conn, objectPath, m, noReceiveOnStart).join();
303 } catch (InterruptedException ignored) {
304 }
305 });
306 }
307
308 private void runDbusMultiAccount(
309 final MultiAccountManager c, final boolean noReceiveOnStart, final boolean isDbusSystem
310 ) throws CommandException {
311 runDbus(isDbusSystem, (connection, objectPath) -> {
312 final var signalControl = new DbusSignalControlImpl(c, objectPath);
313 connection.exportObject(signalControl);
314
315 c.addOnManagerAddedHandler(m -> {
316 final var thread = exportMultiAccountManager(connection, m, noReceiveOnStart);
317 try {
318 thread.join();
319 } catch (InterruptedException ignored) {
320 }
321 });
322 c.addOnManagerRemovedHandler(m -> {
323 final var path = DbusConfig.getObjectPath(m.getSelfNumber());
324 try {
325 final var object = connection.getExportedObject(null, path);
326 if (object instanceof DbusSignalImpl dbusSignal) {
327 dbusSignal.close();
328 }
329 } catch (DBusException ignored) {
330 }
331 });
332
333 final var initThreads = c.getManagers()
334 .stream()
335 .map(m -> exportMultiAccountManager(connection, m, noReceiveOnStart))
336 .toList();
337
338 for (var t : initThreads) {
339 try {
340 t.join();
341 } catch (InterruptedException ignored) {
342 }
343 }
344 });
345 }
346
347 private void runDbus(
348 final boolean isDbusSystem, DbusRunner dbusRunner
349 ) throws CommandException {
350 DBusConnection.DBusBusType busType;
351 if (isDbusSystem) {
352 busType = DBusConnection.DBusBusType.SYSTEM;
353 } else {
354 busType = DBusConnection.DBusBusType.SESSION;
355 }
356 DBusConnection conn;
357 try {
358 conn = DBusConnectionBuilder.forType(busType).build();
359 dbusRunner.run(conn, DbusConfig.getObjectPath());
360 } catch (DBusException e) {
361 throw new UnexpectedErrorException("Dbus command failed: " + e.getMessage(), e);
362 } catch (UnsupportedOperationException e) {
363 throw new UserErrorException("Failed to connect to Dbus: " + e.getMessage(), e);
364 }
365
366 try {
367 conn.requestBusName(DbusConfig.getBusname());
368 } catch (DBusException e) {
369 throw new UnexpectedErrorException("Dbus command failed, maybe signal-cli dbus daemon is already running: "
370 + e.getMessage(), e);
371 }
372
373 logger.info("DBus daemon running on {} bus: {}", busType, DbusConfig.getBusname());
374 }
375
376 private Thread exportMultiAccountManager(
377 final DBusConnection conn, final Manager m, final boolean noReceiveOnStart
378 ) {
379 final var objectPath = DbusConfig.getObjectPath(m.getSelfNumber());
380 return exportDbusObject(conn, objectPath, m, noReceiveOnStart);
381 }
382
383 private Thread exportDbusObject(
384 final DBusConnection conn, final String objectPath, final Manager m, final boolean noReceiveOnStart
385 ) {
386 final var signal = new DbusSignalImpl(m, conn, objectPath, noReceiveOnStart);
387 final var initThread = new Thread(signal::initObjects);
388 initThread.setName("dbus-init");
389 initThread.start();
390
391 return initThread;
392 }
393
394 interface DbusRunner {
395
396 void run(DBusConnection connection, String objectPath) throws DBusException;
397 }
398 }