1 package org
.asamk
.signal
.jsonrpc
;
3 import com
.fasterxml
.jackson
.core
.TreeNode
;
4 import com
.fasterxml
.jackson
.core
.type
.TypeReference
;
5 import com
.fasterxml
.jackson
.databind
.JsonMappingException
;
6 import com
.fasterxml
.jackson
.databind
.JsonNode
;
7 import com
.fasterxml
.jackson
.databind
.ObjectMapper
;
8 import com
.fasterxml
.jackson
.databind
.node
.ArrayNode
;
9 import com
.fasterxml
.jackson
.databind
.node
.ContainerNode
;
10 import com
.fasterxml
.jackson
.databind
.node
.IntNode
;
11 import com
.fasterxml
.jackson
.databind
.node
.ObjectNode
;
13 import org
.asamk
.signal
.commands
.Command
;
14 import org
.asamk
.signal
.commands
.Commands
;
15 import org
.asamk
.signal
.commands
.JsonRpcMultiCommand
;
16 import org
.asamk
.signal
.commands
.JsonRpcRegistrationCommand
;
17 import org
.asamk
.signal
.commands
.JsonRpcSingleCommand
;
18 import org
.asamk
.signal
.commands
.exceptions
.CommandException
;
19 import org
.asamk
.signal
.commands
.exceptions
.IOErrorException
;
20 import org
.asamk
.signal
.commands
.exceptions
.UntrustedKeyErrorException
;
21 import org
.asamk
.signal
.commands
.exceptions
.UserErrorException
;
22 import org
.asamk
.signal
.json
.JsonReceiveMessageHandler
;
23 import org
.asamk
.signal
.manager
.Manager
;
24 import org
.asamk
.signal
.manager
.MultiAccountManager
;
25 import org
.asamk
.signal
.manager
.RegistrationManager
;
26 import org
.asamk
.signal
.manager
.api
.Pair
;
27 import org
.asamk
.signal
.output
.JsonWriter
;
28 import org
.asamk
.signal
.util
.Util
;
29 import org
.slf4j
.Logger
;
30 import org
.slf4j
.LoggerFactory
;
32 import java
.io
.IOException
;
33 import java
.nio
.channels
.ClosedChannelException
;
34 import java
.nio
.channels
.OverlappingFileLockException
;
35 import java
.util
.HashMap
;
36 import java
.util
.List
;
38 import java
.util
.concurrent
.atomic
.AtomicInteger
;
39 import java
.util
.function
.Supplier
;
41 public class SignalJsonRpcDispatcherHandler
{
43 private final static Logger logger
= LoggerFactory
.getLogger(SignalJsonRpcDispatcherHandler
.class);
45 private static final int USER_ERROR
= -1;
46 private static final int IO_ERROR
= -3;
47 private static final int UNTRUSTED_KEY_ERROR
= -4;
49 private final ObjectMapper objectMapper
;
50 private final JsonRpcSender jsonRpcSender
;
51 private final JsonRpcReader jsonRpcReader
;
52 private final boolean noReceiveOnStart
;
54 private MultiAccountManager c
;
55 private final Map
<Integer
, List
<Pair
<Manager
, Manager
.ReceiveMessageHandler
>>> receiveHandlers
= new HashMap
<>();
59 public SignalJsonRpcDispatcherHandler(
60 final JsonWriter jsonWriter
, final Supplier
<String
> lineSupplier
, final boolean noReceiveOnStart
62 this.noReceiveOnStart
= noReceiveOnStart
;
63 this.objectMapper
= Util
.createJsonObjectMapper();
64 this.jsonRpcSender
= new JsonRpcSender(jsonWriter
);
65 this.jsonRpcReader
= new JsonRpcReader(jsonRpcSender
, lineSupplier
);
68 public void handleConnection(final MultiAccountManager c
) {
71 if (!noReceiveOnStart
) {
72 this.subscribeReceive(c
.getManagers());
73 c
.addOnManagerAddedHandler(this::subscribeReceive
);
74 c
.addOnManagerRemovedHandler(this::unsubscribeReceive
);
80 public void handleConnection(final Manager m
) {
83 if (!noReceiveOnStart
) {
87 final var currentThread
= Thread
.currentThread();
88 m
.addClosedListener(currentThread
::interrupt
);
93 private static final AtomicInteger nextSubscriptionId
= new AtomicInteger(0);
95 private int subscribeReceive(final Manager manager
) {
96 return subscribeReceive(List
.of(manager
));
99 private int subscribeReceive(final List
<Manager
> managers
) {
100 final var subscriptionId
= nextSubscriptionId
.getAndIncrement();
101 final var handlers
= managers
.stream().map(m
-> {
102 final var receiveMessageHandler
= new JsonReceiveMessageHandler(m
, s
-> {
103 final ContainerNode
<?
> params
= objectMapper
.valueToTree(s
);
104 ((ObjectNode
) params
).set("subscription", IntNode
.valueOf(subscriptionId
));
105 final var jsonRpcRequest
= JsonRpcRequest
.forNotification("receive", params
, null);
107 jsonRpcSender
.sendRequest(jsonRpcRequest
);
108 } catch (AssertionError e
) {
109 if (e
.getCause() instanceof ClosedChannelException
) {
110 unsubscribeReceive(subscriptionId
);
114 m
.addReceiveHandler(receiveMessageHandler
);
115 return new Pair
<>(m
, (Manager
.ReceiveMessageHandler
) receiveMessageHandler
);
117 receiveHandlers
.put(subscriptionId
, handlers
);
119 return subscriptionId
;
122 private boolean unsubscribeReceive(final int subscriptionId
) {
123 final var handlers
= receiveHandlers
.remove(subscriptionId
);
124 if (handlers
== null) {
127 for (final var pair
: handlers
) {
128 unsubscribeReceiveHandler(pair
);
133 private void unsubscribeReceive(final Manager m
) {
134 final var subscriptionId
= receiveHandlers
.entrySet()
136 .filter(e
-> e
.getValue().size() == 1 && e
.getValue().get(0).first().equals(m
))
137 .map(Map
.Entry
::getKey
)
139 subscriptionId
.ifPresent(this::unsubscribeReceive
);
142 private void handleConnection() {
144 jsonRpcReader
.readMessages((method
, params
) -> handleRequest(objectMapper
, method
, params
),
145 response
-> logger
.debug("Received unexpected response for id {}", response
.getId()));
147 receiveHandlers
.forEach((_subscriptionId
, handlers
) -> handlers
.forEach(this::unsubscribeReceiveHandler
));
148 receiveHandlers
.clear();
152 private void unsubscribeReceiveHandler(final Pair
<Manager
, Manager
.ReceiveMessageHandler
> pair
) {
153 final var m
= pair
.first();
154 final var handler
= pair
.second();
155 m
.removeReceiveHandler(handler
);
158 private JsonNode
handleRequest(
159 final ObjectMapper objectMapper
, final String method
, ContainerNode
<?
> params
160 ) throws JsonRpcException
{
161 var command
= getCommand(method
);
163 if (command
instanceof JsonRpcSingleCommand
<?
> jsonRpcCommand
) {
164 final var manager
= getManagerFromParams(params
);
165 if (manager
!= null) {
166 return runCommand(objectMapper
, params
, new CommandRunnerImpl
<>(manager
, jsonRpcCommand
));
169 if (command
instanceof JsonRpcMultiCommand
<?
> jsonRpcCommand
) {
170 return runCommand(objectMapper
, params
, new MultiCommandRunnerImpl
<>(c
, jsonRpcCommand
));
172 if (command
instanceof JsonRpcRegistrationCommand
<?
> jsonRpcCommand
) {
173 try (var manager
= getRegistrationManagerFromParams(params
)) {
174 if (manager
!= null) {
175 return runCommand(objectMapper
,
177 new RegistrationCommandRunnerImpl
<>(manager
, c
, jsonRpcCommand
));
179 throw new JsonRpcException(new JsonRpcResponse
.Error(JsonRpcResponse
.Error
.INVALID_PARAMS
,
180 "Method requires valid account parameter",
183 } catch (IOException e
) {
184 logger
.warn("Failed to close registration manager", e
);
188 if (command
instanceof JsonRpcSingleCommand
<?
> jsonRpcCommand
) {
190 return runCommand(objectMapper
, params
, new CommandRunnerImpl
<>(m
, jsonRpcCommand
));
193 final var manager
= getManagerFromParams(params
);
194 if (manager
!= null) {
195 return runCommand(objectMapper
, params
, new CommandRunnerImpl
<>(manager
, jsonRpcCommand
));
197 throw new JsonRpcException(new JsonRpcResponse
.Error(JsonRpcResponse
.Error
.INVALID_PARAMS
,
198 "Method requires valid account parameter",
203 throw new JsonRpcException(new JsonRpcResponse
.Error(JsonRpcResponse
.Error
.METHOD_NOT_FOUND
,
204 "Method not implemented",
208 private Manager
getManagerFromParams(final ContainerNode
<?
> params
) throws JsonRpcException
{
209 if (params
!= null && params
.hasNonNull("account")) {
210 final var manager
= c
.getManager(params
.get("account").asText());
211 ((ObjectNode
) params
).remove("account");
212 if (manager
== null) {
213 throw new JsonRpcException(new JsonRpcResponse
.Error(JsonRpcResponse
.Error
.INVALID_PARAMS
,
214 "Specified account does not exist",
222 private RegistrationManager
getRegistrationManagerFromParams(final ContainerNode
<?
> params
) {
223 if (params
!= null && params
.has("account")) {
225 final var registrationManager
= c
.getNewRegistrationManager(params
.get("account").asText());
226 ((ObjectNode
) params
).remove("account");
227 return registrationManager
;
228 } catch (OverlappingFileLockException e
) {
229 logger
.warn("Account is already in use");
231 } catch (IOException
| IllegalStateException e
) {
232 logger
.warn("Failed to load registration manager", e
);
239 private Command
getCommand(final String method
) {
240 if ("subscribeReceive".equals(method
)) {
241 return new SubscribeReceiveCommand();
243 if ("unsubscribeReceive".equals(method
)) {
244 return new UnsubscribeReceiveCommand();
246 return Commands
.getCommand(method
);
249 private record CommandRunnerImpl
<T
>(Manager m
, JsonRpcSingleCommand
<T
> command
) implements CommandRunner
<T
> {
252 public void handleCommand(final T request
, final JsonWriter jsonWriter
) throws CommandException
{
253 command
.handleCommand(request
, m
, jsonWriter
);
257 public TypeReference
<T
> getRequestType() {
258 return command
.getRequestType();
262 private record RegistrationCommandRunnerImpl
<T
>(
263 RegistrationManager m
, MultiAccountManager c
, JsonRpcRegistrationCommand
<T
> command
264 ) implements CommandRunner
<T
> {
267 public void handleCommand(final T request
, final JsonWriter jsonWriter
) throws CommandException
{
268 command
.handleCommand(request
, m
, jsonWriter
);
272 public TypeReference
<T
> getRequestType() {
273 return command
.getRequestType();
277 private record MultiCommandRunnerImpl
<T
>(
278 MultiAccountManager c
, JsonRpcMultiCommand
<T
> command
279 ) implements CommandRunner
<T
> {
282 public void handleCommand(final T request
, final JsonWriter jsonWriter
) throws CommandException
{
283 command
.handleCommand(request
, c
, jsonWriter
);
287 public TypeReference
<T
> getRequestType() {
288 return command
.getRequestType();
292 interface CommandRunner
<T
> {
294 void handleCommand(T request
, JsonWriter jsonWriter
) throws CommandException
;
296 TypeReference
<T
> getRequestType();
299 private JsonNode
runCommand(
300 final ObjectMapper objectMapper
, final ContainerNode
<?
> params
, final CommandRunner
<?
> command
301 ) throws JsonRpcException
{
302 final Object
[] result
= {null};
303 final JsonWriter commandJsonWriter
= s
-> {
304 if (result
[0] != null) {
305 throw new AssertionError("Command may only write one json result");
312 parseParamsAndRunCommand(objectMapper
, params
, commandJsonWriter
, command
);
313 } catch (JsonMappingException e
) {
314 throw new JsonRpcException(new JsonRpcResponse
.Error(JsonRpcResponse
.Error
.INVALID_REQUEST
,
317 } catch (UserErrorException e
) {
318 throw new JsonRpcException(new JsonRpcResponse
.Error(USER_ERROR
,
320 getErrorDataNode(objectMapper
, result
)));
321 } catch (IOErrorException e
) {
322 throw new JsonRpcException(new JsonRpcResponse
.Error(IO_ERROR
,
324 getErrorDataNode(objectMapper
, result
)));
325 } catch (UntrustedKeyErrorException e
) {
326 throw new JsonRpcException(new JsonRpcResponse
.Error(UNTRUSTED_KEY_ERROR
,
328 getErrorDataNode(objectMapper
, result
)));
329 } catch (Throwable e
) {
330 logger
.error("Command execution failed", e
);
331 throw new JsonRpcException(new JsonRpcResponse
.Error(JsonRpcResponse
.Error
.INTERNAL_ERROR
,
333 getErrorDataNode(objectMapper
, result
)));
336 Object output
= result
[0] == null ? Map
.of() : result
[0];
337 return objectMapper
.valueToTree(output
);
340 private JsonNode
getErrorDataNode(final ObjectMapper objectMapper
, final Object
[] result
) {
341 if (result
[0] == null) {
344 return objectMapper
.valueToTree(Map
.of("response", result
[0]));
347 private <T
> void parseParamsAndRunCommand(
348 final ObjectMapper objectMapper
,
349 final TreeNode params
,
350 final JsonWriter jsonWriter
,
351 final CommandRunner
<T
> command
352 ) throws CommandException
, JsonMappingException
{
353 T requestParams
= null;
354 final var requestType
= command
.getRequestType();
355 if (params
!= null && requestType
!= null) {
357 requestParams
= objectMapper
.readValue(objectMapper
.treeAsTokens(params
), requestType
);
358 } catch (JsonMappingException e
) {
360 } catch (IOException e
) {
361 throw new AssertionError(e
);
364 command
.handleCommand(requestParams
, jsonWriter
);
367 private class SubscribeReceiveCommand
implements JsonRpcSingleCommand
<Void
>, JsonRpcMultiCommand
<Void
> {
370 public String
getName() {
371 return "subscribeReceive";
375 public void handleCommand(
376 final Void request
, final Manager m
, final JsonWriter jsonWriter
377 ) throws CommandException
{
378 final var subscriptionId
= subscribeReceive(m
);
379 jsonWriter
.write(subscriptionId
);
383 public void handleCommand(
384 final Void request
, final MultiAccountManager c
, final JsonWriter jsonWriter
385 ) throws CommandException
{
386 final var subscriptionId
= subscribeReceive(c
.getManagers());
387 jsonWriter
.write(subscriptionId
);
391 private class UnsubscribeReceiveCommand
implements JsonRpcSingleCommand
<JsonNode
>, JsonRpcMultiCommand
<JsonNode
> {
394 public String
getName() {
395 return "unsubscribeReceive";
399 public TypeReference
<JsonNode
> getRequestType() {
400 return new TypeReference
<>() {};
404 public void handleCommand(
405 final JsonNode request
, final Manager m
, final JsonWriter jsonWriter
406 ) throws CommandException
{
407 final var subscriptionId
= getSubscriptionId(request
);
408 if (subscriptionId
== null) {
409 unsubscribeReceive(m
);
411 if (!unsubscribeReceive(subscriptionId
)) {
412 throw new UserErrorException("Unknown subscription id");
418 public void handleCommand(
419 final JsonNode request
, final MultiAccountManager c
, final JsonWriter jsonWriter
420 ) throws CommandException
{
421 final var subscriptionId
= getSubscriptionId(request
);
422 if (subscriptionId
== null) {
423 throw new UserErrorException("Missing subscription parameter with subscription id");
425 if (!unsubscribeReceive(subscriptionId
)) {
426 throw new UserErrorException("Unknown subscription id");
431 private Integer
getSubscriptionId(final JsonNode request
) {
432 if (request
instanceof ArrayNode req
) {
433 return req
.get(0).asInt();
434 } else if (request
instanceof ObjectNode req
) {
435 return req
.get("subscription").asInt();