From ed8ac5b84ccea9dac672021aec74c26d035d17e4 Mon Sep 17 00:00:00 2001 From: AsamK Date: Tue, 17 Oct 2023 21:56:10 +0200 Subject: [PATCH] Use new threads API --- .../signal/manager/internal/ManagerImpl.java | 7 +- .../storage/sendLog/MessageSendLogStore.java | 5 +- .../asamk/signal/commands/DaemonCommand.java | 70 +++++++++---------- .../signal/dbus/DbusSignalControlImpl.java | 4 +- .../asamk/signal/http/HttpServerHandler.java | 2 +- .../asamk/signal/jsonrpc/JsonRpcReader.java | 10 +-- src/main/java/org/asamk/signal/util/Util.java | 17 ----- 7 files changed, 40 insertions(+), 75 deletions(-) diff --git a/lib/src/main/java/org/asamk/signal/manager/internal/ManagerImpl.java b/lib/src/main/java/org/asamk/signal/manager/internal/ManagerImpl.java index 3feed994..99bede8d 100644 --- a/lib/src/main/java/org/asamk/signal/manager/internal/ManagerImpl.java +++ b/lib/src/main/java/org/asamk/signal/manager/internal/ManagerImpl.java @@ -1019,7 +1019,7 @@ public class ManagerImpl implements Manager { if (receiveThread != null || isReceivingSynchronous) { return; } - receiveThread = new Thread(() -> { + receiveThread = Thread.ofPlatform().name("receive-" + threadNumber.getAndIncrement()).start(() -> { logger.debug("Starting receiving messages"); context.getReceiveHelper().receiveMessagesContinuously(this::passReceivedMessageToHandlers); logger.debug("Finished receiving messages"); @@ -1033,9 +1033,6 @@ public class ManagerImpl implements Manager { } } }); - receiveThread.setName("receive-" + threadNumber.getAndIncrement()); - - receiveThread.start(); } private void passReceivedMessageToHandlers(MessageEnvelope envelope, Throwable e) { @@ -1310,7 +1307,7 @@ public class ManagerImpl implements Manager { if (thread != null) { stopReceiveThread(thread); } - executor.shutdown(); + executor.close(); dependencies.getSignalWebSocket().disconnect(); disposable.dispose(); diff --git a/lib/src/main/java/org/asamk/signal/manager/storage/sendLog/MessageSendLogStore.java b/lib/src/main/java/org/asamk/signal/manager/storage/sendLog/MessageSendLogStore.java index 751f345b..910da3cf 100644 --- a/lib/src/main/java/org/asamk/signal/manager/storage/sendLog/MessageSendLogStore.java +++ b/lib/src/main/java/org/asamk/signal/manager/storage/sendLog/MessageSendLogStore.java @@ -38,7 +38,7 @@ public class MessageSendLogStore implements AutoCloseable { public MessageSendLogStore(final Database database, final boolean disableMessageSendLog) { this.database = database; this.sendLogDisabled = disableMessageSendLog; - this.cleanupThread = new Thread(() -> { + this.cleanupThread = Thread.ofPlatform().name("msl-cleanup").daemon().start(() -> { try { final var interval = Duration.ofHours(1).toMillis(); while (!Thread.interrupted()) { @@ -55,9 +55,6 @@ public class MessageSendLogStore implements AutoCloseable { logger.debug("Stopping msl cleanup thread"); } }); - cleanupThread.setName("msl-cleanup"); - cleanupThread.setDaemon(true); - cleanupThread.start(); } public static void createSql(Connection connection) throws SQLException { diff --git a/src/main/java/org/asamk/signal/commands/DaemonCommand.java b/src/main/java/org/asamk/signal/commands/DaemonCommand.java index 5c1fac53..dcc21d45 100644 --- a/src/main/java/org/asamk/signal/commands/DaemonCommand.java +++ b/src/main/java/org/asamk/signal/commands/DaemonCommand.java @@ -39,6 +39,7 @@ import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.nio.charset.StandardCharsets; import java.util.List; +import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; @@ -257,19 +258,19 @@ public class DaemonCommand implements MultiLocalCommand, LocalCommand { m.addReceiveHandler(handler, isWeakListener); } - private void runSocketSingleAccount( + private Thread runSocketSingleAccount( final Manager m, final ServerSocketChannel serverChannel, final boolean noReceiveOnStart ) { - runSocket(serverChannel, channel -> { + return runSocket(serverChannel, channel -> { final var handler = getSignalJsonRpcDispatcherHandler(channel, noReceiveOnStart); handler.handleConnection(m); }); } - private void runSocketMultiAccount( + private Thread runSocketMultiAccount( final MultiAccountManager c, final ServerSocketChannel serverChannel, final boolean noReceiveOnStart ) { - runSocket(serverChannel, channel -> { + return runSocket(serverChannel, channel -> { final var handler = getSignalJsonRpcDispatcherHandler(channel, noReceiveOnStart); handler.handleConnection(c); }); @@ -277,39 +278,37 @@ public class DaemonCommand implements MultiLocalCommand, LocalCommand { private static final AtomicInteger threadNumber = new AtomicInteger(0); - private void runSocket(final ServerSocketChannel serverChannel, Consumer socketHandler) { - final var thread = new Thread(() -> { - while (true) { - final var connectionId = threadNumber.getAndIncrement(); - final SocketChannel channel; - final String clientString; - try { - channel = serverChannel.accept(); - clientString = channel.getRemoteAddress() + " " + IOUtils.getUnixDomainPrincipal(channel); - logger.info("Accepted new client connection {}: {}", connectionId, clientString); - } catch (IOException e) { - logger.error("Failed to accept new socket connection", e); - synchronized (this) { - notifyAll(); - } - break; - } - final var connectionThread = new Thread(() -> { - try (final var c = channel) { - socketHandler.accept(c); + private Thread runSocket(final ServerSocketChannel serverChannel, Consumer socketHandler) { + return Thread.ofPlatform().name("daemon-listener").start(() -> { + try (final var executor = Executors.newCachedThreadPool()) { + while (true) { + final var connectionId = threadNumber.getAndIncrement(); + final SocketChannel channel; + final String clientString; + try { + channel = serverChannel.accept(); + clientString = channel.getRemoteAddress() + " " + IOUtils.getUnixDomainPrincipal(channel); + logger.info("Accepted new client connection {}: {}", connectionId, clientString); } catch (IOException e) { - logger.warn("Failed to close channel", e); - } catch (Throwable e) { - logger.warn("Connection handler failed, closing connection", e); + logger.error("Failed to accept new socket connection", e); + break; } - logger.info("Connection {} closed: {}", connectionId, clientString); - }); - connectionThread.setName("daemon-connection-" + connectionId); - connectionThread.start(); + executor.submit(() -> { + try (final var c = channel) { + socketHandler.accept(c); + } catch (IOException e) { + logger.warn("Failed to close channel", e); + } catch (Throwable e) { + logger.warn("Connection handler failed, closing connection", e); + } + logger.info("Connection {} closed: {}", connectionId, clientString); + }); + } + } + synchronized (this) { + notifyAll(); } }); - thread.setName("daemon-listener"); - thread.start(); } private SignalJsonRpcDispatcherHandler getSignalJsonRpcDispatcherHandler( @@ -411,11 +410,8 @@ public class DaemonCommand implements MultiLocalCommand, LocalCommand { final DBusConnection conn, final String objectPath, final Manager m, final boolean noReceiveOnStart ) { final var signal = new DbusSignalImpl(m, conn, objectPath, noReceiveOnStart); - final var initThread = new Thread(signal::initObjects); - initThread.setName("dbus-init"); - initThread.start(); - return initThread; + return Thread.ofPlatform().name("dbus-init-" + m.getSelfNumber()).start(signal::initObjects); } interface DbusRunner { diff --git a/src/main/java/org/asamk/signal/dbus/DbusSignalControlImpl.java b/src/main/java/org/asamk/signal/dbus/DbusSignalControlImpl.java index 5c4d807e..7452eaee 100644 --- a/src/main/java/org/asamk/signal/dbus/DbusSignalControlImpl.java +++ b/src/main/java/org/asamk/signal/dbus/DbusSignalControlImpl.java @@ -103,7 +103,7 @@ public class DbusSignalControlImpl implements org.asamk.SignalControl { public String link(final String newDeviceName) throws Error.Failure { try { final URI deviceLinkUri = c.getNewProvisioningDeviceLinkUri(); - final var thread = new Thread(() -> { + Thread.ofPlatform().name("dbus-link").start(() -> { final ProvisioningManager provisioningManager = c.getProvisioningManagerFor(deviceLinkUri); try { provisioningManager.finishDeviceLink(newDeviceName); @@ -111,8 +111,6 @@ public class DbusSignalControlImpl implements org.asamk.SignalControl { e.printStackTrace(); } }); - thread.setName("dbus-link"); - thread.start(); return deviceLinkUri.toString(); } catch (TimeoutException | IOException e) { throw new SignalControl.Error.Failure(e.getClass().getSimpleName() + " " + e.getMessage()); diff --git a/src/main/java/org/asamk/signal/http/HttpServerHandler.java b/src/main/java/org/asamk/signal/http/HttpServerHandler.java index 39e7f51e..4d13e22c 100644 --- a/src/main/java/org/asamk/signal/http/HttpServerHandler.java +++ b/src/main/java/org/asamk/signal/http/HttpServerHandler.java @@ -54,7 +54,7 @@ public class HttpServerHandler { logger.info("Starting server on " + address.toString()); final var server = HttpServer.create(address, 0); - server.setExecutor(Executors.newFixedThreadPool(10)); + server.setExecutor(Executors.newCachedThreadPool()); server.createContext("/api/v1/rpc", this::handleRpcEndpoint); server.createContext("/api/v1/events", this::handleEventsEndpoint); diff --git a/src/main/java/org/asamk/signal/jsonrpc/JsonRpcReader.java b/src/main/java/org/asamk/signal/jsonrpc/JsonRpcReader.java index f9ef71a1..d1503d01 100644 --- a/src/main/java/org/asamk/signal/jsonrpc/JsonRpcReader.java +++ b/src/main/java/org/asamk/signal/jsonrpc/JsonRpcReader.java @@ -55,8 +55,7 @@ public class JsonRpcReader { return; } - final var executor = Executors.newFixedThreadPool(10); - try { + try (final var executor = Executors.newCachedThreadPool()) { while (!Thread.interrupted()) { final var input = lineSupplier.get(); if (input == null) { @@ -72,8 +71,6 @@ public class JsonRpcReader { executor.submit(() -> handleMessage(message, requestHandler, responseHandler)); } - } finally { - Util.closeExecutorService(executor); } } @@ -94,8 +91,7 @@ public class JsonRpcReader { case JsonRpcBatchMessage jsonRpcBatchMessage -> { final var messages = jsonRpcBatchMessage.getMessages(); final var responseList = new ArrayList(messages.size()); - final var executor = Executors.newFixedThreadPool(10); - try { + try (final var executor = Executors.newCachedThreadPool()) { final var lock = new ReentrantLock(); messages.forEach(jsonNode -> { final JsonRpcRequest request; @@ -124,8 +120,6 @@ public class JsonRpcReader { } }); }); - } finally { - Util.closeExecutorService(executor); } if (!responseList.isEmpty()) { diff --git a/src/main/java/org/asamk/signal/util/Util.java b/src/main/java/org/asamk/signal/util/Util.java index e193d253..63717ec0 100644 --- a/src/main/java/org/asamk/signal/util/Util.java +++ b/src/main/java/org/asamk/signal/util/Util.java @@ -16,8 +16,6 @@ import java.util.List; import java.util.Locale; import java.util.Map; import java.util.Optional; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; public class Util { @@ -86,19 +84,4 @@ public class Util { } return map; } - - public static void closeExecutorService(ExecutorService executor) { - executor.shutdown(); - try { - if (!executor.awaitTermination(5, TimeUnit.MINUTES)) { - executor.shutdownNow(); - if (!executor.awaitTermination(1, TimeUnit.MINUTES)) { - logger.warn("Failed to shutdown executor service"); - } - } - } catch (InterruptedException e) { - executor.shutdownNow(); - Thread.currentThread().interrupt(); - } - } } -- 2.50.1