]> nmode's Git Repositories - signal-cli/blob - src/main/java/org/asamk/signal/http/HttpServerHandler.java
a62139291e36e1394e02698682c1536bd6a14fca
[signal-cli] / src / main / java / org / asamk / signal / http / HttpServerHandler.java
1 package org.asamk.signal.http;
2
3 import com.fasterxml.jackson.databind.ObjectMapper;
4 import com.sun.net.httpserver.HttpExchange;
5 import com.sun.net.httpserver.HttpServer;
6
7 import org.asamk.signal.commands.Commands;
8 import org.asamk.signal.json.JsonReceiveMessageHandler;
9 import org.asamk.signal.jsonrpc.JsonRpcReader;
10 import org.asamk.signal.jsonrpc.JsonRpcResponse;
11 import org.asamk.signal.jsonrpc.JsonRpcSender;
12 import org.asamk.signal.jsonrpc.SignalJsonRpcCommandHandler;
13 import org.asamk.signal.manager.Manager;
14 import org.asamk.signal.manager.MultiAccountManager;
15 import org.asamk.signal.manager.api.Pair;
16 import org.asamk.signal.manager.util.Utils;
17 import org.asamk.signal.util.Util;
18 import org.slf4j.Logger;
19 import org.slf4j.LoggerFactory;
20
21 import java.io.IOException;
22 import java.net.InetSocketAddress;
23 import java.util.List;
24 import java.util.Map;
25 import java.util.concurrent.Executors;
26 import java.util.concurrent.atomic.AtomicBoolean;
27
28 public class HttpServerHandler {
29
30 private final static Logger logger = LoggerFactory.getLogger(HttpServerHandler.class);
31
32 private final ObjectMapper objectMapper = Util.createJsonObjectMapper();
33
34 private final InetSocketAddress address;
35
36 private final SignalJsonRpcCommandHandler commandHandler;
37 private final MultiAccountManager c;
38 private final Manager m;
39
40 public HttpServerHandler(final InetSocketAddress address, final Manager m) {
41 this.address = address;
42 commandHandler = new SignalJsonRpcCommandHandler(m, Commands::getCommand);
43 this.c = null;
44 this.m = m;
45 }
46
47 public HttpServerHandler(final InetSocketAddress address, final MultiAccountManager c) {
48 this.address = address;
49 commandHandler = new SignalJsonRpcCommandHandler(c, Commands::getCommand);
50 this.c = c;
51 this.m = null;
52 }
53
54 public void init() throws IOException {
55 logger.info("Starting server on " + address.toString());
56
57 final var server = HttpServer.create(address, 0);
58 server.setExecutor(Executors.newFixedThreadPool(10));
59
60 server.createContext("/api/v1/rpc", this::handleRpcEndpoint);
61 server.createContext("/api/v1/events", this::handleEventsEndpoint);
62 server.createContext("/api/v1/check", this::handleCheckEndpoint);
63
64 server.start();
65 }
66
67 private void sendResponse(int status, Object response, HttpExchange httpExchange) throws IOException {
68 if (response != null) {
69 final var byteResponse = objectMapper.writeValueAsBytes(response);
70
71 httpExchange.getResponseHeaders().add("Content-Type", "application/json");
72 httpExchange.sendResponseHeaders(status, byteResponse.length);
73
74 httpExchange.getResponseBody().write(byteResponse);
75 } else {
76 httpExchange.sendResponseHeaders(status, -1);
77 }
78
79 httpExchange.getResponseBody().close();
80 }
81
82 private void handleRpcEndpoint(HttpExchange httpExchange) throws IOException {
83 if (!"/api/v1/rpc".equals(httpExchange.getRequestURI().getPath())) {
84 sendResponse(404, null, httpExchange);
85 return;
86 }
87 if (!"POST".equals(httpExchange.getRequestMethod())) {
88 sendResponse(405, null, httpExchange);
89 return;
90 }
91
92 if (!"application/json".equals(httpExchange.getRequestHeaders().getFirst("Content-Type"))) {
93 sendResponse(415, null, httpExchange);
94 return;
95 }
96
97 try {
98
99 final Object[] result = {null};
100 final var jsonRpcSender = new JsonRpcSender(s -> {
101 if (result[0] != null) {
102 throw new AssertionError("There should only be a single JSON-RPC response");
103 }
104
105 result[0] = s;
106 });
107
108 final var jsonRpcReader = new JsonRpcReader(jsonRpcSender, httpExchange.getRequestBody());
109 jsonRpcReader.readMessages((method, params) -> commandHandler.handleRequest(objectMapper, method, params),
110 response -> logger.debug("Received unexpected response for id {}", response.getId()));
111
112 if (result[0] != null) {
113 sendResponse(200, result[0], httpExchange);
114 } else {
115 sendResponse(201, null, httpExchange);
116 }
117
118 } catch (Throwable aEx) {
119 logger.error("Failed to process request.", aEx);
120 sendResponse(200,
121 JsonRpcResponse.forError(new JsonRpcResponse.Error(JsonRpcResponse.Error.INTERNAL_ERROR,
122 "An internal server error has occurred.",
123 null), null),
124 httpExchange);
125 }
126 }
127
128 private void handleEventsEndpoint(HttpExchange httpExchange) throws IOException {
129 if (!"/api/v1/events".equals(httpExchange.getRequestURI().getPath())) {
130 sendResponse(404, null, httpExchange);
131 return;
132 }
133 if (!"GET".equals(httpExchange.getRequestMethod())) {
134 sendResponse(405, null, httpExchange);
135 return;
136 }
137
138 try {
139 final var queryString = httpExchange.getRequestURI().getQuery();
140 final var query = queryString == null ? Map.<String, String>of() : Utils.getQueryMap(queryString);
141
142 List<Manager> managers = getManagerFromQuery(query);
143 if (managers == null) {
144 sendResponse(400, null, httpExchange);
145 return;
146 }
147
148 httpExchange.getResponseHeaders().add("Content-Type", "text/event-stream");
149 httpExchange.sendResponseHeaders(200, 0);
150 final var sender = new ServerSentEventSender(httpExchange.getResponseBody());
151
152 final var shouldStop = new AtomicBoolean(false);
153 final var handlers = subscribeReceiveHandlers(managers, sender, () -> {
154 shouldStop.set(true);
155 synchronized (this) {
156 this.notify();
157 }
158 });
159
160 try {
161 while (true) {
162 synchronized (this) {
163 wait(15_000);
164 }
165 if (shouldStop.get()) {
166 break;
167 }
168
169 try {
170 sender.sendKeepAlive();
171 } catch (IOException e) {
172 break;
173 }
174 }
175 } finally {
176 for (final var pair : handlers) {
177 unsubscribeReceiveHandler(pair);
178 }
179 try {
180 httpExchange.getResponseBody().close();
181 } catch (IOException ignored) {
182 }
183 }
184 } catch (Throwable aEx) {
185 logger.error("Failed to process request.", aEx);
186 sendResponse(500, null, httpExchange);
187 }
188 }
189
190 private void handleCheckEndpoint(HttpExchange httpExchange) throws IOException {
191 if (!"/api/v1/check".equals(httpExchange.getRequestURI().getPath())) {
192 sendResponse(404, null, httpExchange);
193 return;
194 }
195 if (!"GET".equals(httpExchange.getRequestMethod())) {
196 sendResponse(405, null, httpExchange);
197 return;
198 }
199
200 sendResponse(200, null, httpExchange);
201 }
202
203 private List<Manager> getManagerFromQuery(final Map<String, String> query) {
204 List<Manager> managers;
205 if (m != null) {
206 managers = List.of(m);
207 } else {
208 final var account = query.get("account");
209 if (account == null || account.isEmpty()) {
210 managers = c.getManagers();
211 } else {
212 final var manager = c.getManager(account);
213 if (manager == null) {
214 return null;
215 }
216 managers = List.of(manager);
217 }
218 }
219 return managers;
220 }
221
222 private List<Pair<Manager, Manager.ReceiveMessageHandler>> subscribeReceiveHandlers(
223 final List<Manager> managers, final ServerSentEventSender sender, Callable unsubscribe
224 ) {
225 return managers.stream().map(m1 -> {
226 final var receiveMessageHandler = new JsonReceiveMessageHandler(m1, s -> {
227 try {
228 sender.sendEvent(null, "receive", List.of(objectMapper.writeValueAsString(s)));
229 } catch (IOException e) {
230 unsubscribe.call();
231 }
232 });
233 m1.addReceiveHandler(receiveMessageHandler);
234 return new Pair<>(m1, (Manager.ReceiveMessageHandler) receiveMessageHandler);
235 }).toList();
236 }
237
238 private void unsubscribeReceiveHandler(final Pair<Manager, Manager.ReceiveMessageHandler> pair) {
239 final var m = pair.first();
240 final var handler = pair.second();
241 m.removeReceiveHandler(handler);
242 }
243
244 private interface Callable {
245
246 void call();
247 }
248 }