X-Git-Url: https://git.nmode.ca/signal-cli/blobdiff_plain/5c389c875d91bacba127d0e9cbdc1746b022e5aa..ce7aa580b6f0580cdcf7fd68fcc8efba737d21ed:/lib/src/main/java/org/asamk/signal/manager/ManagerImpl.java diff --git a/lib/src/main/java/org/asamk/signal/manager/ManagerImpl.java b/lib/src/main/java/org/asamk/signal/manager/ManagerImpl.java index 0421a401..fe26e9b4 100644 --- a/lib/src/main/java/org/asamk/signal/manager/ManagerImpl.java +++ b/lib/src/main/java/org/asamk/signal/manager/ManagerImpl.java @@ -137,6 +137,10 @@ public class ManagerImpl implements Manager { private boolean hasCaughtUpWithOldMessages = false; private boolean ignoreAttachments = false; + private Thread receiveThread; + private final Set messageHandlers = new HashSet<>(); + private boolean isReceivingSynchronous; + ManagerImpl( SignalAccount account, PathConfig pathConfig, @@ -165,9 +169,9 @@ public class ManagerImpl implements Manager { account.getSignalProtocolStore(), executor, sessionLock); - final var avatarStore = new AvatarStore(pathConfig.getAvatarsPath()); - final var attachmentStore = new AttachmentStore(pathConfig.getAttachmentsPath()); - final var stickerPackStore = new StickerPackStore(pathConfig.getStickerPacksPath()); + final var avatarStore = new AvatarStore(pathConfig.avatarsPath()); + final var attachmentStore = new AttachmentStore(pathConfig.attachmentsPath()); + final var stickerPackStore = new StickerPackStore(pathConfig.stickerPacksPath()); this.attachmentHelper = new AttachmentHelper(dependencies, attachmentStore); this.pinHelper = new PinHelper(dependencies.getKeyBackupService()); @@ -422,7 +426,7 @@ public class ManagerImpl implements Manager { public void addDeviceLink(URI linkUri) throws IOException, InvalidKeyException { var info = DeviceLinkInfo.parseDeviceLinkUri(linkUri); - addDevice(info.deviceIdentifier, info.deviceKey); + addDevice(info.deviceIdentifier(), info.deviceKey()); } private void addDevice(String deviceIdentifier, ECPublicKey deviceKey) throws IOException, InvalidKeyException { @@ -565,16 +569,15 @@ public class ManagerImpl implements Manager { long timestamp = System.currentTimeMillis(); messageBuilder.withTimestamp(timestamp); for (final var recipient : recipients) { - if (recipient instanceof RecipientIdentifier.Single) { - final var recipientId = resolveRecipient((RecipientIdentifier.Single) recipient); + if (recipient instanceof RecipientIdentifier.Single single) { + final var recipientId = resolveRecipient(single); final var result = sendHelper.sendMessage(messageBuilder, recipientId); results.put(recipient, List.of(result)); } else if (recipient instanceof RecipientIdentifier.NoteToSelf) { final var result = sendHelper.sendSelfMessage(messageBuilder); results.put(recipient, List.of(result)); - } else if (recipient instanceof RecipientIdentifier.Group) { - final var groupId = ((RecipientIdentifier.Group) recipient).groupId; - final var result = sendHelper.sendAsGroupMessage(messageBuilder, groupId); + } else if (recipient instanceof RecipientIdentifier.Group group) { + final var result = sendHelper.sendAsGroupMessage(messageBuilder, group.groupId); results.put(recipient, result); } } @@ -639,8 +642,8 @@ public class ManagerImpl implements Manager { private void applyMessage( final SignalServiceDataMessage.Builder messageBuilder, final Message message ) throws AttachmentInvalidException, IOException { - messageBuilder.withBody(message.getMessageText()); - final var attachments = message.getAttachments(); + messageBuilder.withBody(message.messageText()); + final var attachments = message.attachments(); if (attachments != null) { messageBuilder.withAttachments(attachmentHelper.uploadAttachments(attachments)); } @@ -872,6 +875,88 @@ public class ManagerImpl implements Manager { return actions; } + @Override + public void addReceiveHandler(final ReceiveMessageHandler handler) { + if (isReceivingSynchronous) { + throw new IllegalStateException("Already receiving message synchronously."); + } + synchronized (messageHandlers) { + messageHandlers.add(handler); + + startReceiveThreadIfRequired(); + } + } + + private void startReceiveThreadIfRequired() { + if (receiveThread != null) { + return; + } + receiveThread = new Thread(() -> { + while (!Thread.interrupted()) { + try { + receiveMessagesInternal(1L, TimeUnit.HOURS, false, (envelope, decryptedContent, e) -> { + synchronized (messageHandlers) { + for (ReceiveMessageHandler h : messageHandlers) { + try { + h.handleMessage(envelope, decryptedContent, e); + } catch (Exception ex) { + logger.warn("Message handler failed, ignoring", ex); + } + } + } + }); + break; + } catch (IOException e) { + logger.warn("Receiving messages failed, retrying", e); + } + } + hasCaughtUpWithOldMessages = false; + synchronized (messageHandlers) { + receiveThread = null; + + // Check if in the meantime another handler has been registered + if (!messageHandlers.isEmpty()) { + startReceiveThreadIfRequired(); + } + } + }); + + receiveThread.start(); + } + + @Override + public void removeReceiveHandler(final ReceiveMessageHandler handler) { + final Thread thread; + synchronized (messageHandlers) { + thread = receiveThread; + receiveThread = null; + messageHandlers.remove(handler); + if (!messageHandlers.isEmpty() || isReceivingSynchronous) { + return; + } + } + + stopReceiveThread(thread); + } + + private void stopReceiveThread(final Thread thread) { + thread.interrupt(); + try { + thread.join(); + } catch (InterruptedException ignored) { + } + } + + @Override + public boolean isReceiving() { + if (isReceivingSynchronous) { + return true; + } + synchronized (messageHandlers) { + return messageHandlers.size() > 0; + } + } + @Override public void receiveMessages(long timeout, TimeUnit unit, ReceiveMessageHandler handler) throws IOException { receiveMessages(timeout, unit, true, handler); @@ -884,6 +969,23 @@ public class ManagerImpl implements Manager { private void receiveMessages( long timeout, TimeUnit unit, boolean returnOnTimeout, ReceiveMessageHandler handler + ) throws IOException { + if (isReceiving()) { + throw new IllegalStateException("Already receiving message."); + } + isReceivingSynchronous = true; + receiveThread = Thread.currentThread(); + try { + receiveMessagesInternal(timeout, unit, returnOnTimeout, handler); + } finally { + receiveThread = null; + hasCaughtUpWithOldMessages = false; + isReceivingSynchronous = false; + } + } + + private void receiveMessagesInternal( + long timeout, TimeUnit unit, boolean returnOnTimeout, ReceiveMessageHandler handler ) throws IOException { retryFailedReceivedMessages(handler); @@ -1249,6 +1351,15 @@ public class ManagerImpl implements Manager { } private void close(boolean closeAccount) throws IOException { + Thread thread; + synchronized (messageHandlers) { + messageHandlers.clear(); + thread = receiveThread; + receiveThread = null; + } + if (thread != null) { + stopReceiveThread(thread); + } executor.shutdown(); dependencies.getSignalWebSocket().disconnect();