X-Git-Url: https://git.nmode.ca/signal-cli/blobdiff_plain/a0c345185ba91b9aad0dcc7e1f94d0c3269a8cae..6f4d538832b0ee1a4bb45de5d96c9afd5d07f463:/src/main/java/org/asamk/signal/jsonrpc/JsonRpcReader.java diff --git a/src/main/java/org/asamk/signal/jsonrpc/JsonRpcReader.java b/src/main/java/org/asamk/signal/jsonrpc/JsonRpcReader.java index 27da9b0b..16f5fbd0 100644 --- a/src/main/java/org/asamk/signal/jsonrpc/JsonRpcReader.java +++ b/src/main/java/org/asamk/signal/jsonrpc/JsonRpcReader.java @@ -14,7 +14,9 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.InputStream; -import java.util.Objects; +import java.util.ArrayList; +import java.util.concurrent.Executors; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.Consumer; import java.util.function.Supplier; import java.util.stream.StreamSupport; @@ -53,20 +55,25 @@ public class JsonRpcReader { return; } - while (!Thread.interrupted()) { - String input = lineSupplier.get(); - if (input == null) { - logger.trace("Reached end of JSON-RPC input stream."); - break; - } + final var executor = Executors.newFixedThreadPool(10); + try { + while (!Thread.interrupted()) { + final var input = lineSupplier.get(); + if (input == null) { + logger.trace("Reached end of JSON-RPC input stream."); + break; + } - logger.trace("Incoming JSON-RPC message: {}", input); - JsonRpcMessage message = parseJsonRpcMessage(input); - if (message == null) { - continue; - } + logger.trace("Incoming JSON-RPC message: {}", input); + final var message = parseJsonRpcMessage(input); + if (message == null) { + continue; + } - handleMessage(message, requestHandler, responseHandler); + executor.submit(() -> handleMessage(message, requestHandler, responseHandler)); + } + } finally { + Util.closeExecutorService(executor); } } @@ -84,16 +91,41 @@ public class JsonRpcReader { } else if (message instanceof JsonRpcResponse jsonRpcResponse) { responseHandler.accept(jsonRpcResponse); } else { - final var responseList = ((JsonRpcBatchMessage) message).getMessages().stream().map(jsonNode -> { - final JsonRpcRequest request; - try { - request = parseJsonRpcRequest(jsonNode); - } catch (JsonRpcException e) { - return JsonRpcResponse.forError(e.getError(), getId(jsonNode)); - } - - return handleRequest(requestHandler, request); - }).filter(Objects::nonNull).toList(); + final var messages = ((JsonRpcBatchMessage) message).getMessages(); + final var responseList = new ArrayList(messages.size()); + final var executor = Executors.newFixedThreadPool(10); + try { + final var lock = new ReentrantLock(); + messages.forEach(jsonNode -> { + final JsonRpcRequest request; + try { + request = parseJsonRpcRequest(jsonNode); + } catch (JsonRpcException e) { + final var response = JsonRpcResponse.forError(e.getError(), getId(jsonNode)); + lock.lock(); + try { + responseList.add(response); + } finally { + lock.unlock(); + } + return; + } + + executor.submit(() -> { + final var response = handleRequest(requestHandler, request); + if (response != null) { + lock.lock(); + try { + responseList.add(response); + } finally { + lock.unlock(); + } + } + }); + }); + } finally { + Util.closeExecutorService(executor); + } if (responseList.size() > 0) { jsonRpcSender.sendBatchResponses(responseList);