]> nmode's Git Repositories - signal-cli/blobdiff - src/main/java/org/asamk/signal/jsonrpc/SocketHandler.java
Refactor DaemonCommand
[signal-cli] / src / main / java / org / asamk / signal / jsonrpc / SocketHandler.java
diff --git a/src/main/java/org/asamk/signal/jsonrpc/SocketHandler.java b/src/main/java/org/asamk/signal/jsonrpc/SocketHandler.java
new file mode 100644 (file)
index 0000000..dc620bc
--- /dev/null
@@ -0,0 +1,118 @@
+package org.asamk.signal.jsonrpc;
+
+import org.asamk.signal.manager.Manager;
+import org.asamk.signal.manager.MultiAccountManager;
+import org.asamk.signal.output.JsonWriterImpl;
+import org.asamk.signal.util.IOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.SocketAddress;
+import java.nio.channels.Channels;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
+
+public class SocketHandler implements AutoCloseable {
+
+    private static final Logger logger = LoggerFactory.getLogger(SocketHandler.class);
+    private static final AtomicInteger threadNumber = new AtomicInteger(0);
+
+    private final ServerSocketChannel serverChannel;
+
+    private Thread listenerThread;
+    private final List<AutoCloseable> channels = new ArrayList<>();
+    private final Consumer<SocketChannel> socketHandler;
+    private final boolean noReceiveOnStart;
+
+    public SocketHandler(final ServerSocketChannel serverChannel, final Manager m, final boolean noReceiveOnStart) {
+        this.serverChannel = serverChannel;
+        this.socketHandler = channel -> getSignalJsonRpcDispatcherHandler(channel).handleConnection(m);
+        this.noReceiveOnStart = noReceiveOnStart;
+    }
+
+    public SocketHandler(
+            final ServerSocketChannel serverChannel, final MultiAccountManager c, final boolean noReceiveOnStart
+    ) {
+        this.serverChannel = serverChannel;
+        this.socketHandler = channel -> getSignalJsonRpcDispatcherHandler(channel).handleConnection(c);
+        this.noReceiveOnStart = noReceiveOnStart;
+    }
+
+    public void init() {
+        if (listenerThread != null) {
+            throw new AssertionError("SocketHandler already initialized");
+        }
+        SocketAddress socketAddress;
+        try {
+            socketAddress = serverChannel.getLocalAddress();
+        } catch (IOException e) {
+            logger.debug("Failed to get socket address: {}", e.getMessage());
+            socketAddress = null;
+        }
+        final var address = socketAddress == null ? "<Unknown socket address>" : socketAddress;
+        logger.debug("Starting JSON-RPC server on {}", address);
+
+        listenerThread = Thread.ofPlatform().name("daemon-listener").start(() -> {
+            try (final var executor = Executors.newCachedThreadPool()) {
+                logger.info("Started JSON-RPC server on {}", address);
+                while (true) {
+                    final var connectionId = threadNumber.getAndIncrement();
+                    final SocketChannel channel;
+                    final String clientString;
+                    try {
+                        channel = serverChannel.accept();
+                        clientString = channel.getRemoteAddress() + " " + IOUtils.getUnixDomainPrincipal(channel);
+                        logger.info("Accepted new client connection {}: {}", connectionId, clientString);
+                    } catch (ClosedChannelException ignored) {
+                        logger.trace("Listening socket has been closed");
+                        break;
+                    } catch (IOException e) {
+                        logger.error("Failed to accept new socket connection", e);
+                        break;
+                    }
+                    channels.add(channel);
+                    executor.submit(() -> {
+                        try (final var c = channel) {
+                            socketHandler.accept(c);
+                        } catch (IOException e) {
+                            logger.warn("Failed to close channel", e);
+                        } catch (Throwable e) {
+                            logger.warn("Connection handler failed, closing connection", e);
+                        }
+                        logger.info("Connection {} closed: {}", connectionId, clientString);
+                        channels.remove(channel);
+                    });
+                }
+            }
+        });
+    }
+
+    @Override
+    public void close() throws Exception {
+        if (listenerThread == null) {
+            return;
+        }
+        serverChannel.close();
+        for (final var c : new ArrayList<>(channels)) {
+            c.close();
+        }
+        listenerThread.join();
+        channels.clear();
+        listenerThread = null;
+    }
+
+    private SignalJsonRpcDispatcherHandler getSignalJsonRpcDispatcherHandler(final SocketChannel c) {
+        final var lineSupplier = IOUtils.getLineSupplier(Channels.newReader(c, StandardCharsets.UTF_8));
+        final var jsonOutputWriter = new JsonWriterImpl(Channels.newWriter(c, StandardCharsets.UTF_8));
+
+        return new SignalJsonRpcDispatcherHandler(jsonOutputWriter, lineSupplier, noReceiveOnStart);
+    }
+}