]> nmode's Git Repositories - signal-cli/blob - src/main/java/org/asamk/signal/http/HttpServerHandler.java
Use getRawQuery to prevent double decoding the query
[signal-cli] / src / main / java / org / asamk / signal / http / HttpServerHandler.java
1 package org.asamk.signal.http;
2
3 import com.fasterxml.jackson.databind.ObjectMapper;
4 import com.sun.net.httpserver.HttpExchange;
5 import com.sun.net.httpserver.HttpServer;
6
7 import org.asamk.signal.commands.Commands;
8 import org.asamk.signal.json.JsonReceiveMessageHandler;
9 import org.asamk.signal.jsonrpc.JsonRpcReader;
10 import org.asamk.signal.jsonrpc.JsonRpcResponse;
11 import org.asamk.signal.jsonrpc.JsonRpcSender;
12 import org.asamk.signal.jsonrpc.SignalJsonRpcCommandHandler;
13 import org.asamk.signal.manager.Manager;
14 import org.asamk.signal.manager.MultiAccountManager;
15 import org.asamk.signal.manager.api.Pair;
16 import org.asamk.signal.util.Util;
17 import org.slf4j.Logger;
18 import org.slf4j.LoggerFactory;
19
20 import java.io.IOException;
21 import java.net.InetSocketAddress;
22 import java.util.List;
23 import java.util.Map;
24 import java.util.concurrent.Executors;
25 import java.util.concurrent.atomic.AtomicBoolean;
26
27 public class HttpServerHandler implements AutoCloseable {
28
29 private static final Logger logger = LoggerFactory.getLogger(HttpServerHandler.class);
30
31 private final ObjectMapper objectMapper = Util.createJsonObjectMapper();
32
33 private final InetSocketAddress address;
34
35 private final SignalJsonRpcCommandHandler commandHandler;
36 private final MultiAccountManager c;
37 private final Manager m;
38 private HttpServer server;
39 private final AtomicBoolean shutdown = new AtomicBoolean(false);
40
41 public HttpServerHandler(final InetSocketAddress address, final Manager m) {
42 this.address = address;
43 commandHandler = new SignalJsonRpcCommandHandler(m, Commands::getCommand);
44 this.c = null;
45 this.m = m;
46 }
47
48 public HttpServerHandler(final InetSocketAddress address, final MultiAccountManager c) {
49 this.address = address;
50 commandHandler = new SignalJsonRpcCommandHandler(c, Commands::getCommand);
51 this.c = c;
52 this.m = null;
53 }
54
55 public void init() throws IOException {
56 if (server != null) {
57 throw new AssertionError("HttpServerHandler already initialized");
58 }
59 logger.debug("Starting HTTP server on {}", address);
60
61 server = HttpServer.create(address, 0);
62 server.setExecutor(Executors.newCachedThreadPool());
63
64 server.createContext("/api/v1/rpc", this::handleRpcEndpoint);
65 server.createContext("/api/v1/events", this::handleEventsEndpoint);
66 server.createContext("/api/v1/check", this::handleCheckEndpoint);
67
68 server.start();
69 logger.info("Started HTTP server on {}", address);
70 }
71
72 @Override
73 public void close() {
74 if (server != null) {
75 shutdown.set(true);
76 synchronized (this) {
77 this.notifyAll();
78 }
79 // Increase this delay when https://bugs.openjdk.org/browse/JDK-8304065 is fixed
80 server.stop(2);
81 server = null;
82 shutdown.set(false);
83 }
84 }
85
86 private void sendResponse(int status, Object response, HttpExchange httpExchange) throws IOException {
87 if (response != null) {
88 final var byteResponse = objectMapper.writeValueAsBytes(response);
89
90 httpExchange.getResponseHeaders().add("Content-Type", "application/json");
91 httpExchange.sendResponseHeaders(status, byteResponse.length);
92
93 httpExchange.getResponseBody().write(byteResponse);
94 } else {
95 httpExchange.sendResponseHeaders(status, -1);
96 }
97
98 httpExchange.getResponseBody().close();
99 }
100
101 private void handleRpcEndpoint(HttpExchange httpExchange) throws IOException {
102 if (!"/api/v1/rpc".equals(httpExchange.getRequestURI().getPath())) {
103 sendResponse(404, null, httpExchange);
104 return;
105 }
106 if (!"POST".equals(httpExchange.getRequestMethod())) {
107 sendResponse(405, null, httpExchange);
108 return;
109 }
110
111 final var contentType = httpExchange.getRequestHeaders().getFirst("Content-Type");
112 if (contentType == null || !contentType.startsWith("application/json")) {
113 sendResponse(415, null, httpExchange);
114 return;
115 }
116
117 try {
118
119 final Object[] result = {null};
120 final var jsonRpcSender = new JsonRpcSender(s -> {
121 if (result[0] != null) {
122 throw new AssertionError("There should only be a single JSON-RPC response");
123 }
124
125 result[0] = s;
126 });
127
128 final var jsonRpcReader = new JsonRpcReader(jsonRpcSender, httpExchange.getRequestBody());
129 jsonRpcReader.readMessages((method, params) -> commandHandler.handleRequest(objectMapper, method, params),
130 response -> logger.debug("Received unexpected response for id {}", response.getId()));
131
132 if (result[0] != null) {
133 sendResponse(200, result[0], httpExchange);
134 } else {
135 sendResponse(201, null, httpExchange);
136 }
137
138 } catch (Throwable aEx) {
139 logger.error("Failed to process request.", aEx);
140 sendResponse(200,
141 JsonRpcResponse.forError(new JsonRpcResponse.Error(JsonRpcResponse.Error.INTERNAL_ERROR,
142 "An internal server error has occurred.",
143 null), null),
144 httpExchange);
145 }
146 }
147
148 private void handleEventsEndpoint(HttpExchange httpExchange) throws IOException {
149 if (!"/api/v1/events".equals(httpExchange.getRequestURI().getPath())) {
150 sendResponse(404, null, httpExchange);
151 return;
152 }
153 if (!"GET".equals(httpExchange.getRequestMethod())) {
154 sendResponse(405, null, httpExchange);
155 return;
156 }
157
158 try {
159 final var queryString = httpExchange.getRequestURI().getRawQuery();
160 final var query = queryString == null ? Map.<String, String>of() : Util.getQueryMap(queryString);
161
162 List<Manager> managers = getManagerFromQuery(query);
163 if (managers == null) {
164 sendResponse(400, null, httpExchange);
165 return;
166 }
167
168 httpExchange.getResponseHeaders().add("Content-Type", "text/event-stream");
169 httpExchange.sendResponseHeaders(200, 0);
170 final var sender = new ServerSentEventSender(httpExchange.getResponseBody());
171
172 final var shouldStop = new AtomicBoolean(false);
173 final var handlers = subscribeReceiveHandlers(managers, sender, () -> {
174 shouldStop.set(true);
175 synchronized (this) {
176 this.notifyAll();
177 }
178 });
179
180 try {
181 while (true) {
182 synchronized (this) {
183 wait(15_000);
184 }
185 if (shouldStop.get() || shutdown.get()) {
186 break;
187 }
188
189 try {
190 sender.sendKeepAlive();
191 } catch (IOException e) {
192 break;
193 }
194 }
195 } finally {
196 for (final var pair : handlers) {
197 unsubscribeReceiveHandler(pair);
198 }
199 try {
200 httpExchange.getResponseBody().close();
201 } catch (IOException ignored) {
202 }
203 }
204 } catch (Throwable aEx) {
205 logger.error("Failed to process request.", aEx);
206 sendResponse(500, null, httpExchange);
207 }
208 }
209
210 private void handleCheckEndpoint(HttpExchange httpExchange) throws IOException {
211 if (!"/api/v1/check".equals(httpExchange.getRequestURI().getPath())) {
212 sendResponse(404, null, httpExchange);
213 return;
214 }
215 if (!"GET".equals(httpExchange.getRequestMethod())) {
216 sendResponse(405, null, httpExchange);
217 return;
218 }
219
220 sendResponse(200, null, httpExchange);
221 }
222
223 private List<Manager> getManagerFromQuery(final Map<String, String> query) {
224 if (m != null) {
225 return List.of(m);
226 }
227 if (c != null) {
228 final var account = query.get("account");
229 if (account == null || account.isEmpty()) {
230 return c.getManagers();
231 } else {
232 final var manager = c.getManager(account);
233 if (manager == null) {
234 return null;
235 }
236 return List.of(manager);
237 }
238 }
239 throw new AssertionError("Unreachable state");
240 }
241
242 private List<Pair<Manager, Manager.ReceiveMessageHandler>> subscribeReceiveHandlers(
243 final List<Manager> managers,
244 final ServerSentEventSender sender,
245 Callable unsubscribe
246 ) {
247 return managers.stream().map(m1 -> {
248 final var receiveMessageHandler = new JsonReceiveMessageHandler(m1, s -> {
249 try {
250 sender.sendEvent(null, "receive", List.of(objectMapper.writeValueAsString(s)));
251 } catch (IOException e) {
252 unsubscribe.call();
253 }
254 });
255 m1.addReceiveHandler(receiveMessageHandler);
256 return new Pair<>(m1, (Manager.ReceiveMessageHandler) receiveMessageHandler);
257 }).toList();
258 }
259
260 private void unsubscribeReceiveHandler(final Pair<Manager, Manager.ReceiveMessageHandler> pair) {
261 final var m = pair.first();
262 final var handler = pair.second();
263 m.removeReceiveHandler(handler);
264 }
265
266 private interface Callable {
267
268 void call();
269 }
270 }