]> nmode's Git Repositories - signal-cli/blobdiff - src/main/java/org/asamk/signal/commands/DaemonCommand.java
Use new threads API
[signal-cli] / src / main / java / org / asamk / signal / commands / DaemonCommand.java
index 5c1fac53499553cc0f76973ee88bfdc6704a71c6..dcc21d45ddf001d5ef9284b784f93ffc3058daad 100644 (file)
@@ -39,6 +39,7 @@ import java.nio.channels.ServerSocketChannel;
 import java.nio.channels.SocketChannel;
 import java.nio.charset.StandardCharsets;
 import java.util.List;
+import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Consumer;
 
@@ -257,19 +258,19 @@ public class DaemonCommand implements MultiLocalCommand, LocalCommand {
         m.addReceiveHandler(handler, isWeakListener);
     }
 
-    private void runSocketSingleAccount(
+    private Thread runSocketSingleAccount(
             final Manager m, final ServerSocketChannel serverChannel, final boolean noReceiveOnStart
     ) {
-        runSocket(serverChannel, channel -> {
+        return runSocket(serverChannel, channel -> {
             final var handler = getSignalJsonRpcDispatcherHandler(channel, noReceiveOnStart);
             handler.handleConnection(m);
         });
     }
 
-    private void runSocketMultiAccount(
+    private Thread runSocketMultiAccount(
             final MultiAccountManager c, final ServerSocketChannel serverChannel, final boolean noReceiveOnStart
     ) {
-        runSocket(serverChannel, channel -> {
+        return runSocket(serverChannel, channel -> {
             final var handler = getSignalJsonRpcDispatcherHandler(channel, noReceiveOnStart);
             handler.handleConnection(c);
         });
@@ -277,39 +278,37 @@ public class DaemonCommand implements MultiLocalCommand, LocalCommand {
 
     private static final AtomicInteger threadNumber = new AtomicInteger(0);
 
-    private void runSocket(final ServerSocketChannel serverChannel, Consumer<SocketChannel> socketHandler) {
-        final var thread = new Thread(() -> {
-            while (true) {
-                final var connectionId = threadNumber.getAndIncrement();
-                final SocketChannel channel;
-                final String clientString;
-                try {
-                    channel = serverChannel.accept();
-                    clientString = channel.getRemoteAddress() + " " + IOUtils.getUnixDomainPrincipal(channel);
-                    logger.info("Accepted new client connection {}: {}", connectionId, clientString);
-                } catch (IOException e) {
-                    logger.error("Failed to accept new socket connection", e);
-                    synchronized (this) {
-                        notifyAll();
-                    }
-                    break;
-                }
-                final var connectionThread = new Thread(() -> {
-                    try (final var c = channel) {
-                        socketHandler.accept(c);
+    private Thread runSocket(final ServerSocketChannel serverChannel, Consumer<SocketChannel> socketHandler) {
+        return Thread.ofPlatform().name("daemon-listener").start(() -> {
+            try (final var executor = Executors.newCachedThreadPool()) {
+                while (true) {
+                    final var connectionId = threadNumber.getAndIncrement();
+                    final SocketChannel channel;
+                    final String clientString;
+                    try {
+                        channel = serverChannel.accept();
+                        clientString = channel.getRemoteAddress() + " " + IOUtils.getUnixDomainPrincipal(channel);
+                        logger.info("Accepted new client connection {}: {}", connectionId, clientString);
                     } catch (IOException e) {
-                        logger.warn("Failed to close channel", e);
-                    } catch (Throwable e) {
-                        logger.warn("Connection handler failed, closing connection", e);
+                        logger.error("Failed to accept new socket connection", e);
+                        break;
                     }
-                    logger.info("Connection {} closed: {}", connectionId, clientString);
-                });
-                connectionThread.setName("daemon-connection-" + connectionId);
-                connectionThread.start();
+                    executor.submit(() -> {
+                        try (final var c = channel) {
+                            socketHandler.accept(c);
+                        } catch (IOException e) {
+                            logger.warn("Failed to close channel", e);
+                        } catch (Throwable e) {
+                            logger.warn("Connection handler failed, closing connection", e);
+                        }
+                        logger.info("Connection {} closed: {}", connectionId, clientString);
+                    });
+                }
+            }
+            synchronized (this) {
+                notifyAll();
             }
         });
-        thread.setName("daemon-listener");
-        thread.start();
     }
 
     private SignalJsonRpcDispatcherHandler getSignalJsonRpcDispatcherHandler(
@@ -411,11 +410,8 @@ public class DaemonCommand implements MultiLocalCommand, LocalCommand {
             final DBusConnection conn, final String objectPath, final Manager m, final boolean noReceiveOnStart
     ) {
         final var signal = new DbusSignalImpl(m, conn, objectPath, noReceiveOnStart);
-        final var initThread = new Thread(signal::initObjects);
-        initThread.setName("dbus-init");
-        initThread.start();
 
-        return initThread;
+        return Thread.ofPlatform().name("dbus-init-" + m.getSelfNumber()).start(signal::initObjects);
     }
 
     interface DbusRunner {