From 1058e33f1208e1ca284d3fd4760380d68093741f Mon Sep 17 00:00:00 2001 From: AsamK Date: Thu, 9 Nov 2023 18:50:07 +0100 Subject: [PATCH] Use improved shutdown for daemon command --- .../asamk/signal/commands/DaemonCommand.java | 52 ++++++++++++++----- .../org/asamk/signal/dbus/DbusSignalImpl.java | 3 +- .../asamk/signal/http/HttpServerHandler.java | 27 ++++++++-- 3 files changed, 64 insertions(+), 18 deletions(-) diff --git a/src/main/java/org/asamk/signal/commands/DaemonCommand.java b/src/main/java/org/asamk/signal/commands/DaemonCommand.java index aee1330d..209b071c 100644 --- a/src/main/java/org/asamk/signal/commands/DaemonCommand.java +++ b/src/main/java/org/asamk/signal/commands/DaemonCommand.java @@ -7,6 +7,7 @@ import net.sourceforge.argparse4j.inf.Subparser; import org.asamk.signal.DbusConfig; import org.asamk.signal.OutputType; import org.asamk.signal.ReceiveMessageHandler; +import org.asamk.signal.Shutdown; import org.asamk.signal.commands.exceptions.CommandException; import org.asamk.signal.commands.exceptions.IOErrorException; import org.asamk.signal.commands.exceptions.UnexpectedErrorException; @@ -35,9 +36,11 @@ import java.net.InetSocketAddress; import java.net.UnixDomainSocketAddress; import java.nio.channels.Channel; import java.nio.channels.Channels; +import java.nio.channels.ClosedChannelException; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; import java.util.List; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; @@ -104,6 +107,7 @@ public class DaemonCommand implements MultiLocalCommand, LocalCommand { public void handleCommand( final Namespace ns, final Manager m, final OutputWriter outputWriter ) throws CommandException { + Shutdown.installHandler(); logger.info("Starting daemon in single-account mode for " + m.getSelfNumber()); final var noReceiveStdOut = Boolean.TRUE.equals(ns.getBoolean("no-receive-stdout")); final var receiveMode = ns.get("receive-mode"); @@ -115,17 +119,11 @@ public class DaemonCommand implements MultiLocalCommand, LocalCommand { try (final var daemonHandler = new SingleAccountDaemonHandler(m, receiveMode)) { setup(ns, daemonHandler); - m.addClosedListener(() -> { - synchronized (this) { - notifyAll(); - } - }); + m.addClosedListener(Shutdown::triggerShutdown); - synchronized (this) { - try { - wait(); - } catch (InterruptedException ignored) { - } + try { + Shutdown.waitForShutdown(); + } catch (InterruptedException ignored) { } } } @@ -134,6 +132,7 @@ public class DaemonCommand implements MultiLocalCommand, LocalCommand { public void handleCommand( final Namespace ns, final MultiAccountManager c, final OutputWriter outputWriter ) throws CommandException { + Shutdown.installHandler(); logger.info("Starting daemon in multi-account mode"); final var noReceiveStdOut = Boolean.TRUE.equals(ns.getBoolean("no-receive-stdout")); final var receiveMode = ns.get("receive-mode"); @@ -152,7 +151,7 @@ public class DaemonCommand implements MultiLocalCommand, LocalCommand { synchronized (this) { try { - wait(); + Shutdown.waitForShutdown(); } catch (InterruptedException ignored) { } } @@ -220,6 +219,8 @@ public class DaemonCommand implements MultiLocalCommand, LocalCommand { private static abstract class DaemonHandler implements AutoCloseable { protected final ReceiveMode receiveMode; + protected final List closeables = new ArrayList<>(); + private static final AtomicInteger threadNumber = new AtomicInteger(0); public DaemonHandler(final ReceiveMode receiveMode) { @@ -233,7 +234,8 @@ public class DaemonCommand implements MultiLocalCommand, LocalCommand { public abstract void runHttp(InetSocketAddress address) throws CommandException; protected void runSocket(final ServerSocketChannel serverChannel, Consumer socketHandler) { - Thread.ofPlatform().name("daemon-listener").start(() -> { + final List channels = new ArrayList<>(); + final var thread = Thread.ofPlatform().name("daemon-listener").start(() -> { try (final var executor = Executors.newCachedThreadPool()) { while (true) { final var connectionId = threadNumber.getAndIncrement(); @@ -243,10 +245,14 @@ public class DaemonCommand implements MultiLocalCommand, LocalCommand { channel = serverChannel.accept(); clientString = channel.getRemoteAddress() + " " + IOUtils.getUnixDomainPrincipal(channel); logger.info("Accepted new client connection {}: {}", connectionId, clientString); + } catch (ClosedChannelException ignored) { + logger.trace("Listening socket has been closed"); + break; } catch (IOException e) { logger.error("Failed to accept new socket connection", e); break; } + channels.add(channel); executor.submit(() -> { try (final var c = channel) { socketHandler.accept(c); @@ -256,10 +262,18 @@ public class DaemonCommand implements MultiLocalCommand, LocalCommand { logger.warn("Connection handler failed, closing connection", e); } logger.info("Connection {} closed: {}", connectionId, clientString); + channels.remove(channel); }); } } }); + closeables.add(() -> { + serverChannel.close(); + for (final var c : new ArrayList<>(channels)) { + c.close(); + } + thread.join(); + }); } protected SignalJsonRpcDispatcherHandler getSignalJsonRpcDispatcherHandler(final SocketChannel c) { @@ -273,6 +287,7 @@ public class DaemonCommand implements MultiLocalCommand, LocalCommand { protected Thread exportDbusObject(final DBusConnection conn, final String objectPath, final Manager m) { final var signal = new DbusSignalImpl(m, conn, objectPath, receiveMode != ReceiveMode.ON_START); + closeables.add(signal); return Thread.ofPlatform().name("dbus-init-" + m.getSelfNumber()).start(signal::initObjects); } @@ -303,13 +318,21 @@ public class DaemonCommand implements MultiLocalCommand, LocalCommand { "Dbus command failed, maybe signal-cli dbus daemon is already running: " + e.getMessage(), e); } + closeables.add(conn); logger.info("DBus daemon running on {} bus: {}", busType, DbusConfig.getBusname()); } @Override public void close() { - // TODO + for (final var closeable : new ArrayList<>(closeables)) { + try { + closeable.close(); + } catch (Exception e) { + logger.warn("Failed to close daemon handler", e); + } + } + closeables.clear(); } } @@ -348,6 +371,7 @@ public class DaemonCommand implements MultiLocalCommand, LocalCommand { } catch (IOException ex) { throw new IOErrorException("Failed to initialize HTTP Server", ex); } + this.closeables.add(handler); } } @@ -385,6 +409,7 @@ public class DaemonCommand implements MultiLocalCommand, LocalCommand { final var object = connection.getExportedObject(null, path); if (object instanceof DbusSignalImpl dbusSignal) { dbusSignal.close(); + closeables.remove(dbusSignal); } } catch (DBusException ignored) { } @@ -409,6 +434,7 @@ public class DaemonCommand implements MultiLocalCommand, LocalCommand { } catch (IOException ex) { throw new IOErrorException("Failed to initialize HTTP Server", ex); } + this.closeables.add(handler); } private Thread exportManager( diff --git a/src/main/java/org/asamk/signal/dbus/DbusSignalImpl.java b/src/main/java/org/asamk/signal/dbus/DbusSignalImpl.java index a5f28c67..e6ec70d9 100644 --- a/src/main/java/org/asamk/signal/dbus/DbusSignalImpl.java +++ b/src/main/java/org/asamk/signal/dbus/DbusSignalImpl.java @@ -62,7 +62,7 @@ import java.util.stream.Collectors; import static org.asamk.signal.dbus.DbusUtils.makeValidObjectPathElement; -public class DbusSignalImpl implements Signal { +public class DbusSignalImpl implements Signal, AutoCloseable { private final Manager m; private final DBusConnection connection; @@ -108,6 +108,7 @@ public class DbusSignalImpl implements Signal { updateIdentities(); } + @Override public void close() { if (dbusMessageHandler != null) { m.removeReceiveHandler(dbusMessageHandler); diff --git a/src/main/java/org/asamk/signal/http/HttpServerHandler.java b/src/main/java/org/asamk/signal/http/HttpServerHandler.java index b888464e..6cd3b0ef 100644 --- a/src/main/java/org/asamk/signal/http/HttpServerHandler.java +++ b/src/main/java/org/asamk/signal/http/HttpServerHandler.java @@ -24,7 +24,7 @@ import java.util.Map; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; -public class HttpServerHandler { +public class HttpServerHandler implements AutoCloseable { private final static Logger logger = LoggerFactory.getLogger(HttpServerHandler.class); @@ -35,6 +35,8 @@ public class HttpServerHandler { private final SignalJsonRpcCommandHandler commandHandler; private final MultiAccountManager c; private final Manager m; + private HttpServer server; + private final AtomicBoolean shutdown = new AtomicBoolean(false); public HttpServerHandler(final InetSocketAddress address, final Manager m) { this.address = address; @@ -51,9 +53,12 @@ public class HttpServerHandler { } public void init() throws IOException { + if (server != null) { + throw new AssertionError("HttpServerHandler already initialized"); + } logger.info("Starting server on " + address.toString()); - final var server = HttpServer.create(address, 0); + server = HttpServer.create(address, 0); server.setExecutor(Executors.newCachedThreadPool()); server.createContext("/api/v1/rpc", this::handleRpcEndpoint); @@ -153,7 +158,7 @@ public class HttpServerHandler { final var handlers = subscribeReceiveHandlers(managers, sender, () -> { shouldStop.set(true); synchronized (this) { - this.notify(); + this.notifyAll(); } }); @@ -162,7 +167,7 @@ public class HttpServerHandler { synchronized (this) { wait(15_000); } - if (shouldStop.get()) { + if (shouldStop.get() || shutdown.get()) { break; } @@ -241,6 +246,20 @@ public class HttpServerHandler { m.removeReceiveHandler(handler); } + @Override + public void close() { + if (server != null) { + shutdown.set(true); + synchronized (this) { + this.notifyAll(); + } + // Increase this delay when https://bugs.openjdk.org/browse/JDK-8304065 is fixed + server.stop(2); + server = null; + shutdown.set(false); + } + } + private interface Callable { void call(); -- 2.50.1