]> nmode's Git Repositories - signal-cli/blob - src/main/java/org/asamk/signal/jsonrpc/SocketHandler.java
Fix incorrect error message
[signal-cli] / src / main / java / org / asamk / signal / jsonrpc / SocketHandler.java
1 package org.asamk.signal.jsonrpc;
2
3 import org.asamk.signal.manager.Manager;
4 import org.asamk.signal.manager.MultiAccountManager;
5 import org.asamk.signal.output.JsonWriterImpl;
6 import org.asamk.signal.util.IOUtils;
7 import org.slf4j.Logger;
8 import org.slf4j.LoggerFactory;
9
10 import java.io.IOException;
11 import java.net.SocketAddress;
12 import java.nio.channels.Channels;
13 import java.nio.channels.ClosedChannelException;
14 import java.nio.channels.ServerSocketChannel;
15 import java.nio.channels.SocketChannel;
16 import java.nio.charset.StandardCharsets;
17 import java.util.ArrayList;
18 import java.util.List;
19 import java.util.concurrent.Executors;
20 import java.util.concurrent.atomic.AtomicInteger;
21 import java.util.function.Consumer;
22
23 public class SocketHandler implements AutoCloseable {
24
25 private static final Logger logger = LoggerFactory.getLogger(SocketHandler.class);
26 private static final AtomicInteger threadNumber = new AtomicInteger(0);
27
28 private final ServerSocketChannel serverChannel;
29
30 private Thread listenerThread;
31 private final List<AutoCloseable> channels = new ArrayList<>();
32 private final Consumer<SocketChannel> socketHandler;
33 private final boolean noReceiveOnStart;
34
35 public SocketHandler(final ServerSocketChannel serverChannel, final Manager m, final boolean noReceiveOnStart) {
36 this.serverChannel = serverChannel;
37 this.socketHandler = channel -> getSignalJsonRpcDispatcherHandler(channel).handleConnection(m);
38 this.noReceiveOnStart = noReceiveOnStart;
39 }
40
41 public SocketHandler(
42 final ServerSocketChannel serverChannel,
43 final MultiAccountManager c,
44 final boolean noReceiveOnStart
45 ) {
46 this.serverChannel = serverChannel;
47 this.socketHandler = channel -> getSignalJsonRpcDispatcherHandler(channel).handleConnection(c);
48 this.noReceiveOnStart = noReceiveOnStart;
49 }
50
51 public void init() {
52 if (listenerThread != null) {
53 throw new AssertionError("SocketHandler already initialized");
54 }
55 SocketAddress socketAddress;
56 try {
57 socketAddress = serverChannel.getLocalAddress();
58 } catch (IOException e) {
59 logger.debug("Failed to get socket address: {}", e.getMessage());
60 socketAddress = null;
61 }
62 final var address = socketAddress == null ? "<Unknown socket address>" : socketAddress;
63 logger.debug("Starting JSON-RPC server on {}", address);
64
65 listenerThread = Thread.ofPlatform().name("daemon-listener").start(() -> {
66 try (final var executor = Executors.newCachedThreadPool()) {
67 logger.info("Started JSON-RPC server on {}", address);
68 while (true) {
69 final var connectionId = threadNumber.getAndIncrement();
70 final SocketChannel channel;
71 final String clientString;
72 try {
73 channel = serverChannel.accept();
74 clientString = channel.getRemoteAddress() + " " + IOUtils.getUnixDomainPrincipal(channel);
75 logger.info("Accepted new client connection {}: {}", connectionId, clientString);
76 } catch (ClosedChannelException ignored) {
77 logger.trace("Listening socket has been closed");
78 break;
79 } catch (IOException e) {
80 logger.error("Failed to accept new socket connection", e);
81 break;
82 }
83 channels.add(channel);
84 executor.submit(() -> {
85 try (final var c = channel) {
86 socketHandler.accept(c);
87 } catch (IOException e) {
88 logger.warn("Failed to close channel", e);
89 } catch (Throwable e) {
90 logger.warn("Connection handler failed, closing connection", e);
91 }
92 logger.info("Connection {} closed: {}", connectionId, clientString);
93 channels.remove(channel);
94 });
95 }
96 }
97 });
98 }
99
100 @Override
101 public void close() throws Exception {
102 if (listenerThread == null) {
103 return;
104 }
105 serverChannel.close();
106 for (final var c : new ArrayList<>(channels)) {
107 c.close();
108 }
109 listenerThread.join();
110 channels.clear();
111 listenerThread = null;
112 }
113
114 private SignalJsonRpcDispatcherHandler getSignalJsonRpcDispatcherHandler(final SocketChannel c) {
115 final var lineSupplier = IOUtils.getLineSupplier(Channels.newReader(c, StandardCharsets.UTF_8));
116 final var jsonOutputWriter = new JsonWriterImpl(Channels.newWriter(c, StandardCharsets.UTF_8));
117
118 return new SignalJsonRpcDispatcherHandler(jsonOutputWriter, lineSupplier, noReceiveOnStart);
119 }
120 }