1 package org
.asamk
.signal
.jsonrpc
;
3 import com
.fasterxml
.jackson
.core
.JsonParseException
;
4 import com
.fasterxml
.jackson
.databind
.JsonMappingException
;
5 import com
.fasterxml
.jackson
.databind
.JsonNode
;
6 import com
.fasterxml
.jackson
.databind
.ObjectMapper
;
7 import com
.fasterxml
.jackson
.databind
.node
.ContainerNode
;
8 import com
.fasterxml
.jackson
.databind
.node
.ObjectNode
;
9 import com
.fasterxml
.jackson
.databind
.node
.ValueNode
;
11 import org
.asamk
.signal
.util
.Util
;
12 import org
.slf4j
.Logger
;
13 import org
.slf4j
.LoggerFactory
;
15 import java
.io
.IOException
;
16 import java
.io
.InputStream
;
17 import java
.util
.ArrayList
;
18 import java
.util
.concurrent
.Executors
;
19 import java
.util
.concurrent
.locks
.ReentrantLock
;
20 import java
.util
.function
.Consumer
;
21 import java
.util
.function
.Supplier
;
22 import java
.util
.stream
.StreamSupport
;
24 public class JsonRpcReader
{
26 private final static Logger logger
= LoggerFactory
.getLogger(JsonRpcReader
.class);
28 private final JsonRpcSender jsonRpcSender
;
29 private final ObjectMapper objectMapper
;
30 private final InputStream input
;
31 private final Supplier
<String
> lineSupplier
;
33 public JsonRpcReader(final JsonRpcSender jsonRpcSender
, final Supplier
<String
> lineSupplier
) {
34 this.jsonRpcSender
= jsonRpcSender
;
36 this.lineSupplier
= lineSupplier
;
37 this.objectMapper
= Util
.createJsonObjectMapper();
40 public JsonRpcReader(final JsonRpcSender jsonRpcSender
, final InputStream input
) {
41 this.jsonRpcSender
= jsonRpcSender
;
43 this.lineSupplier
= null;
44 this.objectMapper
= Util
.createJsonObjectMapper();
47 public void readMessages(final RequestHandler requestHandler
, final Consumer
<JsonRpcResponse
> responseHandler
) {
49 JsonRpcMessage message
= parseJsonRpcMessage(input
);
50 if (message
== null) {
54 handleMessage(message
, requestHandler
, responseHandler
);
58 final var executor
= Executors
.newFixedThreadPool(10);
60 while (!Thread
.interrupted()) {
61 final var input
= lineSupplier
.get();
63 logger
.trace("Reached end of JSON-RPC input stream.");
67 logger
.trace("Incoming JSON-RPC message: {}", input
);
68 final var message
= parseJsonRpcMessage(input
);
69 if (message
== null) {
73 executor
.submit(() -> handleMessage(message
, requestHandler
, responseHandler
));
76 Util
.closeExecutorService(executor
);
80 private void handleMessage(
81 final JsonRpcMessage message
,
82 final RequestHandler requestHandler
,
83 final Consumer
<JsonRpcResponse
> responseHandler
86 case JsonRpcRequest jsonRpcRequest
-> {
87 logger
.debug("Received json rpc request, method: " + jsonRpcRequest
.getMethod());
88 final var response
= handleRequest(requestHandler
, jsonRpcRequest
);
89 if (response
!= null) {
90 jsonRpcSender
.sendResponse(response
);
93 case JsonRpcResponse jsonRpcResponse
-> responseHandler
.accept(jsonRpcResponse
);
94 case JsonRpcBatchMessage jsonRpcBatchMessage
-> {
95 final var messages
= jsonRpcBatchMessage
.getMessages();
96 final var responseList
= new ArrayList
<JsonRpcResponse
>(messages
.size());
97 final var executor
= Executors
.newFixedThreadPool(10);
99 final var lock
= new ReentrantLock();
100 messages
.forEach(jsonNode
-> {
101 final JsonRpcRequest request
;
103 request
= parseJsonRpcRequest(jsonNode
);
104 } catch (JsonRpcException e
) {
105 final var response
= JsonRpcResponse
.forError(e
.getError(), getId(jsonNode
));
108 responseList
.add(response
);
115 executor
.submit(() -> {
116 final var response
= handleRequest(requestHandler
, request
);
117 if (response
!= null) {
120 responseList
.add(response
);
128 Util
.closeExecutorService(executor
);
131 if (!responseList
.isEmpty()) {
132 jsonRpcSender
.sendBatchResponses(responseList
);
138 private JsonRpcResponse
handleRequest(final RequestHandler requestHandler
, final JsonRpcRequest request
) {
140 final var result
= requestHandler
.apply(request
.getMethod(), request
.getParams());
141 if (request
.getId() != null) {
142 return JsonRpcResponse
.forSuccess(result
, request
.getId());
144 logger
.debug("Command '{}' succeeded but client didn't specify an id, dropping response",
145 request
.getMethod());
147 } catch (JsonRpcException e
) {
148 if (request
.getId() != null) {
149 return JsonRpcResponse
.forError(e
.getError(), request
.getId());
151 logger
.debug("Command '{}' failed but client didn't specify an id, dropping error: {}",
159 private JsonRpcMessage
parseJsonRpcMessage(final String input
) {
160 final JsonNode jsonNode
;
162 jsonNode
= objectMapper
.readTree(input
);
163 } catch (JsonParseException e
) {
164 jsonRpcSender
.sendResponse(JsonRpcResponse
.forError(new JsonRpcResponse
.Error(JsonRpcResponse
.Error
.PARSE_ERROR
,
168 } catch (IOException e
) {
169 throw new AssertionError(e
);
172 return parseJsonRpcMessage(jsonNode
);
175 private JsonRpcMessage
parseJsonRpcMessage(final InputStream input
) {
176 final JsonNode jsonNode
;
178 jsonNode
= objectMapper
.readTree(input
);
179 } catch (JsonParseException e
) {
180 jsonRpcSender
.sendResponse(JsonRpcResponse
.forError(new JsonRpcResponse
.Error(JsonRpcResponse
.Error
.PARSE_ERROR
,
184 } catch (IOException e
) {
185 throw new AssertionError(e
);
188 return parseJsonRpcMessage(jsonNode
);
191 private JsonRpcMessage
parseJsonRpcMessage(final JsonNode jsonNode
) {
192 if (jsonNode
== null) {
193 jsonRpcSender
.sendResponse(JsonRpcResponse
.forError(new JsonRpcResponse
.Error(JsonRpcResponse
.Error
.INVALID_REQUEST
,
197 } else if (jsonNode
.isArray()) {
198 if (jsonNode
.isEmpty()) {
199 jsonRpcSender
.sendResponse(JsonRpcResponse
.forError(new JsonRpcResponse
.Error(JsonRpcResponse
.Error
.INVALID_REQUEST
,
204 return new JsonRpcBatchMessage(StreamSupport
.stream(jsonNode
.spliterator(), false).toList());
205 } else if (jsonNode
.isObject()) {
206 if (jsonNode
.has("result") || jsonNode
.has("error")) {
207 return parseJsonRpcResponse(jsonNode
);
210 return parseJsonRpcRequest(jsonNode
);
211 } catch (JsonRpcException e
) {
212 jsonRpcSender
.sendResponse(JsonRpcResponse
.forError(e
.getError(), getId(jsonNode
)));
217 jsonRpcSender
.sendResponse(JsonRpcResponse
.forError(new JsonRpcResponse
.Error(JsonRpcResponse
.Error
.INVALID_REQUEST
,
218 "unexpected type: " + jsonNode
.getNodeType().name(),
224 private ValueNode
getId(JsonNode jsonNode
) {
225 final var id
= jsonNode
.get("id");
226 return id
instanceof ValueNode value ? value
: null;
229 private JsonRpcRequest
parseJsonRpcRequest(final JsonNode input
) throws JsonRpcException
{
230 if (input
instanceof ObjectNode i
&& input
.has("params") && input
.get("params").isNull()) {
231 // Workaround for clients that send a null params field instead of omitting it
234 JsonRpcRequest request
;
236 request
= objectMapper
.treeToValue(input
, JsonRpcRequest
.class);
237 } catch (JsonMappingException e
) {
238 throw new JsonRpcException(new JsonRpcResponse
.Error(JsonRpcResponse
.Error
.INVALID_REQUEST
,
241 } catch (IOException e
) {
242 throw new AssertionError(e
);
245 if (!"2.0".equals(request
.getJsonrpc())) {
246 throw new JsonRpcException(new JsonRpcResponse
.Error(JsonRpcResponse
.Error
.INVALID_REQUEST
,
247 "only jsonrpc version 2.0 is supported",
251 if (request
.getMethod() == null) {
252 throw new JsonRpcException(new JsonRpcResponse
.Error(JsonRpcResponse
.Error
.INVALID_REQUEST
,
253 "method field must be set",
260 private JsonRpcResponse
parseJsonRpcResponse(final JsonNode input
) {
261 JsonRpcResponse response
;
263 response
= objectMapper
.treeToValue(input
, JsonRpcResponse
.class);
264 } catch (JsonParseException
| JsonMappingException e
) {
265 logger
.debug("Received invalid jsonrpc response {}", e
.getMessage());
267 } catch (IOException e
) {
268 throw new AssertionError(e
);
271 if (!"2.0".equals(response
.getJsonrpc())) {
272 logger
.debug("Received invalid jsonrpc response with invalid version {}", response
.getJsonrpc());
276 if (response
.getResult() != null && response
.getError() != null) {
277 logger
.debug("Received invalid jsonrpc response with both result and error");
281 if (response
.getResult() == null && response
.getError() == null) {
282 logger
.debug("Received invalid jsonrpc response without result and error");
286 if (response
.getId() == null || response
.getId().isNull()) {
287 logger
.debug("Received invalid jsonrpc response without id");
294 public interface RequestHandler
{
296 JsonNode
apply(String method
, ContainerNode
<?
> params
) throws JsonRpcException
;