From: AsamK Date: Thu, 5 Oct 2023 20:18:13 +0000 (+0200) Subject: Execute JSON-RPC requests in parallel X-Git-Tag: v0.12.3~29 X-Git-Url: https://git.nmode.ca/signal-cli/commitdiff_plain/6f4d538832b0ee1a4bb45de5d96c9afd5d07f463 Execute JSON-RPC requests in parallel --- diff --git a/src/main/java/org/asamk/signal/jsonrpc/JsonRpcReader.java b/src/main/java/org/asamk/signal/jsonrpc/JsonRpcReader.java index 27da9b0b..16f5fbd0 100644 --- a/src/main/java/org/asamk/signal/jsonrpc/JsonRpcReader.java +++ b/src/main/java/org/asamk/signal/jsonrpc/JsonRpcReader.java @@ -14,7 +14,9 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.InputStream; -import java.util.Objects; +import java.util.ArrayList; +import java.util.concurrent.Executors; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.Consumer; import java.util.function.Supplier; import java.util.stream.StreamSupport; @@ -53,20 +55,25 @@ public class JsonRpcReader { return; } - while (!Thread.interrupted()) { - String input = lineSupplier.get(); - if (input == null) { - logger.trace("Reached end of JSON-RPC input stream."); - break; - } + final var executor = Executors.newFixedThreadPool(10); + try { + while (!Thread.interrupted()) { + final var input = lineSupplier.get(); + if (input == null) { + logger.trace("Reached end of JSON-RPC input stream."); + break; + } - logger.trace("Incoming JSON-RPC message: {}", input); - JsonRpcMessage message = parseJsonRpcMessage(input); - if (message == null) { - continue; - } + logger.trace("Incoming JSON-RPC message: {}", input); + final var message = parseJsonRpcMessage(input); + if (message == null) { + continue; + } - handleMessage(message, requestHandler, responseHandler); + executor.submit(() -> handleMessage(message, requestHandler, responseHandler)); + } + } finally { + Util.closeExecutorService(executor); } } @@ -84,16 +91,41 @@ public class JsonRpcReader { } else if (message instanceof JsonRpcResponse jsonRpcResponse) { responseHandler.accept(jsonRpcResponse); } else { - final var responseList = ((JsonRpcBatchMessage) message).getMessages().stream().map(jsonNode -> { - final JsonRpcRequest request; - try { - request = parseJsonRpcRequest(jsonNode); - } catch (JsonRpcException e) { - return JsonRpcResponse.forError(e.getError(), getId(jsonNode)); - } - - return handleRequest(requestHandler, request); - }).filter(Objects::nonNull).toList(); + final var messages = ((JsonRpcBatchMessage) message).getMessages(); + final var responseList = new ArrayList(messages.size()); + final var executor = Executors.newFixedThreadPool(10); + try { + final var lock = new ReentrantLock(); + messages.forEach(jsonNode -> { + final JsonRpcRequest request; + try { + request = parseJsonRpcRequest(jsonNode); + } catch (JsonRpcException e) { + final var response = JsonRpcResponse.forError(e.getError(), getId(jsonNode)); + lock.lock(); + try { + responseList.add(response); + } finally { + lock.unlock(); + } + return; + } + + executor.submit(() -> { + final var response = handleRequest(requestHandler, request); + if (response != null) { + lock.lock(); + try { + responseList.add(response); + } finally { + lock.unlock(); + } + } + }); + }); + } finally { + Util.closeExecutorService(executor); + } if (responseList.size() > 0) { jsonRpcSender.sendBatchResponses(responseList); diff --git a/src/main/java/org/asamk/signal/util/Util.java b/src/main/java/org/asamk/signal/util/Util.java index 338c986a..948560af 100644 --- a/src/main/java/org/asamk/signal/util/Util.java +++ b/src/main/java/org/asamk/signal/util/Util.java @@ -5,6 +5,9 @@ import com.fasterxml.jackson.annotation.PropertyAccessor; import com.fasterxml.jackson.core.JsonGenerator; import com.fasterxml.jackson.databind.ObjectMapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.net.URLDecoder; import java.nio.charset.StandardCharsets; import java.util.Arrays; @@ -13,10 +16,14 @@ import java.util.List; import java.util.Locale; import java.util.Map; import java.util.Optional; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; public class Util { + private final static Logger logger = LoggerFactory.getLogger(Util.class); + private Util() { } @@ -80,4 +87,18 @@ public class Util { return map; } + public static void closeExecutorService(ExecutorService executor) { + executor.shutdown(); + try { + if (!executor.awaitTermination(5, TimeUnit.MINUTES)) { + executor.shutdownNow(); + if (!executor.awaitTermination(1, TimeUnit.MINUTES)) { + logger.warn("Failed to shutdown executor service"); + } + } + } catch (InterruptedException e) { + executor.shutdownNow(); + Thread.currentThread().interrupt(); + } + } }