1 package org
.asamk
.signal
.jsonrpc
;
3 import org
.asamk
.signal
.manager
.Manager
;
4 import org
.asamk
.signal
.manager
.MultiAccountManager
;
5 import org
.asamk
.signal
.output
.JsonWriterImpl
;
6 import org
.asamk
.signal
.util
.IOUtils
;
7 import org
.slf4j
.Logger
;
8 import org
.slf4j
.LoggerFactory
;
10 import java
.io
.IOException
;
11 import java
.net
.SocketAddress
;
12 import java
.nio
.channels
.Channels
;
13 import java
.nio
.channels
.ClosedChannelException
;
14 import java
.nio
.channels
.ServerSocketChannel
;
15 import java
.nio
.channels
.SocketChannel
;
16 import java
.nio
.charset
.StandardCharsets
;
17 import java
.util
.ArrayList
;
18 import java
.util
.List
;
19 import java
.util
.concurrent
.Executors
;
20 import java
.util
.concurrent
.atomic
.AtomicInteger
;
21 import java
.util
.function
.Consumer
;
23 public class SocketHandler
implements AutoCloseable
{
25 private static final Logger logger
= LoggerFactory
.getLogger(SocketHandler
.class);
26 private static final AtomicInteger threadNumber
= new AtomicInteger(0);
28 private final ServerSocketChannel serverChannel
;
30 private Thread listenerThread
;
31 private final List
<AutoCloseable
> channels
= new ArrayList
<>();
32 private final Consumer
<SocketChannel
> socketHandler
;
33 private final boolean noReceiveOnStart
;
35 public SocketHandler(final ServerSocketChannel serverChannel
, final Manager m
, final boolean noReceiveOnStart
) {
36 this.serverChannel
= serverChannel
;
37 this.socketHandler
= channel
-> getSignalJsonRpcDispatcherHandler(channel
).handleConnection(m
);
38 this.noReceiveOnStart
= noReceiveOnStart
;
42 final ServerSocketChannel serverChannel
,
43 final MultiAccountManager c
,
44 final boolean noReceiveOnStart
46 this.serverChannel
= serverChannel
;
47 this.socketHandler
= channel
-> getSignalJsonRpcDispatcherHandler(channel
).handleConnection(c
);
48 this.noReceiveOnStart
= noReceiveOnStart
;
52 if (listenerThread
!= null) {
53 throw new AssertionError("SocketHandler already initialized");
55 SocketAddress socketAddress
;
57 socketAddress
= serverChannel
.getLocalAddress();
58 } catch (IOException e
) {
59 logger
.debug("Failed to get socket address: {}", e
.getMessage());
62 final var address
= socketAddress
== null ?
"<Unknown socket address>" : socketAddress
;
63 logger
.debug("Starting JSON-RPC server on {}", address
);
65 listenerThread
= Thread
.ofPlatform().name("daemon-listener").start(() -> {
66 try (final var executor
= Executors
.newCachedThreadPool()) {
67 logger
.info("Started JSON-RPC server on {}", address
);
69 final var connectionId
= threadNumber
.getAndIncrement();
70 final SocketChannel channel
;
71 final String clientString
;
73 channel
= serverChannel
.accept();
74 clientString
= channel
.getRemoteAddress() + " " + IOUtils
.getUnixDomainPrincipal(channel
);
75 logger
.info("Accepted new client connection {}: {}", connectionId
, clientString
);
76 } catch (ClosedChannelException ignored
) {
77 logger
.trace("Listening socket has been closed");
79 } catch (IOException e
) {
80 logger
.error("Failed to accept new socket connection", e
);
83 channels
.add(channel
);
84 executor
.submit(() -> {
85 try (final var c
= channel
) {
86 socketHandler
.accept(c
);
87 } catch (IOException e
) {
88 logger
.warn("Failed to close channel", e
);
89 } catch (Throwable e
) {
90 logger
.warn("Connection handler failed, closing connection", e
);
92 logger
.info("Connection {} closed: {}", connectionId
, clientString
);
93 channels
.remove(channel
);
101 public void close() throws Exception
{
102 if (listenerThread
== null) {
105 serverChannel
.close();
106 for (final var c
: new ArrayList
<>(channels
)) {
109 listenerThread
.join();
111 listenerThread
= null;
114 private SignalJsonRpcDispatcherHandler
getSignalJsonRpcDispatcherHandler(final SocketChannel c
) {
115 final var lineSupplier
= IOUtils
.getLineSupplier(Channels
.newReader(c
, StandardCharsets
.UTF_8
));
116 final var jsonOutputWriter
= new JsonWriterImpl(Channels
.newWriter(c
, StandardCharsets
.UTF_8
));
118 return new SignalJsonRpcDispatcherHandler(jsonOutputWriter
, lineSupplier
, noReceiveOnStart
);