1 package org
.asamk
.signal
.jsonrpc
;
3 import com
.fasterxml
.jackson
.core
.type
.TypeReference
;
4 import com
.fasterxml
.jackson
.databind
.JsonNode
;
5 import com
.fasterxml
.jackson
.databind
.ObjectMapper
;
6 import com
.fasterxml
.jackson
.databind
.node
.ArrayNode
;
7 import com
.fasterxml
.jackson
.databind
.node
.ContainerNode
;
8 import com
.fasterxml
.jackson
.databind
.node
.IntNode
;
9 import com
.fasterxml
.jackson
.databind
.node
.ObjectNode
;
11 import org
.asamk
.signal
.commands
.Command
;
12 import org
.asamk
.signal
.commands
.Commands
;
13 import org
.asamk
.signal
.commands
.JsonRpcMultiCommand
;
14 import org
.asamk
.signal
.commands
.JsonRpcSingleCommand
;
15 import org
.asamk
.signal
.commands
.exceptions
.CommandException
;
16 import org
.asamk
.signal
.commands
.exceptions
.UserErrorException
;
17 import org
.asamk
.signal
.json
.JsonReceiveMessageHandler
;
18 import org
.asamk
.signal
.manager
.Manager
;
19 import org
.asamk
.signal
.manager
.MultiAccountManager
;
20 import org
.asamk
.signal
.manager
.api
.Pair
;
21 import org
.asamk
.signal
.output
.JsonWriter
;
22 import org
.asamk
.signal
.util
.Util
;
23 import org
.slf4j
.Logger
;
24 import org
.slf4j
.LoggerFactory
;
26 import java
.nio
.channels
.ClosedChannelException
;
27 import java
.util
.HashMap
;
28 import java
.util
.List
;
30 import java
.util
.concurrent
.atomic
.AtomicInteger
;
31 import java
.util
.function
.Supplier
;
33 public class SignalJsonRpcDispatcherHandler
{
35 private static final Logger logger
= LoggerFactory
.getLogger(SignalJsonRpcDispatcherHandler
.class);
37 private final ObjectMapper objectMapper
;
38 private final JsonRpcSender jsonRpcSender
;
39 private final JsonRpcReader jsonRpcReader
;
40 private final boolean noReceiveOnStart
;
42 private final Map
<Integer
, List
<Pair
<Manager
, Manager
.ReceiveMessageHandler
>>> receiveHandlers
= new HashMap
<>();
43 private SignalJsonRpcCommandHandler commandHandler
;
45 public SignalJsonRpcDispatcherHandler(
46 final JsonWriter jsonWriter
, final Supplier
<String
> lineSupplier
, final boolean noReceiveOnStart
48 this.noReceiveOnStart
= noReceiveOnStart
;
49 this.objectMapper
= Util
.createJsonObjectMapper();
50 this.jsonRpcSender
= new JsonRpcSender(jsonWriter
);
51 this.jsonRpcReader
= new JsonRpcReader(jsonRpcSender
, lineSupplier
);
54 public void handleConnection(final MultiAccountManager c
) {
55 this.commandHandler
= new SignalJsonRpcCommandHandler(c
, this::getCommand
);
57 if (!noReceiveOnStart
) {
58 this.subscribeReceive(c
.getManagers(), true);
59 c
.addOnManagerAddedHandler(m
-> subscribeReceive(m
, true));
60 c
.addOnManagerRemovedHandler(this::unsubscribeReceive
);
66 public void handleConnection(final Manager m
) {
67 this.commandHandler
= new SignalJsonRpcCommandHandler(m
, this::getCommand
);
69 if (!noReceiveOnStart
) {
70 subscribeReceive(m
, true);
73 final var currentThread
= Thread
.currentThread();
74 m
.addClosedListener(currentThread
::interrupt
);
79 private static final AtomicInteger nextSubscriptionId
= new AtomicInteger(0);
81 private int subscribeReceive(final Manager manager
, boolean internalSubscription
) {
82 return subscribeReceive(List
.of(manager
), internalSubscription
);
85 private int subscribeReceive(final List
<Manager
> managers
, boolean internalSubscription
) {
86 final var subscriptionId
= nextSubscriptionId
.getAndIncrement();
87 final var handlers
= managers
.stream().map(m
-> {
88 final var receiveMessageHandler
= new JsonReceiveMessageHandler(m
, s
-> {
89 ContainerNode
<?
> params
;
90 if (internalSubscription
) {
91 params
= objectMapper
.valueToTree(s
);
93 final var paramsNode
= new ObjectNode(objectMapper
.getNodeFactory());
94 paramsNode
.set("subscription", IntNode
.valueOf(subscriptionId
));
95 paramsNode
.set("result", objectMapper
.valueToTree(s
));
98 final var jsonRpcRequest
= JsonRpcRequest
.forNotification("receive", params
, null);
100 jsonRpcSender
.sendRequest(jsonRpcRequest
);
101 } catch (AssertionError e
) {
102 if (e
.getCause() instanceof ClosedChannelException
) {
103 unsubscribeReceive(subscriptionId
);
107 m
.addReceiveHandler(receiveMessageHandler
);
108 return new Pair
<>(m
, (Manager
.ReceiveMessageHandler
) receiveMessageHandler
);
110 receiveHandlers
.put(subscriptionId
, handlers
);
112 return subscriptionId
;
115 private boolean unsubscribeReceive(final int subscriptionId
) {
116 final var handlers
= receiveHandlers
.remove(subscriptionId
);
117 if (handlers
== null) {
120 for (final var pair
: handlers
) {
121 unsubscribeReceiveHandler(pair
);
126 private void unsubscribeReceive(final Manager m
) {
127 final var subscriptionId
= receiveHandlers
.entrySet()
129 .filter(e
-> e
.getValue().size() == 1 && e
.getValue().get(0).first().equals(m
))
130 .map(Map
.Entry
::getKey
)
132 subscriptionId
.ifPresent(this::unsubscribeReceive
);
135 private void handleConnection() {
137 jsonRpcReader
.readMessages((method
, params
) -> commandHandler
.handleRequest(objectMapper
, method
, params
),
138 response
-> logger
.debug("Received unexpected response for id {}", response
.getId()));
140 receiveHandlers
.forEach((_subscriptionId
, handlers
) -> handlers
.forEach(this::unsubscribeReceiveHandler
));
141 receiveHandlers
.clear();
145 private void unsubscribeReceiveHandler(final Pair
<Manager
, Manager
.ReceiveMessageHandler
> pair
) {
146 final var m
= pair
.first();
147 final var handler
= pair
.second();
148 m
.removeReceiveHandler(handler
);
151 private Command
getCommand(final String method
) {
152 if ("subscribeReceive".equals(method
)) {
153 return new SubscribeReceiveCommand();
155 if ("unsubscribeReceive".equals(method
)) {
156 return new UnsubscribeReceiveCommand();
158 return Commands
.getCommand(method
);
161 private class SubscribeReceiveCommand
implements JsonRpcSingleCommand
<Void
>, JsonRpcMultiCommand
<Void
> {
164 public String
getName() {
165 return "subscribeReceive";
169 public void handleCommand(
170 final Void request
, final Manager m
, final JsonWriter jsonWriter
171 ) throws CommandException
{
172 final var subscriptionId
= subscribeReceive(m
, false);
173 jsonWriter
.write(subscriptionId
);
177 public void handleCommand(
178 final Void request
, final MultiAccountManager c
, final JsonWriter jsonWriter
179 ) throws CommandException
{
180 final var subscriptionId
= subscribeReceive(c
.getManagers(), false);
181 jsonWriter
.write(subscriptionId
);
185 private class UnsubscribeReceiveCommand
implements JsonRpcSingleCommand
<JsonNode
>, JsonRpcMultiCommand
<JsonNode
> {
188 public String
getName() {
189 return "unsubscribeReceive";
193 public TypeReference
<JsonNode
> getRequestType() {
194 return new TypeReference
<>() {};
198 public void handleCommand(
199 final JsonNode request
, final Manager m
, final JsonWriter jsonWriter
200 ) throws CommandException
{
201 final var subscriptionId
= getSubscriptionId(request
);
202 if (subscriptionId
== null) {
203 unsubscribeReceive(m
);
205 if (!unsubscribeReceive(subscriptionId
)) {
206 throw new UserErrorException("Unknown subscription id");
212 public void handleCommand(
213 final JsonNode request
, final MultiAccountManager c
, final JsonWriter jsonWriter
214 ) throws CommandException
{
215 final var subscriptionId
= getSubscriptionId(request
);
216 if (subscriptionId
== null) {
217 throw new UserErrorException("Missing subscription parameter with subscription id");
219 if (!unsubscribeReceive(subscriptionId
)) {
220 throw new UserErrorException("Unknown subscription id");
225 private Integer
getSubscriptionId(final JsonNode request
) {
226 return switch (request
) {
227 case ArrayNode req
-> req
.get(0).asInt();
228 case ObjectNode req
-> req
.get("subscription").asInt();
229 case null, default -> null;