]> nmode's Git Repositories - signal-cli/commitdiff
Unsubscribe receive if jsonRpcSender channel is closed
authorAsamK <asamk@gmx.de>
Wed, 8 Jun 2022 15:50:20 +0000 (17:50 +0200)
committerAsamK <asamk@gmx.de>
Wed, 8 Jun 2022 15:51:18 +0000 (17:51 +0200)
lib/src/main/java/org/asamk/signal/manager/ManagerImpl.java
src/main/java/org/asamk/signal/jsonrpc/SignalJsonRpcDispatcherHandler.java

index dc7e743ec6221e0355636434a71c0396327aa3bf..fcc50e098141bb89af9bcaeed60c8d9d86010b6e 100644 (file)
@@ -860,7 +860,8 @@ class ManagerImpl implements Manager {
             logger.debug("Starting receiving messages");
             context.getReceiveHelper().receiveMessagesContinuously((envelope, e) -> {
                 synchronized (messageHandlers) {
             logger.debug("Starting receiving messages");
             context.getReceiveHelper().receiveMessagesContinuously((envelope, e) -> {
                 synchronized (messageHandlers) {
-                    Stream.concat(messageHandlers.stream(), weakHandlers.stream()).forEach(h -> {
+                    final var handlers = Stream.concat(messageHandlers.stream(), weakHandlers.stream()).toList();
+                    handlers.forEach(h -> {
                         try {
                             h.handleMessage(envelope, e);
                         } catch (Throwable ex) {
                         try {
                             h.handleMessage(envelope, e);
                         } catch (Throwable ex) {
index 086681f795d98119df68bfd0336bb5432f358024..181233bc38dfe26f0b7ee80bb45dff87090e0261 100644 (file)
@@ -30,6 +30,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.nio.channels.ClosedChannelException;
 import java.nio.channels.OverlappingFileLockException;
 import java.util.HashMap;
 import java.util.List;
 import java.nio.channels.OverlappingFileLockException;
 import java.util.HashMap;
 import java.util.List;
@@ -101,7 +102,14 @@ public class SignalJsonRpcDispatcherHandler {
             final var receiveMessageHandler = new JsonReceiveMessageHandler(m, s -> {
                 final ContainerNode<?> params = objectMapper.valueToTree(s);
                 ((ObjectNode) params).set("subscription", IntNode.valueOf(subscriptionId));
             final var receiveMessageHandler = new JsonReceiveMessageHandler(m, s -> {
                 final ContainerNode<?> params = objectMapper.valueToTree(s);
                 ((ObjectNode) params).set("subscription", IntNode.valueOf(subscriptionId));
-                jsonRpcSender.sendRequest(JsonRpcRequest.forNotification("receive", params, null));
+                final var jsonRpcRequest = JsonRpcRequest.forNotification("receive", params, null);
+                try {
+                    jsonRpcSender.sendRequest(jsonRpcRequest);
+                } catch (AssertionError e) {
+                    if (e.getCause() instanceof ClosedChannelException) {
+                        unsubscribeReceive(subscriptionId);
+                    }
+                }
             });
             m.addReceiveHandler(receiveMessageHandler);
             return new Pair<>(m, (Manager.ReceiveMessageHandler) receiveMessageHandler);
             });
             m.addReceiveHandler(receiveMessageHandler);
             return new Pair<>(m, (Manager.ReceiveMessageHandler) receiveMessageHandler);