X-Git-Url: https://git.nmode.ca/signal-cli/blobdiff_plain/4a1af0786c938f887a109a17dcc879da21704a8b..fba7a6a75c838686b645d7934de7d4c75d419d47:/lib/src/main/java/org/asamk/signal/manager/ManagerImpl.java diff --git a/lib/src/main/java/org/asamk/signal/manager/ManagerImpl.java b/lib/src/main/java/org/asamk/signal/manager/ManagerImpl.java index 69ae9269..dd74d22c 100644 --- a/lib/src/main/java/org/asamk/signal/manager/ManagerImpl.java +++ b/lib/src/main/java/org/asamk/signal/manager/ManagerImpl.java @@ -77,9 +77,12 @@ import org.whispersystems.signalservice.api.messages.SignalServiceReceiptMessage import org.whispersystems.signalservice.api.messages.SignalServiceTypingMessage; import org.whispersystems.signalservice.api.push.ACI; import org.whispersystems.signalservice.api.push.SignalServiceAddress; +import org.whispersystems.signalservice.api.push.exceptions.AuthorizationFailedException; +import org.whispersystems.signalservice.api.push.exceptions.UnregisteredUserException; import org.whispersystems.signalservice.api.util.DeviceNameUtil; import org.whispersystems.signalservice.api.util.InvalidNumberException; import org.whispersystems.signalservice.api.util.PhoneNumberFormatter; +import org.whispersystems.signalservice.api.websocket.WebSocketConnectionState; import org.whispersystems.signalservice.api.websocket.WebSocketUnavailableException; import org.whispersystems.signalservice.internal.contacts.crypto.Quote; import org.whispersystems.signalservice.internal.contacts.crypto.UnauthenticatedQuoteException; @@ -95,6 +98,8 @@ import java.net.URISyntaxException; import java.net.URLEncoder; import java.nio.charset.StandardCharsets; import java.security.SignatureException; +import java.time.Duration; +import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.HashSet; @@ -110,6 +115,9 @@ import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Collectors; import java.util.stream.Stream; +import io.reactivex.rxjava3.core.Observable; +import io.reactivex.rxjava3.schedulers.Schedulers; + import static org.asamk.signal.manager.config.ServiceConfig.capabilities; public class ManagerImpl implements Manager { @@ -142,7 +150,9 @@ public class ManagerImpl implements Manager { private Thread receiveThread; private final Set weakHandlers = new HashSet<>(); private final Set messageHandlers = new HashSet<>(); + private final List closedListeners = new ArrayList<>(); private boolean isReceivingSynchronous; + private boolean needsToRetryFailedMessages = false; ManagerImpl( SignalAccount account, @@ -154,7 +164,7 @@ public class ManagerImpl implements Manager { this.serviceEnvironmentConfig = serviceEnvironmentConfig; final var credentialsProvider = new DynamicCredentialsProvider(account.getAci(), - account.getUsername(), + account.getAccount(), account.getPassword(), account.getDeviceId()); final var sessionLock = new SignalSessionLock() { @@ -187,8 +197,7 @@ public class ManagerImpl implements Manager { avatarStore, unidentifiedAccessHelper::getAccessFor, this::resolveSignalServiceAddress); - final GroupV2Helper groupV2Helper = new GroupV2Helper(profileHelper::getRecipientProfileKeyCredential, - this::getRecipientProfile, + final GroupV2Helper groupV2Helper = new GroupV2Helper(profileHelper, account::getSelfRecipientId, dependencies.getGroupsV2Operations(), dependencies.getGroupsV2Api(), @@ -200,6 +209,7 @@ public class ManagerImpl implements Manager { account.getRecipientStore(), this::handleIdentityFailure, this::getGroupInfo, + profileHelper, this::refreshRegisteredUser); this.groupHelper = new GroupHelper(account, dependencies, @@ -238,7 +248,7 @@ public class ManagerImpl implements Manager { contactHelper, attachmentHelper, syncHelper, - this::getRecipientProfile, + profileHelper::getRecipientProfile, jobExecutor); this.identityHelper = new IdentityHelper(account, dependencies, @@ -249,7 +259,7 @@ public class ManagerImpl implements Manager { @Override public String getSelfNumber() { - return account.getUsername(); + return account.getAccount(); } @Override @@ -265,11 +275,16 @@ public class ManagerImpl implements Manager { days); } } - preKeyHelper.refreshPreKeysIfNecessary(); - if (account.getAci() == null) { - account.setAci(dependencies.getAccountManager().getOwnAci()); + try { + preKeyHelper.refreshPreKeysIfNecessary(); + if (account.getAci() == null) { + account.setAci(ACI.parseOrNull(dependencies.getAccountManager().getWhoAmI().getAci())); + } + updateAccountAttributes(null); + } catch (AuthorizationFailedException e) { + account.setRegistered(false); + throw e; } - updateAccountAttributes(null); } /** @@ -277,13 +292,17 @@ public class ManagerImpl implements Manager { * * @param numbers The set of phone number in question * @return A map of numbers to canonicalized number and uuid. If a number is not registered the uuid is null. - * @throws IOException if its unable to get the contacts to check if they're registered + * @throws IOException if it's unable to get the contacts to check if they're registered */ @Override public Map> areUsersRegistered(Set numbers) throws IOException { Map canonicalizedNumbers = numbers.stream().collect(Collectors.toMap(n -> n, n -> { try { - return PhoneNumberFormatter.formatNumber(n, account.getUsername()); + final var canonicalizedNumber = PhoneNumberFormatter.formatNumber(n, account.getAccount()); + if (!canonicalizedNumber.equals(n)) { + logger.debug("Normalized number {} to {}.", n, canonicalizedNumber); + } + return canonicalizedNumber; } catch (InvalidNumberException e) { return ""; } @@ -385,6 +404,7 @@ public class ManagerImpl implements Manager { dependencies.getAccountManager().setGcmId(Optional.absent()); account.setRegistered(false); + close(); } @Override @@ -399,6 +419,7 @@ public class ManagerImpl implements Manager { dependencies.getAccountManager().deleteAccount(); account.setRegistered(false); + close(); } @Override @@ -427,7 +448,7 @@ public class ManagerImpl implements Manager { d.getCreated(), d.getLastSeen(), d.getId() == account.getDeviceId()); - }).collect(Collectors.toList()); + }).toList(); } @Override @@ -499,7 +520,7 @@ public class ManagerImpl implements Manager { @Override public List getGroups() { - return account.getGroupStore().getGroups().stream().map(this::toGroup).collect(Collectors.toList()); + return account.getGroupStore().getGroups().stream().map(this::toGroup).toList(); } private Group toGroup(final GroupInfo groupInfo) { @@ -610,56 +631,80 @@ public class ManagerImpl implements Manager { .map(sendMessageResult -> SendMessageResult.from(sendMessageResult, account.getRecipientStore(), account.getRecipientStore()::resolveRecipientAddress)) - .collect(Collectors.toList())); + .toList()); } } return new SendMessageResults(timestamp, results); } - private void sendTypingMessage( + private SendMessageResults sendTypingMessage( SignalServiceTypingMessage.Action action, Set recipients - ) throws IOException, UntrustedIdentityException, NotAGroupMemberException, GroupNotFoundException, GroupSendingNotAllowedException { + ) throws IOException, NotAGroupMemberException, GroupNotFoundException, GroupSendingNotAllowedException { + var results = new HashMap>(); final var timestamp = System.currentTimeMillis(); for (var recipient : recipients) { if (recipient instanceof RecipientIdentifier.Single) { final var message = new SignalServiceTypingMessage(action, timestamp, Optional.absent()); final var recipientId = resolveRecipient((RecipientIdentifier.Single) recipient); - sendHelper.sendTypingMessage(message, recipientId); + final var result = sendHelper.sendTypingMessage(message, recipientId); + results.put(recipient, + List.of(SendMessageResult.from(result, + account.getRecipientStore(), + account.getRecipientStore()::resolveRecipientAddress))); } else if (recipient instanceof RecipientIdentifier.Group) { final var groupId = ((RecipientIdentifier.Group) recipient).groupId(); final var message = new SignalServiceTypingMessage(action, timestamp, Optional.of(groupId.serialize())); - sendHelper.sendGroupTypingMessage(message, groupId); + final var result = sendHelper.sendGroupTypingMessage(message, groupId); + results.put(recipient, + result.stream() + .map(r -> SendMessageResult.from(r, + account.getRecipientStore(), + account.getRecipientStore()::resolveRecipientAddress)) + .toList()); } } + return new SendMessageResults(timestamp, results); } @Override - public void sendTypingMessage( + public SendMessageResults sendTypingMessage( TypingAction action, Set recipients - ) throws IOException, UntrustedIdentityException, NotAGroupMemberException, GroupNotFoundException, GroupSendingNotAllowedException { - sendTypingMessage(action.toSignalService(), recipients); + ) throws IOException, NotAGroupMemberException, GroupNotFoundException, GroupSendingNotAllowedException { + return sendTypingMessage(action.toSignalService(), recipients); } @Override - public void sendReadReceipt( + public SendMessageResults sendReadReceipt( RecipientIdentifier.Single sender, List messageIds - ) throws IOException, UntrustedIdentityException { + ) throws IOException { + final var timestamp = System.currentTimeMillis(); var receiptMessage = new SignalServiceReceiptMessage(SignalServiceReceiptMessage.Type.READ, messageIds, - System.currentTimeMillis()); + timestamp); - sendHelper.sendReceiptMessage(receiptMessage, resolveRecipient(sender)); + final var result = sendHelper.sendReceiptMessage(receiptMessage, resolveRecipient(sender)); + return new SendMessageResults(timestamp, + Map.of(sender, + List.of(SendMessageResult.from(result, + account.getRecipientStore(), + account.getRecipientStore()::resolveRecipientAddress)))); } @Override - public void sendViewedReceipt( + public SendMessageResults sendViewedReceipt( RecipientIdentifier.Single sender, List messageIds - ) throws IOException, UntrustedIdentityException { + ) throws IOException { + final var timestamp = System.currentTimeMillis(); var receiptMessage = new SignalServiceReceiptMessage(SignalServiceReceiptMessage.Type.VIEWED, messageIds, - System.currentTimeMillis()); + timestamp); - sendHelper.sendReceiptMessage(receiptMessage, resolveRecipient(sender)); + final var result = sendHelper.sendReceiptMessage(receiptMessage, resolveRecipient(sender)); + return new SendMessageResults(timestamp, + Map.of(sender, + List.of(SendMessageResult.from(result, + account.getRecipientStore(), + account.getRecipientStore()::resolveRecipientAddress)))); } @Override @@ -679,6 +724,28 @@ public class ManagerImpl implements Manager { if (attachments != null) { messageBuilder.withAttachments(attachmentHelper.uploadAttachments(attachments)); } + if (message.mentions().size() > 0) { + messageBuilder.withMentions(resolveMentions(message.mentions())); + } + if (message.quote().isPresent()) { + final var quote = message.quote().get(); + messageBuilder.withQuote(new SignalServiceDataMessage.Quote(quote.timestamp(), + resolveSignalServiceAddress(resolveRecipient(quote.author())), + quote.message(), + List.of(), + resolveMentions(quote.mentions()))); + } + } + + private ArrayList resolveMentions(final List mentionList) throws IOException { + final var mentions = new ArrayList(); + for (final var m : mentionList) { + final var recipientId = resolveRecipient(m.recipient()); + mentions.add(new SignalServiceDataMessage.Mention(resolveSignalServiceAddress(recipientId).getAci(), + m.start(), + m.length())); + } + return mentions; } @Override @@ -724,6 +791,16 @@ public class ManagerImpl implements Manager { } } + @Override + public void deleteRecipient(final RecipientIdentifier.Single recipient) throws IOException { + account.removeRecipient(resolveRecipient(recipient)); + } + + @Override + public void deleteContact(final RecipientIdentifier.Single recipient) throws IOException { + account.getContactStore().deleteContact(resolveRecipient(recipient)); + } + @Override public void setContactName( RecipientIdentifier.Single recipient, String name @@ -834,11 +911,11 @@ public class ManagerImpl implements Manager { try { aciMap = getRegisteredUsers(Set.of(number)); } catch (NumberFormatException e) { - throw new IOException(number, e); + throw new UnregisteredUserException(number, e); } final var uuid = aciMap.get(number); if (uuid == null) { - throw new IOException(number, null); + throw new UnregisteredUserException(number, null); } return uuid; } @@ -930,7 +1007,7 @@ public class ManagerImpl implements Manager { logger.debug("Starting receiving messages"); while (!Thread.interrupted()) { try { - receiveMessagesInternal(1L, TimeUnit.HOURS, false, (envelope, e) -> { + receiveMessagesInternal(Duration.ofMinutes(1), false, (envelope, e) -> { synchronized (messageHandlers) { Stream.concat(messageHandlers.stream(), weakHandlers.stream()).forEach(h -> { try { @@ -997,17 +1074,17 @@ public class ManagerImpl implements Manager { } @Override - public void receiveMessages(long timeout, TimeUnit unit, ReceiveMessageHandler handler) throws IOException { - receiveMessages(timeout, unit, true, handler); + public void receiveMessages(Duration timeout, ReceiveMessageHandler handler) throws IOException { + receiveMessages(timeout, true, handler); } @Override public void receiveMessages(ReceiveMessageHandler handler) throws IOException { - receiveMessages(1L, TimeUnit.HOURS, false, handler); + receiveMessages(Duration.ofMinutes(1), false, handler); } private void receiveMessages( - long timeout, TimeUnit unit, boolean returnOnTimeout, ReceiveMessageHandler handler + Duration timeout, boolean returnOnTimeout, ReceiveMessageHandler handler ) throws IOException { if (isReceiving()) { throw new IllegalStateException("Already receiving message."); @@ -1015,7 +1092,7 @@ public class ManagerImpl implements Manager { isReceivingSynchronous = true; receiveThread = Thread.currentThread(); try { - receiveMessagesInternal(timeout, unit, returnOnTimeout, handler); + receiveMessagesInternal(timeout, returnOnTimeout, handler); } finally { receiveThread = null; hasCaughtUpWithOldMessages = false; @@ -1024,13 +1101,20 @@ public class ManagerImpl implements Manager { } private void receiveMessagesInternal( - long timeout, TimeUnit unit, boolean returnOnTimeout, ReceiveMessageHandler handler + Duration timeout, boolean returnOnTimeout, ReceiveMessageHandler handler ) throws IOException { - retryFailedReceivedMessages(handler); + needsToRetryFailedMessages = true; - Set queuedActions = new HashSet<>(); + // Use a Map here because java Set doesn't have a get method ... + Map queuedActions = new HashMap<>(); final var signalWebSocket = dependencies.getSignalWebSocket(); + final var webSocketStateDisposable = Observable.merge(signalWebSocket.getUnidentifiedWebSocketState(), + signalWebSocket.getWebSocketState()) + .subscribeOn(Schedulers.computation()) + .observeOn(Schedulers.computation()) + .distinctUntilChanged() + .subscribe(this::onWebSocketStateChange); signalWebSocket.connect(); hasCaughtUpWithOldMessages = false; @@ -1038,12 +1122,19 @@ public class ManagerImpl implements Manager { final var MAX_BACKOFF_COUNTER = 9; while (!Thread.interrupted()) { + if (needsToRetryFailedMessages) { + retryFailedReceivedMessages(handler); + needsToRetryFailedMessages = false; + } SignalServiceEnvelope envelope; final CachedMessage[] cachedMessage = {null}; - account.setLastReceiveTimestamp(System.currentTimeMillis()); + final var nowMillis = System.currentTimeMillis(); + if (nowMillis - account.getLastReceiveTimestamp() > 60000) { + account.setLastReceiveTimestamp(nowMillis); + } logger.debug("Checking for new message from server"); try { - var result = signalWebSocket.readOrEmpty(unit.toMillis(timeout), envelope1 -> { + var result = signalWebSocket.readOrEmpty(timeout.toMillis(), envelope1 -> { final var recipientId = envelope1.hasSourceUuid() ? resolveRecipient(envelope1.getSourceAddress()) : null; @@ -1057,7 +1148,7 @@ public class ManagerImpl implements Manager { logger.debug("New message received from server"); } else { logger.debug("Received indicator that server queue is empty"); - handleQueuedActions(queuedActions); + handleQueuedActions(queuedActions.keySet()); queuedActions.clear(); hasCaughtUpWithOldMessages = true; @@ -1098,11 +1189,18 @@ public class ManagerImpl implements Manager { } final var result = incomingMessageHandler.handleEnvelope(envelope, ignoreAttachments, handler); - queuedActions.addAll(result.first()); + for (final var h : result.first()) { + final var existingAction = queuedActions.get(h); + if (existingAction == null) { + queuedActions.put(h, h); + } else { + existingAction.mergeOther(h); + } + } final var exception = result.second(); if (hasCaughtUpWithOldMessages) { - handleQueuedActions(queuedActions); + handleQueuedActions(queuedActions.keySet()); queuedActions.clear(); } if (cachedMessage[0] != null) { @@ -1123,9 +1221,21 @@ public class ManagerImpl implements Manager { } } } - handleQueuedActions(queuedActions); + handleQueuedActions(queuedActions.keySet()); queuedActions.clear(); dependencies.getSignalWebSocket().disconnect(); + webSocketStateDisposable.dispose(); + } + + private void onWebSocketStateChange(final WebSocketConnectionState s) { + if (s.equals(WebSocketConnectionState.AUTHENTICATION_FAILED)) { + account.setRegistered(false); + try { + close(); + } catch (IOException e) { + e.printStackTrace(); + } + } } @Override @@ -1142,6 +1252,7 @@ public class ManagerImpl implements Manager { logger.debug("Handling message actions"); var interrupted = false; for (var action : queuedActions) { + logger.debug("Executing action {}", action.getClass().getSimpleName()); try { action.execute(context); } catch (Throwable e) { @@ -1180,7 +1291,7 @@ public class ManagerImpl implements Manager { .getContacts() .stream() .map(p -> new Pair<>(account.getRecipientStore().resolveRecipientAddress(p.first()), p.second())) - .collect(Collectors.toList()); + .toList(); } @Override @@ -1216,11 +1327,7 @@ public class ManagerImpl implements Manager { @Override public List getIdentities() { - return account.getIdentityKeyStore() - .getIdentities() - .stream() - .map(this::toIdentity) - .collect(Collectors.toList()); + return account.getIdentityKeyStore().getIdentities().stream().map(this::toIdentity).toList(); } private Identity toIdentity(final IdentityInfo identityInfo) { @@ -1253,7 +1360,7 @@ public class ManagerImpl implements Manager { /** * Trust this the identity with this fingerprint * - * @param recipient username of the identity + * @param recipient account of the identity * @param fingerprint Fingerprint */ @Override @@ -1264,13 +1371,17 @@ public class ManagerImpl implements Manager { } catch (IOException e) { return false; } - return identityHelper.trustIdentityVerified(recipientId, fingerprint); + final var updated = identityHelper.trustIdentityVerified(recipientId, fingerprint); + if (updated && this.isReceiving()) { + needsToRetryFailedMessages = true; + } + return updated; } /** * Trust this the identity with this safety number * - * @param recipient username of the identity + * @param recipient account of the identity * @param safetyNumber Safety number */ @Override @@ -1281,13 +1392,17 @@ public class ManagerImpl implements Manager { } catch (IOException e) { return false; } - return identityHelper.trustIdentityVerifiedSafetyNumber(recipientId, safetyNumber); + final var updated = identityHelper.trustIdentityVerifiedSafetyNumber(recipientId, safetyNumber); + if (updated && this.isReceiving()) { + needsToRetryFailedMessages = true; + } + return updated; } /** * Trust this the identity with this scannable safety number * - * @param recipient username of the identity + * @param recipient account of the identity * @param safetyNumber Scannable safety number */ @Override @@ -1298,13 +1413,17 @@ public class ManagerImpl implements Manager { } catch (IOException e) { return false; } - return identityHelper.trustIdentityVerifiedSafetyNumber(recipientId, safetyNumber); + final var updated = identityHelper.trustIdentityVerifiedSafetyNumber(recipientId, safetyNumber); + if (updated && this.isReceiving()) { + needsToRetryFailedMessages = true; + } + return updated; } /** * Trust all keys of this identity without verification * - * @param recipient username of the identity + * @param recipient account of the identity */ @Override public boolean trustIdentityAllKeys(RecipientIdentifier.Single recipient) { @@ -1314,7 +1433,18 @@ public class ManagerImpl implements Manager { } catch (IOException e) { return false; } - return identityHelper.trustIdentityAllKeys(recipientId); + final var updated = identityHelper.trustIdentityAllKeys(recipientId); + if (updated && this.isReceiving()) { + needsToRetryFailedMessages = true; + } + return updated; + } + + @Override + public void addClosedListener(final Runnable listener) { + synchronized (closedListeners) { + closedListeners.add(listener); + } } private void handleIdentityFailure( @@ -1326,13 +1456,13 @@ public class ManagerImpl implements Manager { private SignalServiceAddress resolveSignalServiceAddress(RecipientId recipientId) { final var address = account.getRecipientStore().resolveRecipientAddress(recipientId); - if (address.getUuid().isPresent()) { + if (address.uuid().isPresent()) { return address.toSignalServiceAddress(); } // Address in recipient store doesn't have a uuid, this shouldn't happen // Try to retrieve the uuid from the server - final var number = address.getNumber().get(); + final var number = address.number().get(); final ACI aci; try { aci = getRegisteredUser(number); @@ -1382,10 +1512,6 @@ public class ManagerImpl implements Manager { @Override public void close() throws IOException { - close(true); - } - - private void close(boolean closeAccount) throws IOException { Thread thread; synchronized (messageHandlers) { weakHandlers.clear(); @@ -1400,7 +1526,12 @@ public class ManagerImpl implements Manager { dependencies.getSignalWebSocket().disconnect(); - if (closeAccount && account != null) { + synchronized (closedListeners) { + closedListeners.forEach(Runnable::run); + closedListeners.clear(); + } + + if (account != null) { account.close(); } account = null;