X-Git-Url: https://git.nmode.ca/signal-cli/blobdiff_plain/1b029b765fcc4ee99a52c8b4be1fe7426beb5f27..3602ef9be940e287bfe6ed32972ce13fea9deaa4:/src/main/java/org/asamk/signal/http/HttpServerHandler.java diff --git a/src/main/java/org/asamk/signal/http/HttpServerHandler.java b/src/main/java/org/asamk/signal/http/HttpServerHandler.java index f6301e0a..c9c8fa16 100644 --- a/src/main/java/org/asamk/signal/http/HttpServerHandler.java +++ b/src/main/java/org/asamk/signal/http/HttpServerHandler.java @@ -5,49 +5,82 @@ import com.sun.net.httpserver.HttpExchange; import com.sun.net.httpserver.HttpServer; import org.asamk.signal.commands.Commands; +import org.asamk.signal.json.JsonReceiveMessageHandler; import org.asamk.signal.jsonrpc.JsonRpcReader; import org.asamk.signal.jsonrpc.JsonRpcResponse; import org.asamk.signal.jsonrpc.JsonRpcSender; import org.asamk.signal.jsonrpc.SignalJsonRpcCommandHandler; import org.asamk.signal.manager.Manager; import org.asamk.signal.manager.MultiAccountManager; +import org.asamk.signal.manager.api.Pair; import org.asamk.signal.util.Util; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.net.InetSocketAddress; +import java.util.List; +import java.util.Map; import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicBoolean; -public class HttpServerHandler { +public class HttpServerHandler implements AutoCloseable { - private final static Logger logger = LoggerFactory.getLogger(HttpServerHandler.class); + private static final Logger logger = LoggerFactory.getLogger(HttpServerHandler.class); private final ObjectMapper objectMapper = Util.createJsonObjectMapper(); private final InetSocketAddress address; private final SignalJsonRpcCommandHandler commandHandler; + private final MultiAccountManager c; + private final Manager m; + private HttpServer server; + private final AtomicBoolean shutdown = new AtomicBoolean(false); public HttpServerHandler(final InetSocketAddress address, final Manager m) { this.address = address; commandHandler = new SignalJsonRpcCommandHandler(m, Commands::getCommand); + this.c = null; + this.m = m; } public HttpServerHandler(final InetSocketAddress address, final MultiAccountManager c) { this.address = address; commandHandler = new SignalJsonRpcCommandHandler(c, Commands::getCommand); + this.c = c; + this.m = null; } public void init() throws IOException { - logger.info("Starting server on " + address.toString()); + if (server != null) { + throw new AssertionError("HttpServerHandler already initialized"); + } + logger.debug("Starting HTTP server on {}", address); - final var server = HttpServer.create(address, 0); - server.setExecutor(Executors.newFixedThreadPool(10)); + server = HttpServer.create(address, 0); + server.setExecutor(Executors.newCachedThreadPool()); server.createContext("/api/v1/rpc", this::handleRpcEndpoint); + server.createContext("/api/v1/events", this::handleEventsEndpoint); + server.createContext("/api/v1/check", this::handleCheckEndpoint); server.start(); + logger.info("Started HTTP server on {}", address); + } + + @Override + public void close() { + if (server != null) { + shutdown.set(true); + synchronized (this) { + this.notifyAll(); + } + // Increase this delay when https://bugs.openjdk.org/browse/JDK-8304065 is fixed + server.stop(2); + server = null; + shutdown.set(false); + } } private void sendResponse(int status, Object response, HttpExchange httpExchange) throws IOException { @@ -59,19 +92,24 @@ public class HttpServerHandler { httpExchange.getResponseBody().write(byteResponse); } else { - httpExchange.sendResponseHeaders(status, 0); + httpExchange.sendResponseHeaders(status, -1); } httpExchange.getResponseBody().close(); } private void handleRpcEndpoint(HttpExchange httpExchange) throws IOException { + if (!"/api/v1/rpc".equals(httpExchange.getRequestURI().getPath())) { + sendResponse(404, null, httpExchange); + return; + } if (!"POST".equals(httpExchange.getRequestMethod())) { sendResponse(405, null, httpExchange); return; } - if (!"application/json".equals(httpExchange.getRequestHeaders().getFirst("Content-Type"))) { + final var contentType = httpExchange.getRequestHeaders().getFirst("Content-Type"); + if (contentType == null || !contentType.startsWith("application/json")) { sendResponse(415, null, httpExchange); return; } @@ -106,4 +144,125 @@ public class HttpServerHandler { httpExchange); } } + + private void handleEventsEndpoint(HttpExchange httpExchange) throws IOException { + if (!"/api/v1/events".equals(httpExchange.getRequestURI().getPath())) { + sendResponse(404, null, httpExchange); + return; + } + if (!"GET".equals(httpExchange.getRequestMethod())) { + sendResponse(405, null, httpExchange); + return; + } + + try { + final var queryString = httpExchange.getRequestURI().getQuery(); + final var query = queryString == null ? Map.of() : Util.getQueryMap(queryString); + + List managers = getManagerFromQuery(query); + if (managers == null) { + sendResponse(400, null, httpExchange); + return; + } + + httpExchange.getResponseHeaders().add("Content-Type", "text/event-stream"); + httpExchange.sendResponseHeaders(200, 0); + final var sender = new ServerSentEventSender(httpExchange.getResponseBody()); + + final var shouldStop = new AtomicBoolean(false); + final var handlers = subscribeReceiveHandlers(managers, sender, () -> { + shouldStop.set(true); + synchronized (this) { + this.notifyAll(); + } + }); + + try { + while (true) { + synchronized (this) { + wait(15_000); + } + if (shouldStop.get() || shutdown.get()) { + break; + } + + try { + sender.sendKeepAlive(); + } catch (IOException e) { + break; + } + } + } finally { + for (final var pair : handlers) { + unsubscribeReceiveHandler(pair); + } + try { + httpExchange.getResponseBody().close(); + } catch (IOException ignored) { + } + } + } catch (Throwable aEx) { + logger.error("Failed to process request.", aEx); + sendResponse(500, null, httpExchange); + } + } + + private void handleCheckEndpoint(HttpExchange httpExchange) throws IOException { + if (!"/api/v1/check".equals(httpExchange.getRequestURI().getPath())) { + sendResponse(404, null, httpExchange); + return; + } + if (!"GET".equals(httpExchange.getRequestMethod())) { + sendResponse(405, null, httpExchange); + return; + } + + sendResponse(200, null, httpExchange); + } + + private List getManagerFromQuery(final Map query) { + if (m != null) { + return List.of(m); + } + if (c != null) { + final var account = query.get("account"); + if (account == null || account.isEmpty()) { + return c.getManagers(); + } else { + final var manager = c.getManager(account); + if (manager == null) { + return null; + } + return List.of(manager); + } + } + throw new AssertionError("Unreachable state"); + } + + private List> subscribeReceiveHandlers( + final List managers, final ServerSentEventSender sender, Callable unsubscribe + ) { + return managers.stream().map(m1 -> { + final var receiveMessageHandler = new JsonReceiveMessageHandler(m1, s -> { + try { + sender.sendEvent(null, "receive", List.of(objectMapper.writeValueAsString(s))); + } catch (IOException e) { + unsubscribe.call(); + } + }); + m1.addReceiveHandler(receiveMessageHandler); + return new Pair<>(m1, (Manager.ReceiveMessageHandler) receiveMessageHandler); + }).toList(); + } + + private void unsubscribeReceiveHandler(final Pair pair) { + final var m = pair.first(); + final var handler = pair.second(); + m.removeReceiveHandler(handler); + } + + private interface Callable { + + void call(); + } }