import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
-public class HttpServerHandler {
+public class HttpServerHandler implements AutoCloseable {
- private final static Logger logger = LoggerFactory.getLogger(HttpServerHandler.class);
+ private static final Logger logger = LoggerFactory.getLogger(HttpServerHandler.class);
private final ObjectMapper objectMapper = Util.createJsonObjectMapper();
private final SignalJsonRpcCommandHandler commandHandler;
private final MultiAccountManager c;
private final Manager m;
+ private HttpServer server;
+ private final AtomicBoolean shutdown = new AtomicBoolean(false);
public HttpServerHandler(final InetSocketAddress address, final Manager m) {
this.address = address;
}
public void init() throws IOException {
- logger.info("Starting server on " + address.toString());
+ if (server != null) {
+ throw new AssertionError("HttpServerHandler already initialized");
+ }
+ logger.debug("Starting HTTP server on {}", address);
- final var server = HttpServer.create(address, 0);
- server.setExecutor(Executors.newFixedThreadPool(10));
+ server = HttpServer.create(address, 0);
+ server.setExecutor(Executors.newCachedThreadPool());
server.createContext("/api/v1/rpc", this::handleRpcEndpoint);
server.createContext("/api/v1/events", this::handleEventsEndpoint);
server.createContext("/api/v1/check", this::handleCheckEndpoint);
server.start();
+ logger.info("Started HTTP server on {}", address);
+ }
+
+ @Override
+ public void close() {
+ if (server != null) {
+ shutdown.set(true);
+ synchronized (this) {
+ this.notifyAll();
+ }
+ // Increase this delay when https://bugs.openjdk.org/browse/JDK-8304065 is fixed
+ server.stop(2);
+ server = null;
+ shutdown.set(false);
+ }
}
private void sendResponse(int status, Object response, HttpExchange httpExchange) throws IOException {
final var handlers = subscribeReceiveHandlers(managers, sender, () -> {
shouldStop.set(true);
synchronized (this) {
- this.notify();
+ this.notifyAll();
}
});
synchronized (this) {
wait(15_000);
}
- if (shouldStop.get()) {
+ if (shouldStop.get() || shutdown.get()) {
break;
}
}
private List<Manager> getManagerFromQuery(final Map<String, String> query) {
- List<Manager> managers;
if (m != null) {
- managers = List.of(m);
- } else {
+ return List.of(m);
+ }
+ if (c != null) {
final var account = query.get("account");
if (account == null || account.isEmpty()) {
- managers = c.getManagers();
+ return c.getManagers();
} else {
final var manager = c.getManager(account);
if (manager == null) {
return null;
}
- managers = List.of(manager);
+ return List.of(manager);
}
}
- return managers;
+ throw new AssertionError("Unreachable state");
}
private List<Pair<Manager, Manager.ReceiveMessageHandler>> subscribeReceiveHandlers(