]> nmode's Git Repositories - signal-cli/commitdiff
Use new threads API
authorAsamK <asamk@gmx.de>
Tue, 17 Oct 2023 19:56:10 +0000 (21:56 +0200)
committerAsamK <asamk@gmx.de>
Tue, 24 Oct 2023 15:36:32 +0000 (17:36 +0200)
lib/src/main/java/org/asamk/signal/manager/internal/ManagerImpl.java
lib/src/main/java/org/asamk/signal/manager/storage/sendLog/MessageSendLogStore.java
src/main/java/org/asamk/signal/commands/DaemonCommand.java
src/main/java/org/asamk/signal/dbus/DbusSignalControlImpl.java
src/main/java/org/asamk/signal/http/HttpServerHandler.java
src/main/java/org/asamk/signal/jsonrpc/JsonRpcReader.java
src/main/java/org/asamk/signal/util/Util.java

index 3feed994cb38fd934ea65549484fff7e4dec7a7e..99bede8d30f997765a261440535511c9e4775525 100644 (file)
@@ -1019,7 +1019,7 @@ public class ManagerImpl implements Manager {
         if (receiveThread != null || isReceivingSynchronous) {
             return;
         }
-        receiveThread = new Thread(() -> {
+        receiveThread = Thread.ofPlatform().name("receive-" + threadNumber.getAndIncrement()).start(() -> {
             logger.debug("Starting receiving messages");
             context.getReceiveHelper().receiveMessagesContinuously(this::passReceivedMessageToHandlers);
             logger.debug("Finished receiving messages");
@@ -1033,9 +1033,6 @@ public class ManagerImpl implements Manager {
                 }
             }
         });
-        receiveThread.setName("receive-" + threadNumber.getAndIncrement());
-
-        receiveThread.start();
     }
 
     private void passReceivedMessageToHandlers(MessageEnvelope envelope, Throwable e) {
@@ -1310,7 +1307,7 @@ public class ManagerImpl implements Manager {
         if (thread != null) {
             stopReceiveThread(thread);
         }
-        executor.shutdown();
+        executor.close();
 
         dependencies.getSignalWebSocket().disconnect();
         disposable.dispose();
index 751f345bed1cb5d8c4f16347533d1d6c027833c4..910da3cf464d7f95aa858665d092fb94e3c6fdb8 100644 (file)
@@ -38,7 +38,7 @@ public class MessageSendLogStore implements AutoCloseable {
     public MessageSendLogStore(final Database database, final boolean disableMessageSendLog) {
         this.database = database;
         this.sendLogDisabled = disableMessageSendLog;
-        this.cleanupThread = new Thread(() -> {
+        this.cleanupThread = Thread.ofPlatform().name("msl-cleanup").daemon().start(() -> {
             try {
                 final var interval = Duration.ofHours(1).toMillis();
                 while (!Thread.interrupted()) {
@@ -55,9 +55,6 @@ public class MessageSendLogStore implements AutoCloseable {
                 logger.debug("Stopping msl cleanup thread");
             }
         });
-        cleanupThread.setName("msl-cleanup");
-        cleanupThread.setDaemon(true);
-        cleanupThread.start();
     }
 
     public static void createSql(Connection connection) throws SQLException {
index 5c1fac53499553cc0f76973ee88bfdc6704a71c6..dcc21d45ddf001d5ef9284b784f93ffc3058daad 100644 (file)
@@ -39,6 +39,7 @@ import java.nio.channels.ServerSocketChannel;
 import java.nio.channels.SocketChannel;
 import java.nio.charset.StandardCharsets;
 import java.util.List;
+import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Consumer;
 
@@ -257,19 +258,19 @@ public class DaemonCommand implements MultiLocalCommand, LocalCommand {
         m.addReceiveHandler(handler, isWeakListener);
     }
 
-    private void runSocketSingleAccount(
+    private Thread runSocketSingleAccount(
             final Manager m, final ServerSocketChannel serverChannel, final boolean noReceiveOnStart
     ) {
-        runSocket(serverChannel, channel -> {
+        return runSocket(serverChannel, channel -> {
             final var handler = getSignalJsonRpcDispatcherHandler(channel, noReceiveOnStart);
             handler.handleConnection(m);
         });
     }
 
-    private void runSocketMultiAccount(
+    private Thread runSocketMultiAccount(
             final MultiAccountManager c, final ServerSocketChannel serverChannel, final boolean noReceiveOnStart
     ) {
-        runSocket(serverChannel, channel -> {
+        return runSocket(serverChannel, channel -> {
             final var handler = getSignalJsonRpcDispatcherHandler(channel, noReceiveOnStart);
             handler.handleConnection(c);
         });
@@ -277,39 +278,37 @@ public class DaemonCommand implements MultiLocalCommand, LocalCommand {
 
     private static final AtomicInteger threadNumber = new AtomicInteger(0);
 
-    private void runSocket(final ServerSocketChannel serverChannel, Consumer<SocketChannel> socketHandler) {
-        final var thread = new Thread(() -> {
-            while (true) {
-                final var connectionId = threadNumber.getAndIncrement();
-                final SocketChannel channel;
-                final String clientString;
-                try {
-                    channel = serverChannel.accept();
-                    clientString = channel.getRemoteAddress() + " " + IOUtils.getUnixDomainPrincipal(channel);
-                    logger.info("Accepted new client connection {}: {}", connectionId, clientString);
-                } catch (IOException e) {
-                    logger.error("Failed to accept new socket connection", e);
-                    synchronized (this) {
-                        notifyAll();
-                    }
-                    break;
-                }
-                final var connectionThread = new Thread(() -> {
-                    try (final var c = channel) {
-                        socketHandler.accept(c);
+    private Thread runSocket(final ServerSocketChannel serverChannel, Consumer<SocketChannel> socketHandler) {
+        return Thread.ofPlatform().name("daemon-listener").start(() -> {
+            try (final var executor = Executors.newCachedThreadPool()) {
+                while (true) {
+                    final var connectionId = threadNumber.getAndIncrement();
+                    final SocketChannel channel;
+                    final String clientString;
+                    try {
+                        channel = serverChannel.accept();
+                        clientString = channel.getRemoteAddress() + " " + IOUtils.getUnixDomainPrincipal(channel);
+                        logger.info("Accepted new client connection {}: {}", connectionId, clientString);
                     } catch (IOException e) {
-                        logger.warn("Failed to close channel", e);
-                    } catch (Throwable e) {
-                        logger.warn("Connection handler failed, closing connection", e);
+                        logger.error("Failed to accept new socket connection", e);
+                        break;
                     }
-                    logger.info("Connection {} closed: {}", connectionId, clientString);
-                });
-                connectionThread.setName("daemon-connection-" + connectionId);
-                connectionThread.start();
+                    executor.submit(() -> {
+                        try (final var c = channel) {
+                            socketHandler.accept(c);
+                        } catch (IOException e) {
+                            logger.warn("Failed to close channel", e);
+                        } catch (Throwable e) {
+                            logger.warn("Connection handler failed, closing connection", e);
+                        }
+                        logger.info("Connection {} closed: {}", connectionId, clientString);
+                    });
+                }
+            }
+            synchronized (this) {
+                notifyAll();
             }
         });
-        thread.setName("daemon-listener");
-        thread.start();
     }
 
     private SignalJsonRpcDispatcherHandler getSignalJsonRpcDispatcherHandler(
@@ -411,11 +410,8 @@ public class DaemonCommand implements MultiLocalCommand, LocalCommand {
             final DBusConnection conn, final String objectPath, final Manager m, final boolean noReceiveOnStart
     ) {
         final var signal = new DbusSignalImpl(m, conn, objectPath, noReceiveOnStart);
-        final var initThread = new Thread(signal::initObjects);
-        initThread.setName("dbus-init");
-        initThread.start();
 
-        return initThread;
+        return Thread.ofPlatform().name("dbus-init-" + m.getSelfNumber()).start(signal::initObjects);
     }
 
     interface DbusRunner {
index 5c4d807ecca04e95e2abc17900436a7d5b24ab1a..7452eaee3ab5cc446e23bd79c209041b3da6f52b 100644 (file)
@@ -103,7 +103,7 @@ public class DbusSignalControlImpl implements org.asamk.SignalControl {
     public String link(final String newDeviceName) throws Error.Failure {
         try {
             final URI deviceLinkUri = c.getNewProvisioningDeviceLinkUri();
-            final var thread = new Thread(() -> {
+            Thread.ofPlatform().name("dbus-link").start(() -> {
                 final ProvisioningManager provisioningManager = c.getProvisioningManagerFor(deviceLinkUri);
                 try {
                     provisioningManager.finishDeviceLink(newDeviceName);
@@ -111,8 +111,6 @@ public class DbusSignalControlImpl implements org.asamk.SignalControl {
                     e.printStackTrace();
                 }
             });
-            thread.setName("dbus-link");
-            thread.start();
             return deviceLinkUri.toString();
         } catch (TimeoutException | IOException e) {
             throw new SignalControl.Error.Failure(e.getClass().getSimpleName() + " " + e.getMessage());
index 39e7f51eaa87e363242c2ece255993e21a984f85..4d13e22c30fd70d4b95dee14e13b8ae75e34e60e 100644 (file)
@@ -54,7 +54,7 @@ public class HttpServerHandler {
         logger.info("Starting server on " + address.toString());
 
         final var server = HttpServer.create(address, 0);
-        server.setExecutor(Executors.newFixedThreadPool(10));
+        server.setExecutor(Executors.newCachedThreadPool());
 
         server.createContext("/api/v1/rpc", this::handleRpcEndpoint);
         server.createContext("/api/v1/events", this::handleEventsEndpoint);
index f9ef71a1f05cbc8ca72cba4ee90ab88786d5eeba..d1503d0165287f6cf0795d6941ef0af9a6193351 100644 (file)
@@ -55,8 +55,7 @@ public class JsonRpcReader {
             return;
         }
 
-        final var executor = Executors.newFixedThreadPool(10);
-        try {
+        try (final var executor = Executors.newCachedThreadPool()) {
             while (!Thread.interrupted()) {
                 final var input = lineSupplier.get();
                 if (input == null) {
@@ -72,8 +71,6 @@ public class JsonRpcReader {
 
                 executor.submit(() -> handleMessage(message, requestHandler, responseHandler));
             }
-        } finally {
-            Util.closeExecutorService(executor);
         }
     }
 
@@ -94,8 +91,7 @@ public class JsonRpcReader {
             case JsonRpcBatchMessage jsonRpcBatchMessage -> {
                 final var messages = jsonRpcBatchMessage.getMessages();
                 final var responseList = new ArrayList<JsonRpcResponse>(messages.size());
-                final var executor = Executors.newFixedThreadPool(10);
-                try {
+                try (final var executor = Executors.newCachedThreadPool()) {
                     final var lock = new ReentrantLock();
                     messages.forEach(jsonNode -> {
                         final JsonRpcRequest request;
@@ -124,8 +120,6 @@ public class JsonRpcReader {
                             }
                         });
                     });
-                } finally {
-                    Util.closeExecutorService(executor);
                 }
 
                 if (!responseList.isEmpty()) {
index e193d25330b7843fe9b67ef8c933fef8d7882045..63717ec031b9071ea669e8c2dd3e3191a34a7c8d 100644 (file)
@@ -16,8 +16,6 @@ import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 import java.util.Optional;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 public class Util {
@@ -86,19 +84,4 @@ public class Util {
         }
         return map;
     }
-
-    public static void closeExecutorService(ExecutorService executor) {
-        executor.shutdown();
-        try {
-            if (!executor.awaitTermination(5, TimeUnit.MINUTES)) {
-                executor.shutdownNow();
-                if (!executor.awaitTermination(1, TimeUnit.MINUTES)) {
-                    logger.warn("Failed to shutdown executor service");
-                }
-            }
-        } catch (InterruptedException e) {
-            executor.shutdownNow();
-            Thread.currentThread().interrupt();
-        }
-    }
 }