1 package org
.asamk
.signal
.jsonrpc
;
3 import org
.asamk
.signal
.manager
.Manager
;
4 import org
.asamk
.signal
.manager
.MultiAccountManager
;
5 import org
.asamk
.signal
.output
.JsonWriterImpl
;
6 import org
.asamk
.signal
.util
.IOUtils
;
7 import org
.slf4j
.Logger
;
8 import org
.slf4j
.LoggerFactory
;
10 import java
.io
.IOException
;
11 import java
.net
.SocketAddress
;
12 import java
.nio
.channels
.Channels
;
13 import java
.nio
.channels
.ClosedChannelException
;
14 import java
.nio
.channels
.ServerSocketChannel
;
15 import java
.nio
.channels
.SocketChannel
;
16 import java
.nio
.charset
.StandardCharsets
;
17 import java
.util
.ArrayList
;
18 import java
.util
.List
;
19 import java
.util
.concurrent
.Executors
;
20 import java
.util
.concurrent
.atomic
.AtomicInteger
;
21 import java
.util
.function
.Consumer
;
23 public class SocketHandler
implements AutoCloseable
{
25 private static final Logger logger
= LoggerFactory
.getLogger(SocketHandler
.class);
26 private static final AtomicInteger threadNumber
= new AtomicInteger(0);
28 private final ServerSocketChannel serverChannel
;
30 private Thread listenerThread
;
31 private final List
<AutoCloseable
> channels
= new ArrayList
<>();
32 private final Consumer
<SocketChannel
> socketHandler
;
33 private final boolean noReceiveOnStart
;
35 public SocketHandler(final ServerSocketChannel serverChannel
, final Manager m
, final boolean noReceiveOnStart
) {
36 this.serverChannel
= serverChannel
;
37 this.socketHandler
= channel
-> getSignalJsonRpcDispatcherHandler(channel
).handleConnection(m
);
38 this.noReceiveOnStart
= noReceiveOnStart
;
42 final ServerSocketChannel serverChannel
, final MultiAccountManager c
, final boolean noReceiveOnStart
44 this.serverChannel
= serverChannel
;
45 this.socketHandler
= channel
-> getSignalJsonRpcDispatcherHandler(channel
).handleConnection(c
);
46 this.noReceiveOnStart
= noReceiveOnStart
;
50 if (listenerThread
!= null) {
51 throw new AssertionError("SocketHandler already initialized");
53 SocketAddress socketAddress
;
55 socketAddress
= serverChannel
.getLocalAddress();
56 } catch (IOException e
) {
57 logger
.debug("Failed to get socket address: {}", e
.getMessage());
60 final var address
= socketAddress
== null ?
"<Unknown socket address>" : socketAddress
;
61 logger
.debug("Starting JSON-RPC server on {}", address
);
63 listenerThread
= Thread
.ofPlatform().name("daemon-listener").start(() -> {
64 try (final var executor
= Executors
.newCachedThreadPool()) {
65 logger
.info("Started JSON-RPC server on {}", address
);
67 final var connectionId
= threadNumber
.getAndIncrement();
68 final SocketChannel channel
;
69 final String clientString
;
71 channel
= serverChannel
.accept();
72 clientString
= channel
.getRemoteAddress() + " " + IOUtils
.getUnixDomainPrincipal(channel
);
73 logger
.info("Accepted new client connection {}: {}", connectionId
, clientString
);
74 } catch (ClosedChannelException ignored
) {
75 logger
.trace("Listening socket has been closed");
77 } catch (IOException e
) {
78 logger
.error("Failed to accept new socket connection", e
);
81 channels
.add(channel
);
82 executor
.submit(() -> {
83 try (final var c
= channel
) {
84 socketHandler
.accept(c
);
85 } catch (IOException e
) {
86 logger
.warn("Failed to close channel", e
);
87 } catch (Throwable e
) {
88 logger
.warn("Connection handler failed, closing connection", e
);
90 logger
.info("Connection {} closed: {}", connectionId
, clientString
);
91 channels
.remove(channel
);
99 public void close() throws Exception
{
100 if (listenerThread
== null) {
103 serverChannel
.close();
104 for (final var c
: new ArrayList
<>(channels
)) {
107 listenerThread
.join();
109 listenerThread
= null;
112 private SignalJsonRpcDispatcherHandler
getSignalJsonRpcDispatcherHandler(final SocketChannel c
) {
113 final var lineSupplier
= IOUtils
.getLineSupplier(Channels
.newReader(c
, StandardCharsets
.UTF_8
));
114 final var jsonOutputWriter
= new JsonWriterImpl(Channels
.newWriter(c
, StandardCharsets
.UTF_8
));
116 return new SignalJsonRpcDispatcherHandler(jsonOutputWriter
, lineSupplier
, noReceiveOnStart
);