]> nmode's Git Repositories - signal-cli/blob - src/main/java/org/asamk/signal/commands/DaemonCommand.java
Handle rate limit error in JSON-RPC mode
[signal-cli] / src / main / java / org / asamk / signal / commands / DaemonCommand.java
1 package org.asamk.signal.commands;
2
3 import net.sourceforge.argparse4j.impl.Arguments;
4 import net.sourceforge.argparse4j.inf.Namespace;
5 import net.sourceforge.argparse4j.inf.Subparser;
6
7 import org.asamk.signal.DbusConfig;
8 import org.asamk.signal.OutputType;
9 import org.asamk.signal.ReceiveMessageHandler;
10 import org.asamk.signal.commands.exceptions.CommandException;
11 import org.asamk.signal.commands.exceptions.IOErrorException;
12 import org.asamk.signal.commands.exceptions.UnexpectedErrorException;
13 import org.asamk.signal.commands.exceptions.UserErrorException;
14 import org.asamk.signal.dbus.DbusSignalControlImpl;
15 import org.asamk.signal.dbus.DbusSignalImpl;
16 import org.asamk.signal.http.HttpServerHandler;
17 import org.asamk.signal.json.JsonReceiveMessageHandler;
18 import org.asamk.signal.jsonrpc.SignalJsonRpcDispatcherHandler;
19 import org.asamk.signal.manager.Manager;
20 import org.asamk.signal.manager.MultiAccountManager;
21 import org.asamk.signal.manager.api.ReceiveConfig;
22 import org.asamk.signal.output.JsonWriter;
23 import org.asamk.signal.output.JsonWriterImpl;
24 import org.asamk.signal.output.OutputWriter;
25 import org.asamk.signal.output.PlainTextWriter;
26 import org.asamk.signal.util.IOUtils;
27 import org.freedesktop.dbus.connections.impl.DBusConnection;
28 import org.freedesktop.dbus.connections.impl.DBusConnectionBuilder;
29 import org.freedesktop.dbus.exceptions.DBusException;
30 import org.slf4j.Logger;
31 import org.slf4j.LoggerFactory;
32
33 import java.io.File;
34 import java.io.IOException;
35 import java.net.UnixDomainSocketAddress;
36 import java.nio.channels.Channel;
37 import java.nio.channels.Channels;
38 import java.nio.channels.ServerSocketChannel;
39 import java.nio.channels.SocketChannel;
40 import java.nio.charset.StandardCharsets;
41 import java.util.List;
42 import java.util.concurrent.Executors;
43 import java.util.concurrent.atomic.AtomicInteger;
44 import java.util.function.Consumer;
45
46 public class DaemonCommand implements MultiLocalCommand, LocalCommand {
47
48 private final static Logger logger = LoggerFactory.getLogger(DaemonCommand.class);
49
50 @Override
51 public String getName() {
52 return "daemon";
53 }
54
55 @Override
56 public void attachToSubparser(final Subparser subparser) {
57 final var defaultSocketPath = new File(new File(IOUtils.getRuntimeDir(), "signal-cli"), "socket");
58 subparser.help("Run in daemon mode and provide an experimental dbus or JSON-RPC interface.");
59 subparser.addArgument("--dbus")
60 .action(Arguments.storeTrue())
61 .help("Expose a DBus interface on the user bus (the default, if no other options are given).");
62 subparser.addArgument("--dbus-system", "--system")
63 .action(Arguments.storeTrue())
64 .help("Expose a DBus interface on the system bus.");
65 subparser.addArgument("--socket")
66 .nargs("?")
67 .type(File.class)
68 .setConst(defaultSocketPath)
69 .help("Expose a JSON-RPC interface on a UNIX socket (default $XDG_RUNTIME_DIR/signal-cli/socket).");
70 subparser.addArgument("--tcp")
71 .nargs("?")
72 .setConst("localhost:7583")
73 .help("Expose a JSON-RPC interface on a TCP socket (default localhost:7583).");
74 subparser.addArgument("--http")
75 .nargs("?")
76 .setConst("localhost:8080")
77 .help("Expose a JSON-RPC interface as http endpoint (default localhost:8080).");
78 subparser.addArgument("--no-receive-stdout")
79 .help("Don’t print received messages to stdout.")
80 .action(Arguments.storeTrue());
81 subparser.addArgument("--receive-mode")
82 .help("Specify when to start receiving messages.")
83 .type(Arguments.enumStringType(ReceiveMode.class))
84 .setDefault(ReceiveMode.ON_START);
85 subparser.addArgument("--ignore-attachments")
86 .help("Don’t download attachments of received messages.")
87 .action(Arguments.storeTrue());
88 subparser.addArgument("--ignore-stories")
89 .help("Don’t receive story messages from the server.")
90 .action(Arguments.storeTrue());
91 subparser.addArgument("--send-read-receipts")
92 .help("Send read receipts for all incoming data messages (in addition to the default delivery receipts)")
93 .action(Arguments.storeTrue());
94 }
95
96 @Override
97 public List<OutputType> getSupportedOutputTypes() {
98 return List.of(OutputType.PLAIN_TEXT, OutputType.JSON);
99 }
100
101 @Override
102 public void handleCommand(
103 final Namespace ns, final Manager m, final OutputWriter outputWriter
104 ) throws CommandException {
105 logger.info("Starting daemon in single-account mode for " + m.getSelfNumber());
106 final var noReceiveStdOut = Boolean.TRUE.equals(ns.getBoolean("no-receive-stdout"));
107 final var receiveMode = ns.<ReceiveMode>get("receive-mode");
108 final var ignoreAttachments = Boolean.TRUE.equals(ns.getBoolean("ignore-attachments"));
109 final var ignoreStories = Boolean.TRUE.equals(ns.getBoolean("ignore-stories"));
110 final var sendReadReceipts = Boolean.TRUE.equals(ns.getBoolean("send-read-receipts"));
111
112 m.setReceiveConfig(new ReceiveConfig(ignoreAttachments, ignoreStories, sendReadReceipts));
113 addDefaultReceiveHandler(m, noReceiveStdOut ? null : outputWriter, receiveMode != ReceiveMode.ON_START);
114
115 final Channel inheritedChannel;
116 try {
117 inheritedChannel = System.inheritedChannel();
118 if (inheritedChannel instanceof ServerSocketChannel serverChannel) {
119 logger.info("Using inherited socket: " + serverChannel.getLocalAddress());
120 runSocketSingleAccount(m, serverChannel, receiveMode == ReceiveMode.MANUAL);
121 }
122 } catch (IOException e) {
123 throw new IOErrorException("Failed to use inherited socket", e);
124 }
125 final var socketFile = ns.<File>get("socket");
126 if (socketFile != null) {
127 final var address = UnixDomainSocketAddress.of(socketFile.toPath());
128 final var serverChannel = IOUtils.bindSocket(address);
129 runSocketSingleAccount(m, serverChannel, receiveMode == ReceiveMode.MANUAL);
130 }
131 final var tcpAddress = ns.getString("tcp");
132 if (tcpAddress != null) {
133 final var address = IOUtils.parseInetSocketAddress(tcpAddress);
134 final var serverChannel = IOUtils.bindSocket(address);
135 runSocketSingleAccount(m, serverChannel, receiveMode == ReceiveMode.MANUAL);
136 }
137 final var httpAddress = ns.getString("http");
138 if (httpAddress != null) {
139 final var address = IOUtils.parseInetSocketAddress(httpAddress);
140 final var handler = new HttpServerHandler(address, m);
141 try {
142 handler.init();
143 } catch (IOException ex) {
144 throw new IOErrorException("Failed to initialize HTTP Server", ex);
145 }
146 }
147 final var isDbusSystem = Boolean.TRUE.equals(ns.getBoolean("dbus-system"));
148 if (isDbusSystem) {
149 runDbusSingleAccount(m, true, receiveMode != ReceiveMode.ON_START);
150 }
151 final var isDbusSession = Boolean.TRUE.equals(ns.getBoolean("dbus"));
152 if (isDbusSession || (
153 !isDbusSystem
154 && socketFile == null
155 && tcpAddress == null
156 && httpAddress == null
157 && !(inheritedChannel instanceof ServerSocketChannel)
158 )) {
159 runDbusSingleAccount(m, false, receiveMode != ReceiveMode.ON_START);
160 }
161
162 m.addClosedListener(() -> {
163 synchronized (this) {
164 notifyAll();
165 }
166 });
167
168 synchronized (this) {
169 try {
170 wait();
171 } catch (InterruptedException ignored) {
172 }
173 }
174 }
175
176 @Override
177 public void handleCommand(
178 final Namespace ns, final MultiAccountManager c, final OutputWriter outputWriter
179 ) throws CommandException {
180 logger.info("Starting daemon in multi-account mode");
181 final var noReceiveStdOut = Boolean.TRUE.equals(ns.getBoolean("no-receive-stdout"));
182 final var receiveMode = ns.<ReceiveMode>get("receive-mode");
183 final var ignoreAttachments = Boolean.TRUE.equals(ns.getBoolean("ignore-attachments"));
184 final var ignoreStories = Boolean.TRUE.equals(ns.getBoolean("ignore-stories"));
185 final var sendReadReceipts = Boolean.TRUE.equals(ns.getBoolean("send-read-receipts"));
186
187 final var receiveConfig = new ReceiveConfig(ignoreAttachments, ignoreStories, sendReadReceipts);
188 c.getManagers().forEach(m -> {
189 m.setReceiveConfig(receiveConfig);
190 addDefaultReceiveHandler(m, noReceiveStdOut ? null : outputWriter, receiveMode != ReceiveMode.ON_START);
191 });
192 c.addOnManagerAddedHandler(m -> {
193 m.setReceiveConfig(receiveConfig);
194 addDefaultReceiveHandler(m, noReceiveStdOut ? null : outputWriter, receiveMode != ReceiveMode.ON_START);
195 });
196
197 final Channel inheritedChannel;
198 try {
199 inheritedChannel = System.inheritedChannel();
200 if (inheritedChannel instanceof ServerSocketChannel serverChannel) {
201 logger.info("Using inherited socket: " + serverChannel.getLocalAddress());
202 runSocketMultiAccount(c, serverChannel, receiveMode == ReceiveMode.MANUAL);
203 }
204 } catch (IOException e) {
205 throw new IOErrorException("Failed to use inherited socket", e);
206 }
207 final var socketFile = ns.<File>get("socket");
208 if (socketFile != null) {
209 final var address = UnixDomainSocketAddress.of(socketFile.toPath());
210 final var serverChannel = IOUtils.bindSocket(address);
211 runSocketMultiAccount(c, serverChannel, receiveMode == ReceiveMode.MANUAL);
212 }
213 final var tcpAddress = ns.getString("tcp");
214 if (tcpAddress != null) {
215 final var address = IOUtils.parseInetSocketAddress(tcpAddress);
216 final var serverChannel = IOUtils.bindSocket(address);
217 runSocketMultiAccount(c, serverChannel, receiveMode == ReceiveMode.MANUAL);
218 }
219 final var httpAddress = ns.getString("http");
220 if (httpAddress != null) {
221 final var address = IOUtils.parseInetSocketAddress(httpAddress);
222 final var handler = new HttpServerHandler(address, c);
223 try {
224 handler.init();
225 } catch (IOException ex) {
226 throw new IOErrorException("Failed to initialize HTTP Server", ex);
227 }
228 }
229 final var isDbusSystem = Boolean.TRUE.equals(ns.getBoolean("dbus-system"));
230 if (isDbusSystem) {
231 runDbusMultiAccount(c, receiveMode != ReceiveMode.ON_START, true);
232 }
233 final var isDbusSession = Boolean.TRUE.equals(ns.getBoolean("dbus"));
234 if (isDbusSession || (
235 !isDbusSystem
236 && socketFile == null
237 && tcpAddress == null
238 && httpAddress == null
239 && !(inheritedChannel instanceof ServerSocketChannel)
240 )) {
241 runDbusMultiAccount(c, receiveMode != ReceiveMode.ON_START, false);
242 }
243
244 synchronized (this) {
245 try {
246 wait();
247 } catch (InterruptedException ignored) {
248 }
249 }
250 }
251
252 private void addDefaultReceiveHandler(Manager m, OutputWriter outputWriter, final boolean isWeakListener) {
253 final var handler = switch (outputWriter) {
254 case PlainTextWriter writer -> new ReceiveMessageHandler(m, writer);
255 case JsonWriter writer -> new JsonReceiveMessageHandler(m, writer);
256 case null -> Manager.ReceiveMessageHandler.EMPTY;
257 };
258 m.addReceiveHandler(handler, isWeakListener);
259 }
260
261 private Thread runSocketSingleAccount(
262 final Manager m, final ServerSocketChannel serverChannel, final boolean noReceiveOnStart
263 ) {
264 return runSocket(serverChannel, channel -> {
265 final var handler = getSignalJsonRpcDispatcherHandler(channel, noReceiveOnStart);
266 handler.handleConnection(m);
267 });
268 }
269
270 private Thread runSocketMultiAccount(
271 final MultiAccountManager c, final ServerSocketChannel serverChannel, final boolean noReceiveOnStart
272 ) {
273 return runSocket(serverChannel, channel -> {
274 final var handler = getSignalJsonRpcDispatcherHandler(channel, noReceiveOnStart);
275 handler.handleConnection(c);
276 });
277 }
278
279 private static final AtomicInteger threadNumber = new AtomicInteger(0);
280
281 private Thread runSocket(final ServerSocketChannel serverChannel, Consumer<SocketChannel> socketHandler) {
282 return Thread.ofPlatform().name("daemon-listener").start(() -> {
283 try (final var executor = Executors.newCachedThreadPool()) {
284 while (true) {
285 final var connectionId = threadNumber.getAndIncrement();
286 final SocketChannel channel;
287 final String clientString;
288 try {
289 channel = serverChannel.accept();
290 clientString = channel.getRemoteAddress() + " " + IOUtils.getUnixDomainPrincipal(channel);
291 logger.info("Accepted new client connection {}: {}", connectionId, clientString);
292 } catch (IOException e) {
293 logger.error("Failed to accept new socket connection", e);
294 break;
295 }
296 executor.submit(() -> {
297 try (final var c = channel) {
298 socketHandler.accept(c);
299 } catch (IOException e) {
300 logger.warn("Failed to close channel", e);
301 } catch (Throwable e) {
302 logger.warn("Connection handler failed, closing connection", e);
303 }
304 logger.info("Connection {} closed: {}", connectionId, clientString);
305 });
306 }
307 }
308 synchronized (this) {
309 notifyAll();
310 }
311 });
312 }
313
314 private SignalJsonRpcDispatcherHandler getSignalJsonRpcDispatcherHandler(
315 final SocketChannel c, final boolean noReceiveOnStart
316 ) {
317 final var lineSupplier = IOUtils.getLineSupplier(Channels.newReader(c, StandardCharsets.UTF_8));
318 final var jsonOutputWriter = new JsonWriterImpl(Channels.newWriter(c, StandardCharsets.UTF_8));
319
320 return new SignalJsonRpcDispatcherHandler(jsonOutputWriter, lineSupplier, noReceiveOnStart);
321 }
322
323 private void runDbusSingleAccount(
324 final Manager m, final boolean isDbusSystem, final boolean noReceiveOnStart
325 ) throws CommandException {
326 runDbus(isDbusSystem, (conn, objectPath) -> {
327 try {
328 exportDbusObject(conn, objectPath, m, noReceiveOnStart).join();
329 } catch (InterruptedException ignored) {
330 }
331 });
332 }
333
334 private void runDbusMultiAccount(
335 final MultiAccountManager c, final boolean noReceiveOnStart, final boolean isDbusSystem
336 ) throws CommandException {
337 runDbus(isDbusSystem, (connection, objectPath) -> {
338 final var signalControl = new DbusSignalControlImpl(c, objectPath);
339 connection.exportObject(signalControl);
340
341 c.addOnManagerAddedHandler(m -> {
342 final var thread = exportMultiAccountManager(connection, m, noReceiveOnStart);
343 try {
344 thread.join();
345 } catch (InterruptedException ignored) {
346 }
347 });
348 c.addOnManagerRemovedHandler(m -> {
349 final var path = DbusConfig.getObjectPath(m.getSelfNumber());
350 try {
351 final var object = connection.getExportedObject(null, path);
352 if (object instanceof DbusSignalImpl dbusSignal) {
353 dbusSignal.close();
354 }
355 } catch (DBusException ignored) {
356 }
357 });
358
359 final var initThreads = c.getManagers()
360 .stream()
361 .map(m -> exportMultiAccountManager(connection, m, noReceiveOnStart))
362 .toList();
363
364 for (var t : initThreads) {
365 try {
366 t.join();
367 } catch (InterruptedException ignored) {
368 }
369 }
370 });
371 }
372
373 private void runDbus(
374 final boolean isDbusSystem, DbusRunner dbusRunner
375 ) throws CommandException {
376 DBusConnection.DBusBusType busType;
377 if (isDbusSystem) {
378 busType = DBusConnection.DBusBusType.SYSTEM;
379 } else {
380 busType = DBusConnection.DBusBusType.SESSION;
381 }
382 DBusConnection conn;
383 try {
384 conn = DBusConnectionBuilder.forType(busType).build();
385 dbusRunner.run(conn, DbusConfig.getObjectPath());
386 } catch (DBusException e) {
387 throw new UnexpectedErrorException("Dbus command failed: " + e.getMessage(), e);
388 } catch (UnsupportedOperationException e) {
389 throw new UserErrorException("Failed to connect to Dbus: " + e.getMessage(), e);
390 }
391
392 try {
393 conn.requestBusName(DbusConfig.getBusname());
394 } catch (DBusException e) {
395 throw new UnexpectedErrorException("Dbus command failed, maybe signal-cli dbus daemon is already running: "
396 + e.getMessage(), e);
397 }
398
399 logger.info("DBus daemon running on {} bus: {}", busType, DbusConfig.getBusname());
400 }
401
402 private Thread exportMultiAccountManager(
403 final DBusConnection conn, final Manager m, final boolean noReceiveOnStart
404 ) {
405 final var objectPath = DbusConfig.getObjectPath(m.getSelfNumber());
406 return exportDbusObject(conn, objectPath, m, noReceiveOnStart);
407 }
408
409 private Thread exportDbusObject(
410 final DBusConnection conn, final String objectPath, final Manager m, final boolean noReceiveOnStart
411 ) {
412 final var signal = new DbusSignalImpl(m, conn, objectPath, noReceiveOnStart);
413
414 return Thread.ofPlatform().name("dbus-init-" + m.getSelfNumber()).start(signal::initObjects);
415 }
416
417 interface DbusRunner {
418
419 void run(DBusConnection connection, String objectPath) throws DBusException;
420 }
421 }