]> nmode's Git Repositories - signal-cli/blob - src/main/java/org/asamk/signal/http/HttpServerHandler.java
Fix TextStyle doc example typo (#1251)
[signal-cli] / src / main / java / org / asamk / signal / http / HttpServerHandler.java
1 package org.asamk.signal.http;
2
3 import com.fasterxml.jackson.databind.ObjectMapper;
4 import com.sun.net.httpserver.HttpExchange;
5 import com.sun.net.httpserver.HttpServer;
6
7 import org.asamk.signal.commands.Commands;
8 import org.asamk.signal.json.JsonReceiveMessageHandler;
9 import org.asamk.signal.jsonrpc.JsonRpcReader;
10 import org.asamk.signal.jsonrpc.JsonRpcResponse;
11 import org.asamk.signal.jsonrpc.JsonRpcSender;
12 import org.asamk.signal.jsonrpc.SignalJsonRpcCommandHandler;
13 import org.asamk.signal.manager.Manager;
14 import org.asamk.signal.manager.MultiAccountManager;
15 import org.asamk.signal.manager.api.Pair;
16 import org.asamk.signal.manager.util.Utils;
17 import org.asamk.signal.util.Util;
18 import org.slf4j.Logger;
19 import org.slf4j.LoggerFactory;
20
21 import java.io.IOException;
22 import java.net.InetSocketAddress;
23 import java.util.List;
24 import java.util.Map;
25 import java.util.concurrent.Executors;
26 import java.util.concurrent.atomic.AtomicBoolean;
27
28 public class HttpServerHandler {
29
30 private final static Logger logger = LoggerFactory.getLogger(HttpServerHandler.class);
31
32 private final ObjectMapper objectMapper = Util.createJsonObjectMapper();
33
34 private final InetSocketAddress address;
35
36 private final SignalJsonRpcCommandHandler commandHandler;
37 private final MultiAccountManager c;
38 private final Manager m;
39
40 public HttpServerHandler(final InetSocketAddress address, final Manager m) {
41 this.address = address;
42 commandHandler = new SignalJsonRpcCommandHandler(m, Commands::getCommand);
43 this.c = null;
44 this.m = m;
45 }
46
47 public HttpServerHandler(final InetSocketAddress address, final MultiAccountManager c) {
48 this.address = address;
49 commandHandler = new SignalJsonRpcCommandHandler(c, Commands::getCommand);
50 this.c = c;
51 this.m = null;
52 }
53
54 public void init() throws IOException {
55 logger.info("Starting server on " + address.toString());
56
57 final var server = HttpServer.create(address, 0);
58 server.setExecutor(Executors.newFixedThreadPool(10));
59
60 server.createContext("/api/v1/rpc", this::handleRpcEndpoint);
61 server.createContext("/api/v1/events", this::handleEventsEndpoint);
62 server.createContext("/api/v1/check", this::handleCheckEndpoint);
63
64 server.start();
65 }
66
67 private void sendResponse(int status, Object response, HttpExchange httpExchange) throws IOException {
68 if (response != null) {
69 final var byteResponse = objectMapper.writeValueAsBytes(response);
70
71 httpExchange.getResponseHeaders().add("Content-Type", "application/json");
72 httpExchange.sendResponseHeaders(status, byteResponse.length);
73
74 httpExchange.getResponseBody().write(byteResponse);
75 } else {
76 httpExchange.sendResponseHeaders(status, -1);
77 }
78
79 httpExchange.getResponseBody().close();
80 }
81
82 private void handleRpcEndpoint(HttpExchange httpExchange) throws IOException {
83 if (!"/api/v1/rpc".equals(httpExchange.getRequestURI().getPath())) {
84 sendResponse(404, null, httpExchange);
85 return;
86 }
87 if (!"POST".equals(httpExchange.getRequestMethod())) {
88 sendResponse(405, null, httpExchange);
89 return;
90 }
91
92 final var contentType = httpExchange.getRequestHeaders().getFirst("Content-Type");
93 if (contentType == null || !contentType.startsWith("application/json")) {
94 sendResponse(415, null, httpExchange);
95 return;
96 }
97
98 try {
99
100 final Object[] result = {null};
101 final var jsonRpcSender = new JsonRpcSender(s -> {
102 if (result[0] != null) {
103 throw new AssertionError("There should only be a single JSON-RPC response");
104 }
105
106 result[0] = s;
107 });
108
109 final var jsonRpcReader = new JsonRpcReader(jsonRpcSender, httpExchange.getRequestBody());
110 jsonRpcReader.readMessages((method, params) -> commandHandler.handleRequest(objectMapper, method, params),
111 response -> logger.debug("Received unexpected response for id {}", response.getId()));
112
113 if (result[0] != null) {
114 sendResponse(200, result[0], httpExchange);
115 } else {
116 sendResponse(201, null, httpExchange);
117 }
118
119 } catch (Throwable aEx) {
120 logger.error("Failed to process request.", aEx);
121 sendResponse(200,
122 JsonRpcResponse.forError(new JsonRpcResponse.Error(JsonRpcResponse.Error.INTERNAL_ERROR,
123 "An internal server error has occurred.",
124 null), null),
125 httpExchange);
126 }
127 }
128
129 private void handleEventsEndpoint(HttpExchange httpExchange) throws IOException {
130 if (!"/api/v1/events".equals(httpExchange.getRequestURI().getPath())) {
131 sendResponse(404, null, httpExchange);
132 return;
133 }
134 if (!"GET".equals(httpExchange.getRequestMethod())) {
135 sendResponse(405, null, httpExchange);
136 return;
137 }
138
139 try {
140 final var queryString = httpExchange.getRequestURI().getQuery();
141 final var query = queryString == null ? Map.<String, String>of() : Utils.getQueryMap(queryString);
142
143 List<Manager> managers = getManagerFromQuery(query);
144 if (managers == null) {
145 sendResponse(400, null, httpExchange);
146 return;
147 }
148
149 httpExchange.getResponseHeaders().add("Content-Type", "text/event-stream");
150 httpExchange.sendResponseHeaders(200, 0);
151 final var sender = new ServerSentEventSender(httpExchange.getResponseBody());
152
153 final var shouldStop = new AtomicBoolean(false);
154 final var handlers = subscribeReceiveHandlers(managers, sender, () -> {
155 shouldStop.set(true);
156 synchronized (this) {
157 this.notify();
158 }
159 });
160
161 try {
162 while (true) {
163 synchronized (this) {
164 wait(15_000);
165 }
166 if (shouldStop.get()) {
167 break;
168 }
169
170 try {
171 sender.sendKeepAlive();
172 } catch (IOException e) {
173 break;
174 }
175 }
176 } finally {
177 for (final var pair : handlers) {
178 unsubscribeReceiveHandler(pair);
179 }
180 try {
181 httpExchange.getResponseBody().close();
182 } catch (IOException ignored) {
183 }
184 }
185 } catch (Throwable aEx) {
186 logger.error("Failed to process request.", aEx);
187 sendResponse(500, null, httpExchange);
188 }
189 }
190
191 private void handleCheckEndpoint(HttpExchange httpExchange) throws IOException {
192 if (!"/api/v1/check".equals(httpExchange.getRequestURI().getPath())) {
193 sendResponse(404, null, httpExchange);
194 return;
195 }
196 if (!"GET".equals(httpExchange.getRequestMethod())) {
197 sendResponse(405, null, httpExchange);
198 return;
199 }
200
201 sendResponse(200, null, httpExchange);
202 }
203
204 private List<Manager> getManagerFromQuery(final Map<String, String> query) {
205 List<Manager> managers;
206 if (m != null) {
207 managers = List.of(m);
208 } else {
209 final var account = query.get("account");
210 if (account == null || account.isEmpty()) {
211 managers = c.getManagers();
212 } else {
213 final var manager = c.getManager(account);
214 if (manager == null) {
215 return null;
216 }
217 managers = List.of(manager);
218 }
219 }
220 return managers;
221 }
222
223 private List<Pair<Manager, Manager.ReceiveMessageHandler>> subscribeReceiveHandlers(
224 final List<Manager> managers, final ServerSentEventSender sender, Callable unsubscribe
225 ) {
226 return managers.stream().map(m1 -> {
227 final var receiveMessageHandler = new JsonReceiveMessageHandler(m1, s -> {
228 try {
229 sender.sendEvent(null, "receive", List.of(objectMapper.writeValueAsString(s)));
230 } catch (IOException e) {
231 unsubscribe.call();
232 }
233 });
234 m1.addReceiveHandler(receiveMessageHandler);
235 return new Pair<>(m1, (Manager.ReceiveMessageHandler) receiveMessageHandler);
236 }).toList();
237 }
238
239 private void unsubscribeReceiveHandler(final Pair<Manager, Manager.ReceiveMessageHandler> pair) {
240 final var m = pair.first();
241 final var handler = pair.second();
242 m.removeReceiveHandler(handler);
243 }
244
245 private interface Callable {
246
247 void call();
248 }
249 }