]> nmode's Git Repositories - signal-cli/blob - src/main/java/org/asamk/signal/jsonrpc/SocketHandler.java
Refactor DaemonCommand
[signal-cli] / src / main / java / org / asamk / signal / jsonrpc / SocketHandler.java
1 package org.asamk.signal.jsonrpc;
2
3 import org.asamk.signal.manager.Manager;
4 import org.asamk.signal.manager.MultiAccountManager;
5 import org.asamk.signal.output.JsonWriterImpl;
6 import org.asamk.signal.util.IOUtils;
7 import org.slf4j.Logger;
8 import org.slf4j.LoggerFactory;
9
10 import java.io.IOException;
11 import java.net.SocketAddress;
12 import java.nio.channels.Channels;
13 import java.nio.channels.ClosedChannelException;
14 import java.nio.channels.ServerSocketChannel;
15 import java.nio.channels.SocketChannel;
16 import java.nio.charset.StandardCharsets;
17 import java.util.ArrayList;
18 import java.util.List;
19 import java.util.concurrent.Executors;
20 import java.util.concurrent.atomic.AtomicInteger;
21 import java.util.function.Consumer;
22
23 public class SocketHandler implements AutoCloseable {
24
25 private static final Logger logger = LoggerFactory.getLogger(SocketHandler.class);
26 private static final AtomicInteger threadNumber = new AtomicInteger(0);
27
28 private final ServerSocketChannel serverChannel;
29
30 private Thread listenerThread;
31 private final List<AutoCloseable> channels = new ArrayList<>();
32 private final Consumer<SocketChannel> socketHandler;
33 private final boolean noReceiveOnStart;
34
35 public SocketHandler(final ServerSocketChannel serverChannel, final Manager m, final boolean noReceiveOnStart) {
36 this.serverChannel = serverChannel;
37 this.socketHandler = channel -> getSignalJsonRpcDispatcherHandler(channel).handleConnection(m);
38 this.noReceiveOnStart = noReceiveOnStart;
39 }
40
41 public SocketHandler(
42 final ServerSocketChannel serverChannel, final MultiAccountManager c, final boolean noReceiveOnStart
43 ) {
44 this.serverChannel = serverChannel;
45 this.socketHandler = channel -> getSignalJsonRpcDispatcherHandler(channel).handleConnection(c);
46 this.noReceiveOnStart = noReceiveOnStart;
47 }
48
49 public void init() {
50 if (listenerThread != null) {
51 throw new AssertionError("SocketHandler already initialized");
52 }
53 SocketAddress socketAddress;
54 try {
55 socketAddress = serverChannel.getLocalAddress();
56 } catch (IOException e) {
57 logger.debug("Failed to get socket address: {}", e.getMessage());
58 socketAddress = null;
59 }
60 final var address = socketAddress == null ? "<Unknown socket address>" : socketAddress;
61 logger.debug("Starting JSON-RPC server on {}", address);
62
63 listenerThread = Thread.ofPlatform().name("daemon-listener").start(() -> {
64 try (final var executor = Executors.newCachedThreadPool()) {
65 logger.info("Started JSON-RPC server on {}", address);
66 while (true) {
67 final var connectionId = threadNumber.getAndIncrement();
68 final SocketChannel channel;
69 final String clientString;
70 try {
71 channel = serverChannel.accept();
72 clientString = channel.getRemoteAddress() + " " + IOUtils.getUnixDomainPrincipal(channel);
73 logger.info("Accepted new client connection {}: {}", connectionId, clientString);
74 } catch (ClosedChannelException ignored) {
75 logger.trace("Listening socket has been closed");
76 break;
77 } catch (IOException e) {
78 logger.error("Failed to accept new socket connection", e);
79 break;
80 }
81 channels.add(channel);
82 executor.submit(() -> {
83 try (final var c = channel) {
84 socketHandler.accept(c);
85 } catch (IOException e) {
86 logger.warn("Failed to close channel", e);
87 } catch (Throwable e) {
88 logger.warn("Connection handler failed, closing connection", e);
89 }
90 logger.info("Connection {} closed: {}", connectionId, clientString);
91 channels.remove(channel);
92 });
93 }
94 }
95 });
96 }
97
98 @Override
99 public void close() throws Exception {
100 if (listenerThread == null) {
101 return;
102 }
103 serverChannel.close();
104 for (final var c : new ArrayList<>(channels)) {
105 c.close();
106 }
107 listenerThread.join();
108 channels.clear();
109 listenerThread = null;
110 }
111
112 private SignalJsonRpcDispatcherHandler getSignalJsonRpcDispatcherHandler(final SocketChannel c) {
113 final var lineSupplier = IOUtils.getLineSupplier(Channels.newReader(c, StandardCharsets.UTF_8));
114 final var jsonOutputWriter = new JsonWriterImpl(Channels.newWriter(c, StandardCharsets.UTF_8));
115
116 return new SignalJsonRpcDispatcherHandler(jsonOutputWriter, lineSupplier, noReceiveOnStart);
117 }
118 }