]> nmode's Git Repositories - signal-cli/blobdiff - src/main/java/org/asamk/signal/jsonrpc/JsonRpcReader.java
Allow using data URIs for updateGroup/updateProfile avatars
[signal-cli] / src / main / java / org / asamk / signal / jsonrpc / JsonRpcReader.java
index 0a7017be88568135e8d1c77931e9b692e04700f3..e49b6cd0154de68a2b996c174256fbabaa43fdca 100644 (file)
@@ -5,6 +5,7 @@ import com.fasterxml.jackson.databind.JsonMappingException;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ContainerNode;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ContainerNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.fasterxml.jackson.databind.node.ValueNode;
 
 import org.asamk.signal.util.Util;
 import com.fasterxml.jackson.databind.node.ValueNode;
 
 import org.asamk.signal.util.Util;
@@ -12,10 +13,10 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.io.InputStream;
 import java.util.Objects;
 import java.util.function.Consumer;
 import java.util.function.Supplier;
 import java.util.Objects;
 import java.util.function.Consumer;
 import java.util.function.Supplier;
-import java.util.stream.Collectors;
 import java.util.stream.StreamSupport;
 
 public class JsonRpcReader {
 import java.util.stream.StreamSupport;
 
 public class JsonRpcReader {
@@ -24,44 +25,77 @@ public class JsonRpcReader {
 
     private final JsonRpcSender jsonRpcSender;
     private final ObjectMapper objectMapper;
 
     private final JsonRpcSender jsonRpcSender;
     private final ObjectMapper objectMapper;
+    private final InputStream input;
     private final Supplier<String> lineSupplier;
 
     private final Supplier<String> lineSupplier;
 
-    public JsonRpcReader(
-            final JsonRpcSender jsonRpcSender, final Supplier<String> lineSupplier
-    ) {
+    public JsonRpcReader(final JsonRpcSender jsonRpcSender, final Supplier<String> lineSupplier) {
         this.jsonRpcSender = jsonRpcSender;
         this.jsonRpcSender = jsonRpcSender;
+        this.input = null;
         this.lineSupplier = lineSupplier;
         this.objectMapper = Util.createJsonObjectMapper();
     }
 
         this.lineSupplier = lineSupplier;
         this.objectMapper = Util.createJsonObjectMapper();
     }
 
-    public void readRequests(
-            final RequestHandler requestHandler, final Consumer<JsonRpcResponse> responseHandler
-    ) {
+    public JsonRpcReader(final JsonRpcSender jsonRpcSender, final InputStream input) {
+        this.jsonRpcSender = jsonRpcSender;
+        this.input = input;
+        this.lineSupplier = null;
+        this.objectMapper = Util.createJsonObjectMapper();
+    }
+
+    public void readMessages(final RequestHandler requestHandler, final Consumer<JsonRpcResponse> responseHandler) {
+        if (input != null) {
+            JsonRpcMessage message = parseJsonRpcMessage(input);
+            if (message == null) {
+                return;
+            }
+
+            handleMessage(message, requestHandler, responseHandler);
+            return;
+        }
+
         while (!Thread.interrupted()) {
         while (!Thread.interrupted()) {
-            JsonRpcMessage message = readMessage();
-            if (message == null) break;
+            String input = lineSupplier.get();
+            if (input == null) {
+                logger.trace("Reached end of JSON-RPC input stream.");
+                break;
+            }
 
 
-            if (message instanceof JsonRpcRequest) {
-                final var response = handleRequest(requestHandler, (JsonRpcRequest) message);
-                if (response != null) {
-                    jsonRpcSender.sendResponse(response);
-                }
-            } else if (message instanceof JsonRpcResponse) {
-                responseHandler.accept((JsonRpcResponse) message);
-            } else {
-                final var responseList = ((JsonRpcBulkMessage) message).getMessages().stream().map(jsonNode -> {
-                    final JsonRpcRequest request;
-                    try {
-                        request = parseJsonRpcRequest(jsonNode);
-                    } catch (JsonRpcException e) {
-                        return JsonRpcResponse.forError(e.getError(), getId(jsonNode));
-                    }
-
-                    return handleRequest(requestHandler, request);
-                }).filter(Objects::nonNull).collect(Collectors.toList());
-
-                jsonRpcSender.sendBulkResponses(responseList);
+            logger.trace("Incoming JSON-RPC message: {}", input);
+            JsonRpcMessage message = parseJsonRpcMessage(input);
+            if (message == null) {
+                continue;
             }
             }
+
+            handleMessage(message, requestHandler, responseHandler);
+        }
+    }
+
+    private void handleMessage(
+            final JsonRpcMessage message,
+            final RequestHandler requestHandler,
+            final Consumer<JsonRpcResponse> responseHandler
+    ) {
+        if (message instanceof final JsonRpcRequest jsonRpcRequest) {
+            logger.debug("Received json rpc request, method: " + jsonRpcRequest.getMethod());
+            final var response = handleRequest(requestHandler, jsonRpcRequest);
+            if (response != null) {
+                jsonRpcSender.sendResponse(response);
+            }
+        } else if (message instanceof JsonRpcResponse jsonRpcResponse) {
+            responseHandler.accept(jsonRpcResponse);
+        } else {
+            final var responseList = ((JsonRpcBatchMessage) message).getMessages().stream().map(jsonNode -> {
+                final JsonRpcRequest request;
+                try {
+                    request = parseJsonRpcRequest(jsonNode);
+                } catch (JsonRpcException e) {
+                    return JsonRpcResponse.forError(e.getError(), getId(jsonNode));
+                }
+
+                return handleRequest(requestHandler, request);
+            }).filter(Objects::nonNull).toList();
+
+            jsonRpcSender.sendBatchResponses(responseList);
         }
     }
 
         }
     }
 
@@ -86,25 +120,23 @@ public class JsonRpcReader {
         return null;
     }
 
         return null;
     }
 
-    private JsonRpcMessage readMessage() {
-        while (!Thread.interrupted()) {
-            String input = lineSupplier.get();
-
-            if (input == null) {
-                // Reached end of input stream
-                break;
-            }
-
-            JsonRpcMessage message = parseJsonRpcMessage(input);
-            if (message == null) continue;
-
-            return message;
+    private JsonRpcMessage parseJsonRpcMessage(final String input) {
+        final JsonNode jsonNode;
+        try {
+            jsonNode = objectMapper.readTree(input);
+        } catch (JsonParseException e) {
+            jsonRpcSender.sendResponse(JsonRpcResponse.forError(new JsonRpcResponse.Error(JsonRpcResponse.Error.PARSE_ERROR,
+                    e.getMessage(),
+                    null), null));
+            return null;
+        } catch (IOException e) {
+            throw new AssertionError(e);
         }
 
         }
 
-        return null;
+        return parseJsonRpcMessage(jsonNode);
     }
 
     }
 
-    private JsonRpcMessage parseJsonRpcMessage(final String input) {
+    private JsonRpcMessage parseJsonRpcMessage(final InputStream input) {
         final JsonNode jsonNode;
         try {
             jsonNode = objectMapper.readTree(input);
         final JsonNode jsonNode;
         try {
             jsonNode = objectMapper.readTree(input);
@@ -117,6 +149,10 @@ public class JsonRpcReader {
             throw new AssertionError(e);
         }
 
             throw new AssertionError(e);
         }
 
+        return parseJsonRpcMessage(jsonNode);
+    }
+
+    private JsonRpcMessage parseJsonRpcMessage(final JsonNode jsonNode) {
         if (jsonNode == null) {
             jsonRpcSender.sendResponse(JsonRpcResponse.forError(new JsonRpcResponse.Error(JsonRpcResponse.Error.INVALID_REQUEST,
                     "invalid request",
         if (jsonNode == null) {
             jsonRpcSender.sendResponse(JsonRpcResponse.forError(new JsonRpcResponse.Error(JsonRpcResponse.Error.INVALID_REQUEST,
                     "invalid request",
@@ -129,8 +165,7 @@ public class JsonRpcReader {
                         null), null));
                 return null;
             }
                         null), null));
                 return null;
             }
-            return new JsonRpcBulkMessage(StreamSupport.stream(jsonNode.spliterator(), false)
-                    .collect(Collectors.toList()));
+            return new JsonRpcBatchMessage(StreamSupport.stream(jsonNode.spliterator(), false).toList());
         } else if (jsonNode.isObject()) {
             if (jsonNode.has("result") || jsonNode.has("error")) {
                 return parseJsonRpcResponse(jsonNode);
         } else if (jsonNode.isObject()) {
             if (jsonNode.has("result") || jsonNode.has("error")) {
                 return parseJsonRpcResponse(jsonNode);
@@ -156,6 +191,10 @@ public class JsonRpcReader {
     }
 
     private JsonRpcRequest parseJsonRpcRequest(final JsonNode input) throws JsonRpcException {
     }
 
     private JsonRpcRequest parseJsonRpcRequest(final JsonNode input) throws JsonRpcException {
+        if (input instanceof ObjectNode i && input.has("params") && input.get("params").isNull()) {
+            // Workaround for clients that send a null params field instead of omitting it
+            i.remove("params");
+        }
         JsonRpcRequest request;
         try {
             request = objectMapper.treeToValue(input, JsonRpcRequest.class);
         JsonRpcRequest request;
         try {
             request = objectMapper.treeToValue(input, JsonRpcRequest.class);