]> nmode's Git Repositories - signal-cli/commitdiff
Use improved shutdown for daemon command
authorAsamK <asamk@gmx.de>
Thu, 9 Nov 2023 17:50:07 +0000 (18:50 +0100)
committerAsamK <asamk@gmx.de>
Thu, 9 Nov 2023 18:23:11 +0000 (19:23 +0100)
src/main/java/org/asamk/signal/commands/DaemonCommand.java
src/main/java/org/asamk/signal/dbus/DbusSignalImpl.java
src/main/java/org/asamk/signal/http/HttpServerHandler.java

index aee1330d0b0bb218405f4ca45978fea8fa42ee08..209b071c9f4045259a7e1cfcff504bdba99b75bb 100644 (file)
@@ -7,6 +7,7 @@ import net.sourceforge.argparse4j.inf.Subparser;
 import org.asamk.signal.DbusConfig;
 import org.asamk.signal.OutputType;
 import org.asamk.signal.ReceiveMessageHandler;
+import org.asamk.signal.Shutdown;
 import org.asamk.signal.commands.exceptions.CommandException;
 import org.asamk.signal.commands.exceptions.IOErrorException;
 import org.asamk.signal.commands.exceptions.UnexpectedErrorException;
@@ -35,9 +36,11 @@ import java.net.InetSocketAddress;
 import java.net.UnixDomainSocketAddress;
 import java.nio.channels.Channel;
 import java.nio.channels.Channels;
+import java.nio.channels.ClosedChannelException;
 import java.nio.channels.ServerSocketChannel;
 import java.nio.channels.SocketChannel;
 import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -104,6 +107,7 @@ public class DaemonCommand implements MultiLocalCommand, LocalCommand {
     public void handleCommand(
             final Namespace ns, final Manager m, final OutputWriter outputWriter
     ) throws CommandException {
+        Shutdown.installHandler();
         logger.info("Starting daemon in single-account mode for " + m.getSelfNumber());
         final var noReceiveStdOut = Boolean.TRUE.equals(ns.getBoolean("no-receive-stdout"));
         final var receiveMode = ns.<ReceiveMode>get("receive-mode");
@@ -115,17 +119,11 @@ public class DaemonCommand implements MultiLocalCommand, LocalCommand {
         try (final var daemonHandler = new SingleAccountDaemonHandler(m, receiveMode)) {
             setup(ns, daemonHandler);
 
-            m.addClosedListener(() -> {
-                synchronized (this) {
-                    notifyAll();
-                }
-            });
+            m.addClosedListener(Shutdown::triggerShutdown);
 
-            synchronized (this) {
-                try {
-                    wait();
-                } catch (InterruptedException ignored) {
-                }
+            try {
+                Shutdown.waitForShutdown();
+            } catch (InterruptedException ignored) {
             }
         }
     }
@@ -134,6 +132,7 @@ public class DaemonCommand implements MultiLocalCommand, LocalCommand {
     public void handleCommand(
             final Namespace ns, final MultiAccountManager c, final OutputWriter outputWriter
     ) throws CommandException {
+        Shutdown.installHandler();
         logger.info("Starting daemon in multi-account mode");
         final var noReceiveStdOut = Boolean.TRUE.equals(ns.getBoolean("no-receive-stdout"));
         final var receiveMode = ns.<ReceiveMode>get("receive-mode");
@@ -152,7 +151,7 @@ public class DaemonCommand implements MultiLocalCommand, LocalCommand {
 
             synchronized (this) {
                 try {
-                    wait();
+                    Shutdown.waitForShutdown();
                 } catch (InterruptedException ignored) {
                 }
             }
@@ -220,6 +219,8 @@ public class DaemonCommand implements MultiLocalCommand, LocalCommand {
     private static abstract class DaemonHandler implements AutoCloseable {
 
         protected final ReceiveMode receiveMode;
+        protected final List<AutoCloseable> closeables = new ArrayList<>();
+
         private static final AtomicInteger threadNumber = new AtomicInteger(0);
 
         public DaemonHandler(final ReceiveMode receiveMode) {
@@ -233,7 +234,8 @@ public class DaemonCommand implements MultiLocalCommand, LocalCommand {
         public abstract void runHttp(InetSocketAddress address) throws CommandException;
 
         protected void runSocket(final ServerSocketChannel serverChannel, Consumer<SocketChannel> socketHandler) {
-            Thread.ofPlatform().name("daemon-listener").start(() -> {
+            final List<AutoCloseable> channels = new ArrayList<>();
+            final var thread = Thread.ofPlatform().name("daemon-listener").start(() -> {
                 try (final var executor = Executors.newCachedThreadPool()) {
                     while (true) {
                         final var connectionId = threadNumber.getAndIncrement();
@@ -243,10 +245,14 @@ public class DaemonCommand implements MultiLocalCommand, LocalCommand {
                             channel = serverChannel.accept();
                             clientString = channel.getRemoteAddress() + " " + IOUtils.getUnixDomainPrincipal(channel);
                             logger.info("Accepted new client connection {}: {}", connectionId, clientString);
+                        } catch (ClosedChannelException ignored) {
+                            logger.trace("Listening socket has been closed");
+                            break;
                         } catch (IOException e) {
                             logger.error("Failed to accept new socket connection", e);
                             break;
                         }
+                        channels.add(channel);
                         executor.submit(() -> {
                             try (final var c = channel) {
                                 socketHandler.accept(c);
@@ -256,10 +262,18 @@ public class DaemonCommand implements MultiLocalCommand, LocalCommand {
                                 logger.warn("Connection handler failed, closing connection", e);
                             }
                             logger.info("Connection {} closed: {}", connectionId, clientString);
+                            channels.remove(channel);
                         });
                     }
                 }
             });
+            closeables.add(() -> {
+                serverChannel.close();
+                for (final var c : new ArrayList<>(channels)) {
+                    c.close();
+                }
+                thread.join();
+            });
         }
 
         protected SignalJsonRpcDispatcherHandler getSignalJsonRpcDispatcherHandler(final SocketChannel c) {
@@ -273,6 +287,7 @@ public class DaemonCommand implements MultiLocalCommand, LocalCommand {
 
         protected Thread exportDbusObject(final DBusConnection conn, final String objectPath, final Manager m) {
             final var signal = new DbusSignalImpl(m, conn, objectPath, receiveMode != ReceiveMode.ON_START);
+            closeables.add(signal);
 
             return Thread.ofPlatform().name("dbus-init-" + m.getSelfNumber()).start(signal::initObjects);
         }
@@ -303,13 +318,21 @@ public class DaemonCommand implements MultiLocalCommand, LocalCommand {
                         "Dbus command failed, maybe signal-cli dbus daemon is already running: " + e.getMessage(),
                         e);
             }
+            closeables.add(conn);
 
             logger.info("DBus daemon running on {} bus: {}", busType, DbusConfig.getBusname());
         }
 
         @Override
         public void close() {
-            // TODO
+            for (final var closeable : new ArrayList<>(closeables)) {
+                try {
+                    closeable.close();
+                } catch (Exception e) {
+                    logger.warn("Failed to close daemon handler", e);
+                }
+            }
+            closeables.clear();
         }
     }
 
@@ -348,6 +371,7 @@ public class DaemonCommand implements MultiLocalCommand, LocalCommand {
             } catch (IOException ex) {
                 throw new IOErrorException("Failed to initialize HTTP Server", ex);
             }
+            this.closeables.add(handler);
         }
     }
 
@@ -385,6 +409,7 @@ public class DaemonCommand implements MultiLocalCommand, LocalCommand {
                         final var object = connection.getExportedObject(null, path);
                         if (object instanceof DbusSignalImpl dbusSignal) {
                             dbusSignal.close();
+                            closeables.remove(dbusSignal);
                         }
                     } catch (DBusException ignored) {
                     }
@@ -409,6 +434,7 @@ public class DaemonCommand implements MultiLocalCommand, LocalCommand {
             } catch (IOException ex) {
                 throw new IOErrorException("Failed to initialize HTTP Server", ex);
             }
+            this.closeables.add(handler);
         }
 
         private Thread exportManager(
index a5f28c678a92e9b04e06985dba370435f9ce5c57..e6ec70d9970e7636aead78f637bfff3013ac9036 100644 (file)
@@ -62,7 +62,7 @@ import java.util.stream.Collectors;
 
 import static org.asamk.signal.dbus.DbusUtils.makeValidObjectPathElement;
 
-public class DbusSignalImpl implements Signal {
+public class DbusSignalImpl implements Signal, AutoCloseable {
 
     private final Manager m;
     private final DBusConnection connection;
@@ -108,6 +108,7 @@ public class DbusSignalImpl implements Signal {
         updateIdentities();
     }
 
+    @Override
     public void close() {
         if (dbusMessageHandler != null) {
             m.removeReceiveHandler(dbusMessageHandler);
index b888464e1bd624267c372e70d42164ec12b34509..6cd3b0ef1f07a91fa63a237568967027378bf93c 100644 (file)
@@ -24,7 +24,7 @@ import java.util.Map;
 import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-public class HttpServerHandler {
+public class HttpServerHandler implements AutoCloseable {
 
     private final static Logger logger = LoggerFactory.getLogger(HttpServerHandler.class);
 
@@ -35,6 +35,8 @@ public class HttpServerHandler {
     private final SignalJsonRpcCommandHandler commandHandler;
     private final MultiAccountManager c;
     private final Manager m;
+    private HttpServer server;
+    private final AtomicBoolean shutdown = new AtomicBoolean(false);
 
     public HttpServerHandler(final InetSocketAddress address, final Manager m) {
         this.address = address;
@@ -51,9 +53,12 @@ public class HttpServerHandler {
     }
 
     public void init() throws IOException {
+        if (server != null) {
+            throw new AssertionError("HttpServerHandler already initialized");
+        }
         logger.info("Starting server on " + address.toString());
 
-        final var server = HttpServer.create(address, 0);
+        server = HttpServer.create(address, 0);
         server.setExecutor(Executors.newCachedThreadPool());
 
         server.createContext("/api/v1/rpc", this::handleRpcEndpoint);
@@ -153,7 +158,7 @@ public class HttpServerHandler {
             final var handlers = subscribeReceiveHandlers(managers, sender, () -> {
                 shouldStop.set(true);
                 synchronized (this) {
-                    this.notify();
+                    this.notifyAll();
                 }
             });
 
@@ -162,7 +167,7 @@ public class HttpServerHandler {
                     synchronized (this) {
                         wait(15_000);
                     }
-                    if (shouldStop.get()) {
+                    if (shouldStop.get() || shutdown.get()) {
                         break;
                     }
 
@@ -241,6 +246,20 @@ public class HttpServerHandler {
         m.removeReceiveHandler(handler);
     }
 
+    @Override
+    public void close() {
+        if (server != null) {
+            shutdown.set(true);
+            synchronized (this) {
+                this.notifyAll();
+            }
+            // Increase this delay when https://bugs.openjdk.org/browse/JDK-8304065 is fixed
+            server.stop(2);
+            server = null;
+            shutdown.set(false);
+        }
+    }
+
     private interface Callable {
 
         void call();