+ private int subscribeReceive(final List<Manager> managers) {
+ final var subscriptionId = nextSubscriptionId.getAndIncrement();
+ final var handlers = managers.stream().map(m -> {
+ final var receiveMessageHandler = new JsonReceiveMessageHandler(m, s -> {
+ final ContainerNode<?> params = objectMapper.valueToTree(s);
+ ((ObjectNode) params).set("subscription", IntNode.valueOf(subscriptionId));
+ final var jsonRpcRequest = JsonRpcRequest.forNotification("receive", params, null);
+ try {
+ jsonRpcSender.sendRequest(jsonRpcRequest);
+ } catch (AssertionError e) {
+ if (e.getCause() instanceof ClosedChannelException) {
+ unsubscribeReceive(subscriptionId);
+ }