X-Git-Url: https://git.nmode.ca/signal-cli/blobdiff_plain/2b0989adfe84340df9001c502046581bad1e2208..d248f249e37f7b35a3b7dd69f2a06af8eddd3996:/lib/src/main/java/org/asamk/signal/manager/ManagerImpl.java diff --git a/lib/src/main/java/org/asamk/signal/manager/ManagerImpl.java b/lib/src/main/java/org/asamk/signal/manager/ManagerImpl.java index 94a3f4be..df757a8b 100644 --- a/lib/src/main/java/org/asamk/signal/manager/ManagerImpl.java +++ b/lib/src/main/java/org/asamk/signal/manager/ManagerImpl.java @@ -77,9 +77,11 @@ import org.whispersystems.signalservice.api.messages.SignalServiceReceiptMessage import org.whispersystems.signalservice.api.messages.SignalServiceTypingMessage; import org.whispersystems.signalservice.api.push.ACI; import org.whispersystems.signalservice.api.push.SignalServiceAddress; +import org.whispersystems.signalservice.api.push.exceptions.AuthorizationFailedException; import org.whispersystems.signalservice.api.util.DeviceNameUtil; import org.whispersystems.signalservice.api.util.InvalidNumberException; import org.whispersystems.signalservice.api.util.PhoneNumberFormatter; +import org.whispersystems.signalservice.api.websocket.WebSocketConnectionState; import org.whispersystems.signalservice.api.websocket.WebSocketUnavailableException; import org.whispersystems.signalservice.internal.contacts.crypto.Quote; import org.whispersystems.signalservice.internal.contacts.crypto.UnauthenticatedQuoteException; @@ -95,6 +97,7 @@ import java.net.URISyntaxException; import java.net.URLEncoder; import java.nio.charset.StandardCharsets; import java.security.SignatureException; +import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.HashSet; @@ -108,6 +111,10 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Collectors; +import java.util.stream.Stream; + +import io.reactivex.rxjava3.core.Observable; +import io.reactivex.rxjava3.schedulers.Schedulers; import static org.asamk.signal.manager.config.ServiceConfig.capabilities; @@ -139,7 +146,9 @@ public class ManagerImpl implements Manager { private boolean ignoreAttachments = false; private Thread receiveThread; + private final Set weakHandlers = new HashSet<>(); private final Set messageHandlers = new HashSet<>(); + private final List closedListeners = new ArrayList<>(); private boolean isReceivingSynchronous; ManagerImpl( @@ -152,7 +161,7 @@ public class ManagerImpl implements Manager { this.serviceEnvironmentConfig = serviceEnvironmentConfig; final var credentialsProvider = new DynamicCredentialsProvider(account.getAci(), - account.getUsername(), + account.getAccount(), account.getPassword(), account.getDeviceId()); final var sessionLock = new SignalSessionLock() { @@ -247,7 +256,7 @@ public class ManagerImpl implements Manager { @Override public String getSelfNumber() { - return account.getUsername(); + return account.getAccount(); } @Override @@ -263,11 +272,16 @@ public class ManagerImpl implements Manager { days); } } - preKeyHelper.refreshPreKeysIfNecessary(); - if (account.getAci() == null) { - account.setAci(dependencies.getAccountManager().getOwnAci()); + try { + preKeyHelper.refreshPreKeysIfNecessary(); + if (account.getAci() == null) { + account.setAci(ACI.parseOrNull(dependencies.getAccountManager().getWhoAmI().getAci())); + } + updateAccountAttributes(null); + } catch (AuthorizationFailedException e) { + account.setRegistered(false); + throw e; } - updateAccountAttributes(null); } /** @@ -275,13 +289,17 @@ public class ManagerImpl implements Manager { * * @param numbers The set of phone number in question * @return A map of numbers to canonicalized number and uuid. If a number is not registered the uuid is null. - * @throws IOException if its unable to get the contacts to check if they're registered + * @throws IOException if it's unable to get the contacts to check if they're registered */ @Override public Map> areUsersRegistered(Set numbers) throws IOException { Map canonicalizedNumbers = numbers.stream().collect(Collectors.toMap(n -> n, n -> { try { - return PhoneNumberFormatter.formatNumber(n, account.getUsername()); + final var canonicalizedNumber = PhoneNumberFormatter.formatNumber(n, account.getAccount()); + if (!canonicalizedNumber.equals(n)) { + logger.debug("Normalized number {} to {}.", n, canonicalizedNumber); + } + return canonicalizedNumber; } catch (InvalidNumberException e) { return ""; } @@ -383,6 +401,7 @@ public class ManagerImpl implements Manager { dependencies.getAccountManager().setGcmId(Optional.absent()); account.setRegistered(false); + close(); } @Override @@ -397,10 +416,13 @@ public class ManagerImpl implements Manager { dependencies.getAccountManager().deleteAccount(); account.setRegistered(false); + close(); } @Override public void submitRateLimitRecaptchaChallenge(String challenge, String captcha) throws IOException { + captcha = captcha == null ? null : captcha.replace("signalcaptcha://", ""); + dependencies.getAccountManager().submitRateLimitRecaptchaChallenge(challenge, captcha); } @@ -423,7 +445,7 @@ public class ManagerImpl implements Manager { d.getCreated(), d.getLastSeen(), d.getId() == account.getDeviceId()); - }).collect(Collectors.toList()); + }).toList(); } @Override @@ -495,7 +517,7 @@ public class ManagerImpl implements Manager { @Override public List getGroups() { - return account.getGroupStore().getGroups().stream().map(this::toGroup).collect(Collectors.toList()); + return account.getGroupStore().getGroups().stream().map(this::toGroup).toList(); } private Group toGroup(final GroupInfo groupInfo) { @@ -606,56 +628,80 @@ public class ManagerImpl implements Manager { .map(sendMessageResult -> SendMessageResult.from(sendMessageResult, account.getRecipientStore(), account.getRecipientStore()::resolveRecipientAddress)) - .collect(Collectors.toList())); + .toList()); } } return new SendMessageResults(timestamp, results); } - private void sendTypingMessage( + private SendMessageResults sendTypingMessage( SignalServiceTypingMessage.Action action, Set recipients - ) throws IOException, UntrustedIdentityException, NotAGroupMemberException, GroupNotFoundException, GroupSendingNotAllowedException { + ) throws IOException, NotAGroupMemberException, GroupNotFoundException, GroupSendingNotAllowedException { + var results = new HashMap>(); final var timestamp = System.currentTimeMillis(); for (var recipient : recipients) { if (recipient instanceof RecipientIdentifier.Single) { final var message = new SignalServiceTypingMessage(action, timestamp, Optional.absent()); final var recipientId = resolveRecipient((RecipientIdentifier.Single) recipient); - sendHelper.sendTypingMessage(message, recipientId); + final var result = sendHelper.sendTypingMessage(message, recipientId); + results.put(recipient, + List.of(SendMessageResult.from(result, + account.getRecipientStore(), + account.getRecipientStore()::resolveRecipientAddress))); } else if (recipient instanceof RecipientIdentifier.Group) { final var groupId = ((RecipientIdentifier.Group) recipient).groupId(); final var message = new SignalServiceTypingMessage(action, timestamp, Optional.of(groupId.serialize())); - sendHelper.sendGroupTypingMessage(message, groupId); + final var result = sendHelper.sendGroupTypingMessage(message, groupId); + results.put(recipient, + result.stream() + .map(r -> SendMessageResult.from(r, + account.getRecipientStore(), + account.getRecipientStore()::resolveRecipientAddress)) + .toList()); } } + return new SendMessageResults(timestamp, results); } @Override - public void sendTypingMessage( + public SendMessageResults sendTypingMessage( TypingAction action, Set recipients - ) throws IOException, UntrustedIdentityException, NotAGroupMemberException, GroupNotFoundException, GroupSendingNotAllowedException { - sendTypingMessage(action.toSignalService(), recipients); + ) throws IOException, NotAGroupMemberException, GroupNotFoundException, GroupSendingNotAllowedException { + return sendTypingMessage(action.toSignalService(), recipients); } @Override - public void sendReadReceipt( + public SendMessageResults sendReadReceipt( RecipientIdentifier.Single sender, List messageIds - ) throws IOException, UntrustedIdentityException { + ) throws IOException { + final var timestamp = System.currentTimeMillis(); var receiptMessage = new SignalServiceReceiptMessage(SignalServiceReceiptMessage.Type.READ, messageIds, - System.currentTimeMillis()); + timestamp); - sendHelper.sendReceiptMessage(receiptMessage, resolveRecipient(sender)); + final var result = sendHelper.sendReceiptMessage(receiptMessage, resolveRecipient(sender)); + return new SendMessageResults(timestamp, + Map.of(sender, + List.of(SendMessageResult.from(result, + account.getRecipientStore(), + account.getRecipientStore()::resolveRecipientAddress)))); } @Override - public void sendViewedReceipt( + public SendMessageResults sendViewedReceipt( RecipientIdentifier.Single sender, List messageIds - ) throws IOException, UntrustedIdentityException { + ) throws IOException { + final var timestamp = System.currentTimeMillis(); var receiptMessage = new SignalServiceReceiptMessage(SignalServiceReceiptMessage.Type.VIEWED, messageIds, - System.currentTimeMillis()); + timestamp); - sendHelper.sendReceiptMessage(receiptMessage, resolveRecipient(sender)); + final var result = sendHelper.sendReceiptMessage(receiptMessage, resolveRecipient(sender)); + return new SendMessageResults(timestamp, + Map.of(sender, + List.of(SendMessageResult.from(result, + account.getRecipientStore(), + account.getRecipientStore()::resolveRecipientAddress)))); } @Override @@ -675,6 +721,28 @@ public class ManagerImpl implements Manager { if (attachments != null) { messageBuilder.withAttachments(attachmentHelper.uploadAttachments(attachments)); } + if (message.mentions().size() > 0) { + messageBuilder.withMentions(resolveMentions(message.mentions())); + } + if (message.quote().isPresent()) { + final var quote = message.quote().get(); + messageBuilder.withQuote(new SignalServiceDataMessage.Quote(quote.timestamp(), + resolveSignalServiceAddress(resolveRecipient(quote.author())), + quote.message(), + List.of(), + resolveMentions(quote.mentions()))); + } + } + + private ArrayList resolveMentions(final List mentionList) throws IOException { + final var mentions = new ArrayList(); + for (final var m : mentionList) { + final var recipientId = resolveRecipient(m.recipient()); + mentions.add(new SignalServiceDataMessage.Mention(resolveSignalServiceAddress(recipientId).getAci(), + m.start(), + m.length())); + } + return mentions; } @Override @@ -720,6 +788,16 @@ public class ManagerImpl implements Manager { } } + @Override + public void deleteRecipient(final RecipientIdentifier.Single recipient) throws IOException { + account.removeRecipient(resolveRecipient(recipient)); + } + + @Override + public void deleteContact(final RecipientIdentifier.Single recipient) throws IOException { + account.getContactStore().deleteContact(resolveRecipient(recipient)); + } + @Override public void setContactName( RecipientIdentifier.Single recipient, String name @@ -904,14 +982,17 @@ public class ManagerImpl implements Manager { } @Override - public void addReceiveHandler(final ReceiveMessageHandler handler) { + public void addReceiveHandler(final ReceiveMessageHandler handler, final boolean isWeakListener) { if (isReceivingSynchronous) { throw new IllegalStateException("Already receiving message synchronously."); } synchronized (messageHandlers) { - messageHandlers.add(handler); - - startReceiveThreadIfRequired(); + if (isWeakListener) { + weakHandlers.add(handler); + } else { + messageHandlers.add(handler); + startReceiveThreadIfRequired(); + } } } @@ -925,13 +1006,13 @@ public class ManagerImpl implements Manager { try { receiveMessagesInternal(1L, TimeUnit.HOURS, false, (envelope, e) -> { synchronized (messageHandlers) { - for (ReceiveMessageHandler h : messageHandlers) { + Stream.concat(messageHandlers.stream(), weakHandlers.stream()).forEach(h -> { try { h.handleMessage(envelope, e); } catch (Exception ex) { logger.warn("Message handler failed, ignoring", ex); } - } + }); } }); break; @@ -959,8 +1040,9 @@ public class ManagerImpl implements Manager { public void removeReceiveHandler(final ReceiveMessageHandler handler) { final Thread thread; synchronized (messageHandlers) { + weakHandlers.remove(handler); messageHandlers.remove(handler); - if (!messageHandlers.isEmpty() || isReceivingSynchronous) { + if (!messageHandlers.isEmpty() || receiveThread == null || isReceivingSynchronous) { return; } thread = receiveThread; @@ -1020,9 +1102,16 @@ public class ManagerImpl implements Manager { ) throws IOException { retryFailedReceivedMessages(handler); - Set queuedActions = new HashSet<>(); + // Use a Map here because java Set doesn't have a get method ... + Map queuedActions = new HashMap<>(); final var signalWebSocket = dependencies.getSignalWebSocket(); + final var webSocketStateDisposable = Observable.merge(signalWebSocket.getUnidentifiedWebSocketState(), + signalWebSocket.getWebSocketState()) + .subscribeOn(Schedulers.computation()) + .observeOn(Schedulers.computation()) + .distinctUntilChanged() + .subscribe(this::onWebSocketStateChange); signalWebSocket.connect(); hasCaughtUpWithOldMessages = false; @@ -1032,7 +1121,10 @@ public class ManagerImpl implements Manager { while (!Thread.interrupted()) { SignalServiceEnvelope envelope; final CachedMessage[] cachedMessage = {null}; - account.setLastReceiveTimestamp(System.currentTimeMillis()); + final var nowMillis = System.currentTimeMillis(); + if (nowMillis - account.getLastReceiveTimestamp() > 60000) { + account.setLastReceiveTimestamp(nowMillis); + } logger.debug("Checking for new message from server"); try { var result = signalWebSocket.readOrEmpty(unit.toMillis(timeout), envelope1 -> { @@ -1049,7 +1141,7 @@ public class ManagerImpl implements Manager { logger.debug("New message received from server"); } else { logger.debug("Received indicator that server queue is empty"); - handleQueuedActions(queuedActions); + handleQueuedActions(queuedActions.keySet()); queuedActions.clear(); hasCaughtUpWithOldMessages = true; @@ -1090,11 +1182,18 @@ public class ManagerImpl implements Manager { } final var result = incomingMessageHandler.handleEnvelope(envelope, ignoreAttachments, handler); - queuedActions.addAll(result.first()); + for (final var h : result.first()) { + final var existingAction = queuedActions.get(h); + if (existingAction == null) { + queuedActions.put(h, h); + } else { + existingAction.mergeOther(h); + } + } final var exception = result.second(); if (hasCaughtUpWithOldMessages) { - handleQueuedActions(queuedActions); + handleQueuedActions(queuedActions.keySet()); queuedActions.clear(); } if (cachedMessage[0] != null) { @@ -1115,9 +1214,21 @@ public class ManagerImpl implements Manager { } } } - handleQueuedActions(queuedActions); + handleQueuedActions(queuedActions.keySet()); queuedActions.clear(); dependencies.getSignalWebSocket().disconnect(); + webSocketStateDisposable.dispose(); + } + + private void onWebSocketStateChange(final WebSocketConnectionState s) { + if (s.equals(WebSocketConnectionState.AUTHENTICATION_FAILED)) { + account.setRegistered(false); + try { + close(); + } catch (IOException e) { + e.printStackTrace(); + } + } } @Override @@ -1172,7 +1283,7 @@ public class ManagerImpl implements Manager { .getContacts() .stream() .map(p -> new Pair<>(account.getRecipientStore().resolveRecipientAddress(p.first()), p.second())) - .collect(Collectors.toList()); + .toList(); } @Override @@ -1208,11 +1319,7 @@ public class ManagerImpl implements Manager { @Override public List getIdentities() { - return account.getIdentityKeyStore() - .getIdentities() - .stream() - .map(this::toIdentity) - .collect(Collectors.toList()); + return account.getIdentityKeyStore().getIdentities().stream().map(this::toIdentity).toList(); } private Identity toIdentity(final IdentityInfo identityInfo) { @@ -1245,7 +1352,7 @@ public class ManagerImpl implements Manager { /** * Trust this the identity with this fingerprint * - * @param recipient username of the identity + * @param recipient account of the identity * @param fingerprint Fingerprint */ @Override @@ -1262,7 +1369,7 @@ public class ManagerImpl implements Manager { /** * Trust this the identity with this safety number * - * @param recipient username of the identity + * @param recipient account of the identity * @param safetyNumber Safety number */ @Override @@ -1279,7 +1386,7 @@ public class ManagerImpl implements Manager { /** * Trust this the identity with this scannable safety number * - * @param recipient username of the identity + * @param recipient account of the identity * @param safetyNumber Scannable safety number */ @Override @@ -1296,7 +1403,7 @@ public class ManagerImpl implements Manager { /** * Trust all keys of this identity without verification * - * @param recipient username of the identity + * @param recipient account of the identity */ @Override public boolean trustIdentityAllKeys(RecipientIdentifier.Single recipient) { @@ -1309,6 +1416,13 @@ public class ManagerImpl implements Manager { return identityHelper.trustIdentityAllKeys(recipientId); } + @Override + public void addClosedListener(final Runnable listener) { + synchronized (closedListeners) { + closedListeners.add(listener); + } + } + private void handleIdentityFailure( final RecipientId recipientId, final org.whispersystems.signalservice.api.messages.SendMessageResult.IdentityFailure identityFailure @@ -1318,13 +1432,13 @@ public class ManagerImpl implements Manager { private SignalServiceAddress resolveSignalServiceAddress(RecipientId recipientId) { final var address = account.getRecipientStore().resolveRecipientAddress(recipientId); - if (address.getUuid().isPresent()) { + if (address.uuid().isPresent()) { return address.toSignalServiceAddress(); } // Address in recipient store doesn't have a uuid, this shouldn't happen // Try to retrieve the uuid from the server - final var number = address.getNumber().get(); + final var number = address.number().get(); final ACI aci; try { aci = getRegisteredUser(number); @@ -1374,12 +1488,9 @@ public class ManagerImpl implements Manager { @Override public void close() throws IOException { - close(true); - } - - private void close(boolean closeAccount) throws IOException { Thread thread; synchronized (messageHandlers) { + weakHandlers.clear(); messageHandlers.clear(); thread = receiveThread; receiveThread = null; @@ -1391,7 +1502,12 @@ public class ManagerImpl implements Manager { dependencies.getSignalWebSocket().disconnect(); - if (closeAccount && account != null) { + synchronized (closedListeners) { + closedListeners.forEach(Runnable::run); + closedListeners.clear(); + } + + if (account != null) { account.close(); } account = null;