X-Git-Url: https://git.nmode.ca/signal-cli/blobdiff_plain/c628e27d2e1372c4ed9bc4ee319b83700cd11b17..eca3c6fa30a8c6c72599a80cea39d674db1c0d65:/src/main/java/org/asamk/signal/http/HttpServerHandler.java diff --git a/src/main/java/org/asamk/signal/http/HttpServerHandler.java b/src/main/java/org/asamk/signal/http/HttpServerHandler.java index 9fb33a34..c9c8fa16 100644 --- a/src/main/java/org/asamk/signal/http/HttpServerHandler.java +++ b/src/main/java/org/asamk/signal/http/HttpServerHandler.java @@ -5,108 +5,264 @@ import com.sun.net.httpserver.HttpExchange; import com.sun.net.httpserver.HttpServer; import org.asamk.signal.commands.Commands; +import org.asamk.signal.json.JsonReceiveMessageHandler; import org.asamk.signal.jsonrpc.JsonRpcReader; import org.asamk.signal.jsonrpc.JsonRpcResponse; import org.asamk.signal.jsonrpc.JsonRpcSender; import org.asamk.signal.jsonrpc.SignalJsonRpcCommandHandler; import org.asamk.signal.manager.Manager; import org.asamk.signal.manager.MultiAccountManager; +import org.asamk.signal.manager.api.Pair; import org.asamk.signal.util.Util; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.net.InetSocketAddress; +import java.util.List; +import java.util.Map; import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicBoolean; -public class HttpServerHandler { +public class HttpServerHandler implements AutoCloseable { - private final static Logger logger = LoggerFactory.getLogger(HttpServerHandler.class); + private static final Logger logger = LoggerFactory.getLogger(HttpServerHandler.class); private final ObjectMapper objectMapper = Util.createJsonObjectMapper(); private final InetSocketAddress address; private final SignalJsonRpcCommandHandler commandHandler; + private final MultiAccountManager c; + private final Manager m; + private HttpServer server; + private final AtomicBoolean shutdown = new AtomicBoolean(false); public HttpServerHandler(final InetSocketAddress address, final Manager m) { this.address = address; commandHandler = new SignalJsonRpcCommandHandler(m, Commands::getCommand); + this.c = null; + this.m = m; } public HttpServerHandler(final InetSocketAddress address, final MultiAccountManager c) { this.address = address; commandHandler = new SignalJsonRpcCommandHandler(c, Commands::getCommand); + this.c = c; + this.m = null; } public void init() throws IOException { + if (server != null) { + throw new AssertionError("HttpServerHandler already initialized"); + } + logger.debug("Starting HTTP server on {}", address); - logger.info("Starting server on " + address.toString()); + server = HttpServer.create(address, 0); + server.setExecutor(Executors.newCachedThreadPool()); - final var server = HttpServer.create(address, 0); - server.setExecutor(Executors.newFixedThreadPool(10)); + server.createContext("/api/v1/rpc", this::handleRpcEndpoint); + server.createContext("/api/v1/events", this::handleEventsEndpoint); + server.createContext("/api/v1/check", this::handleCheckEndpoint); - server.createContext("/api/v1/rpc", httpExchange -> { + server.start(); + logger.info("Started HTTP server on {}", address); + } - if (!"POST".equals(httpExchange.getRequestMethod())) { - sendResponse(405, null, httpExchange); - return; + @Override + public void close() { + if (server != null) { + shutdown.set(true); + synchronized (this) { + this.notifyAll(); } + // Increase this delay when https://bugs.openjdk.org/browse/JDK-8304065 is fixed + server.stop(2); + server = null; + shutdown.set(false); + } + } - if (!"application/json".equals(httpExchange.getRequestHeaders().getFirst("Content-Type"))) { - sendResponse(415, null, httpExchange); - return; - } + private void sendResponse(int status, Object response, HttpExchange httpExchange) throws IOException { + if (response != null) { + final var byteResponse = objectMapper.writeValueAsBytes(response); - try { + httpExchange.getResponseHeaders().add("Content-Type", "application/json"); + httpExchange.sendResponseHeaders(status, byteResponse.length); - final Object[] result = {null}; - final var jsonRpcSender = new JsonRpcSender(s -> { - if (result[0] != null) { - throw new AssertionError("There should only be a single JSON-RPC response"); - } + httpExchange.getResponseBody().write(byteResponse); + } else { + httpExchange.sendResponseHeaders(status, -1); + } - result[0] = s; - }); + httpExchange.getResponseBody().close(); + } - final var jsonRpcReader = new JsonRpcReader(jsonRpcSender, httpExchange.getRequestBody()); - jsonRpcReader.readMessages((method, params) -> commandHandler.handleRequest(objectMapper, - method, - params), response -> logger.debug("Received unexpected response for id {}", response.getId())); + private void handleRpcEndpoint(HttpExchange httpExchange) throws IOException { + if (!"/api/v1/rpc".equals(httpExchange.getRequestURI().getPath())) { + sendResponse(404, null, httpExchange); + return; + } + if (!"POST".equals(httpExchange.getRequestMethod())) { + sendResponse(405, null, httpExchange); + return; + } + final var contentType = httpExchange.getRequestHeaders().getFirst("Content-Type"); + if (contentType == null || !contentType.startsWith("application/json")) { + sendResponse(415, null, httpExchange); + return; + } + + try { + + final Object[] result = {null}; + final var jsonRpcSender = new JsonRpcSender(s -> { if (result[0] != null) { - sendResponse(200, result[0], httpExchange); - } else { - sendResponse(201, null, httpExchange); + throw new AssertionError("There should only be a single JSON-RPC response"); } - } catch (Throwable aEx) { - logger.error("Failed to process request.", aEx); - sendResponse(200, - JsonRpcResponse.forError(new JsonRpcResponse.Error(JsonRpcResponse.Error.INTERNAL_ERROR, - "An internal server error has occurred.", - null), null), - httpExchange); + result[0] = s; + }); + + final var jsonRpcReader = new JsonRpcReader(jsonRpcSender, httpExchange.getRequestBody()); + jsonRpcReader.readMessages((method, params) -> commandHandler.handleRequest(objectMapper, method, params), + response -> logger.debug("Received unexpected response for id {}", response.getId())); + + if (result[0] != null) { + sendResponse(200, result[0], httpExchange); + } else { + sendResponse(201, null, httpExchange); } - }); - server.start(); + } catch (Throwable aEx) { + logger.error("Failed to process request.", aEx); + sendResponse(200, + JsonRpcResponse.forError(new JsonRpcResponse.Error(JsonRpcResponse.Error.INTERNAL_ERROR, + "An internal server error has occurred.", + null), null), + httpExchange); + } + } + + private void handleEventsEndpoint(HttpExchange httpExchange) throws IOException { + if (!"/api/v1/events".equals(httpExchange.getRequestURI().getPath())) { + sendResponse(404, null, httpExchange); + return; + } + if (!"GET".equals(httpExchange.getRequestMethod())) { + sendResponse(405, null, httpExchange); + return; + } + + try { + final var queryString = httpExchange.getRequestURI().getQuery(); + final var query = queryString == null ? Map.of() : Util.getQueryMap(queryString); + + List managers = getManagerFromQuery(query); + if (managers == null) { + sendResponse(400, null, httpExchange); + return; + } + + httpExchange.getResponseHeaders().add("Content-Type", "text/event-stream"); + httpExchange.sendResponseHeaders(200, 0); + final var sender = new ServerSentEventSender(httpExchange.getResponseBody()); + + final var shouldStop = new AtomicBoolean(false); + final var handlers = subscribeReceiveHandlers(managers, sender, () -> { + shouldStop.set(true); + synchronized (this) { + this.notifyAll(); + } + }); + + try { + while (true) { + synchronized (this) { + wait(15_000); + } + if (shouldStop.get() || shutdown.get()) { + break; + } + try { + sender.sendKeepAlive(); + } catch (IOException e) { + break; + } + } + } finally { + for (final var pair : handlers) { + unsubscribeReceiveHandler(pair); + } + try { + httpExchange.getResponseBody().close(); + } catch (IOException ignored) { + } + } + } catch (Throwable aEx) { + logger.error("Failed to process request.", aEx); + sendResponse(500, null, httpExchange); + } } - private void sendResponse(int status, Object response, HttpExchange httpExchange) throws IOException { - if (response != null) { - final var byteResponse = objectMapper.writeValueAsBytes(response); + private void handleCheckEndpoint(HttpExchange httpExchange) throws IOException { + if (!"/api/v1/check".equals(httpExchange.getRequestURI().getPath())) { + sendResponse(404, null, httpExchange); + return; + } + if (!"GET".equals(httpExchange.getRequestMethod())) { + sendResponse(405, null, httpExchange); + return; + } - httpExchange.getResponseHeaders().add("Content-Type", "application/json"); - httpExchange.sendResponseHeaders(status, byteResponse.length); + sendResponse(200, null, httpExchange); + } - httpExchange.getResponseBody().write(byteResponse); - } else { - httpExchange.sendResponseHeaders(status, 0); + private List getManagerFromQuery(final Map query) { + if (m != null) { + return List.of(m); } + if (c != null) { + final var account = query.get("account"); + if (account == null || account.isEmpty()) { + return c.getManagers(); + } else { + final var manager = c.getManager(account); + if (manager == null) { + return null; + } + return List.of(manager); + } + } + throw new AssertionError("Unreachable state"); + } - httpExchange.getResponseBody().close(); + private List> subscribeReceiveHandlers( + final List managers, final ServerSentEventSender sender, Callable unsubscribe + ) { + return managers.stream().map(m1 -> { + final var receiveMessageHandler = new JsonReceiveMessageHandler(m1, s -> { + try { + sender.sendEvent(null, "receive", List.of(objectMapper.writeValueAsString(s))); + } catch (IOException e) { + unsubscribe.call(); + } + }); + m1.addReceiveHandler(receiveMessageHandler); + return new Pair<>(m1, (Manager.ReceiveMessageHandler) receiveMessageHandler); + }).toList(); } + private void unsubscribeReceiveHandler(final Pair pair) { + final var m = pair.first(); + final var handler = pair.second(); + m.removeReceiveHandler(handler); + } + + private interface Callable { + + void call(); + } }