X-Git-Url: https://git.nmode.ca/signal-cli/blobdiff_plain/f207c2abc390d1b6292de6e86379d1513dc358cf..942999b7b4beebf4519eed9b216587519b47e6c6:/lib/src/main/java/org/asamk/signal/manager/ManagerImpl.java diff --git a/lib/src/main/java/org/asamk/signal/manager/ManagerImpl.java b/lib/src/main/java/org/asamk/signal/manager/ManagerImpl.java index fa500712..380b00ee 100644 --- a/lib/src/main/java/org/asamk/signal/manager/ManagerImpl.java +++ b/lib/src/main/java/org/asamk/signal/manager/ManagerImpl.java @@ -133,7 +133,7 @@ class ManagerImpl implements Manager { this.dependencies = new SignalDependencies(serviceEnvironmentConfig, userAgent, account.getCredentialsProvider(), - account.getSignalProtocolStore(), + account.getSignalServiceDataStore(), executor, sessionLock); final var avatarStore = new AvatarStore(pathConfig.avatarsPath()); @@ -278,7 +278,7 @@ class ManagerImpl implements Manager { public List getLinkedDevices() throws IOException { var devices = dependencies.getAccountManager().getDevices(); account.setMultiDevice(devices.size() > 1); - var identityKey = account.getIdentityKeyPair().getPrivateKey(); + var identityKey = account.getAciIdentityKeyPair().getPrivateKey(); return devices.stream().map(d -> { String deviceName = d.getName(); if (deviceName != null) { @@ -568,7 +568,7 @@ class ManagerImpl implements Manager { final var recipientId = context.getRecipientHelper().resolveRecipient(m.recipient()); mentions.add(new SignalServiceDataMessage.Mention(context.getRecipientHelper() .resolveSignalServiceAddress(recipientId) - .getAci(), m.start(), m.length())); + .getServiceId(), m.start(), m.length())); } return mentions; } @@ -765,24 +765,17 @@ class ManagerImpl implements Manager { } receiveThread = new Thread(() -> { logger.debug("Starting receiving messages"); - while (!Thread.interrupted()) { - try { - context.getReceiveHelper().receiveMessages(Duration.ofMinutes(1), false, (envelope, e) -> { - synchronized (messageHandlers) { - Stream.concat(messageHandlers.stream(), weakHandlers.stream()).forEach(h -> { - try { - h.handleMessage(envelope, e); - } catch (Exception ex) { - logger.warn("Message handler failed, ignoring", ex); - } - }); + context.getReceiveHelper().receiveMessagesContinuously((envelope, e) -> { + synchronized (messageHandlers) { + Stream.concat(messageHandlers.stream(), weakHandlers.stream()).forEach(h -> { + try { + h.handleMessage(envelope, e); + } catch (Throwable ex) { + logger.warn("Message handler failed, ignoring", ex); } }); - break; - } catch (IOException e) { - logger.warn("Receiving messages failed, retrying", e); } - } + }); logger.debug("Finished receiving messages"); synchronized (messageHandlers) { receiveThread = null; @@ -816,7 +809,10 @@ class ManagerImpl implements Manager { } private void stopReceiveThread(final Thread thread) { - thread.interrupt(); + if (context.getReceiveHelper().requestStopReceiveMessages()) { + logger.debug("Receive stop requested, interrupting read from server."); + thread.interrupt(); + } try { thread.join(); } catch (InterruptedException ignored) { @@ -1030,14 +1026,15 @@ class ManagerImpl implements Manager { dependencies.getSignalWebSocket().disconnect(); disposable.dispose(); + if (account != null) { + account.close(); + } + synchronized (closedListeners) { closedListeners.forEach(Runnable::run); closedListeners.clear(); } - if (account != null) { - account.close(); - } account = null; } }