]> nmode's Git Repositories - signal-cli/blobdiff - lib/src/main/java/org/asamk/signal/manager/ManagerImpl.java
Paralellize profile fetching
[signal-cli] / lib / src / main / java / org / asamk / signal / manager / ManagerImpl.java
index 28a461619e16386f4af43e90b6e78e7c4014a1f6..dd74d22c48f0296cf9029a495ddcf764fb8eb655 100644 (file)
@@ -77,9 +77,12 @@ import org.whispersystems.signalservice.api.messages.SignalServiceReceiptMessage
 import org.whispersystems.signalservice.api.messages.SignalServiceTypingMessage;
 import org.whispersystems.signalservice.api.push.ACI;
 import org.whispersystems.signalservice.api.push.SignalServiceAddress;
+import org.whispersystems.signalservice.api.push.exceptions.AuthorizationFailedException;
+import org.whispersystems.signalservice.api.push.exceptions.UnregisteredUserException;
 import org.whispersystems.signalservice.api.util.DeviceNameUtil;
 import org.whispersystems.signalservice.api.util.InvalidNumberException;
 import org.whispersystems.signalservice.api.util.PhoneNumberFormatter;
+import org.whispersystems.signalservice.api.websocket.WebSocketConnectionState;
 import org.whispersystems.signalservice.api.websocket.WebSocketUnavailableException;
 import org.whispersystems.signalservice.internal.contacts.crypto.Quote;
 import org.whispersystems.signalservice.internal.contacts.crypto.UnauthenticatedQuoteException;
@@ -95,6 +98,7 @@ import java.net.URISyntaxException;
 import java.net.URLEncoder;
 import java.nio.charset.StandardCharsets;
 import java.security.SignatureException;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
@@ -111,6 +115,9 @@ import java.util.concurrent.locks.ReentrantLock;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import io.reactivex.rxjava3.core.Observable;
+import io.reactivex.rxjava3.schedulers.Schedulers;
+
 import static org.asamk.signal.manager.config.ServiceConfig.capabilities;
 
 public class ManagerImpl implements Manager {
@@ -145,6 +152,7 @@ public class ManagerImpl implements Manager {
     private final Set<ReceiveMessageHandler> messageHandlers = new HashSet<>();
     private final List<Runnable> closedListeners = new ArrayList<>();
     private boolean isReceivingSynchronous;
+    private boolean needsToRetryFailedMessages = false;
 
     ManagerImpl(
             SignalAccount account,
@@ -156,7 +164,7 @@ public class ManagerImpl implements Manager {
         this.serviceEnvironmentConfig = serviceEnvironmentConfig;
 
         final var credentialsProvider = new DynamicCredentialsProvider(account.getAci(),
-                account.getUsername(),
+                account.getAccount(),
                 account.getPassword(),
                 account.getDeviceId());
         final var sessionLock = new SignalSessionLock() {
@@ -189,8 +197,7 @@ public class ManagerImpl implements Manager {
                 avatarStore,
                 unidentifiedAccessHelper::getAccessFor,
                 this::resolveSignalServiceAddress);
-        final GroupV2Helper groupV2Helper = new GroupV2Helper(profileHelper::getRecipientProfileKeyCredential,
-                this::getRecipientProfile,
+        final GroupV2Helper groupV2Helper = new GroupV2Helper(profileHelper,
                 account::getSelfRecipientId,
                 dependencies.getGroupsV2Operations(),
                 dependencies.getGroupsV2Api(),
@@ -202,6 +209,7 @@ public class ManagerImpl implements Manager {
                 account.getRecipientStore(),
                 this::handleIdentityFailure,
                 this::getGroupInfo,
+                profileHelper,
                 this::refreshRegisteredUser);
         this.groupHelper = new GroupHelper(account,
                 dependencies,
@@ -240,7 +248,7 @@ public class ManagerImpl implements Manager {
                 contactHelper,
                 attachmentHelper,
                 syncHelper,
-                this::getRecipientProfile,
+                profileHelper::getRecipientProfile,
                 jobExecutor);
         this.identityHelper = new IdentityHelper(account,
                 dependencies,
@@ -251,7 +259,7 @@ public class ManagerImpl implements Manager {
 
     @Override
     public String getSelfNumber() {
-        return account.getUsername();
+        return account.getAccount();
     }
 
     @Override
@@ -267,11 +275,16 @@ public class ManagerImpl implements Manager {
                         days);
             }
         }
-        preKeyHelper.refreshPreKeysIfNecessary();
-        if (account.getAci() == null) {
-            account.setAci(dependencies.getAccountManager().getOwnAci());
+        try {
+            preKeyHelper.refreshPreKeysIfNecessary();
+            if (account.getAci() == null) {
+                account.setAci(ACI.parseOrNull(dependencies.getAccountManager().getWhoAmI().getAci()));
+            }
+            updateAccountAttributes(null);
+        } catch (AuthorizationFailedException e) {
+            account.setRegistered(false);
+            throw e;
         }
-        updateAccountAttributes(null);
     }
 
     /**
@@ -279,13 +292,17 @@ public class ManagerImpl implements Manager {
      *
      * @param numbers The set of phone number in question
      * @return A map of numbers to canonicalized number and uuid. If a number is not registered the uuid is null.
-     * @throws IOException if its unable to get the contacts to check if they're registered
+     * @throws IOException if it's unable to get the contacts to check if they're registered
      */
     @Override
     public Map<String, Pair<String, UUID>> areUsersRegistered(Set<String> numbers) throws IOException {
         Map<String, String> canonicalizedNumbers = numbers.stream().collect(Collectors.toMap(n -> n, n -> {
             try {
-                return PhoneNumberFormatter.formatNumber(n, account.getUsername());
+                final var canonicalizedNumber = PhoneNumberFormatter.formatNumber(n, account.getAccount());
+                if (!canonicalizedNumber.equals(n)) {
+                    logger.debug("Normalized number {} to {}.", n, canonicalizedNumber);
+                }
+                return canonicalizedNumber;
             } catch (InvalidNumberException e) {
                 return "";
             }
@@ -431,7 +448,7 @@ public class ManagerImpl implements Manager {
                     d.getCreated(),
                     d.getLastSeen(),
                     d.getId() == account.getDeviceId());
-        }).collect(Collectors.toList());
+        }).toList();
     }
 
     @Override
@@ -503,7 +520,7 @@ public class ManagerImpl implements Manager {
 
     @Override
     public List<Group> getGroups() {
-        return account.getGroupStore().getGroups().stream().map(this::toGroup).collect(Collectors.toList());
+        return account.getGroupStore().getGroups().stream().map(this::toGroup).toList();
     }
 
     private Group toGroup(final GroupInfo groupInfo) {
@@ -614,56 +631,80 @@ public class ManagerImpl implements Manager {
                                 .map(sendMessageResult -> SendMessageResult.from(sendMessageResult,
                                         account.getRecipientStore(),
                                         account.getRecipientStore()::resolveRecipientAddress))
-                                .collect(Collectors.toList()));
+                                .toList());
             }
         }
         return new SendMessageResults(timestamp, results);
     }
 
-    private void sendTypingMessage(
+    private SendMessageResults sendTypingMessage(
             SignalServiceTypingMessage.Action action, Set<RecipientIdentifier> recipients
-    ) throws IOException, UntrustedIdentityException, NotAGroupMemberException, GroupNotFoundException, GroupSendingNotAllowedException {
+    ) throws IOException, NotAGroupMemberException, GroupNotFoundException, GroupSendingNotAllowedException {
+        var results = new HashMap<RecipientIdentifier, List<SendMessageResult>>();
         final var timestamp = System.currentTimeMillis();
         for (var recipient : recipients) {
             if (recipient instanceof RecipientIdentifier.Single) {
                 final var message = new SignalServiceTypingMessage(action, timestamp, Optional.absent());
                 final var recipientId = resolveRecipient((RecipientIdentifier.Single) recipient);
-                sendHelper.sendTypingMessage(message, recipientId);
+                final var result = sendHelper.sendTypingMessage(message, recipientId);
+                results.put(recipient,
+                        List.of(SendMessageResult.from(result,
+                                account.getRecipientStore(),
+                                account.getRecipientStore()::resolveRecipientAddress)));
             } else if (recipient instanceof RecipientIdentifier.Group) {
                 final var groupId = ((RecipientIdentifier.Group) recipient).groupId();
                 final var message = new SignalServiceTypingMessage(action, timestamp, Optional.of(groupId.serialize()));
-                sendHelper.sendGroupTypingMessage(message, groupId);
+                final var result = sendHelper.sendGroupTypingMessage(message, groupId);
+                results.put(recipient,
+                        result.stream()
+                                .map(r -> SendMessageResult.from(r,
+                                        account.getRecipientStore(),
+                                        account.getRecipientStore()::resolveRecipientAddress))
+                                .toList());
             }
         }
+        return new SendMessageResults(timestamp, results);
     }
 
     @Override
-    public void sendTypingMessage(
+    public SendMessageResults sendTypingMessage(
             TypingAction action, Set<RecipientIdentifier> recipients
-    ) throws IOException, UntrustedIdentityException, NotAGroupMemberException, GroupNotFoundException, GroupSendingNotAllowedException {
-        sendTypingMessage(action.toSignalService(), recipients);
+    ) throws IOException, NotAGroupMemberException, GroupNotFoundException, GroupSendingNotAllowedException {
+        return sendTypingMessage(action.toSignalService(), recipients);
     }
 
     @Override
-    public void sendReadReceipt(
+    public SendMessageResults sendReadReceipt(
             RecipientIdentifier.Single sender, List<Long> messageIds
-    ) throws IOException, UntrustedIdentityException {
+    ) throws IOException {
+        final var timestamp = System.currentTimeMillis();
         var receiptMessage = new SignalServiceReceiptMessage(SignalServiceReceiptMessage.Type.READ,
                 messageIds,
-                System.currentTimeMillis());
+                timestamp);
 
-        sendHelper.sendReceiptMessage(receiptMessage, resolveRecipient(sender));
+        final var result = sendHelper.sendReceiptMessage(receiptMessage, resolveRecipient(sender));
+        return new SendMessageResults(timestamp,
+                Map.of(sender,
+                        List.of(SendMessageResult.from(result,
+                                account.getRecipientStore(),
+                                account.getRecipientStore()::resolveRecipientAddress))));
     }
 
     @Override
-    public void sendViewedReceipt(
+    public SendMessageResults sendViewedReceipt(
             RecipientIdentifier.Single sender, List<Long> messageIds
-    ) throws IOException, UntrustedIdentityException {
+    ) throws IOException {
+        final var timestamp = System.currentTimeMillis();
         var receiptMessage = new SignalServiceReceiptMessage(SignalServiceReceiptMessage.Type.VIEWED,
                 messageIds,
-                System.currentTimeMillis());
+                timestamp);
 
-        sendHelper.sendReceiptMessage(receiptMessage, resolveRecipient(sender));
+        final var result = sendHelper.sendReceiptMessage(receiptMessage, resolveRecipient(sender));
+        return new SendMessageResults(timestamp,
+                Map.of(sender,
+                        List.of(SendMessageResult.from(result,
+                                account.getRecipientStore(),
+                                account.getRecipientStore()::resolveRecipientAddress))));
     }
 
     @Override
@@ -683,6 +724,28 @@ public class ManagerImpl implements Manager {
         if (attachments != null) {
             messageBuilder.withAttachments(attachmentHelper.uploadAttachments(attachments));
         }
+        if (message.mentions().size() > 0) {
+            messageBuilder.withMentions(resolveMentions(message.mentions()));
+        }
+        if (message.quote().isPresent()) {
+            final var quote = message.quote().get();
+            messageBuilder.withQuote(new SignalServiceDataMessage.Quote(quote.timestamp(),
+                    resolveSignalServiceAddress(resolveRecipient(quote.author())),
+                    quote.message(),
+                    List.of(),
+                    resolveMentions(quote.mentions())));
+        }
+    }
+
+    private ArrayList<SignalServiceDataMessage.Mention> resolveMentions(final List<Message.Mention> mentionList) throws IOException {
+        final var mentions = new ArrayList<SignalServiceDataMessage.Mention>();
+        for (final var m : mentionList) {
+            final var recipientId = resolveRecipient(m.recipient());
+            mentions.add(new SignalServiceDataMessage.Mention(resolveSignalServiceAddress(recipientId).getAci(),
+                    m.start(),
+                    m.length()));
+        }
+        return mentions;
     }
 
     @Override
@@ -728,6 +791,16 @@ public class ManagerImpl implements Manager {
         }
     }
 
+    @Override
+    public void deleteRecipient(final RecipientIdentifier.Single recipient) throws IOException {
+        account.removeRecipient(resolveRecipient(recipient));
+    }
+
+    @Override
+    public void deleteContact(final RecipientIdentifier.Single recipient) throws IOException {
+        account.getContactStore().deleteContact(resolveRecipient(recipient));
+    }
+
     @Override
     public void setContactName(
             RecipientIdentifier.Single recipient, String name
@@ -838,11 +911,11 @@ public class ManagerImpl implements Manager {
         try {
             aciMap = getRegisteredUsers(Set.of(number));
         } catch (NumberFormatException e) {
-            throw new IOException(number, e);
+            throw new UnregisteredUserException(number, e);
         }
         final var uuid = aciMap.get(number);
         if (uuid == null) {
-            throw new IOException(number, null);
+            throw new UnregisteredUserException(number, null);
         }
         return uuid;
     }
@@ -934,7 +1007,7 @@ public class ManagerImpl implements Manager {
             logger.debug("Starting receiving messages");
             while (!Thread.interrupted()) {
                 try {
-                    receiveMessagesInternal(1L, TimeUnit.HOURS, false, (envelope, e) -> {
+                    receiveMessagesInternal(Duration.ofMinutes(1), false, (envelope, e) -> {
                         synchronized (messageHandlers) {
                             Stream.concat(messageHandlers.stream(), weakHandlers.stream()).forEach(h -> {
                                 try {
@@ -1001,17 +1074,17 @@ public class ManagerImpl implements Manager {
     }
 
     @Override
-    public void receiveMessages(long timeout, TimeUnit unit, ReceiveMessageHandler handler) throws IOException {
-        receiveMessages(timeout, unit, true, handler);
+    public void receiveMessages(Duration timeout, ReceiveMessageHandler handler) throws IOException {
+        receiveMessages(timeout, true, handler);
     }
 
     @Override
     public void receiveMessages(ReceiveMessageHandler handler) throws IOException {
-        receiveMessages(1L, TimeUnit.HOURS, false, handler);
+        receiveMessages(Duration.ofMinutes(1), false, handler);
     }
 
     private void receiveMessages(
-            long timeout, TimeUnit unit, boolean returnOnTimeout, ReceiveMessageHandler handler
+            Duration timeout, boolean returnOnTimeout, ReceiveMessageHandler handler
     ) throws IOException {
         if (isReceiving()) {
             throw new IllegalStateException("Already receiving message.");
@@ -1019,7 +1092,7 @@ public class ManagerImpl implements Manager {
         isReceivingSynchronous = true;
         receiveThread = Thread.currentThread();
         try {
-            receiveMessagesInternal(timeout, unit, returnOnTimeout, handler);
+            receiveMessagesInternal(timeout, returnOnTimeout, handler);
         } finally {
             receiveThread = null;
             hasCaughtUpWithOldMessages = false;
@@ -1028,14 +1101,20 @@ public class ManagerImpl implements Manager {
     }
 
     private void receiveMessagesInternal(
-            long timeout, TimeUnit unit, boolean returnOnTimeout, ReceiveMessageHandler handler
+            Duration timeout, boolean returnOnTimeout, ReceiveMessageHandler handler
     ) throws IOException {
-        retryFailedReceivedMessages(handler);
+        needsToRetryFailedMessages = true;
 
         // Use a Map here because java Set doesn't have a get method ...
         Map<HandleAction, HandleAction> queuedActions = new HashMap<>();
 
         final var signalWebSocket = dependencies.getSignalWebSocket();
+        final var webSocketStateDisposable = Observable.merge(signalWebSocket.getUnidentifiedWebSocketState(),
+                        signalWebSocket.getWebSocketState())
+                .subscribeOn(Schedulers.computation())
+                .observeOn(Schedulers.computation())
+                .distinctUntilChanged()
+                .subscribe(this::onWebSocketStateChange);
         signalWebSocket.connect();
 
         hasCaughtUpWithOldMessages = false;
@@ -1043,12 +1122,19 @@ public class ManagerImpl implements Manager {
         final var MAX_BACKOFF_COUNTER = 9;
 
         while (!Thread.interrupted()) {
+            if (needsToRetryFailedMessages) {
+                retryFailedReceivedMessages(handler);
+                needsToRetryFailedMessages = false;
+            }
             SignalServiceEnvelope envelope;
             final CachedMessage[] cachedMessage = {null};
-            account.setLastReceiveTimestamp(System.currentTimeMillis());
+            final var nowMillis = System.currentTimeMillis();
+            if (nowMillis - account.getLastReceiveTimestamp() > 60000) {
+                account.setLastReceiveTimestamp(nowMillis);
+            }
             logger.debug("Checking for new message from server");
             try {
-                var result = signalWebSocket.readOrEmpty(unit.toMillis(timeout), envelope1 -> {
+                var result = signalWebSocket.readOrEmpty(timeout.toMillis(), envelope1 -> {
                     final var recipientId = envelope1.hasSourceUuid()
                             ? resolveRecipient(envelope1.getSourceAddress())
                             : null;
@@ -1138,6 +1224,18 @@ public class ManagerImpl implements Manager {
         handleQueuedActions(queuedActions.keySet());
         queuedActions.clear();
         dependencies.getSignalWebSocket().disconnect();
+        webSocketStateDisposable.dispose();
+    }
+
+    private void onWebSocketStateChange(final WebSocketConnectionState s) {
+        if (s.equals(WebSocketConnectionState.AUTHENTICATION_FAILED)) {
+            account.setRegistered(false);
+            try {
+                close();
+            } catch (IOException e) {
+                e.printStackTrace();
+            }
+        }
     }
 
     @Override
@@ -1154,6 +1252,7 @@ public class ManagerImpl implements Manager {
         logger.debug("Handling message actions");
         var interrupted = false;
         for (var action : queuedActions) {
+            logger.debug("Executing action {}", action.getClass().getSimpleName());
             try {
                 action.execute(context);
             } catch (Throwable e) {
@@ -1192,7 +1291,7 @@ public class ManagerImpl implements Manager {
                 .getContacts()
                 .stream()
                 .map(p -> new Pair<>(account.getRecipientStore().resolveRecipientAddress(p.first()), p.second()))
-                .collect(Collectors.toList());
+                .toList();
     }
 
     @Override
@@ -1228,11 +1327,7 @@ public class ManagerImpl implements Manager {
 
     @Override
     public List<Identity> getIdentities() {
-        return account.getIdentityKeyStore()
-                .getIdentities()
-                .stream()
-                .map(this::toIdentity)
-                .collect(Collectors.toList());
+        return account.getIdentityKeyStore().getIdentities().stream().map(this::toIdentity).toList();
     }
 
     private Identity toIdentity(final IdentityInfo identityInfo) {
@@ -1265,7 +1360,7 @@ public class ManagerImpl implements Manager {
     /**
      * Trust this the identity with this fingerprint
      *
-     * @param recipient   username of the identity
+     * @param recipient   account of the identity
      * @param fingerprint Fingerprint
      */
     @Override
@@ -1276,13 +1371,17 @@ public class ManagerImpl implements Manager {
         } catch (IOException e) {
             return false;
         }
-        return identityHelper.trustIdentityVerified(recipientId, fingerprint);
+        final var updated = identityHelper.trustIdentityVerified(recipientId, fingerprint);
+        if (updated && this.isReceiving()) {
+            needsToRetryFailedMessages = true;
+        }
+        return updated;
     }
 
     /**
      * Trust this the identity with this safety number
      *
-     * @param recipient    username of the identity
+     * @param recipient    account of the identity
      * @param safetyNumber Safety number
      */
     @Override
@@ -1293,13 +1392,17 @@ public class ManagerImpl implements Manager {
         } catch (IOException e) {
             return false;
         }
-        return identityHelper.trustIdentityVerifiedSafetyNumber(recipientId, safetyNumber);
+        final var updated = identityHelper.trustIdentityVerifiedSafetyNumber(recipientId, safetyNumber);
+        if (updated && this.isReceiving()) {
+            needsToRetryFailedMessages = true;
+        }
+        return updated;
     }
 
     /**
      * Trust this the identity with this scannable safety number
      *
-     * @param recipient    username of the identity
+     * @param recipient    account of the identity
      * @param safetyNumber Scannable safety number
      */
     @Override
@@ -1310,13 +1413,17 @@ public class ManagerImpl implements Manager {
         } catch (IOException e) {
             return false;
         }
-        return identityHelper.trustIdentityVerifiedSafetyNumber(recipientId, safetyNumber);
+        final var updated = identityHelper.trustIdentityVerifiedSafetyNumber(recipientId, safetyNumber);
+        if (updated && this.isReceiving()) {
+            needsToRetryFailedMessages = true;
+        }
+        return updated;
     }
 
     /**
      * Trust all keys of this identity without verification
      *
-     * @param recipient username of the identity
+     * @param recipient account of the identity
      */
     @Override
     public boolean trustIdentityAllKeys(RecipientIdentifier.Single recipient) {
@@ -1326,7 +1433,11 @@ public class ManagerImpl implements Manager {
         } catch (IOException e) {
             return false;
         }
-        return identityHelper.trustIdentityAllKeys(recipientId);
+        final var updated = identityHelper.trustIdentityAllKeys(recipientId);
+        if (updated && this.isReceiving()) {
+            needsToRetryFailedMessages = true;
+        }
+        return updated;
     }
 
     @Override
@@ -1345,13 +1456,13 @@ public class ManagerImpl implements Manager {
 
     private SignalServiceAddress resolveSignalServiceAddress(RecipientId recipientId) {
         final var address = account.getRecipientStore().resolveRecipientAddress(recipientId);
-        if (address.getUuid().isPresent()) {
+        if (address.uuid().isPresent()) {
             return address.toSignalServiceAddress();
         }
 
         // Address in recipient store doesn't have a uuid, this shouldn't happen
         // Try to retrieve the uuid from the server
-        final var number = address.getNumber().get();
+        final var number = address.number().get();
         final ACI aci;
         try {
             aci = getRegisteredUser(number);