X-Git-Url: https://git.nmode.ca/signal-cli/blobdiff_plain/fd851ba6cb39369f2cb1b5958a90cd023c05426a..e1f4dae5c20b2cb98975e6ba16bc73f2a45423e6:/src/main/java/org/asamk/signal/jsonrpc/JsonRpcReader.java diff --git a/src/main/java/org/asamk/signal/jsonrpc/JsonRpcReader.java b/src/main/java/org/asamk/signal/jsonrpc/JsonRpcReader.java index 27da9b0b..327342a2 100644 --- a/src/main/java/org/asamk/signal/jsonrpc/JsonRpcReader.java +++ b/src/main/java/org/asamk/signal/jsonrpc/JsonRpcReader.java @@ -14,14 +14,16 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.InputStream; -import java.util.Objects; +import java.util.ArrayList; +import java.util.concurrent.Executors; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.Consumer; import java.util.function.Supplier; import java.util.stream.StreamSupport; public class JsonRpcReader { - private final static Logger logger = LoggerFactory.getLogger(JsonRpcReader.class); + private static final Logger logger = LoggerFactory.getLogger(JsonRpcReader.class); private final JsonRpcSender jsonRpcSender; private final ObjectMapper objectMapper; @@ -53,20 +55,22 @@ public class JsonRpcReader { return; } - while (!Thread.interrupted()) { - String input = lineSupplier.get(); - if (input == null) { - logger.trace("Reached end of JSON-RPC input stream."); - break; - } + try (final var executor = Executors.newCachedThreadPool()) { + while (!Thread.interrupted()) { + final var input = lineSupplier.get(); + if (input == null) { + logger.trace("Reached end of JSON-RPC input stream."); + break; + } - logger.trace("Incoming JSON-RPC message: {}", input); - JsonRpcMessage message = parseJsonRpcMessage(input); - if (message == null) { - continue; - } + logger.trace("Incoming JSON-RPC message: {}", input); + final var message = parseJsonRpcMessage(input); + if (message == null) { + continue; + } - handleMessage(message, requestHandler, responseHandler); + executor.submit(() -> handleMessage(message, requestHandler, responseHandler)); + } } } @@ -75,28 +79,52 @@ public class JsonRpcReader { final RequestHandler requestHandler, final Consumer responseHandler ) { - if (message instanceof final JsonRpcRequest jsonRpcRequest) { - logger.debug("Received json rpc request, method: " + jsonRpcRequest.getMethod()); - final var response = handleRequest(requestHandler, jsonRpcRequest); - if (response != null) { - jsonRpcSender.sendResponse(response); + switch (message) { + case JsonRpcRequest jsonRpcRequest -> { + logger.debug("Received json rpc request, method: " + jsonRpcRequest.getMethod()); + final var response = handleRequest(requestHandler, jsonRpcRequest); + if (response != null) { + jsonRpcSender.sendResponse(response); + } } - } else if (message instanceof JsonRpcResponse jsonRpcResponse) { - responseHandler.accept(jsonRpcResponse); - } else { - final var responseList = ((JsonRpcBatchMessage) message).getMessages().stream().map(jsonNode -> { - final JsonRpcRequest request; - try { - request = parseJsonRpcRequest(jsonNode); - } catch (JsonRpcException e) { - return JsonRpcResponse.forError(e.getError(), getId(jsonNode)); + case JsonRpcResponse jsonRpcResponse -> responseHandler.accept(jsonRpcResponse); + case JsonRpcBatchMessage jsonRpcBatchMessage -> { + final var messages = jsonRpcBatchMessage.getMessages(); + final var responseList = new ArrayList(messages.size()); + try (final var executor = Executors.newCachedThreadPool()) { + final var lock = new ReentrantLock(); + messages.forEach(jsonNode -> { + final JsonRpcRequest request; + try { + request = parseJsonRpcRequest(jsonNode); + } catch (JsonRpcException e) { + final var response = JsonRpcResponse.forError(e.getError(), getId(jsonNode)); + lock.lock(); + try { + responseList.add(response); + } finally { + lock.unlock(); + } + return; + } + + executor.submit(() -> { + final var response = handleRequest(requestHandler, request); + if (response != null) { + lock.lock(); + try { + responseList.add(response); + } finally { + lock.unlock(); + } + } + }); + }); } - return handleRequest(requestHandler, request); - }).filter(Objects::nonNull).toList(); - - if (responseList.size() > 0) { - jsonRpcSender.sendBatchResponses(responseList); + if (!responseList.isEmpty()) { + jsonRpcSender.sendBatchResponses(responseList); + } } } } @@ -123,6 +151,13 @@ public class JsonRpcReader { } private JsonRpcMessage parseJsonRpcMessage(final String input) { + if (input.trim().isEmpty()) { + jsonRpcSender.sendResponse(JsonRpcResponse.forError(new JsonRpcResponse.Error(JsonRpcResponse.Error.PARSE_ERROR, + "Empty input line", + null), null)); + return null; + } + final JsonNode jsonNode; try { jsonNode = objectMapper.readTree(input); @@ -161,7 +196,7 @@ public class JsonRpcReader { null), null)); return null; } else if (jsonNode.isArray()) { - if (jsonNode.size() == 0) { + if (jsonNode.isEmpty()) { jsonRpcSender.sendResponse(JsonRpcResponse.forError(new JsonRpcResponse.Error(JsonRpcResponse.Error.INVALID_REQUEST, "invalid request", null), null)); @@ -189,7 +224,7 @@ public class JsonRpcReader { private ValueNode getId(JsonNode jsonNode) { final var id = jsonNode.get("id"); - return id instanceof ValueNode ? (ValueNode) id : null; + return id instanceof ValueNode value ? value : null; } private JsonRpcRequest parseJsonRpcRequest(final JsonNode input) throws JsonRpcException {