]> nmode's Git Repositories - signal-cli/blob - src/main/java/org/asamk/signal/jsonrpc/SignalJsonRpcDispatcherHandler.java
Fix inspection issues
[signal-cli] / src / main / java / org / asamk / signal / jsonrpc / SignalJsonRpcDispatcherHandler.java
1 package org.asamk.signal.jsonrpc;
2
3 import com.fasterxml.jackson.core.type.TypeReference;
4 import com.fasterxml.jackson.databind.JsonNode;
5 import com.fasterxml.jackson.databind.ObjectMapper;
6 import com.fasterxml.jackson.databind.node.ArrayNode;
7 import com.fasterxml.jackson.databind.node.ContainerNode;
8 import com.fasterxml.jackson.databind.node.IntNode;
9 import com.fasterxml.jackson.databind.node.ObjectNode;
10
11 import org.asamk.signal.commands.Command;
12 import org.asamk.signal.commands.Commands;
13 import org.asamk.signal.commands.JsonRpcMultiCommand;
14 import org.asamk.signal.commands.JsonRpcSingleCommand;
15 import org.asamk.signal.commands.exceptions.CommandException;
16 import org.asamk.signal.commands.exceptions.UserErrorException;
17 import org.asamk.signal.json.JsonReceiveMessageHandler;
18 import org.asamk.signal.manager.Manager;
19 import org.asamk.signal.manager.MultiAccountManager;
20 import org.asamk.signal.manager.api.Pair;
21 import org.asamk.signal.output.JsonWriter;
22 import org.asamk.signal.util.Util;
23 import org.slf4j.Logger;
24 import org.slf4j.LoggerFactory;
25
26 import java.nio.channels.ClosedChannelException;
27 import java.util.HashMap;
28 import java.util.List;
29 import java.util.Map;
30 import java.util.concurrent.atomic.AtomicInteger;
31 import java.util.function.Supplier;
32
33 public class SignalJsonRpcDispatcherHandler {
34
35 private static final Logger logger = LoggerFactory.getLogger(SignalJsonRpcDispatcherHandler.class);
36
37 private final ObjectMapper objectMapper;
38 private final JsonRpcSender jsonRpcSender;
39 private final JsonRpcReader jsonRpcReader;
40 private final boolean noReceiveOnStart;
41
42 private final Map<Integer, List<Pair<Manager, Manager.ReceiveMessageHandler>>> receiveHandlers = new HashMap<>();
43 private SignalJsonRpcCommandHandler commandHandler;
44
45 public SignalJsonRpcDispatcherHandler(
46 final JsonWriter jsonWriter, final Supplier<String> lineSupplier, final boolean noReceiveOnStart
47 ) {
48 this.noReceiveOnStart = noReceiveOnStart;
49 this.objectMapper = Util.createJsonObjectMapper();
50 this.jsonRpcSender = new JsonRpcSender(jsonWriter);
51 this.jsonRpcReader = new JsonRpcReader(jsonRpcSender, lineSupplier);
52 }
53
54 public void handleConnection(final MultiAccountManager c) {
55 this.commandHandler = new SignalJsonRpcCommandHandler(c, this::getCommand);
56
57 if (!noReceiveOnStart) {
58 this.subscribeReceive(c.getManagers(), true);
59 c.addOnManagerAddedHandler(m -> subscribeReceive(m, true));
60 c.addOnManagerRemovedHandler(this::unsubscribeReceive);
61 }
62
63 handleConnection();
64 }
65
66 public void handleConnection(final Manager m) {
67 this.commandHandler = new SignalJsonRpcCommandHandler(m, this::getCommand);
68
69 if (!noReceiveOnStart) {
70 subscribeReceive(m, true);
71 }
72
73 final var currentThread = Thread.currentThread();
74 m.addClosedListener(currentThread::interrupt);
75
76 handleConnection();
77 }
78
79 private static final AtomicInteger nextSubscriptionId = new AtomicInteger(0);
80
81 private int subscribeReceive(final Manager manager, boolean internalSubscription) {
82 return subscribeReceive(List.of(manager), internalSubscription);
83 }
84
85 private int subscribeReceive(final List<Manager> managers, boolean internalSubscription) {
86 final var subscriptionId = nextSubscriptionId.getAndIncrement();
87 final var handlers = managers.stream().map(m -> {
88 final var receiveMessageHandler = new JsonReceiveMessageHandler(m, s -> {
89 ContainerNode<?> params;
90 if (internalSubscription) {
91 params = objectMapper.valueToTree(s);
92 } else {
93 final var paramsNode = new ObjectNode(objectMapper.getNodeFactory());
94 paramsNode.set("subscription", IntNode.valueOf(subscriptionId));
95 paramsNode.set("result", objectMapper.valueToTree(s));
96 params = paramsNode;
97 }
98 final var jsonRpcRequest = JsonRpcRequest.forNotification("receive", params, null);
99 try {
100 jsonRpcSender.sendRequest(jsonRpcRequest);
101 } catch (AssertionError e) {
102 if (e.getCause() instanceof ClosedChannelException) {
103 unsubscribeReceive(subscriptionId);
104 }
105 }
106 });
107 m.addReceiveHandler(receiveMessageHandler);
108 return new Pair<>(m, (Manager.ReceiveMessageHandler) receiveMessageHandler);
109 }).toList();
110 receiveHandlers.put(subscriptionId, handlers);
111
112 return subscriptionId;
113 }
114
115 private boolean unsubscribeReceive(final int subscriptionId) {
116 final var handlers = receiveHandlers.remove(subscriptionId);
117 if (handlers == null) {
118 return false;
119 }
120 for (final var pair : handlers) {
121 unsubscribeReceiveHandler(pair);
122 }
123 return true;
124 }
125
126 private void unsubscribeReceive(final Manager m) {
127 final var subscriptionId = receiveHandlers.entrySet()
128 .stream()
129 .filter(e -> e.getValue().size() == 1 && e.getValue().getFirst().first().equals(m))
130 .map(Map.Entry::getKey)
131 .findFirst();
132 subscriptionId.ifPresent(this::unsubscribeReceive);
133 }
134
135 private void handleConnection() {
136 try {
137 jsonRpcReader.readMessages((method, params) -> commandHandler.handleRequest(objectMapper, method, params),
138 response -> logger.debug("Received unexpected response for id {}", response.getId()));
139 } finally {
140 receiveHandlers.forEach((_subscriptionId, handlers) -> handlers.forEach(this::unsubscribeReceiveHandler));
141 receiveHandlers.clear();
142 }
143 }
144
145 private void unsubscribeReceiveHandler(final Pair<Manager, Manager.ReceiveMessageHandler> pair) {
146 final var m = pair.first();
147 final var handler = pair.second();
148 m.removeReceiveHandler(handler);
149 }
150
151 private Command getCommand(final String method) {
152 if ("subscribeReceive".equals(method)) {
153 return new SubscribeReceiveCommand();
154 }
155 if ("unsubscribeReceive".equals(method)) {
156 return new UnsubscribeReceiveCommand();
157 }
158 return Commands.getCommand(method);
159 }
160
161 private class SubscribeReceiveCommand implements JsonRpcSingleCommand<Void>, JsonRpcMultiCommand<Void> {
162
163 @Override
164 public String getName() {
165 return "subscribeReceive";
166 }
167
168 @Override
169 public void handleCommand(
170 final Void request, final Manager m, final JsonWriter jsonWriter
171 ) throws CommandException {
172 final var subscriptionId = subscribeReceive(m, false);
173 jsonWriter.write(subscriptionId);
174 }
175
176 @Override
177 public void handleCommand(
178 final Void request, final MultiAccountManager c, final JsonWriter jsonWriter
179 ) throws CommandException {
180 final var subscriptionId = subscribeReceive(c.getManagers(), false);
181 jsonWriter.write(subscriptionId);
182 }
183 }
184
185 private class UnsubscribeReceiveCommand implements JsonRpcSingleCommand<JsonNode>, JsonRpcMultiCommand<JsonNode> {
186
187 @Override
188 public String getName() {
189 return "unsubscribeReceive";
190 }
191
192 @Override
193 public TypeReference<JsonNode> getRequestType() {
194 return new TypeReference<>() {};
195 }
196
197 @Override
198 public void handleCommand(
199 final JsonNode request, final Manager m, final JsonWriter jsonWriter
200 ) throws CommandException {
201 final var subscriptionId = getSubscriptionId(request);
202 if (subscriptionId == null) {
203 unsubscribeReceive(m);
204 } else {
205 if (!unsubscribeReceive(subscriptionId)) {
206 throw new UserErrorException("Unknown subscription id");
207 }
208 }
209 }
210
211 @Override
212 public void handleCommand(
213 final JsonNode request, final MultiAccountManager c, final JsonWriter jsonWriter
214 ) throws CommandException {
215 final var subscriptionId = getSubscriptionId(request);
216 if (subscriptionId == null) {
217 throw new UserErrorException("Missing subscription parameter with subscription id");
218 } else {
219 if (!unsubscribeReceive(subscriptionId)) {
220 throw new UserErrorException("Unknown subscription id");
221 }
222 }
223 }
224
225 private Integer getSubscriptionId(final JsonNode request) {
226 return switch (request) {
227 case ArrayNode req -> req.get(0).asInt();
228 case ObjectNode req -> req.get("subscription").asInt();
229 case null, default -> null;
230 };
231 }
232 }
233 }