import org.asamk.signal.DbusConfig;
import org.asamk.signal.OutputType;
import org.asamk.signal.ReceiveMessageHandler;
+import org.asamk.signal.Shutdown;
import org.asamk.signal.commands.exceptions.CommandException;
import org.asamk.signal.commands.exceptions.IOErrorException;
import org.asamk.signal.commands.exceptions.UnexpectedErrorException;
import java.net.UnixDomainSocketAddress;
import java.nio.channels.Channel;
import java.nio.channels.Channels;
+import java.nio.channels.ClosedChannelException;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
public void handleCommand(
final Namespace ns, final Manager m, final OutputWriter outputWriter
) throws CommandException {
+ Shutdown.installHandler();
logger.info("Starting daemon in single-account mode for " + m.getSelfNumber());
final var noReceiveStdOut = Boolean.TRUE.equals(ns.getBoolean("no-receive-stdout"));
final var receiveMode = ns.<ReceiveMode>get("receive-mode");
try (final var daemonHandler = new SingleAccountDaemonHandler(m, receiveMode)) {
setup(ns, daemonHandler);
- m.addClosedListener(() -> {
- synchronized (this) {
- notifyAll();
- }
- });
+ m.addClosedListener(Shutdown::triggerShutdown);
- synchronized (this) {
- try {
- wait();
- } catch (InterruptedException ignored) {
- }
+ try {
+ Shutdown.waitForShutdown();
+ } catch (InterruptedException ignored) {
}
}
}
public void handleCommand(
final Namespace ns, final MultiAccountManager c, final OutputWriter outputWriter
) throws CommandException {
+ Shutdown.installHandler();
logger.info("Starting daemon in multi-account mode");
final var noReceiveStdOut = Boolean.TRUE.equals(ns.getBoolean("no-receive-stdout"));
final var receiveMode = ns.<ReceiveMode>get("receive-mode");
synchronized (this) {
try {
- wait();
+ Shutdown.waitForShutdown();
} catch (InterruptedException ignored) {
}
}
private static abstract class DaemonHandler implements AutoCloseable {
protected final ReceiveMode receiveMode;
+ protected final List<AutoCloseable> closeables = new ArrayList<>();
+
private static final AtomicInteger threadNumber = new AtomicInteger(0);
public DaemonHandler(final ReceiveMode receiveMode) {
public abstract void runHttp(InetSocketAddress address) throws CommandException;
protected void runSocket(final ServerSocketChannel serverChannel, Consumer<SocketChannel> socketHandler) {
- Thread.ofPlatform().name("daemon-listener").start(() -> {
+ final List<AutoCloseable> channels = new ArrayList<>();
+ final var thread = Thread.ofPlatform().name("daemon-listener").start(() -> {
try (final var executor = Executors.newCachedThreadPool()) {
while (true) {
final var connectionId = threadNumber.getAndIncrement();
channel = serverChannel.accept();
clientString = channel.getRemoteAddress() + " " + IOUtils.getUnixDomainPrincipal(channel);
logger.info("Accepted new client connection {}: {}", connectionId, clientString);
+ } catch (ClosedChannelException ignored) {
+ logger.trace("Listening socket has been closed");
+ break;
} catch (IOException e) {
logger.error("Failed to accept new socket connection", e);
break;
}
+ channels.add(channel);
executor.submit(() -> {
try (final var c = channel) {
socketHandler.accept(c);
logger.warn("Connection handler failed, closing connection", e);
}
logger.info("Connection {} closed: {}", connectionId, clientString);
+ channels.remove(channel);
});
}
}
});
+ closeables.add(() -> {
+ serverChannel.close();
+ for (final var c : new ArrayList<>(channels)) {
+ c.close();
+ }
+ thread.join();
+ });
}
protected SignalJsonRpcDispatcherHandler getSignalJsonRpcDispatcherHandler(final SocketChannel c) {
protected Thread exportDbusObject(final DBusConnection conn, final String objectPath, final Manager m) {
final var signal = new DbusSignalImpl(m, conn, objectPath, receiveMode != ReceiveMode.ON_START);
+ closeables.add(signal);
return Thread.ofPlatform().name("dbus-init-" + m.getSelfNumber()).start(signal::initObjects);
}
"Dbus command failed, maybe signal-cli dbus daemon is already running: " + e.getMessage(),
e);
}
+ closeables.add(conn);
logger.info("DBus daemon running on {} bus: {}", busType, DbusConfig.getBusname());
}
@Override
public void close() {
- // TODO
+ for (final var closeable : new ArrayList<>(closeables)) {
+ try {
+ closeable.close();
+ } catch (Exception e) {
+ logger.warn("Failed to close daemon handler", e);
+ }
+ }
+ closeables.clear();
}
}
} catch (IOException ex) {
throw new IOErrorException("Failed to initialize HTTP Server", ex);
}
+ this.closeables.add(handler);
}
}
final var object = connection.getExportedObject(null, path);
if (object instanceof DbusSignalImpl dbusSignal) {
dbusSignal.close();
+ closeables.remove(dbusSignal);
}
} catch (DBusException ignored) {
}
} catch (IOException ex) {
throw new IOErrorException("Failed to initialize HTTP Server", ex);
}
+ this.closeables.add(handler);
}
private Thread exportManager(
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
-public class HttpServerHandler {
+public class HttpServerHandler implements AutoCloseable {
private final static Logger logger = LoggerFactory.getLogger(HttpServerHandler.class);
private final SignalJsonRpcCommandHandler commandHandler;
private final MultiAccountManager c;
private final Manager m;
+ private HttpServer server;
+ private final AtomicBoolean shutdown = new AtomicBoolean(false);
public HttpServerHandler(final InetSocketAddress address, final Manager m) {
this.address = address;
}
public void init() throws IOException {
+ if (server != null) {
+ throw new AssertionError("HttpServerHandler already initialized");
+ }
logger.info("Starting server on " + address.toString());
- final var server = HttpServer.create(address, 0);
+ server = HttpServer.create(address, 0);
server.setExecutor(Executors.newCachedThreadPool());
server.createContext("/api/v1/rpc", this::handleRpcEndpoint);
final var handlers = subscribeReceiveHandlers(managers, sender, () -> {
shouldStop.set(true);
synchronized (this) {
- this.notify();
+ this.notifyAll();
}
});
synchronized (this) {
wait(15_000);
}
- if (shouldStop.get()) {
+ if (shouldStop.get() || shutdown.get()) {
break;
}
m.removeReceiveHandler(handler);
}
+ @Override
+ public void close() {
+ if (server != null) {
+ shutdown.set(true);
+ synchronized (this) {
+ this.notifyAll();
+ }
+ // Increase this delay when https://bugs.openjdk.org/browse/JDK-8304065 is fixed
+ server.stop(2);
+ server = null;
+ shutdown.set(false);
+ }
+ }
+
private interface Callable {
void call();