X-Git-Url: https://git.nmode.ca/signal-cli/blobdiff_plain/f094cd6806aae68fba9ebd0cf29eaf4eabf042d2..3cf7721cd7112c4997c35ac95dca7fad305dfd12:/lib/src/main/java/org/asamk/signal/manager/ManagerImpl.java diff --git a/lib/src/main/java/org/asamk/signal/manager/ManagerImpl.java b/lib/src/main/java/org/asamk/signal/manager/ManagerImpl.java index d2ffaaab..345413c6 100644 --- a/lib/src/main/java/org/asamk/signal/manager/ManagerImpl.java +++ b/lib/src/main/java/org/asamk/signal/manager/ManagerImpl.java @@ -20,7 +20,9 @@ import org.asamk.signal.manager.actions.HandleAction; import org.asamk.signal.manager.api.Device; import org.asamk.signal.manager.api.Group; import org.asamk.signal.manager.api.Identity; +import org.asamk.signal.manager.api.InvalidDeviceLinkException; import org.asamk.signal.manager.api.Message; +import org.asamk.signal.manager.api.Pair; import org.asamk.signal.manager.api.RecipientIdentifier; import org.asamk.signal.manager.api.SendGroupMessageResults; import org.asamk.signal.manager.api.SendMessageResults; @@ -64,7 +66,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.whispersystems.libsignal.InvalidKeyException; import org.whispersystems.libsignal.ecc.ECPublicKey; -import org.whispersystems.libsignal.util.Pair; import org.whispersystems.libsignal.util.guava.Optional; import org.whispersystems.signalservice.api.SignalSessionLock; import org.whispersystems.signalservice.api.groupsv2.GroupLinkNotActiveException; @@ -135,6 +136,11 @@ public class ManagerImpl implements Manager { private final Context context; private boolean hasCaughtUpWithOldMessages = false; + private boolean ignoreAttachments = false; + + private Thread receiveThread; + private final Set messageHandlers = new HashSet<>(); + private boolean isReceivingSynchronous; ManagerImpl( SignalAccount account, @@ -164,9 +170,9 @@ public class ManagerImpl implements Manager { account.getSignalProtocolStore(), executor, sessionLock); - final var avatarStore = new AvatarStore(pathConfig.getAvatarsPath()); - final var attachmentStore = new AttachmentStore(pathConfig.getAttachmentsPath()); - final var stickerPackStore = new StickerPackStore(pathConfig.getStickerPacksPath()); + final var avatarStore = new AvatarStore(pathConfig.avatarsPath()); + final var attachmentStore = new AttachmentStore(pathConfig.attachmentsPath()); + final var stickerPackStore = new StickerPackStore(pathConfig.stickerPacksPath()); this.attachmentHelper = new AttachmentHelper(dependencies, attachmentStore); this.pinHelper = new PinHelper(dependencies.getKeyBackupService()); @@ -353,9 +359,13 @@ public class ManagerImpl implements Manager { */ @Override public void setProfile( - String givenName, final String familyName, String about, String aboutEmoji, Optional avatar + String givenName, final String familyName, String about, String aboutEmoji, java.util.Optional avatar ) throws IOException { - profileHelper.setProfile(givenName, familyName, about, aboutEmoji, avatar); + profileHelper.setProfile(givenName, + familyName, + about, + aboutEmoji, + avatar == null ? null : Optional.fromNullable(avatar.orElse(null))); syncHelper.sendSyncFetchProfileMessage(); } @@ -418,27 +428,33 @@ public class ManagerImpl implements Manager { } @Override - public void addDeviceLink(URI linkUri) throws IOException, InvalidKeyException { + public void addDeviceLink(URI linkUri) throws IOException, InvalidDeviceLinkException { var info = DeviceLinkInfo.parseDeviceLinkUri(linkUri); - addDevice(info.deviceIdentifier, info.deviceKey); + addDevice(info.deviceIdentifier(), info.deviceKey()); } - private void addDevice(String deviceIdentifier, ECPublicKey deviceKey) throws IOException, InvalidKeyException { + private void addDevice( + String deviceIdentifier, ECPublicKey deviceKey + ) throws IOException, InvalidDeviceLinkException { var identityKeyPair = account.getIdentityKeyPair(); var verificationCode = dependencies.getAccountManager().getNewDeviceVerificationCode(); - dependencies.getAccountManager() - .addDevice(deviceIdentifier, - deviceKey, - identityKeyPair, - Optional.of(account.getProfileKey().serialize()), - verificationCode); + try { + dependencies.getAccountManager() + .addDevice(deviceIdentifier, + deviceKey, + identityKeyPair, + Optional.of(account.getProfileKey().serialize()), + verificationCode); + } catch (InvalidKeyException e) { + throw new InvalidDeviceLinkException("Invalid device link", e); + } account.setMultiDevice(true); } @Override - public void setRegistrationLockPin(Optional pin) throws IOException, UnauthenticatedResponseException { + public void setRegistrationLockPin(java.util.Optional pin) throws IOException, UnauthenticatedResponseException { if (!account.isMasterDevice()) { throw new RuntimeException("Only master device can set a PIN"); } @@ -564,16 +580,15 @@ public class ManagerImpl implements Manager { long timestamp = System.currentTimeMillis(); messageBuilder.withTimestamp(timestamp); for (final var recipient : recipients) { - if (recipient instanceof RecipientIdentifier.Single) { - final var recipientId = resolveRecipient((RecipientIdentifier.Single) recipient); + if (recipient instanceof RecipientIdentifier.Single single) { + final var recipientId = resolveRecipient(single); final var result = sendHelper.sendMessage(messageBuilder, recipientId); results.put(recipient, List.of(result)); } else if (recipient instanceof RecipientIdentifier.NoteToSelf) { final var result = sendHelper.sendSelfMessage(messageBuilder); results.put(recipient, List.of(result)); - } else if (recipient instanceof RecipientIdentifier.Group) { - final var groupId = ((RecipientIdentifier.Group) recipient).groupId; - final var result = sendHelper.sendAsGroupMessage(messageBuilder, groupId); + } else if (recipient instanceof RecipientIdentifier.Group group) { + final var result = sendHelper.sendAsGroupMessage(messageBuilder, group.groupId); results.put(recipient, result); } } @@ -638,8 +653,8 @@ public class ManagerImpl implements Manager { private void applyMessage( final SignalServiceDataMessage.Builder messageBuilder, final Message message ) throws AttachmentInvalidException, IOException { - messageBuilder.withBody(message.getMessageText()); - final var attachments = message.getAttachments(); + messageBuilder.withBody(message.messageText()); + final var attachments = message.attachments(); if (attachments != null) { messageBuilder.withAttachments(attachmentHelper.uploadAttachments(attachments)); } @@ -824,10 +839,10 @@ public class ManagerImpl implements Manager { return registeredUsers; } - private void retryFailedReceivedMessages(ReceiveMessageHandler handler, boolean ignoreAttachments) { + private void retryFailedReceivedMessages(ReceiveMessageHandler handler) { Set queuedActions = new HashSet<>(); for (var cachedMessage : account.getMessageCache().getCachedMessages()) { - var actions = retryFailedReceivedMessage(handler, ignoreAttachments, cachedMessage); + var actions = retryFailedReceivedMessage(handler, cachedMessage); if (actions != null) { queuedActions.addAll(actions); } @@ -836,7 +851,7 @@ public class ManagerImpl implements Manager { } private List retryFailedReceivedMessage( - final ReceiveMessageHandler handler, final boolean ignoreAttachments, final CachedMessage cachedMessage + final ReceiveMessageHandler handler, final CachedMessage cachedMessage ) { var envelope = cachedMessage.loadEnvelope(); if (envelope == null) { @@ -872,14 +887,118 @@ public class ManagerImpl implements Manager { } @Override - public void receiveMessages( - long timeout, - TimeUnit unit, - boolean returnOnTimeout, - boolean ignoreAttachments, - ReceiveMessageHandler handler + public void addReceiveHandler(final ReceiveMessageHandler handler) { + if (isReceivingSynchronous) { + throw new IllegalStateException("Already receiving message synchronously."); + } + synchronized (messageHandlers) { + messageHandlers.add(handler); + + startReceiveThreadIfRequired(); + } + } + + private void startReceiveThreadIfRequired() { + if (receiveThread != null) { + return; + } + receiveThread = new Thread(() -> { + while (!Thread.interrupted()) { + try { + receiveMessagesInternal(1L, TimeUnit.HOURS, false, (envelope, decryptedContent, e) -> { + synchronized (messageHandlers) { + for (ReceiveMessageHandler h : messageHandlers) { + try { + h.handleMessage(envelope, decryptedContent, e); + } catch (Exception ex) { + logger.warn("Message handler failed, ignoring", ex); + } + } + } + }); + break; + } catch (IOException e) { + logger.warn("Receiving messages failed, retrying", e); + } + } + hasCaughtUpWithOldMessages = false; + synchronized (messageHandlers) { + receiveThread = null; + + // Check if in the meantime another handler has been registered + if (!messageHandlers.isEmpty()) { + startReceiveThreadIfRequired(); + } + } + }); + + receiveThread.start(); + } + + @Override + public void removeReceiveHandler(final ReceiveMessageHandler handler) { + final Thread thread; + synchronized (messageHandlers) { + thread = receiveThread; + receiveThread = null; + messageHandlers.remove(handler); + if (!messageHandlers.isEmpty() || isReceivingSynchronous) { + return; + } + } + + stopReceiveThread(thread); + } + + private void stopReceiveThread(final Thread thread) { + thread.interrupt(); + try { + thread.join(); + } catch (InterruptedException ignored) { + } + } + + @Override + public boolean isReceiving() { + if (isReceivingSynchronous) { + return true; + } + synchronized (messageHandlers) { + return messageHandlers.size() > 0; + } + } + + @Override + public void receiveMessages(long timeout, TimeUnit unit, ReceiveMessageHandler handler) throws IOException { + receiveMessages(timeout, unit, true, handler); + } + + @Override + public void receiveMessages(ReceiveMessageHandler handler) throws IOException { + receiveMessages(1L, TimeUnit.HOURS, false, handler); + } + + private void receiveMessages( + long timeout, TimeUnit unit, boolean returnOnTimeout, ReceiveMessageHandler handler + ) throws IOException { + if (isReceiving()) { + throw new IllegalStateException("Already receiving message."); + } + isReceivingSynchronous = true; + receiveThread = Thread.currentThread(); + try { + receiveMessagesInternal(timeout, unit, returnOnTimeout, handler); + } finally { + receiveThread = null; + hasCaughtUpWithOldMessages = false; + isReceivingSynchronous = false; + } + } + + private void receiveMessagesInternal( + long timeout, TimeUnit unit, boolean returnOnTimeout, ReceiveMessageHandler handler ) throws IOException { - retryFailedReceivedMessages(handler, ignoreAttachments); + retryFailedReceivedMessages(handler); Set queuedActions = new HashSet<>(); @@ -887,6 +1006,8 @@ public class ManagerImpl implements Manager { signalWebSocket.connect(); hasCaughtUpWithOldMessages = false; + var backOffCounter = 0; + final var MAX_BACKOFF_COUNTER = 9; while (!Thread.interrupted()) { SignalServiceEnvelope envelope; @@ -901,6 +1022,8 @@ public class ManagerImpl implements Manager { // store message on disk, before acknowledging receipt to the server cachedMessage[0] = account.getMessageCache().cacheMessage(envelope1, recipientId); }); + backOffCounter = 0; + if (result.isPresent()) { envelope = result.get(); logger.debug("New message received from server"); @@ -924,11 +1047,24 @@ public class ManagerImpl implements Manager { } else { throw e; } - } catch (WebSocketUnavailableException e) { - logger.debug("Pipe unexpectedly unavailable, connecting"); - signalWebSocket.connect(); - continue; + } catch (IOException e) { + logger.debug("Pipe unexpectedly unavailable: {}", e.getMessage()); + if (e instanceof WebSocketUnavailableException || "Connection closed!".equals(e.getMessage())) { + final var sleepMilliseconds = 100 * (long) Math.pow(2, backOffCounter); + backOffCounter = Math.min(backOffCounter + 1, MAX_BACKOFF_COUNTER); + logger.warn("Connection closed unexpectedly, reconnecting in {} ms", sleepMilliseconds); + try { + Thread.sleep(sleepMilliseconds); + } catch (InterruptedException interruptedException) { + return; + } + hasCaughtUpWithOldMessages = false; + signalWebSocket.connect(); + continue; + } + throw e; } catch (TimeoutException e) { + backOffCounter = 0; if (returnOnTimeout) return; continue; } @@ -939,9 +1075,11 @@ public class ManagerImpl implements Manager { if (hasCaughtUpWithOldMessages) { handleQueuedActions(queuedActions); + queuedActions.clear(); } if (cachedMessage[0] != null) { if (exception instanceof UntrustedIdentityException) { + logger.debug("Keeping message with untrusted identity in message cache"); final var address = ((UntrustedIdentityException) exception).getSender(); final var recipientId = resolveRecipient(address); if (!envelope.hasSourceUuid()) { @@ -958,6 +1096,12 @@ public class ManagerImpl implements Manager { } } handleQueuedActions(queuedActions); + queuedActions.clear(); + } + + @Override + public void setIgnoreAttachments(final boolean ignoreAttachments) { + this.ignoreAttachments = ignoreAttachments; } @Override @@ -966,6 +1110,7 @@ public class ManagerImpl implements Manager { } private void handleQueuedActions(final Collection queuedActions) { + logger.debug("Handling message actions"); var interrupted = false; for (var action : queuedActions) { try { @@ -1060,11 +1205,12 @@ public class ManagerImpl implements Manager { } final var address = account.getRecipientStore().resolveRecipientAddress(identityInfo.getRecipientId()); + final var scannableFingerprint = identityHelper.computeSafetyNumberForScanning(identityInfo.getRecipientId(), + identityInfo.getIdentityKey()); return new Identity(address, identityInfo.getIdentityKey(), identityHelper.computeSafetyNumber(identityInfo.getRecipientId(), identityInfo.getIdentityKey()), - identityHelper.computeSafetyNumberForScanning(identityInfo.getRecipientId(), - identityInfo.getIdentityKey()).getSerialized(), + scannableFingerprint == null ? null : scannableFingerprint.getSerialized(), identityInfo.getTrustLevel(), identityInfo.getDateAdded()); } @@ -1216,6 +1362,15 @@ public class ManagerImpl implements Manager { } private void close(boolean closeAccount) throws IOException { + Thread thread; + synchronized (messageHandlers) { + messageHandlers.clear(); + thread = receiveThread; + receiveThread = null; + } + if (thread != null) { + stopReceiveThread(thread); + } executor.shutdown(); dependencies.getSignalWebSocket().disconnect();