- } else if (message instanceof JsonRpcResponse jsonRpcResponse) {
- responseHandler.accept(jsonRpcResponse);
- } else {
- final var responseList = ((JsonRpcBatchMessage) message).getMessages().stream().map(jsonNode -> {
- final JsonRpcRequest request;
- try {
- request = parseJsonRpcRequest(jsonNode);
- } catch (JsonRpcException e) {
- return JsonRpcResponse.forError(e.getError(), getId(jsonNode));
- }
-
- return handleRequest(requestHandler, request);
- }).filter(Objects::nonNull).collect(Collectors.toList());
-
- jsonRpcSender.sendBatchResponses(responseList);
+ }
+ case JsonRpcResponse jsonRpcResponse -> responseHandler.accept(jsonRpcResponse);
+ case JsonRpcBatchMessage jsonRpcBatchMessage -> {
+ final var messages = jsonRpcBatchMessage.getMessages();
+ final var responseList = new ArrayList<JsonRpcResponse>(messages.size());
+ try (final var executor = Executors.newCachedThreadPool()) {
+ final var lock = new ReentrantLock();
+ messages.forEach(jsonNode -> {
+ final JsonRpcRequest request;
+ try {
+ request = parseJsonRpcRequest(jsonNode);
+ } catch (JsonRpcException e) {
+ final var response = JsonRpcResponse.forError(e.getError(), getId(jsonNode));
+ lock.lock();
+ try {
+ responseList.add(response);
+ } finally {
+ lock.unlock();
+ }
+ return;
+ }
+
+ executor.submit(() -> {
+ final var response = handleRequest(requestHandler, request);
+ if (response != null) {
+ lock.lock();
+ try {
+ responseList.add(response);
+ } finally {
+ lock.unlock();
+ }
+ }
+ });
+ });
+ }
+
+ if (!responseList.isEmpty()) {
+ jsonRpcSender.sendBatchResponses(responseList);
+ }