1 package org
.asamk
.signal
.jsonrpc
;
3 import com
.fasterxml
.jackson
.core
.type
.TypeReference
;
4 import com
.fasterxml
.jackson
.databind
.JsonNode
;
5 import com
.fasterxml
.jackson
.databind
.ObjectMapper
;
6 import com
.fasterxml
.jackson
.databind
.node
.ArrayNode
;
7 import com
.fasterxml
.jackson
.databind
.node
.ContainerNode
;
8 import com
.fasterxml
.jackson
.databind
.node
.IntNode
;
9 import com
.fasterxml
.jackson
.databind
.node
.ObjectNode
;
11 import org
.asamk
.signal
.commands
.Command
;
12 import org
.asamk
.signal
.commands
.Commands
;
13 import org
.asamk
.signal
.commands
.JsonRpcMultiCommand
;
14 import org
.asamk
.signal
.commands
.JsonRpcSingleCommand
;
15 import org
.asamk
.signal
.commands
.exceptions
.CommandException
;
16 import org
.asamk
.signal
.commands
.exceptions
.UserErrorException
;
17 import org
.asamk
.signal
.json
.JsonReceiveMessageHandler
;
18 import org
.asamk
.signal
.manager
.Manager
;
19 import org
.asamk
.signal
.manager
.MultiAccountManager
;
20 import org
.asamk
.signal
.manager
.api
.Pair
;
21 import org
.asamk
.signal
.output
.JsonWriter
;
22 import org
.asamk
.signal
.util
.Util
;
23 import org
.slf4j
.Logger
;
24 import org
.slf4j
.LoggerFactory
;
26 import java
.nio
.channels
.ClosedChannelException
;
27 import java
.util
.HashMap
;
28 import java
.util
.List
;
30 import java
.util
.concurrent
.atomic
.AtomicInteger
;
31 import java
.util
.function
.Supplier
;
33 public class SignalJsonRpcDispatcherHandler
{
35 private static final Logger logger
= LoggerFactory
.getLogger(SignalJsonRpcDispatcherHandler
.class);
37 private final ObjectMapper objectMapper
;
38 private final JsonRpcSender jsonRpcSender
;
39 private final JsonRpcReader jsonRpcReader
;
40 private final boolean noReceiveOnStart
;
42 private final Map
<Integer
, List
<Pair
<Manager
, Manager
.ReceiveMessageHandler
>>> receiveHandlers
= new HashMap
<>();
43 private SignalJsonRpcCommandHandler commandHandler
;
45 public SignalJsonRpcDispatcherHandler(
46 final JsonWriter jsonWriter
,
47 final Supplier
<String
> lineSupplier
,
48 final boolean noReceiveOnStart
50 this.noReceiveOnStart
= noReceiveOnStart
;
51 this.objectMapper
= Util
.createJsonObjectMapper();
52 this.jsonRpcSender
= new JsonRpcSender(jsonWriter
);
53 this.jsonRpcReader
= new JsonRpcReader(jsonRpcSender
, lineSupplier
);
56 public void handleConnection(final MultiAccountManager c
) {
57 this.commandHandler
= new SignalJsonRpcCommandHandler(c
, this::getCommand
);
59 if (!noReceiveOnStart
) {
60 this.subscribeReceive(c
.getManagers(), true);
61 c
.addOnManagerAddedHandler(m
-> subscribeReceive(m
, true));
62 c
.addOnManagerRemovedHandler(this::unsubscribeReceive
);
68 public void handleConnection(final Manager m
) {
69 this.commandHandler
= new SignalJsonRpcCommandHandler(m
, this::getCommand
);
71 if (!noReceiveOnStart
) {
72 subscribeReceive(m
, true);
75 final var currentThread
= Thread
.currentThread();
76 m
.addClosedListener(currentThread
::interrupt
);
81 private static final AtomicInteger nextSubscriptionId
= new AtomicInteger(0);
83 private int subscribeReceive(final Manager manager
, boolean internalSubscription
) {
84 return subscribeReceive(List
.of(manager
), internalSubscription
);
87 private int subscribeReceive(final List
<Manager
> managers
, boolean internalSubscription
) {
88 final var subscriptionId
= nextSubscriptionId
.getAndIncrement();
89 final var handlers
= managers
.stream().map(m
-> {
90 final var receiveMessageHandler
= new JsonReceiveMessageHandler(m
, s
-> {
91 ContainerNode
<?
> params
;
92 if (internalSubscription
) {
93 params
= objectMapper
.valueToTree(s
);
95 final var paramsNode
= new ObjectNode(objectMapper
.getNodeFactory());
96 paramsNode
.set("subscription", IntNode
.valueOf(subscriptionId
));
97 paramsNode
.set("result", objectMapper
.valueToTree(s
));
100 final var jsonRpcRequest
= JsonRpcRequest
.forNotification("receive", params
, null);
102 jsonRpcSender
.sendRequest(jsonRpcRequest
);
103 } catch (AssertionError e
) {
104 if (e
.getCause() instanceof ClosedChannelException
) {
105 unsubscribeReceive(subscriptionId
);
109 m
.addReceiveHandler(receiveMessageHandler
);
110 return new Pair
<>(m
, (Manager
.ReceiveMessageHandler
) receiveMessageHandler
);
112 receiveHandlers
.put(subscriptionId
, handlers
);
114 return subscriptionId
;
117 private boolean unsubscribeReceive(final int subscriptionId
) {
118 final var handlers
= receiveHandlers
.remove(subscriptionId
);
119 if (handlers
== null) {
122 for (final var pair
: handlers
) {
123 unsubscribeReceiveHandler(pair
);
128 private void unsubscribeReceive(final Manager m
) {
129 final var subscriptionId
= receiveHandlers
.entrySet()
131 .filter(e
-> e
.getValue().size() == 1 && e
.getValue().getFirst().first().equals(m
))
132 .map(Map
.Entry
::getKey
)
134 subscriptionId
.ifPresent(this::unsubscribeReceive
);
137 private void handleConnection() {
139 jsonRpcReader
.readMessages((method
, params
) -> commandHandler
.handleRequest(objectMapper
, method
, params
),
140 response
-> logger
.debug("Received unexpected response for id {}", response
.getId()));
142 receiveHandlers
.forEach((_subscriptionId
, handlers
) -> handlers
.forEach(this::unsubscribeReceiveHandler
));
143 receiveHandlers
.clear();
147 private void unsubscribeReceiveHandler(final Pair
<Manager
, Manager
.ReceiveMessageHandler
> pair
) {
148 final var m
= pair
.first();
149 final var handler
= pair
.second();
150 m
.removeReceiveHandler(handler
);
153 private Command
getCommand(final String method
) {
154 if ("subscribeReceive".equals(method
)) {
155 return new SubscribeReceiveCommand();
157 if ("unsubscribeReceive".equals(method
)) {
158 return new UnsubscribeReceiveCommand();
160 return Commands
.getCommand(method
);
163 private class SubscribeReceiveCommand
implements JsonRpcSingleCommand
<Void
>, JsonRpcMultiCommand
<Void
> {
166 public String
getName() {
167 return "subscribeReceive";
171 public void handleCommand(
174 final JsonWriter jsonWriter
175 ) throws CommandException
{
176 final var subscriptionId
= subscribeReceive(m
, false);
177 jsonWriter
.write(subscriptionId
);
181 public void handleCommand(
183 final MultiAccountManager c
,
184 final JsonWriter jsonWriter
185 ) throws CommandException
{
186 final var subscriptionId
= subscribeReceive(c
.getManagers(), false);
187 jsonWriter
.write(subscriptionId
);
191 private class UnsubscribeReceiveCommand
implements JsonRpcSingleCommand
<JsonNode
>, JsonRpcMultiCommand
<JsonNode
> {
194 public String
getName() {
195 return "unsubscribeReceive";
199 public TypeReference
<JsonNode
> getRequestType() {
200 return new TypeReference
<>() {};
204 public void handleCommand(
205 final JsonNode request
,
207 final JsonWriter jsonWriter
208 ) throws CommandException
{
209 final var subscriptionId
= getSubscriptionId(request
);
210 if (subscriptionId
== null) {
211 unsubscribeReceive(m
);
213 if (!unsubscribeReceive(subscriptionId
)) {
214 throw new UserErrorException("Unknown subscription id");
220 public void handleCommand(
221 final JsonNode request
,
222 final MultiAccountManager c
,
223 final JsonWriter jsonWriter
224 ) throws CommandException
{
225 final var subscriptionId
= getSubscriptionId(request
);
226 if (subscriptionId
== null) {
227 throw new UserErrorException("Missing subscription parameter with subscription id");
229 if (!unsubscribeReceive(subscriptionId
)) {
230 throw new UserErrorException("Unknown subscription id");
235 private Integer
getSubscriptionId(final JsonNode request
) {
236 return switch (request
) {
237 case ArrayNode req
-> req
.get(0).asInt();
238 case ObjectNode req
-> req
.get("subscription").asInt();
239 case null, default -> null;