]> nmode's Git Repositories - signal-cli/blob - src/main/java/org/asamk/signal/commands/DaemonCommand.java
Prevent a stale jsonrpc connection from interfering with message receiving
[signal-cli] / src / main / java / org / asamk / signal / commands / DaemonCommand.java
1 package org.asamk.signal.commands;
2
3 import net.sourceforge.argparse4j.impl.Arguments;
4 import net.sourceforge.argparse4j.inf.Namespace;
5 import net.sourceforge.argparse4j.inf.Subparser;
6
7 import org.asamk.signal.DbusConfig;
8 import org.asamk.signal.OutputType;
9 import org.asamk.signal.ReceiveMessageHandler;
10 import org.asamk.signal.commands.exceptions.CommandException;
11 import org.asamk.signal.commands.exceptions.IOErrorException;
12 import org.asamk.signal.commands.exceptions.UnexpectedErrorException;
13 import org.asamk.signal.dbus.DbusSignalControlImpl;
14 import org.asamk.signal.dbus.DbusSignalImpl;
15 import org.asamk.signal.json.JsonReceiveMessageHandler;
16 import org.asamk.signal.jsonrpc.SignalJsonRpcDispatcherHandler;
17 import org.asamk.signal.manager.Manager;
18 import org.asamk.signal.manager.MultiAccountManager;
19 import org.asamk.signal.output.JsonWriter;
20 import org.asamk.signal.output.JsonWriterImpl;
21 import org.asamk.signal.output.OutputWriter;
22 import org.asamk.signal.output.PlainTextWriter;
23 import org.asamk.signal.util.IOUtils;
24 import org.freedesktop.dbus.connections.impl.DBusConnection;
25 import org.freedesktop.dbus.exceptions.DBusException;
26 import org.slf4j.Logger;
27 import org.slf4j.LoggerFactory;
28
29 import java.io.File;
30 import java.io.IOException;
31 import java.net.UnixDomainSocketAddress;
32 import java.nio.channels.Channel;
33 import java.nio.channels.Channels;
34 import java.nio.channels.ServerSocketChannel;
35 import java.nio.channels.SocketChannel;
36 import java.nio.charset.StandardCharsets;
37 import java.util.List;
38 import java.util.concurrent.atomic.AtomicInteger;
39 import java.util.function.Consumer;
40
41 public class DaemonCommand implements MultiLocalCommand, LocalCommand {
42
43 private final static Logger logger = LoggerFactory.getLogger(DaemonCommand.class);
44
45 @Override
46 public String getName() {
47 return "daemon";
48 }
49
50 @Override
51 public void attachToSubparser(final Subparser subparser) {
52 final var defaultSocketPath = new File(new File(IOUtils.getRuntimeDir(), "signal-cli"), "socket");
53 subparser.help("Run in daemon mode and provide an experimental dbus or JSON-RPC interface.");
54 subparser.addArgument("--dbus")
55 .action(Arguments.storeTrue())
56 .help("Expose a DBus interface on the user bus (the default, if no other options are given).");
57 subparser.addArgument("--dbus-system", "--system")
58 .action(Arguments.storeTrue())
59 .help("Expose a DBus interface on the system bus.");
60 subparser.addArgument("--socket")
61 .nargs("?")
62 .type(File.class)
63 .setConst(defaultSocketPath)
64 .help("Expose a JSON-RPC interface on a UNIX socket (default $XDG_RUNTIME_DIR/signal-cli/socket).");
65 subparser.addArgument("--tcp")
66 .nargs("?")
67 .setConst("localhost:7583")
68 .help("Expose a JSON-RPC interface on a TCP socket (default localhost:7583).");
69 subparser.addArgument("--no-receive-stdout")
70 .help("Don’t print received messages to stdout.")
71 .action(Arguments.storeTrue());
72 subparser.addArgument("--receive-mode")
73 .help("Specify when to start receiving messages.")
74 .type(Arguments.enumStringType(ReceiveMode.class))
75 .setDefault(ReceiveMode.ON_START);
76 subparser.addArgument("--ignore-attachments")
77 .help("Don’t download attachments of received messages.")
78 .action(Arguments.storeTrue());
79 }
80
81 @Override
82 public List<OutputType> getSupportedOutputTypes() {
83 return List.of(OutputType.PLAIN_TEXT, OutputType.JSON);
84 }
85
86 @Override
87 public void handleCommand(
88 final Namespace ns, final Manager m, final OutputWriter outputWriter
89 ) throws CommandException {
90 logger.info("Starting daemon in single-account mode for " + m.getSelfNumber());
91 final var noReceiveStdOut = Boolean.TRUE.equals(ns.getBoolean("no-receive-stdout"));
92 final var receiveMode = ns.<ReceiveMode>get("receive-mode");
93 final var ignoreAttachments = Boolean.TRUE.equals(ns.getBoolean("ignore-attachments"));
94
95 m.setIgnoreAttachments(ignoreAttachments);
96 addDefaultReceiveHandler(m, noReceiveStdOut ? null : outputWriter, receiveMode != ReceiveMode.ON_START);
97
98 final Channel inheritedChannel;
99 try {
100 inheritedChannel = System.inheritedChannel();
101 if (inheritedChannel instanceof ServerSocketChannel serverChannel) {
102 logger.info("Using inherited socket: " + serverChannel.getLocalAddress());
103 runSocketSingleAccount(m, serverChannel, receiveMode == ReceiveMode.MANUAL);
104 }
105 } catch (IOException e) {
106 throw new IOErrorException("Failed to use inherited socket", e);
107 }
108 final var socketFile = ns.<File>get("socket");
109 if (socketFile != null) {
110 final var address = UnixDomainSocketAddress.of(socketFile.toPath());
111 final var serverChannel = IOUtils.bindSocket(address);
112 runSocketSingleAccount(m, serverChannel, receiveMode == ReceiveMode.MANUAL);
113 }
114 final var tcpAddress = ns.getString("tcp");
115 if (tcpAddress != null) {
116 final var address = IOUtils.parseInetSocketAddress(tcpAddress);
117 final var serverChannel = IOUtils.bindSocket(address);
118 runSocketSingleAccount(m, serverChannel, receiveMode == ReceiveMode.MANUAL);
119 }
120 final var isDbusSystem = Boolean.TRUE.equals(ns.getBoolean("dbus-system"));
121 if (isDbusSystem) {
122 runDbusSingleAccount(m, true, receiveMode != ReceiveMode.ON_START);
123 }
124 final var isDbusSession = Boolean.TRUE.equals(ns.getBoolean("dbus"));
125 if (isDbusSession || (
126 !isDbusSystem
127 && socketFile == null
128 && tcpAddress == null
129 && !(inheritedChannel instanceof ServerSocketChannel)
130 )) {
131 runDbusSingleAccount(m, false, receiveMode != ReceiveMode.ON_START);
132 }
133
134 m.addClosedListener(() -> {
135 synchronized (this) {
136 notifyAll();
137 }
138 });
139
140 synchronized (this) {
141 try {
142 wait();
143 } catch (InterruptedException ignored) {
144 }
145 }
146 }
147
148 @Override
149 public void handleCommand(
150 final Namespace ns, final MultiAccountManager c, final OutputWriter outputWriter
151 ) throws CommandException {
152 logger.info("Starting daemon in multi-account mode");
153 final var noReceiveStdOut = Boolean.TRUE.equals(ns.getBoolean("no-receive-stdout"));
154 final var receiveMode = ns.<ReceiveMode>get("receive-mode");
155 final var ignoreAttachments = Boolean.TRUE.equals(ns.getBoolean("ignore-attachments"));
156
157 c.getManagers().forEach(m -> {
158 m.setIgnoreAttachments(ignoreAttachments);
159 addDefaultReceiveHandler(m, noReceiveStdOut ? null : outputWriter, receiveMode != ReceiveMode.ON_START);
160 });
161 c.addOnManagerAddedHandler(m -> {
162 m.setIgnoreAttachments(ignoreAttachments);
163 addDefaultReceiveHandler(m, noReceiveStdOut ? null : outputWriter, receiveMode != ReceiveMode.ON_START);
164 });
165
166 final Channel inheritedChannel;
167 try {
168 inheritedChannel = System.inheritedChannel();
169 if (inheritedChannel instanceof ServerSocketChannel serverChannel) {
170 logger.info("Using inherited socket: " + serverChannel.getLocalAddress());
171 runSocketMultiAccount(c, serverChannel, receiveMode == ReceiveMode.MANUAL);
172 }
173 } catch (IOException e) {
174 throw new IOErrorException("Failed to use inherited socket", e);
175 }
176 final var socketFile = ns.<File>get("socket");
177 if (socketFile != null) {
178 final var address = UnixDomainSocketAddress.of(socketFile.toPath());
179 final var serverChannel = IOUtils.bindSocket(address);
180 runSocketMultiAccount(c, serverChannel, receiveMode == ReceiveMode.MANUAL);
181 }
182 final var tcpAddress = ns.getString("tcp");
183 if (tcpAddress != null) {
184 final var address = IOUtils.parseInetSocketAddress(tcpAddress);
185 final var serverChannel = IOUtils.bindSocket(address);
186 runSocketMultiAccount(c, serverChannel, receiveMode == ReceiveMode.MANUAL);
187 }
188 final var isDbusSystem = Boolean.TRUE.equals(ns.getBoolean("dbus-system"));
189 if (isDbusSystem) {
190 runDbusMultiAccount(c, receiveMode != ReceiveMode.ON_START, true);
191 }
192 final var isDbusSession = Boolean.TRUE.equals(ns.getBoolean("dbus"));
193 if (isDbusSession || (
194 !isDbusSystem
195 && socketFile == null
196 && tcpAddress == null
197 && !(inheritedChannel instanceof ServerSocketChannel)
198 )) {
199 runDbusMultiAccount(c, receiveMode != ReceiveMode.ON_START, false);
200 }
201
202 synchronized (this) {
203 try {
204 wait();
205 } catch (InterruptedException ignored) {
206 }
207 }
208 }
209
210 private void addDefaultReceiveHandler(Manager m, OutputWriter outputWriter, final boolean isWeakListener) {
211 final var handler = outputWriter instanceof JsonWriter o
212 ? new JsonReceiveMessageHandler(m, o)
213 : outputWriter instanceof PlainTextWriter o
214 ? new ReceiveMessageHandler(m, o)
215 : Manager.ReceiveMessageHandler.EMPTY;
216 m.addReceiveHandler(handler, isWeakListener);
217 }
218
219 private void runSocketSingleAccount(
220 final Manager m, final ServerSocketChannel serverChannel, final boolean noReceiveOnStart
221 ) {
222 runSocket(serverChannel, channel -> {
223 final var handler = getSignalJsonRpcDispatcherHandler(channel, noReceiveOnStart);
224 handler.handleConnection(m);
225 });
226 }
227
228 private void runSocketMultiAccount(
229 final MultiAccountManager c, final ServerSocketChannel serverChannel, final boolean noReceiveOnStart
230 ) {
231 runSocket(serverChannel, channel -> {
232 final var handler = getSignalJsonRpcDispatcherHandler(channel, noReceiveOnStart);
233 handler.handleConnection(c);
234 });
235 }
236
237 private static final AtomicInteger threadNumber = new AtomicInteger(0);
238
239 private void runSocket(final ServerSocketChannel serverChannel, Consumer<SocketChannel> socketHandler) {
240 final var thread = new Thread(() -> {
241 while (true) {
242 final var connectionId = threadNumber.getAndIncrement();
243 final SocketChannel channel;
244 final String clientString;
245 try {
246 channel = serverChannel.accept();
247 clientString = channel.getRemoteAddress() + " " + IOUtils.getUnixDomainPrincipal(channel);
248 logger.info("Accepted new client connection {}: {}", connectionId, clientString);
249 } catch (IOException e) {
250 logger.error("Failed to accept new socket connection", e);
251 synchronized (this) {
252 notifyAll();
253 }
254 break;
255 }
256 final var connectionThread = new Thread(() -> {
257 try (final var c = channel) {
258 socketHandler.accept(c);
259 } catch (IOException e) {
260 logger.warn("Failed to close channel", e);
261 } catch (Throwable e) {
262 logger.warn("Connection handler failed, closing connection", e);
263 }
264 logger.info("Connection {} closed: {}", connectionId, clientString);
265 });
266 connectionThread.setName("daemon-connection-" + connectionId);
267 connectionThread.start();
268 }
269 });
270 thread.setName("daemon-listener");
271 thread.start();
272 }
273
274 private SignalJsonRpcDispatcherHandler getSignalJsonRpcDispatcherHandler(
275 final SocketChannel c, final boolean noReceiveOnStart
276 ) {
277 final var lineSupplier = IOUtils.getLineSupplier(Channels.newReader(c, StandardCharsets.UTF_8));
278 final var jsonOutputWriter = new JsonWriterImpl(Channels.newWriter(c, StandardCharsets.UTF_8));
279
280 return new SignalJsonRpcDispatcherHandler(jsonOutputWriter, lineSupplier, noReceiveOnStart);
281 }
282
283 private void runDbusSingleAccount(
284 final Manager m, final boolean isDbusSystem, final boolean noReceiveOnStart
285 ) throws UnexpectedErrorException {
286 runDbus(isDbusSystem, (conn, objectPath) -> {
287 try {
288 exportDbusObject(conn, objectPath, m, noReceiveOnStart).join();
289 } catch (InterruptedException ignored) {
290 }
291 });
292 }
293
294 private void runDbusMultiAccount(
295 final MultiAccountManager c, final boolean noReceiveOnStart, final boolean isDbusSystem
296 ) throws UnexpectedErrorException {
297 runDbus(isDbusSystem, (connection, objectPath) -> {
298 final var signalControl = new DbusSignalControlImpl(c, objectPath);
299 connection.exportObject(signalControl);
300
301 c.addOnManagerAddedHandler(m -> {
302 final var thread = exportMultiAccountManager(connection, m, noReceiveOnStart);
303 try {
304 thread.join();
305 } catch (InterruptedException ignored) {
306 }
307 });
308 c.addOnManagerRemovedHandler(m -> {
309 final var path = DbusConfig.getObjectPath(m.getSelfNumber());
310 try {
311 final var object = connection.getExportedObject(null, path);
312 if (object instanceof DbusSignalImpl dbusSignal) {
313 dbusSignal.close();
314 }
315 } catch (DBusException ignored) {
316 }
317 });
318
319 final var initThreads = c.getManagers()
320 .stream()
321 .map(m -> exportMultiAccountManager(connection, m, noReceiveOnStart))
322 .toList();
323
324 for (var t : initThreads) {
325 try {
326 t.join();
327 } catch (InterruptedException ignored) {
328 }
329 }
330 });
331 }
332
333 private void runDbus(
334 final boolean isDbusSystem, DbusRunner dbusRunner
335 ) throws UnexpectedErrorException {
336 DBusConnection.DBusBusType busType;
337 if (isDbusSystem) {
338 busType = DBusConnection.DBusBusType.SYSTEM;
339 } else {
340 busType = DBusConnection.DBusBusType.SESSION;
341 }
342 DBusConnection conn;
343 try {
344 conn = DBusConnection.getConnection(busType);
345 dbusRunner.run(conn, DbusConfig.getObjectPath());
346 } catch (DBusException e) {
347 throw new UnexpectedErrorException("Dbus command failed: " + e.getMessage(), e);
348 }
349
350 try {
351 conn.requestBusName(DbusConfig.getBusname());
352 } catch (DBusException e) {
353 throw new UnexpectedErrorException("Dbus command failed, maybe signal-cli dbus daemon is already running: "
354 + e.getMessage(), e);
355 }
356
357 logger.info("DBus daemon running on {} bus: {}", busType, DbusConfig.getBusname());
358 }
359
360 private Thread exportMultiAccountManager(
361 final DBusConnection conn, final Manager m, final boolean noReceiveOnStart
362 ) {
363 final var objectPath = DbusConfig.getObjectPath(m.getSelfNumber());
364 return exportDbusObject(conn, objectPath, m, noReceiveOnStart);
365 }
366
367 private Thread exportDbusObject(
368 final DBusConnection conn, final String objectPath, final Manager m, final boolean noReceiveOnStart
369 ) {
370 final var signal = new DbusSignalImpl(m, conn, objectPath, noReceiveOnStart);
371 final var initThread = new Thread(signal::initObjects);
372 initThread.setName("dbus-init");
373 initThread.start();
374
375 return initThread;
376 }
377
378 interface DbusRunner {
379
380 void run(DBusConnection connection, String objectPath) throws DBusException;
381 }
382 }