]> nmode's Git Repositories - signal-cli/blob - src/main/java/org/asamk/signal/jsonrpc/SignalJsonRpcDispatcherHandler.java
Reformat files
[signal-cli] / src / main / java / org / asamk / signal / jsonrpc / SignalJsonRpcDispatcherHandler.java
1 package org.asamk.signal.jsonrpc;
2
3 import com.fasterxml.jackson.core.type.TypeReference;
4 import com.fasterxml.jackson.databind.JsonNode;
5 import com.fasterxml.jackson.databind.ObjectMapper;
6 import com.fasterxml.jackson.databind.node.ArrayNode;
7 import com.fasterxml.jackson.databind.node.ContainerNode;
8 import com.fasterxml.jackson.databind.node.IntNode;
9 import com.fasterxml.jackson.databind.node.ObjectNode;
10
11 import org.asamk.signal.commands.Command;
12 import org.asamk.signal.commands.Commands;
13 import org.asamk.signal.commands.JsonRpcMultiCommand;
14 import org.asamk.signal.commands.JsonRpcSingleCommand;
15 import org.asamk.signal.commands.exceptions.CommandException;
16 import org.asamk.signal.commands.exceptions.UserErrorException;
17 import org.asamk.signal.json.JsonReceiveMessageHandler;
18 import org.asamk.signal.manager.Manager;
19 import org.asamk.signal.manager.MultiAccountManager;
20 import org.asamk.signal.manager.api.Pair;
21 import org.asamk.signal.output.JsonWriter;
22 import org.asamk.signal.util.Util;
23 import org.slf4j.Logger;
24 import org.slf4j.LoggerFactory;
25
26 import java.nio.channels.ClosedChannelException;
27 import java.util.HashMap;
28 import java.util.List;
29 import java.util.Map;
30 import java.util.concurrent.atomic.AtomicInteger;
31 import java.util.function.Supplier;
32
33 public class SignalJsonRpcDispatcherHandler {
34
35 private static final Logger logger = LoggerFactory.getLogger(SignalJsonRpcDispatcherHandler.class);
36
37 private final ObjectMapper objectMapper;
38 private final JsonRpcSender jsonRpcSender;
39 private final JsonRpcReader jsonRpcReader;
40 private final boolean noReceiveOnStart;
41
42 private final Map<Integer, List<Pair<Manager, Manager.ReceiveMessageHandler>>> receiveHandlers = new HashMap<>();
43 private SignalJsonRpcCommandHandler commandHandler;
44
45 public SignalJsonRpcDispatcherHandler(
46 final JsonWriter jsonWriter,
47 final Supplier<String> lineSupplier,
48 final boolean noReceiveOnStart
49 ) {
50 this.noReceiveOnStart = noReceiveOnStart;
51 this.objectMapper = Util.createJsonObjectMapper();
52 this.jsonRpcSender = new JsonRpcSender(jsonWriter);
53 this.jsonRpcReader = new JsonRpcReader(jsonRpcSender, lineSupplier);
54 }
55
56 public void handleConnection(final MultiAccountManager c) {
57 this.commandHandler = new SignalJsonRpcCommandHandler(c, this::getCommand);
58
59 if (!noReceiveOnStart) {
60 this.subscribeReceive(c.getManagers(), true);
61 c.addOnManagerAddedHandler(m -> subscribeReceive(m, true));
62 c.addOnManagerRemovedHandler(this::unsubscribeReceive);
63 }
64
65 handleConnection();
66 }
67
68 public void handleConnection(final Manager m) {
69 this.commandHandler = new SignalJsonRpcCommandHandler(m, this::getCommand);
70
71 if (!noReceiveOnStart) {
72 subscribeReceive(m, true);
73 }
74
75 final var currentThread = Thread.currentThread();
76 m.addClosedListener(currentThread::interrupt);
77
78 handleConnection();
79 }
80
81 private static final AtomicInteger nextSubscriptionId = new AtomicInteger(0);
82
83 private int subscribeReceive(final Manager manager, boolean internalSubscription) {
84 return subscribeReceive(List.of(manager), internalSubscription);
85 }
86
87 private int subscribeReceive(final List<Manager> managers, boolean internalSubscription) {
88 final var subscriptionId = nextSubscriptionId.getAndIncrement();
89 final var handlers = managers.stream().map(m -> {
90 final var receiveMessageHandler = new JsonReceiveMessageHandler(m, s -> {
91 ContainerNode<?> params;
92 if (internalSubscription) {
93 params = objectMapper.valueToTree(s);
94 } else {
95 final var paramsNode = new ObjectNode(objectMapper.getNodeFactory());
96 paramsNode.set("subscription", IntNode.valueOf(subscriptionId));
97 paramsNode.set("result", objectMapper.valueToTree(s));
98 params = paramsNode;
99 }
100 final var jsonRpcRequest = JsonRpcRequest.forNotification("receive", params, null);
101 try {
102 jsonRpcSender.sendRequest(jsonRpcRequest);
103 } catch (AssertionError e) {
104 if (e.getCause() instanceof ClosedChannelException) {
105 unsubscribeReceive(subscriptionId);
106 }
107 }
108 });
109 m.addReceiveHandler(receiveMessageHandler);
110 return new Pair<>(m, (Manager.ReceiveMessageHandler) receiveMessageHandler);
111 }).toList();
112 receiveHandlers.put(subscriptionId, handlers);
113
114 return subscriptionId;
115 }
116
117 private boolean unsubscribeReceive(final int subscriptionId) {
118 final var handlers = receiveHandlers.remove(subscriptionId);
119 if (handlers == null) {
120 return false;
121 }
122 for (final var pair : handlers) {
123 unsubscribeReceiveHandler(pair);
124 }
125 return true;
126 }
127
128 private void unsubscribeReceive(final Manager m) {
129 final var subscriptionId = receiveHandlers.entrySet()
130 .stream()
131 .filter(e -> e.getValue().size() == 1 && e.getValue().getFirst().first().equals(m))
132 .map(Map.Entry::getKey)
133 .findFirst();
134 subscriptionId.ifPresent(this::unsubscribeReceive);
135 }
136
137 private void handleConnection() {
138 try {
139 jsonRpcReader.readMessages((method, params) -> commandHandler.handleRequest(objectMapper, method, params),
140 response -> logger.debug("Received unexpected response for id {}", response.getId()));
141 } finally {
142 receiveHandlers.forEach((_subscriptionId, handlers) -> handlers.forEach(this::unsubscribeReceiveHandler));
143 receiveHandlers.clear();
144 }
145 }
146
147 private void unsubscribeReceiveHandler(final Pair<Manager, Manager.ReceiveMessageHandler> pair) {
148 final var m = pair.first();
149 final var handler = pair.second();
150 m.removeReceiveHandler(handler);
151 }
152
153 private Command getCommand(final String method) {
154 if ("subscribeReceive".equals(method)) {
155 return new SubscribeReceiveCommand();
156 }
157 if ("unsubscribeReceive".equals(method)) {
158 return new UnsubscribeReceiveCommand();
159 }
160 return Commands.getCommand(method);
161 }
162
163 private class SubscribeReceiveCommand implements JsonRpcSingleCommand<Void>, JsonRpcMultiCommand<Void> {
164
165 @Override
166 public String getName() {
167 return "subscribeReceive";
168 }
169
170 @Override
171 public void handleCommand(
172 final Void request,
173 final Manager m,
174 final JsonWriter jsonWriter
175 ) throws CommandException {
176 final var subscriptionId = subscribeReceive(m, false);
177 jsonWriter.write(subscriptionId);
178 }
179
180 @Override
181 public void handleCommand(
182 final Void request,
183 final MultiAccountManager c,
184 final JsonWriter jsonWriter
185 ) throws CommandException {
186 final var subscriptionId = subscribeReceive(c.getManagers(), false);
187 jsonWriter.write(subscriptionId);
188 }
189 }
190
191 private class UnsubscribeReceiveCommand implements JsonRpcSingleCommand<JsonNode>, JsonRpcMultiCommand<JsonNode> {
192
193 @Override
194 public String getName() {
195 return "unsubscribeReceive";
196 }
197
198 @Override
199 public TypeReference<JsonNode> getRequestType() {
200 return new TypeReference<>() {};
201 }
202
203 @Override
204 public void handleCommand(
205 final JsonNode request,
206 final Manager m,
207 final JsonWriter jsonWriter
208 ) throws CommandException {
209 final var subscriptionId = getSubscriptionId(request);
210 if (subscriptionId == null) {
211 unsubscribeReceive(m);
212 } else {
213 if (!unsubscribeReceive(subscriptionId)) {
214 throw new UserErrorException("Unknown subscription id");
215 }
216 }
217 }
218
219 @Override
220 public void handleCommand(
221 final JsonNode request,
222 final MultiAccountManager c,
223 final JsonWriter jsonWriter
224 ) throws CommandException {
225 final var subscriptionId = getSubscriptionId(request);
226 if (subscriptionId == null) {
227 throw new UserErrorException("Missing subscription parameter with subscription id");
228 } else {
229 if (!unsubscribeReceive(subscriptionId)) {
230 throw new UserErrorException("Unknown subscription id");
231 }
232 }
233 }
234
235 private Integer getSubscriptionId(final JsonNode request) {
236 return switch (request) {
237 case ArrayNode req -> req.get(0).asInt();
238 case ObjectNode req -> req.get("subscription").asInt();
239 case null, default -> null;
240 };
241 }
242 }
243 }