From: AsamK Date: Wed, 8 Jun 2022 15:50:20 +0000 (+0200) Subject: Unsubscribe receive if jsonRpcSender channel is closed X-Git-Tag: v0.10.8~8 X-Git-Url: https://git.nmode.ca/signal-cli/commitdiff_plain/c8cd36bde884fe03004dfa05357eac34394b4b01?ds=inline Unsubscribe receive if jsonRpcSender channel is closed --- diff --git a/lib/src/main/java/org/asamk/signal/manager/ManagerImpl.java b/lib/src/main/java/org/asamk/signal/manager/ManagerImpl.java index dc7e743e..fcc50e09 100644 --- a/lib/src/main/java/org/asamk/signal/manager/ManagerImpl.java +++ b/lib/src/main/java/org/asamk/signal/manager/ManagerImpl.java @@ -860,7 +860,8 @@ class ManagerImpl implements Manager { logger.debug("Starting receiving messages"); context.getReceiveHelper().receiveMessagesContinuously((envelope, e) -> { synchronized (messageHandlers) { - Stream.concat(messageHandlers.stream(), weakHandlers.stream()).forEach(h -> { + final var handlers = Stream.concat(messageHandlers.stream(), weakHandlers.stream()).toList(); + handlers.forEach(h -> { try { h.handleMessage(envelope, e); } catch (Throwable ex) { diff --git a/src/main/java/org/asamk/signal/jsonrpc/SignalJsonRpcDispatcherHandler.java b/src/main/java/org/asamk/signal/jsonrpc/SignalJsonRpcDispatcherHandler.java index 086681f7..181233bc 100644 --- a/src/main/java/org/asamk/signal/jsonrpc/SignalJsonRpcDispatcherHandler.java +++ b/src/main/java/org/asamk/signal/jsonrpc/SignalJsonRpcDispatcherHandler.java @@ -30,6 +30,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.nio.channels.ClosedChannelException; import java.nio.channels.OverlappingFileLockException; import java.util.HashMap; import java.util.List; @@ -101,7 +102,14 @@ public class SignalJsonRpcDispatcherHandler { final var receiveMessageHandler = new JsonReceiveMessageHandler(m, s -> { final ContainerNode params = objectMapper.valueToTree(s); ((ObjectNode) params).set("subscription", IntNode.valueOf(subscriptionId)); - jsonRpcSender.sendRequest(JsonRpcRequest.forNotification("receive", params, null)); + final var jsonRpcRequest = JsonRpcRequest.forNotification("receive", params, null); + try { + jsonRpcSender.sendRequest(jsonRpcRequest); + } catch (AssertionError e) { + if (e.getCause() instanceof ClosedChannelException) { + unsubscribeReceive(subscriptionId); + } + } }); m.addReceiveHandler(receiveMessageHandler); return new Pair<>(m, (Manager.ReceiveMessageHandler) receiveMessageHandler);