]> nmode's Git Repositories - signal-cli/blob - src/main/java/org/asamk/signal/http/HttpServerHandler.java
Reorder static final modifier
[signal-cli] / src / main / java / org / asamk / signal / http / HttpServerHandler.java
1 package org.asamk.signal.http;
2
3 import com.fasterxml.jackson.databind.ObjectMapper;
4 import com.sun.net.httpserver.HttpExchange;
5 import com.sun.net.httpserver.HttpServer;
6
7 import org.asamk.signal.commands.Commands;
8 import org.asamk.signal.json.JsonReceiveMessageHandler;
9 import org.asamk.signal.jsonrpc.JsonRpcReader;
10 import org.asamk.signal.jsonrpc.JsonRpcResponse;
11 import org.asamk.signal.jsonrpc.JsonRpcSender;
12 import org.asamk.signal.jsonrpc.SignalJsonRpcCommandHandler;
13 import org.asamk.signal.manager.Manager;
14 import org.asamk.signal.manager.MultiAccountManager;
15 import org.asamk.signal.manager.api.Pair;
16 import org.asamk.signal.util.Util;
17 import org.slf4j.Logger;
18 import org.slf4j.LoggerFactory;
19
20 import java.io.IOException;
21 import java.net.InetSocketAddress;
22 import java.util.List;
23 import java.util.Map;
24 import java.util.concurrent.Executors;
25 import java.util.concurrent.atomic.AtomicBoolean;
26
27 public class HttpServerHandler implements AutoCloseable {
28
29 private static final Logger logger = LoggerFactory.getLogger(HttpServerHandler.class);
30
31 private final ObjectMapper objectMapper = Util.createJsonObjectMapper();
32
33 private final InetSocketAddress address;
34
35 private final SignalJsonRpcCommandHandler commandHandler;
36 private final MultiAccountManager c;
37 private final Manager m;
38 private HttpServer server;
39 private final AtomicBoolean shutdown = new AtomicBoolean(false);
40
41 public HttpServerHandler(final InetSocketAddress address, final Manager m) {
42 this.address = address;
43 commandHandler = new SignalJsonRpcCommandHandler(m, Commands::getCommand);
44 this.c = null;
45 this.m = m;
46 }
47
48 public HttpServerHandler(final InetSocketAddress address, final MultiAccountManager c) {
49 this.address = address;
50 commandHandler = new SignalJsonRpcCommandHandler(c, Commands::getCommand);
51 this.c = c;
52 this.m = null;
53 }
54
55 public void init() throws IOException {
56 if (server != null) {
57 throw new AssertionError("HttpServerHandler already initialized");
58 }
59 logger.info("Starting server on " + address.toString());
60
61 server = HttpServer.create(address, 0);
62 server.setExecutor(Executors.newCachedThreadPool());
63
64 server.createContext("/api/v1/rpc", this::handleRpcEndpoint);
65 server.createContext("/api/v1/events", this::handleEventsEndpoint);
66 server.createContext("/api/v1/check", this::handleCheckEndpoint);
67
68 server.start();
69 }
70
71 private void sendResponse(int status, Object response, HttpExchange httpExchange) throws IOException {
72 if (response != null) {
73 final var byteResponse = objectMapper.writeValueAsBytes(response);
74
75 httpExchange.getResponseHeaders().add("Content-Type", "application/json");
76 httpExchange.sendResponseHeaders(status, byteResponse.length);
77
78 httpExchange.getResponseBody().write(byteResponse);
79 } else {
80 httpExchange.sendResponseHeaders(status, -1);
81 }
82
83 httpExchange.getResponseBody().close();
84 }
85
86 private void handleRpcEndpoint(HttpExchange httpExchange) throws IOException {
87 if (!"/api/v1/rpc".equals(httpExchange.getRequestURI().getPath())) {
88 sendResponse(404, null, httpExchange);
89 return;
90 }
91 if (!"POST".equals(httpExchange.getRequestMethod())) {
92 sendResponse(405, null, httpExchange);
93 return;
94 }
95
96 final var contentType = httpExchange.getRequestHeaders().getFirst("Content-Type");
97 if (contentType == null || !contentType.startsWith("application/json")) {
98 sendResponse(415, null, httpExchange);
99 return;
100 }
101
102 try {
103
104 final Object[] result = {null};
105 final var jsonRpcSender = new JsonRpcSender(s -> {
106 if (result[0] != null) {
107 throw new AssertionError("There should only be a single JSON-RPC response");
108 }
109
110 result[0] = s;
111 });
112
113 final var jsonRpcReader = new JsonRpcReader(jsonRpcSender, httpExchange.getRequestBody());
114 jsonRpcReader.readMessages((method, params) -> commandHandler.handleRequest(objectMapper, method, params),
115 response -> logger.debug("Received unexpected response for id {}", response.getId()));
116
117 if (result[0] != null) {
118 sendResponse(200, result[0], httpExchange);
119 } else {
120 sendResponse(201, null, httpExchange);
121 }
122
123 } catch (Throwable aEx) {
124 logger.error("Failed to process request.", aEx);
125 sendResponse(200,
126 JsonRpcResponse.forError(new JsonRpcResponse.Error(JsonRpcResponse.Error.INTERNAL_ERROR,
127 "An internal server error has occurred.",
128 null), null),
129 httpExchange);
130 }
131 }
132
133 private void handleEventsEndpoint(HttpExchange httpExchange) throws IOException {
134 if (!"/api/v1/events".equals(httpExchange.getRequestURI().getPath())) {
135 sendResponse(404, null, httpExchange);
136 return;
137 }
138 if (!"GET".equals(httpExchange.getRequestMethod())) {
139 sendResponse(405, null, httpExchange);
140 return;
141 }
142
143 try {
144 final var queryString = httpExchange.getRequestURI().getQuery();
145 final var query = queryString == null ? Map.<String, String>of() : Util.getQueryMap(queryString);
146
147 List<Manager> managers = getManagerFromQuery(query);
148 if (managers == null) {
149 sendResponse(400, null, httpExchange);
150 return;
151 }
152
153 httpExchange.getResponseHeaders().add("Content-Type", "text/event-stream");
154 httpExchange.sendResponseHeaders(200, 0);
155 final var sender = new ServerSentEventSender(httpExchange.getResponseBody());
156
157 final var shouldStop = new AtomicBoolean(false);
158 final var handlers = subscribeReceiveHandlers(managers, sender, () -> {
159 shouldStop.set(true);
160 synchronized (this) {
161 this.notifyAll();
162 }
163 });
164
165 try {
166 while (true) {
167 synchronized (this) {
168 wait(15_000);
169 }
170 if (shouldStop.get() || shutdown.get()) {
171 break;
172 }
173
174 try {
175 sender.sendKeepAlive();
176 } catch (IOException e) {
177 break;
178 }
179 }
180 } finally {
181 for (final var pair : handlers) {
182 unsubscribeReceiveHandler(pair);
183 }
184 try {
185 httpExchange.getResponseBody().close();
186 } catch (IOException ignored) {
187 }
188 }
189 } catch (Throwable aEx) {
190 logger.error("Failed to process request.", aEx);
191 sendResponse(500, null, httpExchange);
192 }
193 }
194
195 private void handleCheckEndpoint(HttpExchange httpExchange) throws IOException {
196 if (!"/api/v1/check".equals(httpExchange.getRequestURI().getPath())) {
197 sendResponse(404, null, httpExchange);
198 return;
199 }
200 if (!"GET".equals(httpExchange.getRequestMethod())) {
201 sendResponse(405, null, httpExchange);
202 return;
203 }
204
205 sendResponse(200, null, httpExchange);
206 }
207
208 private List<Manager> getManagerFromQuery(final Map<String, String> query) {
209 if (m != null) {
210 return List.of(m);
211 }
212 if (c != null) {
213 final var account = query.get("account");
214 if (account == null || account.isEmpty()) {
215 return c.getManagers();
216 } else {
217 final var manager = c.getManager(account);
218 if (manager == null) {
219 return null;
220 }
221 return List.of(manager);
222 }
223 }
224 return List.of();
225 }
226
227 private List<Pair<Manager, Manager.ReceiveMessageHandler>> subscribeReceiveHandlers(
228 final List<Manager> managers, final ServerSentEventSender sender, Callable unsubscribe
229 ) {
230 return managers.stream().map(m1 -> {
231 final var receiveMessageHandler = new JsonReceiveMessageHandler(m1, s -> {
232 try {
233 sender.sendEvent(null, "receive", List.of(objectMapper.writeValueAsString(s)));
234 } catch (IOException e) {
235 unsubscribe.call();
236 }
237 });
238 m1.addReceiveHandler(receiveMessageHandler);
239 return new Pair<>(m1, (Manager.ReceiveMessageHandler) receiveMessageHandler);
240 }).toList();
241 }
242
243 private void unsubscribeReceiveHandler(final Pair<Manager, Manager.ReceiveMessageHandler> pair) {
244 final var m = pair.first();
245 final var handler = pair.second();
246 m.removeReceiveHandler(handler);
247 }
248
249 @Override
250 public void close() {
251 if (server != null) {
252 shutdown.set(true);
253 synchronized (this) {
254 this.notifyAll();
255 }
256 // Increase this delay when https://bugs.openjdk.org/browse/JDK-8304065 is fixed
257 server.stop(2);
258 server = null;
259 shutdown.set(false);
260 }
261 }
262
263 private interface Callable {
264
265 void call();
266 }
267 }