X-Git-Url: https://git.nmode.ca/signal-cli/blobdiff_plain/c0aa338d7c8e40874dbc453b3fc3916701762029..7e9940be4ac1d3b7e19bcbd1e92b8db436415195:/src/main/java/org/asamk/signal/jsonrpc/SocketHandler.java diff --git a/src/main/java/org/asamk/signal/jsonrpc/SocketHandler.java b/src/main/java/org/asamk/signal/jsonrpc/SocketHandler.java new file mode 100644 index 00000000..dc620bc8 --- /dev/null +++ b/src/main/java/org/asamk/signal/jsonrpc/SocketHandler.java @@ -0,0 +1,118 @@ +package org.asamk.signal.jsonrpc; + +import org.asamk.signal.manager.Manager; +import org.asamk.signal.manager.MultiAccountManager; +import org.asamk.signal.output.JsonWriterImpl; +import org.asamk.signal.util.IOUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.SocketAddress; +import java.nio.channels.Channels; +import java.nio.channels.ClosedChannelException; +import java.nio.channels.ServerSocketChannel; +import java.nio.channels.SocketChannel; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; + +public class SocketHandler implements AutoCloseable { + + private static final Logger logger = LoggerFactory.getLogger(SocketHandler.class); + private static final AtomicInteger threadNumber = new AtomicInteger(0); + + private final ServerSocketChannel serverChannel; + + private Thread listenerThread; + private final List channels = new ArrayList<>(); + private final Consumer socketHandler; + private final boolean noReceiveOnStart; + + public SocketHandler(final ServerSocketChannel serverChannel, final Manager m, final boolean noReceiveOnStart) { + this.serverChannel = serverChannel; + this.socketHandler = channel -> getSignalJsonRpcDispatcherHandler(channel).handleConnection(m); + this.noReceiveOnStart = noReceiveOnStart; + } + + public SocketHandler( + final ServerSocketChannel serverChannel, final MultiAccountManager c, final boolean noReceiveOnStart + ) { + this.serverChannel = serverChannel; + this.socketHandler = channel -> getSignalJsonRpcDispatcherHandler(channel).handleConnection(c); + this.noReceiveOnStart = noReceiveOnStart; + } + + public void init() { + if (listenerThread != null) { + throw new AssertionError("SocketHandler already initialized"); + } + SocketAddress socketAddress; + try { + socketAddress = serverChannel.getLocalAddress(); + } catch (IOException e) { + logger.debug("Failed to get socket address: {}", e.getMessage()); + socketAddress = null; + } + final var address = socketAddress == null ? "" : socketAddress; + logger.debug("Starting JSON-RPC server on {}", address); + + listenerThread = Thread.ofPlatform().name("daemon-listener").start(() -> { + try (final var executor = Executors.newCachedThreadPool()) { + logger.info("Started JSON-RPC server on {}", address); + while (true) { + final var connectionId = threadNumber.getAndIncrement(); + final SocketChannel channel; + final String clientString; + try { + channel = serverChannel.accept(); + clientString = channel.getRemoteAddress() + " " + IOUtils.getUnixDomainPrincipal(channel); + logger.info("Accepted new client connection {}: {}", connectionId, clientString); + } catch (ClosedChannelException ignored) { + logger.trace("Listening socket has been closed"); + break; + } catch (IOException e) { + logger.error("Failed to accept new socket connection", e); + break; + } + channels.add(channel); + executor.submit(() -> { + try (final var c = channel) { + socketHandler.accept(c); + } catch (IOException e) { + logger.warn("Failed to close channel", e); + } catch (Throwable e) { + logger.warn("Connection handler failed, closing connection", e); + } + logger.info("Connection {} closed: {}", connectionId, clientString); + channels.remove(channel); + }); + } + } + }); + } + + @Override + public void close() throws Exception { + if (listenerThread == null) { + return; + } + serverChannel.close(); + for (final var c : new ArrayList<>(channels)) { + c.close(); + } + listenerThread.join(); + channels.clear(); + listenerThread = null; + } + + private SignalJsonRpcDispatcherHandler getSignalJsonRpcDispatcherHandler(final SocketChannel c) { + final var lineSupplier = IOUtils.getLineSupplier(Channels.newReader(c, StandardCharsets.UTF_8)); + final var jsonOutputWriter = new JsonWriterImpl(Channels.newWriter(c, StandardCharsets.UTF_8)); + + return new SignalJsonRpcDispatcherHandler(jsonOutputWriter, lineSupplier, noReceiveOnStart); + } +}