]> nmode's Git Repositories - signal-cli/blob - src/main/java/org/asamk/signal/http/HttpServerHandler.java
Add http endpoint events with SSE
[signal-cli] / src / main / java / org / asamk / signal / http / HttpServerHandler.java
1 package org.asamk.signal.http;
2
3 import com.fasterxml.jackson.databind.ObjectMapper;
4 import com.sun.net.httpserver.HttpExchange;
5 import com.sun.net.httpserver.HttpServer;
6
7 import org.asamk.signal.commands.Commands;
8 import org.asamk.signal.json.JsonReceiveMessageHandler;
9 import org.asamk.signal.jsonrpc.JsonRpcReader;
10 import org.asamk.signal.jsonrpc.JsonRpcResponse;
11 import org.asamk.signal.jsonrpc.JsonRpcSender;
12 import org.asamk.signal.jsonrpc.SignalJsonRpcCommandHandler;
13 import org.asamk.signal.manager.Manager;
14 import org.asamk.signal.manager.MultiAccountManager;
15 import org.asamk.signal.manager.api.Pair;
16 import org.asamk.signal.manager.util.Utils;
17 import org.asamk.signal.util.Util;
18 import org.slf4j.Logger;
19 import org.slf4j.LoggerFactory;
20
21 import java.io.IOException;
22 import java.net.InetSocketAddress;
23 import java.util.List;
24 import java.util.Map;
25 import java.util.concurrent.Executors;
26 import java.util.concurrent.atomic.AtomicBoolean;
27
28 public class HttpServerHandler {
29
30 private final static Logger logger = LoggerFactory.getLogger(HttpServerHandler.class);
31
32 private final ObjectMapper objectMapper = Util.createJsonObjectMapper();
33
34 private final InetSocketAddress address;
35
36 private final SignalJsonRpcCommandHandler commandHandler;
37 private final MultiAccountManager c;
38 private final Manager m;
39
40 public HttpServerHandler(final InetSocketAddress address, final Manager m) {
41 this.address = address;
42 commandHandler = new SignalJsonRpcCommandHandler(m, Commands::getCommand);
43 this.c = null;
44 this.m = m;
45 }
46
47 public HttpServerHandler(final InetSocketAddress address, final MultiAccountManager c) {
48 this.address = address;
49 commandHandler = new SignalJsonRpcCommandHandler(c, Commands::getCommand);
50 this.c = c;
51 this.m = null;
52 }
53
54 public void init() throws IOException {
55 logger.info("Starting server on " + address.toString());
56
57 final var server = HttpServer.create(address, 0);
58 server.setExecutor(Executors.newFixedThreadPool(10));
59
60 server.createContext("/api/v1/rpc", this::handleRpcEndpoint);
61 server.createContext("/api/v1/events", this::handleEventsEndpoint);
62
63 server.start();
64 }
65
66 private void sendResponse(int status, Object response, HttpExchange httpExchange) throws IOException {
67 if (response != null) {
68 final var byteResponse = objectMapper.writeValueAsBytes(response);
69
70 httpExchange.getResponseHeaders().add("Content-Type", "application/json");
71 httpExchange.sendResponseHeaders(status, byteResponse.length);
72
73 httpExchange.getResponseBody().write(byteResponse);
74 } else {
75 httpExchange.sendResponseHeaders(status, -1);
76 }
77
78 httpExchange.getResponseBody().close();
79 }
80
81 private void handleRpcEndpoint(HttpExchange httpExchange) throws IOException {
82 if (!"/api/v1/rpc".equals(httpExchange.getRequestURI().getPath())) {
83 sendResponse(404, null, httpExchange);
84 return;
85 }
86 if (!"POST".equals(httpExchange.getRequestMethod())) {
87 sendResponse(405, null, httpExchange);
88 return;
89 }
90
91 if (!"application/json".equals(httpExchange.getRequestHeaders().getFirst("Content-Type"))) {
92 sendResponse(415, null, httpExchange);
93 return;
94 }
95
96 try {
97
98 final Object[] result = {null};
99 final var jsonRpcSender = new JsonRpcSender(s -> {
100 if (result[0] != null) {
101 throw new AssertionError("There should only be a single JSON-RPC response");
102 }
103
104 result[0] = s;
105 });
106
107 final var jsonRpcReader = new JsonRpcReader(jsonRpcSender, httpExchange.getRequestBody());
108 jsonRpcReader.readMessages((method, params) -> commandHandler.handleRequest(objectMapper, method, params),
109 response -> logger.debug("Received unexpected response for id {}", response.getId()));
110
111 if (result[0] != null) {
112 sendResponse(200, result[0], httpExchange);
113 } else {
114 sendResponse(201, null, httpExchange);
115 }
116
117 } catch (Throwable aEx) {
118 logger.error("Failed to process request.", aEx);
119 sendResponse(200,
120 JsonRpcResponse.forError(new JsonRpcResponse.Error(JsonRpcResponse.Error.INTERNAL_ERROR,
121 "An internal server error has occurred.",
122 null), null),
123 httpExchange);
124 }
125 }
126
127 private void handleEventsEndpoint(HttpExchange httpExchange) throws IOException {
128 if (!"/api/v1/events".equals(httpExchange.getRequestURI().getPath())) {
129 sendResponse(404, null, httpExchange);
130 return;
131 }
132 if (!"GET".equals(httpExchange.getRequestMethod())) {
133 sendResponse(405, null, httpExchange);
134 return;
135 }
136
137 try {
138 final var queryString = httpExchange.getRequestURI().getQuery();
139 final var query = queryString == null ? Map.<String, String>of() : Utils.getQueryMap(queryString);
140
141 List<Manager> managers = getManagerFromQuery(query);
142 if (managers == null) {
143 sendResponse(400, null, httpExchange);
144 return;
145 }
146
147 httpExchange.getResponseHeaders().add("Content-Type", "text/event-stream");
148 httpExchange.sendResponseHeaders(200, 0);
149 final var sender = new ServerSentEventSender(httpExchange.getResponseBody());
150
151 final var shouldStop = new AtomicBoolean(false);
152 final var handlers = subscribeReceiveHandlers(managers, sender, () -> {
153 shouldStop.set(true);
154 synchronized (this) {
155 this.notify();
156 }
157 });
158
159 try {
160 while (true) {
161 synchronized (this) {
162 wait(15_000);
163 }
164 if (shouldStop.get()) {
165 break;
166 }
167
168 try {
169 sender.sendKeepAlive();
170 } catch (IOException e) {
171 break;
172 }
173 }
174 } finally {
175 for (final var pair : handlers) {
176 unsubscribeReceiveHandler(pair);
177 }
178 try {
179 httpExchange.getResponseBody().close();
180 } catch (IOException ignored) {
181 }
182 }
183 } catch (Throwable aEx) {
184 logger.error("Failed to process request.", aEx);
185 sendResponse(500, null, httpExchange);
186 }
187 }
188
189 private List<Manager> getManagerFromQuery(final Map<String, String> query) {
190 List<Manager> managers;
191 if (m != null) {
192 managers = List.of(m);
193 } else {
194 final var account = query.get("account");
195 if (account == null || account.isEmpty()) {
196 managers = c.getManagers();
197 } else {
198 final var manager = c.getManager(account);
199 if (manager == null) {
200 return null;
201 }
202 managers = List.of(manager);
203 }
204 }
205 return managers;
206 }
207
208 private List<Pair<Manager, Manager.ReceiveMessageHandler>> subscribeReceiveHandlers(
209 final List<Manager> managers, final ServerSentEventSender sender, Callable unsubscribe
210 ) {
211 return managers.stream().map(m1 -> {
212 final var receiveMessageHandler = new JsonReceiveMessageHandler(m1, s -> {
213 try {
214 sender.sendEvent(null, "receive", List.of(objectMapper.writeValueAsString(s)));
215 } catch (IOException e) {
216 unsubscribe.call();
217 }
218 });
219 m1.addReceiveHandler(receiveMessageHandler);
220 return new Pair<>(m1, (Manager.ReceiveMessageHandler) receiveMessageHandler);
221 }).toList();
222 }
223
224 private void unsubscribeReceiveHandler(final Pair<Manager, Manager.ReceiveMessageHandler> pair) {
225 final var m = pair.first();
226 final var handler = pair.second();
227 m.removeReceiveHandler(handler);
228 }
229
230 private interface Callable {
231
232 void call();
233 }
234 }