import java.io.IOException;
import java.io.InputStream;
-import java.util.Objects;
+import java.util.ArrayList;
+import java.util.concurrent.Executors;
+import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.StreamSupport;
return;
}
- while (!Thread.interrupted()) {
- String input = lineSupplier.get();
- if (input == null) {
- logger.trace("Reached end of JSON-RPC input stream.");
- break;
- }
+ final var executor = Executors.newFixedThreadPool(10);
+ try {
+ while (!Thread.interrupted()) {
+ final var input = lineSupplier.get();
+ if (input == null) {
+ logger.trace("Reached end of JSON-RPC input stream.");
+ break;
+ }
- logger.trace("Incoming JSON-RPC message: {}", input);
- JsonRpcMessage message = parseJsonRpcMessage(input);
- if (message == null) {
- continue;
- }
+ logger.trace("Incoming JSON-RPC message: {}", input);
+ final var message = parseJsonRpcMessage(input);
+ if (message == null) {
+ continue;
+ }
- handleMessage(message, requestHandler, responseHandler);
+ executor.submit(() -> handleMessage(message, requestHandler, responseHandler));
+ }
+ } finally {
+ Util.closeExecutorService(executor);
}
}
} else if (message instanceof JsonRpcResponse jsonRpcResponse) {
responseHandler.accept(jsonRpcResponse);
} else {
- final var responseList = ((JsonRpcBatchMessage) message).getMessages().stream().map(jsonNode -> {
- final JsonRpcRequest request;
- try {
- request = parseJsonRpcRequest(jsonNode);
- } catch (JsonRpcException e) {
- return JsonRpcResponse.forError(e.getError(), getId(jsonNode));
- }
-
- return handleRequest(requestHandler, request);
- }).filter(Objects::nonNull).toList();
+ final var messages = ((JsonRpcBatchMessage) message).getMessages();
+ final var responseList = new ArrayList<JsonRpcResponse>(messages.size());
+ final var executor = Executors.newFixedThreadPool(10);
+ try {
+ final var lock = new ReentrantLock();
+ messages.forEach(jsonNode -> {
+ final JsonRpcRequest request;
+ try {
+ request = parseJsonRpcRequest(jsonNode);
+ } catch (JsonRpcException e) {
+ final var response = JsonRpcResponse.forError(e.getError(), getId(jsonNode));
+ lock.lock();
+ try {
+ responseList.add(response);
+ } finally {
+ lock.unlock();
+ }
+ return;
+ }
+
+ executor.submit(() -> {
+ final var response = handleRequest(requestHandler, request);
+ if (response != null) {
+ lock.lock();
+ try {
+ responseList.add(response);
+ } finally {
+ lock.unlock();
+ }
+ }
+ });
+ });
+ } finally {
+ Util.closeExecutorService(executor);
+ }
if (responseList.size() > 0) {
jsonRpcSender.sendBatchResponses(responseList);
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.net.URLDecoder;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
public class Util {
+ private final static Logger logger = LoggerFactory.getLogger(Util.class);
+
private Util() {
}
return map;
}
+ public static void closeExecutorService(ExecutorService executor) {
+ executor.shutdown();
+ try {
+ if (!executor.awaitTermination(5, TimeUnit.MINUTES)) {
+ executor.shutdownNow();
+ if (!executor.awaitTermination(1, TimeUnit.MINUTES)) {
+ logger.warn("Failed to shutdown executor service");
+ }
+ }
+ } catch (InterruptedException e) {
+ executor.shutdownNow();
+ Thread.currentThread().interrupt();
+ }
+ }
}