]> nmode's Git Repositories - signal-cli/blob - src/main/java/org/asamk/signal/jsonrpc/SignalJsonRpcDispatcherHandler.java
Extract JSON-RPC command handler
[signal-cli] / src / main / java / org / asamk / signal / jsonrpc / SignalJsonRpcDispatcherHandler.java
1 package org.asamk.signal.jsonrpc;
2
3 import com.fasterxml.jackson.core.type.TypeReference;
4 import com.fasterxml.jackson.databind.JsonNode;
5 import com.fasterxml.jackson.databind.ObjectMapper;
6 import com.fasterxml.jackson.databind.node.ArrayNode;
7 import com.fasterxml.jackson.databind.node.ContainerNode;
8 import com.fasterxml.jackson.databind.node.IntNode;
9 import com.fasterxml.jackson.databind.node.ObjectNode;
10
11 import org.asamk.signal.commands.Command;
12 import org.asamk.signal.commands.Commands;
13 import org.asamk.signal.commands.JsonRpcMultiCommand;
14 import org.asamk.signal.commands.JsonRpcSingleCommand;
15 import org.asamk.signal.commands.exceptions.CommandException;
16 import org.asamk.signal.commands.exceptions.UserErrorException;
17 import org.asamk.signal.json.JsonReceiveMessageHandler;
18 import org.asamk.signal.manager.Manager;
19 import org.asamk.signal.manager.MultiAccountManager;
20 import org.asamk.signal.manager.api.Pair;
21 import org.asamk.signal.output.JsonWriter;
22 import org.asamk.signal.util.Util;
23 import org.slf4j.Logger;
24 import org.slf4j.LoggerFactory;
25
26 import java.nio.channels.ClosedChannelException;
27 import java.util.HashMap;
28 import java.util.List;
29 import java.util.Map;
30 import java.util.concurrent.atomic.AtomicInteger;
31 import java.util.function.Supplier;
32
33 public class SignalJsonRpcDispatcherHandler {
34
35 private final static Logger logger = LoggerFactory.getLogger(SignalJsonRpcDispatcherHandler.class);
36
37 private final ObjectMapper objectMapper;
38 private final JsonRpcSender jsonRpcSender;
39 private final JsonRpcReader jsonRpcReader;
40 private final boolean noReceiveOnStart;
41
42 private final Map<Integer, List<Pair<Manager, Manager.ReceiveMessageHandler>>> receiveHandlers = new HashMap<>();
43 private SignalJsonRpcCommandHandler commandHandler;
44
45 public SignalJsonRpcDispatcherHandler(
46 final JsonWriter jsonWriter, final Supplier<String> lineSupplier, final boolean noReceiveOnStart
47 ) {
48 this.noReceiveOnStart = noReceiveOnStart;
49 this.objectMapper = Util.createJsonObjectMapper();
50 this.jsonRpcSender = new JsonRpcSender(jsonWriter);
51 this.jsonRpcReader = new JsonRpcReader(jsonRpcSender, lineSupplier);
52 }
53
54 public void handleConnection(final MultiAccountManager c) {
55 this.commandHandler = new SignalJsonRpcCommandHandler(c, this::getCommand);
56
57 if (!noReceiveOnStart) {
58 this.subscribeReceive(c.getManagers());
59 c.addOnManagerAddedHandler(this::subscribeReceive);
60 c.addOnManagerRemovedHandler(this::unsubscribeReceive);
61 }
62
63 handleConnection();
64 }
65
66 public void handleConnection(final Manager m) {
67 this.commandHandler = new SignalJsonRpcCommandHandler(m, this::getCommand);
68
69 if (!noReceiveOnStart) {
70 subscribeReceive(m);
71 }
72
73 final var currentThread = Thread.currentThread();
74 m.addClosedListener(currentThread::interrupt);
75
76 handleConnection();
77 }
78
79 private static final AtomicInteger nextSubscriptionId = new AtomicInteger(0);
80
81 private int subscribeReceive(final Manager manager) {
82 return subscribeReceive(List.of(manager));
83 }
84
85 private int subscribeReceive(final List<Manager> managers) {
86 final var subscriptionId = nextSubscriptionId.getAndIncrement();
87 final var handlers = managers.stream().map(m -> {
88 final var receiveMessageHandler = new JsonReceiveMessageHandler(m, s -> {
89 final ContainerNode<?> params = objectMapper.valueToTree(s);
90 ((ObjectNode) params).set("subscription", IntNode.valueOf(subscriptionId));
91 final var jsonRpcRequest = JsonRpcRequest.forNotification("receive", params, null);
92 try {
93 jsonRpcSender.sendRequest(jsonRpcRequest);
94 } catch (AssertionError e) {
95 if (e.getCause() instanceof ClosedChannelException) {
96 unsubscribeReceive(subscriptionId);
97 }
98 }
99 });
100 m.addReceiveHandler(receiveMessageHandler);
101 return new Pair<>(m, (Manager.ReceiveMessageHandler) receiveMessageHandler);
102 }).toList();
103 receiveHandlers.put(subscriptionId, handlers);
104
105 return subscriptionId;
106 }
107
108 private boolean unsubscribeReceive(final int subscriptionId) {
109 final var handlers = receiveHandlers.remove(subscriptionId);
110 if (handlers == null) {
111 return false;
112 }
113 for (final var pair : handlers) {
114 unsubscribeReceiveHandler(pair);
115 }
116 return true;
117 }
118
119 private void unsubscribeReceive(final Manager m) {
120 final var subscriptionId = receiveHandlers.entrySet()
121 .stream()
122 .filter(e -> e.getValue().size() == 1 && e.getValue().get(0).first().equals(m))
123 .map(Map.Entry::getKey)
124 .findFirst();
125 subscriptionId.ifPresent(this::unsubscribeReceive);
126 }
127
128 private void handleConnection() {
129 try {
130 jsonRpcReader.readMessages((method, params) -> commandHandler.handleRequest(objectMapper, method, params),
131 response -> logger.debug("Received unexpected response for id {}", response.getId()));
132 } finally {
133 receiveHandlers.forEach((_subscriptionId, handlers) -> handlers.forEach(this::unsubscribeReceiveHandler));
134 receiveHandlers.clear();
135 }
136 }
137
138 private void unsubscribeReceiveHandler(final Pair<Manager, Manager.ReceiveMessageHandler> pair) {
139 final var m = pair.first();
140 final var handler = pair.second();
141 m.removeReceiveHandler(handler);
142 }
143
144 private Command getCommand(final String method) {
145 if ("subscribeReceive".equals(method)) {
146 return new SubscribeReceiveCommand();
147 }
148 if ("unsubscribeReceive".equals(method)) {
149 return new UnsubscribeReceiveCommand();
150 }
151 return Commands.getCommand(method);
152 }
153
154 private class SubscribeReceiveCommand implements JsonRpcSingleCommand<Void>, JsonRpcMultiCommand<Void> {
155
156 @Override
157 public String getName() {
158 return "subscribeReceive";
159 }
160
161 @Override
162 public void handleCommand(
163 final Void request, final Manager m, final JsonWriter jsonWriter
164 ) throws CommandException {
165 final var subscriptionId = subscribeReceive(m);
166 jsonWriter.write(subscriptionId);
167 }
168
169 @Override
170 public void handleCommand(
171 final Void request, final MultiAccountManager c, final JsonWriter jsonWriter
172 ) throws CommandException {
173 final var subscriptionId = subscribeReceive(c.getManagers());
174 jsonWriter.write(subscriptionId);
175 }
176 }
177
178 private class UnsubscribeReceiveCommand implements JsonRpcSingleCommand<JsonNode>, JsonRpcMultiCommand<JsonNode> {
179
180 @Override
181 public String getName() {
182 return "unsubscribeReceive";
183 }
184
185 @Override
186 public TypeReference<JsonNode> getRequestType() {
187 return new TypeReference<>() {};
188 }
189
190 @Override
191 public void handleCommand(
192 final JsonNode request, final Manager m, final JsonWriter jsonWriter
193 ) throws CommandException {
194 final var subscriptionId = getSubscriptionId(request);
195 if (subscriptionId == null) {
196 unsubscribeReceive(m);
197 } else {
198 if (!unsubscribeReceive(subscriptionId)) {
199 throw new UserErrorException("Unknown subscription id");
200 }
201 }
202 }
203
204 @Override
205 public void handleCommand(
206 final JsonNode request, final MultiAccountManager c, final JsonWriter jsonWriter
207 ) throws CommandException {
208 final var subscriptionId = getSubscriptionId(request);
209 if (subscriptionId == null) {
210 throw new UserErrorException("Missing subscription parameter with subscription id");
211 } else {
212 if (!unsubscribeReceive(subscriptionId)) {
213 throw new UserErrorException("Unknown subscription id");
214 }
215 }
216 }
217
218 private Integer getSubscriptionId(final JsonNode request) {
219 if (request instanceof ArrayNode req) {
220 return req.get(0).asInt();
221 } else if (request instanceof ObjectNode req) {
222 return req.get("subscription").asInt();
223 } else {
224 return null;
225 }
226 }
227 }
228 }