- } else if (message instanceof JsonRpcResponse jsonRpcResponse) {
- responseHandler.accept(jsonRpcResponse);
- } else {
- final var responseList = ((JsonRpcBatchMessage) message).getMessages().stream().map(jsonNode -> {
+
+ logger.trace("Incoming JSON-RPC message: {}", input);
+ final var message = parseJsonRpcMessage(input);
+ if (message == null) {
+ continue;
+ }
+
+ executor.submit(() -> handleMessage(message, requestHandler, responseHandler));
+ }
+ } finally {
+ Util.closeExecutorService(executor);
+ }
+ }
+
+ private void handleMessage(
+ final JsonRpcMessage message,
+ final RequestHandler requestHandler,
+ final Consumer<JsonRpcResponse> responseHandler
+ ) {
+ if (message instanceof final JsonRpcRequest jsonRpcRequest) {
+ logger.debug("Received json rpc request, method: " + jsonRpcRequest.getMethod());
+ final var response = handleRequest(requestHandler, jsonRpcRequest);
+ if (response != null) {
+ jsonRpcSender.sendResponse(response);
+ }
+ } else if (message instanceof JsonRpcResponse jsonRpcResponse) {
+ responseHandler.accept(jsonRpcResponse);
+ } else {
+ final var messages = ((JsonRpcBatchMessage) message).getMessages();
+ final var responseList = new ArrayList<JsonRpcResponse>(messages.size());
+ final var executor = Executors.newFixedThreadPool(10);
+ try {
+ final var lock = new ReentrantLock();
+ messages.forEach(jsonNode -> {