import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ContainerNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.databind.node.ValueNode;
import org.asamk.signal.util.Util;
import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.util.Objects;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.concurrent.Executors;
+import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.StreamSupport;
public class JsonRpcReader {
- private final static Logger logger = LoggerFactory.getLogger(JsonRpcReader.class);
+ private static final Logger logger = LoggerFactory.getLogger(JsonRpcReader.class);
private final JsonRpcSender jsonRpcSender;
private final ObjectMapper objectMapper;
+ private final InputStream input;
private final Supplier<String> lineSupplier;
- public JsonRpcReader(
- final JsonRpcSender jsonRpcSender, final Supplier<String> lineSupplier
- ) {
+ public JsonRpcReader(final JsonRpcSender jsonRpcSender, final Supplier<String> lineSupplier) {
this.jsonRpcSender = jsonRpcSender;
+ this.input = null;
this.lineSupplier = lineSupplier;
this.objectMapper = Util.createJsonObjectMapper();
}
+ public JsonRpcReader(final JsonRpcSender jsonRpcSender, final InputStream input) {
+ this.jsonRpcSender = jsonRpcSender;
+ this.input = input;
+ this.lineSupplier = null;
+ this.objectMapper = Util.createJsonObjectMapper();
+ }
+
public void readMessages(final RequestHandler requestHandler, final Consumer<JsonRpcResponse> responseHandler) {
- while (!Thread.interrupted()) {
- JsonRpcMessage message = readMessage();
- if (message == null) break;
+ if (input != null) {
+ JsonRpcMessage message = parseJsonRpcMessage(input);
+ if (message == null) {
+ return;
+ }
+
+ handleMessage(message, requestHandler, responseHandler);
+ return;
+ }
+
+ try (final var executor = Executors.newCachedThreadPool()) {
+ while (!Thread.interrupted()) {
+ final var input = lineSupplier.get();
+ if (input == null) {
+ logger.trace("Reached end of JSON-RPC input stream.");
+ break;
+ }
+
+ logger.trace("Incoming JSON-RPC message: {}", input);
+ final var message = parseJsonRpcMessage(input);
+ if (message == null) {
+ continue;
+ }
+
+ executor.submit(() -> handleMessage(message, requestHandler, responseHandler));
+ }
+ }
+ }
- if (message instanceof final JsonRpcRequest jsonRpcRequest) {
- logger.debug("Received json rpc request, method: " + jsonRpcRequest.method);
+ private void handleMessage(
+ final JsonRpcMessage message,
+ final RequestHandler requestHandler,
+ final Consumer<JsonRpcResponse> responseHandler
+ ) {
+ switch (message) {
+ case JsonRpcRequest jsonRpcRequest -> {
+ logger.debug("Received json rpc request, method: " + jsonRpcRequest.getMethod());
final var response = handleRequest(requestHandler, jsonRpcRequest);
if (response != null) {
jsonRpcSender.sendResponse(response);
}
- } else if (message instanceof JsonRpcResponse jsonRpcResponse) {
- responseHandler.accept(jsonRpcResponse);
- } else {
- final var responseList = ((JsonRpcBatchMessage) message).getMessages().stream().map(jsonNode -> {
- final JsonRpcRequest request;
- try {
- request = parseJsonRpcRequest(jsonNode);
- } catch (JsonRpcException e) {
- return JsonRpcResponse.forError(e.getError(), getId(jsonNode));
- }
-
- return handleRequest(requestHandler, request);
- }).filter(Objects::nonNull).toList();
-
- jsonRpcSender.sendBatchResponses(responseList);
+ }
+ case JsonRpcResponse jsonRpcResponse -> responseHandler.accept(jsonRpcResponse);
+ case JsonRpcBatchMessage jsonRpcBatchMessage -> {
+ final var messages = jsonRpcBatchMessage.getMessages();
+ final var responseList = new ArrayList<JsonRpcResponse>(messages.size());
+ try (final var executor = Executors.newCachedThreadPool()) {
+ final var lock = new ReentrantLock();
+ messages.forEach(jsonNode -> {
+ final JsonRpcRequest request;
+ try {
+ request = parseJsonRpcRequest(jsonNode);
+ } catch (JsonRpcException e) {
+ final var response = JsonRpcResponse.forError(e.getError(), getId(jsonNode));
+ lock.lock();
+ try {
+ responseList.add(response);
+ } finally {
+ lock.unlock();
+ }
+ return;
+ }
+
+ executor.submit(() -> {
+ final var response = handleRequest(requestHandler, request);
+ if (response != null) {
+ lock.lock();
+ try {
+ responseList.add(response);
+ } finally {
+ lock.unlock();
+ }
+ }
+ });
+ });
+ }
+
+ if (!responseList.isEmpty()) {
+ jsonRpcSender.sendBatchResponses(responseList);
+ }
}
}
}
return null;
}
- private JsonRpcMessage readMessage() {
- while (!Thread.interrupted()) {
- String input = lineSupplier.get();
-
- if (input == null) {
- // Reached end of input stream
- break;
- }
-
- JsonRpcMessage message = parseJsonRpcMessage(input);
- if (message == null) continue;
+ private JsonRpcMessage parseJsonRpcMessage(final String input) {
+ if (input.trim().isEmpty()) {
+ jsonRpcSender.sendResponse(JsonRpcResponse.forError(new JsonRpcResponse.Error(JsonRpcResponse.Error.PARSE_ERROR,
+ "Empty input line",
+ null), null));
+ return null;
+ }
- return message;
+ final JsonNode jsonNode;
+ try {
+ jsonNode = objectMapper.readTree(input);
+ } catch (JsonParseException e) {
+ jsonRpcSender.sendResponse(JsonRpcResponse.forError(new JsonRpcResponse.Error(JsonRpcResponse.Error.PARSE_ERROR,
+ e.getMessage(),
+ null), null));
+ return null;
+ } catch (IOException e) {
+ throw new AssertionError(e);
}
- return null;
+ return parseJsonRpcMessage(jsonNode);
}
- private JsonRpcMessage parseJsonRpcMessage(final String input) {
+ private JsonRpcMessage parseJsonRpcMessage(final InputStream input) {
final JsonNode jsonNode;
try {
jsonNode = objectMapper.readTree(input);
throw new AssertionError(e);
}
+ return parseJsonRpcMessage(jsonNode);
+ }
+
+ private JsonRpcMessage parseJsonRpcMessage(final JsonNode jsonNode) {
if (jsonNode == null) {
jsonRpcSender.sendResponse(JsonRpcResponse.forError(new JsonRpcResponse.Error(JsonRpcResponse.Error.INVALID_REQUEST,
"invalid request",
null), null));
return null;
} else if (jsonNode.isArray()) {
- if (jsonNode.size() == 0) {
+ if (jsonNode.isEmpty()) {
jsonRpcSender.sendResponse(JsonRpcResponse.forError(new JsonRpcResponse.Error(JsonRpcResponse.Error.INVALID_REQUEST,
"invalid request",
null), null));
private ValueNode getId(JsonNode jsonNode) {
final var id = jsonNode.get("id");
- return id instanceof ValueNode ? (ValueNode) id : null;
+ return id instanceof ValueNode value ? value : null;
}
private JsonRpcRequest parseJsonRpcRequest(final JsonNode input) throws JsonRpcException {
+ if (input instanceof ObjectNode i && input.has("params") && input.get("params").isNull()) {
+ // Workaround for clients that send a null params field instead of omitting it
+ i.remove("params");
+ }
JsonRpcRequest request;
try {
request = objectMapper.treeToValue(input, JsonRpcRequest.class);