From c8cc428e3fad371d30cb68dfebfc33477a9d287d Mon Sep 17 00:00:00 2001 From: AsamK Date: Sat, 15 Jan 2022 18:18:40 +0100 Subject: [PATCH] Improve performance when fetching multiple profiles --- .../signal/manager/helper/ProfileHelper.java | 54 ++++++++++++------- .../storage/recipients/RecipientStore.java | 17 ++++++ 2 files changed, 51 insertions(+), 20 deletions(-) diff --git a/lib/src/main/java/org/asamk/signal/manager/helper/ProfileHelper.java b/lib/src/main/java/org/asamk/signal/manager/helper/ProfileHelper.java index fc422a6b..de0c7b60 100644 --- a/lib/src/main/java/org/asamk/signal/manager/helper/ProfileHelper.java +++ b/lib/src/main/java/org/asamk/signal/manager/helper/ProfileHelper.java @@ -33,6 +33,7 @@ import java.util.List; import java.util.Objects; import java.util.Set; +import io.reactivex.rxjava3.core.Flowable; import io.reactivex.rxjava3.core.Maybe; import io.reactivex.rxjava3.core.Single; @@ -59,16 +60,16 @@ public final class ProfileHelper { } public List getRecipientProfileKeyCredential(List recipientIds) { - final var profileFetches = recipientIds.stream().map(recipientId -> { - var profileKeyCredential = account.getProfileStore().getProfileKeyCredential(recipientId); - if (profileKeyCredential != null) { - return null; - } - - return retrieveProfile(recipientId, - SignalServiceProfile.RequestType.PROFILE_AND_CREDENTIAL).onErrorComplete(); - }).filter(Objects::nonNull).toList(); - Maybe.merge(profileFetches).blockingSubscribe(); + try { + account.getRecipientStore().setBulkUpdating(true); + final var profileFetches = Flowable.fromIterable(recipientIds) + .filter(recipientId -> account.getProfileStore().getProfileKeyCredential(recipientId) == null) + .map(recipientId -> retrieveProfile(recipientId, + SignalServiceProfile.RequestType.PROFILE_AND_CREDENTIAL).onErrorComplete()); + Maybe.merge(profileFetches, 10).blockingSubscribe(); + } finally { + account.getRecipientStore().setBulkUpdating(false); + } return recipientIds.stream().map(r -> account.getProfileStore().getProfileKeyCredential(r)).toList(); } @@ -158,15 +159,16 @@ public final class ProfileHelper { } public List getRecipientProfile(List recipientIds) { - final var profileFetches = recipientIds.stream().map(recipientId -> { - var profile = account.getProfileStore().getProfile(recipientId); - if (!isProfileRefreshRequired(profile)) { - return null; - } - - return retrieveProfile(recipientId, SignalServiceProfile.RequestType.PROFILE).onErrorComplete(); - }).filter(Objects::nonNull).toList(); - Maybe.merge(profileFetches).blockingSubscribe(); + try { + account.getRecipientStore().setBulkUpdating(true); + final var profileFetches = Flowable.fromIterable(recipientIds) + .filter(recipientId -> isProfileRefreshRequired(account.getProfileStore().getProfile(recipientId))) + .map(recipientId -> retrieveProfile(recipientId, + SignalServiceProfile.RequestType.PROFILE).onErrorComplete()); + Maybe.merge(profileFetches, 10).blockingSubscribe(); + } finally { + account.getRecipientStore().setBulkUpdating(false); + } return recipientIds.stream().map(r -> account.getProfileStore().getProfile(r)).toList(); } @@ -215,6 +217,7 @@ public final class ProfileHelper { ) { var profile = account.getProfileStore().getProfile(recipientId); if (profile == null || !Objects.equals(avatarPath, profile.getAvatarUrlPath())) { + logger.trace("Downloading profile avatar for {}", recipientId); downloadProfileAvatar(context.getRecipientHelper().resolveSignalServiceAddress(recipientId), avatarPath, profileKey); @@ -243,11 +246,17 @@ public final class ProfileHelper { var unidentifiedAccess = getUnidentifiedAccess(recipientId); var profileKey = Optional.fromNullable(account.getProfileStore().getProfileKey(recipientId)); + logger.trace("Retrieving profile for {} {}", + recipientId, + profileKey.isPresent() ? "with profile key" : "without profile key"); final var address = context.getRecipientHelper().resolveSignalServiceAddress(recipientId); return retrieveProfile(address, profileKey, unidentifiedAccess, requestType).doOnSuccess(p -> { + logger.trace("Got new profile for {}", recipientId); final var encryptedProfile = p.getProfile(); - if (requestType == SignalServiceProfile.RequestType.PROFILE_AND_CREDENTIAL) { + if (requestType == SignalServiceProfile.RequestType.PROFILE_AND_CREDENTIAL + || account.getProfileStore().getProfileKeyCredential(recipientId) == null) { + logger.trace("Storing profile credential"); final var profileKeyCredential = p.getProfileKeyCredential().orNull(); account.getProfileStore().storeProfileKeyCredential(recipientId, profileKeyCredential); } @@ -256,6 +265,7 @@ public final class ProfileHelper { Profile newProfile = null; if (profileKey.isPresent()) { + logger.trace("Decrypting profile"); newProfile = decryptProfileAndDownloadAvatar(recipientId, profileKey.get(), encryptedProfile); } @@ -268,15 +278,18 @@ public final class ProfileHelper { .build(); } + logger.trace("Storing profile"); account.getProfileStore().storeProfile(recipientId, newProfile); try { + logger.trace("Storing identity"); var newIdentity = account.getIdentityKeyStore() .saveIdentity(recipientId, new IdentityKey(Base64.getDecoder().decode(encryptedProfile.getIdentityKey())), new Date()); if (newIdentity) { + logger.trace("Archiving old sessions"); account.getSessionStore().archiveSessions(recipientId); account.getSenderKeyStore().deleteSharedWith(recipientId); } @@ -284,6 +297,7 @@ public final class ProfileHelper { logger.warn("Got invalid identity key in profile for {}", context.getRecipientHelper().resolveSignalServiceAddress(recipientId).getIdentifier()); } + logger.trace("Done handling retrieved profile"); }).doOnError(e -> { logger.warn("Failed to retrieve profile, ignoring: {}", e.getMessage()); final var profile = account.getProfileStore().getProfile(recipientId); diff --git a/lib/src/main/java/org/asamk/signal/manager/storage/recipients/RecipientStore.java b/lib/src/main/java/org/asamk/signal/manager/storage/recipients/RecipientStore.java index 53bfde92..6dd327e8 100644 --- a/lib/src/main/java/org/asamk/signal/manager/storage/recipients/RecipientStore.java +++ b/lib/src/main/java/org/asamk/signal/manager/storage/recipients/RecipientStore.java @@ -47,6 +47,7 @@ public class RecipientStore implements RecipientResolver, ContactsStore, Profile private final Map recipientsMerged = new HashMap<>(); private long lastId; + private boolean isBulkUpdating; public static RecipientStore load(File file, RecipientMergeHandler recipientMergeHandler) throws IOException { final var objectMapper = Utils.createStorageObjectMapper(); @@ -130,6 +131,19 @@ public class RecipientStore implements RecipientResolver, ContactsStore, Profile this.lastId = lastId; } + public boolean isBulkUpdating() { + return isBulkUpdating; + } + + public void setBulkUpdating(final boolean bulkUpdating) { + isBulkUpdating = bulkUpdating; + if (!bulkUpdating) { + synchronized (recipients) { + saveLocked(); + } + } + } + public RecipientAddress resolveRecipientAddress(RecipientId recipientId) { synchronized (recipients) { return getRecipient(recipientId).getAddress(); @@ -483,6 +497,9 @@ public class RecipientStore implements RecipientResolver, ContactsStore, Profile } private void saveLocked() { + if (isBulkUpdating) { + return; + } final var base64 = Base64.getEncoder(); var storage = new Storage(recipients.entrySet().stream().map(pair -> { final var recipient = pair.getValue(); -- 2.50.1