1 package org
.asamk
.signal
.jsonrpc
;
3 import com
.fasterxml
.jackson
.core
.type
.TypeReference
;
4 import com
.fasterxml
.jackson
.databind
.JsonNode
;
5 import com
.fasterxml
.jackson
.databind
.ObjectMapper
;
6 import com
.fasterxml
.jackson
.databind
.node
.ArrayNode
;
7 import com
.fasterxml
.jackson
.databind
.node
.ContainerNode
;
8 import com
.fasterxml
.jackson
.databind
.node
.IntNode
;
9 import com
.fasterxml
.jackson
.databind
.node
.ObjectNode
;
11 import org
.asamk
.signal
.commands
.Command
;
12 import org
.asamk
.signal
.commands
.Commands
;
13 import org
.asamk
.signal
.commands
.JsonRpcMultiCommand
;
14 import org
.asamk
.signal
.commands
.JsonRpcSingleCommand
;
15 import org
.asamk
.signal
.commands
.exceptions
.CommandException
;
16 import org
.asamk
.signal
.commands
.exceptions
.UserErrorException
;
17 import org
.asamk
.signal
.json
.JsonReceiveMessageHandler
;
18 import org
.asamk
.signal
.manager
.Manager
;
19 import org
.asamk
.signal
.manager
.MultiAccountManager
;
20 import org
.asamk
.signal
.manager
.api
.Pair
;
21 import org
.asamk
.signal
.output
.JsonWriter
;
22 import org
.asamk
.signal
.util
.Util
;
23 import org
.slf4j
.Logger
;
24 import org
.slf4j
.LoggerFactory
;
26 import java
.nio
.channels
.ClosedChannelException
;
27 import java
.util
.HashMap
;
28 import java
.util
.List
;
30 import java
.util
.concurrent
.atomic
.AtomicInteger
;
31 import java
.util
.function
.Supplier
;
33 public class SignalJsonRpcDispatcherHandler
{
35 private final static Logger logger
= LoggerFactory
.getLogger(SignalJsonRpcDispatcherHandler
.class);
37 private final ObjectMapper objectMapper
;
38 private final JsonRpcSender jsonRpcSender
;
39 private final JsonRpcReader jsonRpcReader
;
40 private final boolean noReceiveOnStart
;
42 private final Map
<Integer
, List
<Pair
<Manager
, Manager
.ReceiveMessageHandler
>>> receiveHandlers
= new HashMap
<>();
43 private SignalJsonRpcCommandHandler commandHandler
;
45 public SignalJsonRpcDispatcherHandler(
46 final JsonWriter jsonWriter
, final Supplier
<String
> lineSupplier
, final boolean noReceiveOnStart
48 this.noReceiveOnStart
= noReceiveOnStart
;
49 this.objectMapper
= Util
.createJsonObjectMapper();
50 this.jsonRpcSender
= new JsonRpcSender(jsonWriter
);
51 this.jsonRpcReader
= new JsonRpcReader(jsonRpcSender
, lineSupplier
);
54 public void handleConnection(final MultiAccountManager c
) {
55 this.commandHandler
= new SignalJsonRpcCommandHandler(c
, this::getCommand
);
57 if (!noReceiveOnStart
) {
58 this.subscribeReceive(c
.getManagers());
59 c
.addOnManagerAddedHandler(this::subscribeReceive
);
60 c
.addOnManagerRemovedHandler(this::unsubscribeReceive
);
66 public void handleConnection(final Manager m
) {
67 this.commandHandler
= new SignalJsonRpcCommandHandler(m
, this::getCommand
);
69 if (!noReceiveOnStart
) {
73 final var currentThread
= Thread
.currentThread();
74 m
.addClosedListener(currentThread
::interrupt
);
79 private static final AtomicInteger nextSubscriptionId
= new AtomicInteger(0);
81 private int subscribeReceive(final Manager manager
) {
82 return subscribeReceive(List
.of(manager
));
85 private int subscribeReceive(final List
<Manager
> managers
) {
86 final var subscriptionId
= nextSubscriptionId
.getAndIncrement();
87 final var handlers
= managers
.stream().map(m
-> {
88 final var receiveMessageHandler
= new JsonReceiveMessageHandler(m
, s
-> {
89 final ContainerNode
<?
> params
= objectMapper
.valueToTree(s
);
90 ((ObjectNode
) params
).set("subscription", IntNode
.valueOf(subscriptionId
));
91 final var jsonRpcRequest
= JsonRpcRequest
.forNotification("receive", params
, null);
93 jsonRpcSender
.sendRequest(jsonRpcRequest
);
94 } catch (AssertionError e
) {
95 if (e
.getCause() instanceof ClosedChannelException
) {
96 unsubscribeReceive(subscriptionId
);
100 m
.addReceiveHandler(receiveMessageHandler
);
101 return new Pair
<>(m
, (Manager
.ReceiveMessageHandler
) receiveMessageHandler
);
103 receiveHandlers
.put(subscriptionId
, handlers
);
105 return subscriptionId
;
108 private boolean unsubscribeReceive(final int subscriptionId
) {
109 final var handlers
= receiveHandlers
.remove(subscriptionId
);
110 if (handlers
== null) {
113 for (final var pair
: handlers
) {
114 unsubscribeReceiveHandler(pair
);
119 private void unsubscribeReceive(final Manager m
) {
120 final var subscriptionId
= receiveHandlers
.entrySet()
122 .filter(e
-> e
.getValue().size() == 1 && e
.getValue().get(0).first().equals(m
))
123 .map(Map
.Entry
::getKey
)
125 subscriptionId
.ifPresent(this::unsubscribeReceive
);
128 private void handleConnection() {
130 jsonRpcReader
.readMessages((method
, params
) -> commandHandler
.handleRequest(objectMapper
, method
, params
),
131 response
-> logger
.debug("Received unexpected response for id {}", response
.getId()));
133 receiveHandlers
.forEach((_subscriptionId
, handlers
) -> handlers
.forEach(this::unsubscribeReceiveHandler
));
134 receiveHandlers
.clear();
138 private void unsubscribeReceiveHandler(final Pair
<Manager
, Manager
.ReceiveMessageHandler
> pair
) {
139 final var m
= pair
.first();
140 final var handler
= pair
.second();
141 m
.removeReceiveHandler(handler
);
144 private Command
getCommand(final String method
) {
145 if ("subscribeReceive".equals(method
)) {
146 return new SubscribeReceiveCommand();
148 if ("unsubscribeReceive".equals(method
)) {
149 return new UnsubscribeReceiveCommand();
151 return Commands
.getCommand(method
);
154 private class SubscribeReceiveCommand
implements JsonRpcSingleCommand
<Void
>, JsonRpcMultiCommand
<Void
> {
157 public String
getName() {
158 return "subscribeReceive";
162 public void handleCommand(
163 final Void request
, final Manager m
, final JsonWriter jsonWriter
164 ) throws CommandException
{
165 final var subscriptionId
= subscribeReceive(m
);
166 jsonWriter
.write(subscriptionId
);
170 public void handleCommand(
171 final Void request
, final MultiAccountManager c
, final JsonWriter jsonWriter
172 ) throws CommandException
{
173 final var subscriptionId
= subscribeReceive(c
.getManagers());
174 jsonWriter
.write(subscriptionId
);
178 private class UnsubscribeReceiveCommand
implements JsonRpcSingleCommand
<JsonNode
>, JsonRpcMultiCommand
<JsonNode
> {
181 public String
getName() {
182 return "unsubscribeReceive";
186 public TypeReference
<JsonNode
> getRequestType() {
187 return new TypeReference
<>() {};
191 public void handleCommand(
192 final JsonNode request
, final Manager m
, final JsonWriter jsonWriter
193 ) throws CommandException
{
194 final var subscriptionId
= getSubscriptionId(request
);
195 if (subscriptionId
== null) {
196 unsubscribeReceive(m
);
198 if (!unsubscribeReceive(subscriptionId
)) {
199 throw new UserErrorException("Unknown subscription id");
205 public void handleCommand(
206 final JsonNode request
, final MultiAccountManager c
, final JsonWriter jsonWriter
207 ) throws CommandException
{
208 final var subscriptionId
= getSubscriptionId(request
);
209 if (subscriptionId
== null) {
210 throw new UserErrorException("Missing subscription parameter with subscription id");
212 if (!unsubscribeReceive(subscriptionId
)) {
213 throw new UserErrorException("Unknown subscription id");
218 private Integer
getSubscriptionId(final JsonNode request
) {
219 if (request
instanceof ArrayNode req
) {
220 return req
.get(0).asInt();
221 } else if (request
instanceof ObjectNode req
) {
222 return req
.get("subscription").asInt();