]> nmode's Git Repositories - signal-cli/commitdiff
Execute JSON-RPC requests in parallel
authorAsamK <asamk@gmx.de>
Thu, 5 Oct 2023 20:18:13 +0000 (22:18 +0200)
committerAsamK <asamk@gmx.de>
Thu, 5 Oct 2023 20:18:13 +0000 (22:18 +0200)
src/main/java/org/asamk/signal/jsonrpc/JsonRpcReader.java
src/main/java/org/asamk/signal/util/Util.java

index 27da9b0be5606fb1917e601da6c951350556cf74..16f5fbd0f85a9c085d42bf5025acb0a716dda4ea 100644 (file)
@@ -14,7 +14,9 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.io.InputStream;
-import java.util.Objects;
+import java.util.ArrayList;
+import java.util.concurrent.Executors;
+import java.util.concurrent.locks.ReentrantLock;
 import java.util.function.Consumer;
 import java.util.function.Supplier;
 import java.util.stream.StreamSupport;
@@ -53,20 +55,25 @@ public class JsonRpcReader {
             return;
         }
 
-        while (!Thread.interrupted()) {
-            String input = lineSupplier.get();
-            if (input == null) {
-                logger.trace("Reached end of JSON-RPC input stream.");
-                break;
-            }
+        final var executor = Executors.newFixedThreadPool(10);
+        try {
+            while (!Thread.interrupted()) {
+                final var input = lineSupplier.get();
+                if (input == null) {
+                    logger.trace("Reached end of JSON-RPC input stream.");
+                    break;
+                }
 
-            logger.trace("Incoming JSON-RPC message: {}", input);
-            JsonRpcMessage message = parseJsonRpcMessage(input);
-            if (message == null) {
-                continue;
-            }
+                logger.trace("Incoming JSON-RPC message: {}", input);
+                final var message = parseJsonRpcMessage(input);
+                if (message == null) {
+                    continue;
+                }
 
-            handleMessage(message, requestHandler, responseHandler);
+                executor.submit(() -> handleMessage(message, requestHandler, responseHandler));
+            }
+        } finally {
+            Util.closeExecutorService(executor);
         }
     }
 
@@ -84,16 +91,41 @@ public class JsonRpcReader {
         } else if (message instanceof JsonRpcResponse jsonRpcResponse) {
             responseHandler.accept(jsonRpcResponse);
         } else {
-            final var responseList = ((JsonRpcBatchMessage) message).getMessages().stream().map(jsonNode -> {
-                final JsonRpcRequest request;
-                try {
-                    request = parseJsonRpcRequest(jsonNode);
-                } catch (JsonRpcException e) {
-                    return JsonRpcResponse.forError(e.getError(), getId(jsonNode));
-                }
-
-                return handleRequest(requestHandler, request);
-            }).filter(Objects::nonNull).toList();
+            final var messages = ((JsonRpcBatchMessage) message).getMessages();
+            final var responseList = new ArrayList<JsonRpcResponse>(messages.size());
+            final var executor = Executors.newFixedThreadPool(10);
+            try {
+                final var lock = new ReentrantLock();
+                messages.forEach(jsonNode -> {
+                    final JsonRpcRequest request;
+                    try {
+                        request = parseJsonRpcRequest(jsonNode);
+                    } catch (JsonRpcException e) {
+                        final var response = JsonRpcResponse.forError(e.getError(), getId(jsonNode));
+                        lock.lock();
+                        try {
+                            responseList.add(response);
+                        } finally {
+                            lock.unlock();
+                        }
+                        return;
+                    }
+
+                    executor.submit(() -> {
+                        final var response = handleRequest(requestHandler, request);
+                        if (response != null) {
+                            lock.lock();
+                            try {
+                                responseList.add(response);
+                            } finally {
+                                lock.unlock();
+                            }
+                        }
+                    });
+                });
+            } finally {
+                Util.closeExecutorService(executor);
+            }
 
             if (responseList.size() > 0) {
                 jsonRpcSender.sendBatchResponses(responseList);
index 338c986ab9b071567fd110b64d5228e454bc2155..948560af768ebd02b33dfaeab9659a7deefc5d9a 100644 (file)
@@ -5,6 +5,9 @@ import com.fasterxml.jackson.annotation.PropertyAccessor;
 import com.fasterxml.jackson.core.JsonGenerator;
 import com.fasterxml.jackson.databind.ObjectMapper;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.net.URLDecoder;
 import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
@@ -13,10 +16,14 @@ import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 import java.util.Optional;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 public class Util {
 
+    private final static Logger logger = LoggerFactory.getLogger(Util.class);
+
     private Util() {
     }
 
@@ -80,4 +87,18 @@ public class Util {
         return map;
     }
 
+    public static void closeExecutorService(ExecutorService executor) {
+        executor.shutdown();
+        try {
+            if (!executor.awaitTermination(5, TimeUnit.MINUTES)) {
+                executor.shutdownNow();
+                if (!executor.awaitTermination(1, TimeUnit.MINUTES)) {
+                    logger.warn("Failed to shutdown executor service");
+                }
+            }
+        } catch (InterruptedException e) {
+            executor.shutdownNow();
+            Thread.currentThread().interrupt();
+        }
+    }
 }