]> nmode's Git Repositories - signal-cli/blobdiff - src/main/java/org/asamk/signal/jsonrpc/JsonRpcReader.java
Execute JSON-RPC requests in parallel
[signal-cli] / src / main / java / org / asamk / signal / jsonrpc / JsonRpcReader.java
index 27da9b0be5606fb1917e601da6c951350556cf74..16f5fbd0f85a9c085d42bf5025acb0a716dda4ea 100644 (file)
@@ -14,7 +14,9 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.io.InputStream;
 
 import java.io.IOException;
 import java.io.InputStream;
-import java.util.Objects;
+import java.util.ArrayList;
+import java.util.concurrent.Executors;
+import java.util.concurrent.locks.ReentrantLock;
 import java.util.function.Consumer;
 import java.util.function.Supplier;
 import java.util.stream.StreamSupport;
 import java.util.function.Consumer;
 import java.util.function.Supplier;
 import java.util.stream.StreamSupport;
@@ -53,20 +55,25 @@ public class JsonRpcReader {
             return;
         }
 
             return;
         }
 
-        while (!Thread.interrupted()) {
-            String input = lineSupplier.get();
-            if (input == null) {
-                logger.trace("Reached end of JSON-RPC input stream.");
-                break;
-            }
+        final var executor = Executors.newFixedThreadPool(10);
+        try {
+            while (!Thread.interrupted()) {
+                final var input = lineSupplier.get();
+                if (input == null) {
+                    logger.trace("Reached end of JSON-RPC input stream.");
+                    break;
+                }
 
 
-            logger.trace("Incoming JSON-RPC message: {}", input);
-            JsonRpcMessage message = parseJsonRpcMessage(input);
-            if (message == null) {
-                continue;
-            }
+                logger.trace("Incoming JSON-RPC message: {}", input);
+                final var message = parseJsonRpcMessage(input);
+                if (message == null) {
+                    continue;
+                }
 
 
-            handleMessage(message, requestHandler, responseHandler);
+                executor.submit(() -> handleMessage(message, requestHandler, responseHandler));
+            }
+        } finally {
+            Util.closeExecutorService(executor);
         }
     }
 
         }
     }
 
@@ -84,16 +91,41 @@ public class JsonRpcReader {
         } else if (message instanceof JsonRpcResponse jsonRpcResponse) {
             responseHandler.accept(jsonRpcResponse);
         } else {
         } else if (message instanceof JsonRpcResponse jsonRpcResponse) {
             responseHandler.accept(jsonRpcResponse);
         } else {
-            final var responseList = ((JsonRpcBatchMessage) message).getMessages().stream().map(jsonNode -> {
-                final JsonRpcRequest request;
-                try {
-                    request = parseJsonRpcRequest(jsonNode);
-                } catch (JsonRpcException e) {
-                    return JsonRpcResponse.forError(e.getError(), getId(jsonNode));
-                }
-
-                return handleRequest(requestHandler, request);
-            }).filter(Objects::nonNull).toList();
+            final var messages = ((JsonRpcBatchMessage) message).getMessages();
+            final var responseList = new ArrayList<JsonRpcResponse>(messages.size());
+            final var executor = Executors.newFixedThreadPool(10);
+            try {
+                final var lock = new ReentrantLock();
+                messages.forEach(jsonNode -> {
+                    final JsonRpcRequest request;
+                    try {
+                        request = parseJsonRpcRequest(jsonNode);
+                    } catch (JsonRpcException e) {
+                        final var response = JsonRpcResponse.forError(e.getError(), getId(jsonNode));
+                        lock.lock();
+                        try {
+                            responseList.add(response);
+                        } finally {
+                            lock.unlock();
+                        }
+                        return;
+                    }
+
+                    executor.submit(() -> {
+                        final var response = handleRequest(requestHandler, request);
+                        if (response != null) {
+                            lock.lock();
+                            try {
+                                responseList.add(response);
+                            } finally {
+                                lock.unlock();
+                            }
+                        }
+                    });
+                });
+            } finally {
+                Util.closeExecutorService(executor);
+            }
 
             if (responseList.size() > 0) {
                 jsonRpcSender.sendBatchResponses(responseList);
 
             if (responseList.size() > 0) {
                 jsonRpcSender.sendBatchResponses(responseList);