]> nmode's Git Repositories - signal-cli/blobdiff - src/main/java/org/asamk/signal/jsonrpc/SignalJsonRpcDispatcherHandler.java
Execute JSON-RPC requests in parallel
[signal-cli] / src / main / java / org / asamk / signal / jsonrpc / SignalJsonRpcDispatcherHandler.java
index 3548f655d7e630403ea4a178de058b3d429783aa..53b876b866a5f6b8fe815f5dbe72632bce57c845 100644 (file)
@@ -55,8 +55,8 @@ public class SignalJsonRpcDispatcherHandler {
         this.commandHandler = new SignalJsonRpcCommandHandler(c, this::getCommand);
 
         if (!noReceiveOnStart) {
-            this.subscribeReceive(c.getManagers());
-            c.addOnManagerAddedHandler(this::subscribeReceive);
+            this.subscribeReceive(c.getManagers(), true);
+            c.addOnManagerAddedHandler(m -> subscribeReceive(m, true));
             c.addOnManagerRemovedHandler(this::unsubscribeReceive);
         }
 
@@ -67,7 +67,7 @@ public class SignalJsonRpcDispatcherHandler {
         this.commandHandler = new SignalJsonRpcCommandHandler(m, this::getCommand);
 
         if (!noReceiveOnStart) {
-            subscribeReceive(m);
+            subscribeReceive(m, true);
         }
 
         final var currentThread = Thread.currentThread();
@@ -78,16 +78,23 @@ public class SignalJsonRpcDispatcherHandler {
 
     private static final AtomicInteger nextSubscriptionId = new AtomicInteger(0);
 
-    private int subscribeReceive(final Manager manager) {
-        return subscribeReceive(List.of(manager));
+    private int subscribeReceive(final Manager manager, boolean internalSubscription) {
+        return subscribeReceive(List.of(manager), internalSubscription);
     }
 
-    private int subscribeReceive(final List<Manager> managers) {
+    private int subscribeReceive(final List<Manager> managers, boolean internalSubscription) {
         final var subscriptionId = nextSubscriptionId.getAndIncrement();
         final var handlers = managers.stream().map(m -> {
             final var receiveMessageHandler = new JsonReceiveMessageHandler(m, s -> {
-                final ContainerNode<?> params = objectMapper.valueToTree(s);
-                ((ObjectNode) params).set("subscription", IntNode.valueOf(subscriptionId));
+                ContainerNode<?> params;
+                if (internalSubscription) {
+                    params = objectMapper.valueToTree(s);
+                } else {
+                    final var paramsNode = new ObjectNode(objectMapper.getNodeFactory());
+                    paramsNode.set("subscription", IntNode.valueOf(subscriptionId));
+                    paramsNode.set("result", objectMapper.valueToTree(s));
+                    params = paramsNode;
+                }
                 final var jsonRpcRequest = JsonRpcRequest.forNotification("receive", params, null);
                 try {
                     jsonRpcSender.sendRequest(jsonRpcRequest);
@@ -162,7 +169,7 @@ public class SignalJsonRpcDispatcherHandler {
         public void handleCommand(
                 final Void request, final Manager m, final JsonWriter jsonWriter
         ) throws CommandException {
-            final var subscriptionId = subscribeReceive(m);
+            final var subscriptionId = subscribeReceive(m, false);
             jsonWriter.write(subscriptionId);
         }
 
@@ -170,7 +177,7 @@ public class SignalJsonRpcDispatcherHandler {
         public void handleCommand(
                 final Void request, final MultiAccountManager c, final JsonWriter jsonWriter
         ) throws CommandException {
-            final var subscriptionId = subscribeReceive(c.getManagers());
+            final var subscriptionId = subscribeReceive(c.getManagers(), false);
             jsonWriter.write(subscriptionId);
         }
     }