1 package org
.asamk
.signal
.http
;
3 import com
.fasterxml
.jackson
.databind
.ObjectMapper
;
4 import com
.sun
.net
.httpserver
.HttpExchange
;
5 import com
.sun
.net
.httpserver
.HttpServer
;
7 import org
.asamk
.signal
.commands
.Commands
;
8 import org
.asamk
.signal
.json
.JsonReceiveMessageHandler
;
9 import org
.asamk
.signal
.jsonrpc
.JsonRpcReader
;
10 import org
.asamk
.signal
.jsonrpc
.JsonRpcResponse
;
11 import org
.asamk
.signal
.jsonrpc
.JsonRpcSender
;
12 import org
.asamk
.signal
.jsonrpc
.SignalJsonRpcCommandHandler
;
13 import org
.asamk
.signal
.manager
.Manager
;
14 import org
.asamk
.signal
.manager
.MultiAccountManager
;
15 import org
.asamk
.signal
.manager
.api
.Pair
;
16 import org
.asamk
.signal
.util
.Util
;
17 import org
.slf4j
.Logger
;
18 import org
.slf4j
.LoggerFactory
;
20 import java
.io
.IOException
;
21 import java
.net
.InetSocketAddress
;
22 import java
.util
.List
;
24 import java
.util
.concurrent
.Executors
;
25 import java
.util
.concurrent
.atomic
.AtomicBoolean
;
27 public class HttpServerHandler
implements AutoCloseable
{
29 private static final Logger logger
= LoggerFactory
.getLogger(HttpServerHandler
.class);
31 private final ObjectMapper objectMapper
= Util
.createJsonObjectMapper();
33 private final InetSocketAddress address
;
35 private final SignalJsonRpcCommandHandler commandHandler
;
36 private final MultiAccountManager c
;
37 private final Manager m
;
38 private HttpServer server
;
39 private final AtomicBoolean shutdown
= new AtomicBoolean(false);
41 public HttpServerHandler(final InetSocketAddress address
, final Manager m
) {
42 this.address
= address
;
43 commandHandler
= new SignalJsonRpcCommandHandler(m
, Commands
::getCommand
);
48 public HttpServerHandler(final InetSocketAddress address
, final MultiAccountManager c
) {
49 this.address
= address
;
50 commandHandler
= new SignalJsonRpcCommandHandler(c
, Commands
::getCommand
);
55 public void init() throws IOException
{
57 throw new AssertionError("HttpServerHandler already initialized");
59 logger
.debug("Starting HTTP server on {}", address
);
61 server
= HttpServer
.create(address
, 0);
62 server
.setExecutor(Executors
.newCachedThreadPool());
64 server
.createContext("/api/v1/rpc", this::handleRpcEndpoint
);
65 server
.createContext("/api/v1/events", this::handleEventsEndpoint
);
66 server
.createContext("/api/v1/check", this::handleCheckEndpoint
);
69 logger
.info("Started HTTP server on {}", address
);
79 // Increase this delay when https://bugs.openjdk.org/browse/JDK-8304065 is fixed
86 private void sendResponse(int status
, Object response
, HttpExchange httpExchange
) throws IOException
{
87 if (response
!= null) {
88 final var byteResponse
= objectMapper
.writeValueAsBytes(response
);
90 httpExchange
.getResponseHeaders().add("Content-Type", "application/json");
91 httpExchange
.sendResponseHeaders(status
, byteResponse
.length
);
93 httpExchange
.getResponseBody().write(byteResponse
);
95 httpExchange
.sendResponseHeaders(status
, -1);
98 httpExchange
.getResponseBody().close();
101 private void handleRpcEndpoint(HttpExchange httpExchange
) throws IOException
{
102 if (!"/api/v1/rpc".equals(httpExchange
.getRequestURI().getPath())) {
103 sendResponse(404, null, httpExchange
);
106 if (!"POST".equals(httpExchange
.getRequestMethod())) {
107 sendResponse(405, null, httpExchange
);
111 final var contentType
= httpExchange
.getRequestHeaders().getFirst("Content-Type");
112 if (contentType
== null || !contentType
.startsWith("application/json")) {
113 sendResponse(415, null, httpExchange
);
119 final Object
[] result
= {null};
120 final var jsonRpcSender
= new JsonRpcSender(s
-> {
121 if (result
[0] != null) {
122 throw new AssertionError("There should only be a single JSON-RPC response");
128 final var jsonRpcReader
= new JsonRpcReader(jsonRpcSender
, httpExchange
.getRequestBody());
129 jsonRpcReader
.readMessages((method
, params
) -> commandHandler
.handleRequest(objectMapper
, method
, params
),
130 response
-> logger
.debug("Received unexpected response for id {}", response
.getId()));
132 if (result
[0] != null) {
133 sendResponse(200, result
[0], httpExchange
);
135 sendResponse(201, null, httpExchange
);
138 } catch (Throwable aEx
) {
139 logger
.error("Failed to process request.", aEx
);
141 JsonRpcResponse
.forError(new JsonRpcResponse
.Error(JsonRpcResponse
.Error
.INTERNAL_ERROR
,
142 "An internal server error has occurred.",
148 private void handleEventsEndpoint(HttpExchange httpExchange
) throws IOException
{
149 if (!"/api/v1/events".equals(httpExchange
.getRequestURI().getPath())) {
150 sendResponse(404, null, httpExchange
);
153 if (!"GET".equals(httpExchange
.getRequestMethod())) {
154 sendResponse(405, null, httpExchange
);
159 final var queryString
= httpExchange
.getRequestURI().getRawQuery();
160 final var query
= queryString
== null ? Map
.<String
, String
>of() : Util
.getQueryMap(queryString
);
162 List
<Manager
> managers
= getManagerFromQuery(query
);
163 if (managers
== null) {
164 sendResponse(400, null, httpExchange
);
168 httpExchange
.getResponseHeaders().add("Content-Type", "text/event-stream");
169 httpExchange
.sendResponseHeaders(200, 0);
170 final var sender
= new ServerSentEventSender(httpExchange
.getResponseBody());
172 final var shouldStop
= new AtomicBoolean(false);
173 final var handlers
= subscribeReceiveHandlers(managers
, sender
, () -> {
174 shouldStop
.set(true);
175 synchronized (this) {
182 synchronized (this) {
185 if (shouldStop
.get() || shutdown
.get()) {
190 sender
.sendKeepAlive();
191 } catch (IOException e
) {
196 for (final var pair
: handlers
) {
197 unsubscribeReceiveHandler(pair
);
200 httpExchange
.getResponseBody().close();
201 } catch (IOException ignored
) {
204 } catch (Throwable aEx
) {
205 logger
.error("Failed to process request.", aEx
);
206 sendResponse(500, null, httpExchange
);
210 private void handleCheckEndpoint(HttpExchange httpExchange
) throws IOException
{
211 if (!"/api/v1/check".equals(httpExchange
.getRequestURI().getPath())) {
212 sendResponse(404, null, httpExchange
);
215 if (!"GET".equals(httpExchange
.getRequestMethod())) {
216 sendResponse(405, null, httpExchange
);
220 sendResponse(200, null, httpExchange
);
223 private List
<Manager
> getManagerFromQuery(final Map
<String
, String
> query
) {
228 final var account
= query
.get("account");
229 if (account
== null || account
.isEmpty()) {
230 return c
.getManagers();
232 final var manager
= c
.getManager(account
);
233 if (manager
== null) {
236 return List
.of(manager
);
239 throw new AssertionError("Unreachable state");
242 private List
<Pair
<Manager
, Manager
.ReceiveMessageHandler
>> subscribeReceiveHandlers(
243 final List
<Manager
> managers
,
244 final ServerSentEventSender sender
,
247 return managers
.stream().map(m1
-> {
248 final var receiveMessageHandler
= new JsonReceiveMessageHandler(m1
, s
-> {
250 sender
.sendEvent(null, "receive", List
.of(objectMapper
.writeValueAsString(s
)));
251 } catch (IOException e
) {
255 m1
.addReceiveHandler(receiveMessageHandler
);
256 return new Pair
<>(m1
, (Manager
.ReceiveMessageHandler
) receiveMessageHandler
);
260 private void unsubscribeReceiveHandler(final Pair
<Manager
, Manager
.ReceiveMessageHandler
> pair
) {
261 final var m
= pair
.first();
262 final var handler
= pair
.second();
263 m
.removeReceiveHandler(handler
);
266 private interface Callable
{