From be0993c5d8171aff9190e152b095677be4088112 Mon Sep 17 00:00:00 2001 From: AsamK Date: Sun, 30 Jan 2022 16:35:11 +0100 Subject: [PATCH] Improve JSON-RPC subscribeReceive method with subscription id --- graalvm-config-dir/reflect-config.json | 5 +- .../signal/manager/MultiAccountManager.java | 2 + .../manager/MultiAccountManagerImpl.java | 7 + man/signal-cli-jsonrpc.5.adoc | 16 ++ man/signal-cli.1.adoc | 7 +- .../asamk/signal/commands/DaemonCommand.java | 6 +- .../dbus/DbusMultiAccountManagerImpl.java | 8 + .../asamk/signal/jsonrpc/JsonRpcReader.java | 3 +- .../SignalJsonRpcDispatcherHandler.java | 145 ++++++++++++++---- 9 files changed, 157 insertions(+), 42 deletions(-) diff --git a/graalvm-config-dir/reflect-config.json b/graalvm-config-dir/reflect-config.json index 46c106eb..50d3c5d1 100644 --- a/graalvm-config-dir/reflect-config.json +++ b/graalvm-config-dir/reflect-config.json @@ -269,7 +269,10 @@ { "name":"java.lang.Throwable", "queryAllPublicMethods":true, - "methods":[{"name":"addSuppressed","parameterTypes":["java.lang.Throwable"] }] + "methods":[ + {"name":"addSuppressed","parameterTypes":["java.lang.Throwable"] }, + {"name":"getSuppressed","parameterTypes":[] } + ] }, { "name":"java.lang.reflect.Method", diff --git a/lib/src/main/java/org/asamk/signal/manager/MultiAccountManager.java b/lib/src/main/java/org/asamk/signal/manager/MultiAccountManager.java index 32b03d3a..15b60594 100644 --- a/lib/src/main/java/org/asamk/signal/manager/MultiAccountManager.java +++ b/lib/src/main/java/org/asamk/signal/manager/MultiAccountManager.java @@ -10,6 +10,8 @@ public interface MultiAccountManager extends AutoCloseable { List getAccountNumbers(); + List getManagers(); + void addOnManagerAddedHandler(Consumer handler); void addOnManagerRemovedHandler(Consumer handler); diff --git a/lib/src/main/java/org/asamk/signal/manager/MultiAccountManagerImpl.java b/lib/src/main/java/org/asamk/signal/manager/MultiAccountManagerImpl.java index dc72aeeb..83f0bb26 100644 --- a/lib/src/main/java/org/asamk/signal/manager/MultiAccountManagerImpl.java +++ b/lib/src/main/java/org/asamk/signal/manager/MultiAccountManagerImpl.java @@ -49,6 +49,13 @@ public class MultiAccountManagerImpl implements MultiAccountManager { } } + @Override + public List getManagers() { + synchronized (managers) { + return new ArrayList<>(managers); + } + } + void addManager(final Manager m) { synchronized (managers) { if (managers.contains(m)) { diff --git a/man/signal-cli-jsonrpc.5.adoc b/man/signal-cli-jsonrpc.5.adoc index 0a2ed950..f0b62ca9 100644 --- a/man/signal-cli-jsonrpc.5.adoc +++ b/man/signal-cli-jsonrpc.5.adoc @@ -76,6 +76,22 @@ The `method` field is the command name and the parameters can be sent as the `pa `--attachment ATTACH` becomes `"attachment":"ATTACH"` +=== Additional JSON-RPC commands + +For receiving message additional commands are provided in JSON-RPC mode with `--receive-mode=manual`. + +==== subscribeReceive + +Tells the daemon to start receiving messages, returns the subscription id as a single integer value in the result. + +==== unsubscribeReceive + +Stop a previous subscription for receiving messages. + +Params: + +- `subscription`: the subscription id returned by `subscribeReceive` + == Examples REQUEST: `{"jsonrpc":"2.0","method":"listGroups","id":"5"}` RESPONSE: `{"jsonrpc":"2.0","result":[...],"id":"5"}` diff --git a/man/signal-cli.1.adoc b/man/signal-cli.1.adoc index e7296207..02ae4ca4 100644 --- a/man/signal-cli.1.adoc +++ b/man/signal-cli.1.adoc @@ -8,7 +8,7 @@ vim:set ts=4 sw=4 tw=82 noet: == Name -signal-cli - A commandline and dbus interface for the Signal messenger +signal-cli - A commandline interface for the Signal messenger == Synopsis @@ -20,7 +20,7 @@ signal-cli is a commandline interface for libsignal-service-java. It supports registering, verifying, sending and receiving messages. For registering you need a phone number where you can receive SMS or incoming calls. signal-cli was primarily developed to be used on servers to notify admins of important events. -For this use-case, it has a dbus interface, that can be used to send messages from any programming language that has dbus bindings. +For this use-case, it has a dbus and a JSON-RPC interface, that can be used to send messages from other programs. For some functionality the Signal protocol requires that all messages have been received from the server. The `receive` command should be regularly executed. @@ -54,8 +54,9 @@ This flag must not be given for the `link` command. It is optional for the `daemon` command. For all other commands it is only optional if there is exactly one local user in the config directory. -*--service-environment* ENVIRONMENT +*--service-environment* ENVIRONMENT:: Choose the server environment to use: + - `live` (default) - `staging` diff --git a/src/main/java/org/asamk/signal/commands/DaemonCommand.java b/src/main/java/org/asamk/signal/commands/DaemonCommand.java index 34a80226..043b19c2 100644 --- a/src/main/java/org/asamk/signal/commands/DaemonCommand.java +++ b/src/main/java/org/asamk/signal/commands/DaemonCommand.java @@ -155,7 +155,7 @@ public class DaemonCommand implements MultiLocalCommand, LocalCommand { final var receiveMode = ns.get("receive-mode"); final var ignoreAttachments = Boolean.TRUE.equals(ns.getBoolean("ignore-attachments")); - c.getAccountNumbers().stream().map(c::getManager).filter(Objects::nonNull).forEach(m -> { + c.getManagers().forEach(m -> { m.setIgnoreAttachments(ignoreAttachments); addDefaultReceiveHandler(m, noReceiveStdOut ? null : outputWriter, receiveMode != ReceiveMode.ON_START); }); @@ -317,10 +317,8 @@ public class DaemonCommand implements MultiLocalCommand, LocalCommand { connection.unExportObject(path); }); - final var initThreads = c.getAccountNumbers() + final var initThreads = c.getManagers() .stream() - .map(c::getManager) - .filter(Objects::nonNull) .map(m -> exportMultiAccountManager(connection, m, noReceiveOnStart)) .filter(Objects::nonNull) .toList(); diff --git a/src/main/java/org/asamk/signal/dbus/DbusMultiAccountManagerImpl.java b/src/main/java/org/asamk/signal/dbus/DbusMultiAccountManagerImpl.java index 9659fea5..2c5f720f 100644 --- a/src/main/java/org/asamk/signal/dbus/DbusMultiAccountManagerImpl.java +++ b/src/main/java/org/asamk/signal/dbus/DbusMultiAccountManagerImpl.java @@ -46,6 +46,14 @@ public class DbusMultiAccountManagerImpl implements MultiAccountManager { .toList(); } + @Override + public List getManagers() { + return signalControl.listAccounts() + .stream() + .map(a -> (Manager) new DbusManagerImpl(getRemoteObject(a, Signal.class), connection)) + .toList(); + } + @Override public void addOnManagerAddedHandler(final Consumer handler) { synchronized (onManagerAddedHandlers) { diff --git a/src/main/java/org/asamk/signal/jsonrpc/JsonRpcReader.java b/src/main/java/org/asamk/signal/jsonrpc/JsonRpcReader.java index 5b641266..f3784a2c 100644 --- a/src/main/java/org/asamk/signal/jsonrpc/JsonRpcReader.java +++ b/src/main/java/org/asamk/signal/jsonrpc/JsonRpcReader.java @@ -90,10 +90,11 @@ public class JsonRpcReader { String input = lineSupplier.get(); if (input == null) { - // Reached end of input stream + logger.trace("Reached end of JSON-RPC input stream."); break; } + logger.trace("Incoming JSON-RPC message: {}", input); JsonRpcMessage message = parseJsonRpcMessage(input); if (message == null) continue; diff --git a/src/main/java/org/asamk/signal/jsonrpc/SignalJsonRpcDispatcherHandler.java b/src/main/java/org/asamk/signal/jsonrpc/SignalJsonRpcDispatcherHandler.java index 8d13ad28..086681f7 100644 --- a/src/main/java/org/asamk/signal/jsonrpc/SignalJsonRpcDispatcherHandler.java +++ b/src/main/java/org/asamk/signal/jsonrpc/SignalJsonRpcDispatcherHandler.java @@ -5,7 +5,9 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.JsonMappingException; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.ContainerNode; +import com.fasterxml.jackson.databind.node.IntNode; import com.fasterxml.jackson.databind.node.ObjectNode; import org.asamk.signal.commands.Command; @@ -21,6 +23,7 @@ import org.asamk.signal.json.JsonReceiveMessageHandler; import org.asamk.signal.manager.Manager; import org.asamk.signal.manager.MultiAccountManager; import org.asamk.signal.manager.RegistrationManager; +import org.asamk.signal.manager.api.Pair; import org.asamk.signal.output.JsonWriter; import org.asamk.signal.util.Util; import org.slf4j.Logger; @@ -29,8 +32,9 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.nio.channels.OverlappingFileLockException; import java.util.HashMap; +import java.util.List; import java.util.Map; -import java.util.Objects; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Supplier; public class SignalJsonRpcDispatcherHandler { @@ -47,7 +51,7 @@ public class SignalJsonRpcDispatcherHandler { private final boolean noReceiveOnStart; private MultiAccountManager c; - private final Map receiveHandlers = new HashMap<>(); + private final Map>> receiveHandlers = new HashMap<>(); private Manager m; @@ -64,7 +68,7 @@ public class SignalJsonRpcDispatcherHandler { this.c = c; if (!noReceiveOnStart) { - c.getAccountNumbers().stream().map(c::getManager).filter(Objects::nonNull).forEach(this::subscribeReceive); + this.subscribeReceive(c.getManagers()); c.addOnManagerAddedHandler(this::subscribeReceive); c.addOnManagerRemovedHandler(this::unsubscribeReceive); } @@ -85,33 +89,46 @@ public class SignalJsonRpcDispatcherHandler { handleConnection(); } - private void subscribeReceive(final Manager m) { - if (receiveHandlers.containsKey(m)) { - return; - } + private static final AtomicInteger nextSubscriptionId = new AtomicInteger(0); - final var receiveMessageHandler = new JsonReceiveMessageHandler(m, - s -> jsonRpcSender.sendRequest(JsonRpcRequest.forNotification("receive", - objectMapper.valueToTree(s), - null))); - m.addReceiveHandler(receiveMessageHandler); - receiveHandlers.put(m, receiveMessageHandler); + private int subscribeReceive(final Manager manager) { + return subscribeReceive(List.of(manager)); + } - while (!m.hasCaughtUpWithOldMessages()) { - try { - synchronized (m) { - m.wait(); - } - } catch (InterruptedException ignored) { - } - } + private int subscribeReceive(final List managers) { + final var subscriptionId = nextSubscriptionId.getAndIncrement(); + final var handlers = managers.stream().map(m -> { + final var receiveMessageHandler = new JsonReceiveMessageHandler(m, s -> { + final ContainerNode params = objectMapper.valueToTree(s); + ((ObjectNode) params).set("subscription", IntNode.valueOf(subscriptionId)); + jsonRpcSender.sendRequest(JsonRpcRequest.forNotification("receive", params, null)); + }); + m.addReceiveHandler(receiveMessageHandler); + return new Pair<>(m, (Manager.ReceiveMessageHandler) receiveMessageHandler); + }).toList(); + receiveHandlers.put(subscriptionId, handlers); + + return subscriptionId; } - void unsubscribeReceive(final Manager m) { - final var receiveMessageHandler = receiveHandlers.remove(m); - if (receiveMessageHandler != null) { - m.removeReceiveHandler(receiveMessageHandler); + private boolean unsubscribeReceive(final int subscriptionId) { + final var handlers = receiveHandlers.remove(subscriptionId); + if (handlers == null) { + return false; } + for (final var pair : handlers) { + unsubscribeReceiveHandler(pair); + } + return true; + } + + private void unsubscribeReceive(final Manager m) { + final var subscriptionId = receiveHandlers.entrySet() + .stream() + .filter(e -> e.getValue().size() == 1 && e.getValue().get(0).first().equals(m)) + .map(Map.Entry::getKey) + .findFirst(); + subscriptionId.ifPresent(this::unsubscribeReceive); } private void handleConnection() { @@ -119,16 +136,28 @@ public class SignalJsonRpcDispatcherHandler { jsonRpcReader.readMessages((method, params) -> handleRequest(objectMapper, method, params), response -> logger.debug("Received unexpected response for id {}", response.getId())); } finally { - receiveHandlers.forEach(Manager::removeReceiveHandler); + receiveHandlers.forEach((_subscriptionId, handlers) -> handlers.forEach(this::unsubscribeReceiveHandler)); receiveHandlers.clear(); } } + private void unsubscribeReceiveHandler(final Pair pair) { + final var m = pair.first(); + final var handler = pair.second(); + m.removeReceiveHandler(handler); + } + private JsonNode handleRequest( final ObjectMapper objectMapper, final String method, ContainerNode params ) throws JsonRpcException { var command = getCommand(method); if (c != null) { + if (command instanceof JsonRpcSingleCommand jsonRpcCommand) { + final var manager = getManagerFromParams(params); + if (manager != null) { + return runCommand(objectMapper, params, new CommandRunnerImpl<>(manager, jsonRpcCommand)); + } + } if (command instanceof JsonRpcMultiCommand jsonRpcCommand) { return runCommand(objectMapper, params, new MultiCommandRunnerImpl<>(c, jsonRpcCommand)); } @@ -168,10 +197,15 @@ public class SignalJsonRpcDispatcherHandler { null)); } - private Manager getManagerFromParams(final ContainerNode params) { - if (params != null && params.has("account")) { + private Manager getManagerFromParams(final ContainerNode params) throws JsonRpcException { + if (params != null && params.hasNonNull("account")) { final var manager = c.getManager(params.get("account").asText()); ((ObjectNode) params).remove("account"); + if (manager == null) { + throw new JsonRpcException(new JsonRpcResponse.Error(JsonRpcResponse.Error.INVALID_PARAMS, + "Specified account does not exist", + null)); + } return manager; } return null; @@ -322,7 +356,7 @@ public class SignalJsonRpcDispatcherHandler { command.handleCommand(requestParams, jsonWriter); } - private class SubscribeReceiveCommand implements JsonRpcSingleCommand { + private class SubscribeReceiveCommand implements JsonRpcSingleCommand, JsonRpcMultiCommand { @Override public String getName() { @@ -333,22 +367,67 @@ public class SignalJsonRpcDispatcherHandler { public void handleCommand( final Void request, final Manager m, final JsonWriter jsonWriter ) throws CommandException { - subscribeReceive(m); + final var subscriptionId = subscribeReceive(m); + jsonWriter.write(subscriptionId); + } + + @Override + public void handleCommand( + final Void request, final MultiAccountManager c, final JsonWriter jsonWriter + ) throws CommandException { + final var subscriptionId = subscribeReceive(c.getManagers()); + jsonWriter.write(subscriptionId); } } - private class UnsubscribeReceiveCommand implements JsonRpcSingleCommand { + private class UnsubscribeReceiveCommand implements JsonRpcSingleCommand, JsonRpcMultiCommand { @Override public String getName() { return "unsubscribeReceive"; } + @Override + public TypeReference getRequestType() { + return new TypeReference<>() {}; + } + @Override public void handleCommand( - final Void request, final Manager m, final JsonWriter jsonWriter + final JsonNode request, final Manager m, final JsonWriter jsonWriter ) throws CommandException { - unsubscribeReceive(m); + final var subscriptionId = getSubscriptionId(request); + if (subscriptionId == null) { + unsubscribeReceive(m); + } else { + if (!unsubscribeReceive(subscriptionId)) { + throw new UserErrorException("Unknown subscription id"); + } + } + } + + @Override + public void handleCommand( + final JsonNode request, final MultiAccountManager c, final JsonWriter jsonWriter + ) throws CommandException { + final var subscriptionId = getSubscriptionId(request); + if (subscriptionId == null) { + throw new UserErrorException("Missing subscription parameter with subscription id"); + } else { + if (!unsubscribeReceive(subscriptionId)) { + throw new UserErrorException("Unknown subscription id"); + } + } + } + + private Integer getSubscriptionId(final JsonNode request) { + if (request instanceof ArrayNode req) { + return req.get(0).asInt(); + } else if (request instanceof ObjectNode req) { + return req.get("subscription").asInt(); + } else { + return null; + } } } } -- 2.50.1