]> nmode's Git Repositories - signal-cli/commitdiff
Implement remote storage sync
authorAsamK <asamk@gmx.de>
Mon, 27 Jun 2022 13:39:22 +0000 (15:39 +0200)
committerAsamK <asamk@gmx.de>
Sun, 28 Jan 2024 21:38:41 +0000 (22:38 +0100)
Closes #604

41 files changed:
lib/src/main/java/org/asamk/signal/manager/actions/RetrieveStorageDataAction.java [deleted file]
lib/src/main/java/org/asamk/signal/manager/actions/SyncStorageDataAction.java [new file with mode: 0644]
lib/src/main/java/org/asamk/signal/manager/api/TrustLevel.java
lib/src/main/java/org/asamk/signal/manager/config/ServiceConfig.java
lib/src/main/java/org/asamk/signal/manager/helper/AccountHelper.java
lib/src/main/java/org/asamk/signal/manager/helper/Context.java
lib/src/main/java/org/asamk/signal/manager/helper/GroupHelper.java
lib/src/main/java/org/asamk/signal/manager/helper/IncomingMessageHandler.java
lib/src/main/java/org/asamk/signal/manager/helper/ProfileHelper.java
lib/src/main/java/org/asamk/signal/manager/helper/StorageHelper.java
lib/src/main/java/org/asamk/signal/manager/helper/SyncHelper.java
lib/src/main/java/org/asamk/signal/manager/internal/JobExecutor.java
lib/src/main/java/org/asamk/signal/manager/internal/ManagerImpl.java
lib/src/main/java/org/asamk/signal/manager/internal/RegistrationManagerImpl.java
lib/src/main/java/org/asamk/signal/manager/jobs/DownloadProfileJob.java [new file with mode: 0644]
lib/src/main/java/org/asamk/signal/manager/jobs/SyncStorageJob.java [new file with mode: 0644]
lib/src/main/java/org/asamk/signal/manager/storage/AccountDatabase.java
lib/src/main/java/org/asamk/signal/manager/storage/SignalAccount.java
lib/src/main/java/org/asamk/signal/manager/storage/UnknownStorageIdStore.java [new file with mode: 0644]
lib/src/main/java/org/asamk/signal/manager/storage/configuration/ConfigurationStore.java
lib/src/main/java/org/asamk/signal/manager/storage/groups/GroupInfoV1.java
lib/src/main/java/org/asamk/signal/manager/storage/groups/GroupInfoV2.java
lib/src/main/java/org/asamk/signal/manager/storage/groups/GroupStore.java
lib/src/main/java/org/asamk/signal/manager/storage/groups/LegacyGroupStore.java
lib/src/main/java/org/asamk/signal/manager/storage/identities/IdentityKeyStore.java
lib/src/main/java/org/asamk/signal/manager/storage/keyValue/KeyValueStore.java
lib/src/main/java/org/asamk/signal/manager/storage/recipients/LegacyRecipientStore2.java
lib/src/main/java/org/asamk/signal/manager/storage/recipients/Recipient.java
lib/src/main/java/org/asamk/signal/manager/storage/recipients/RecipientAddress.java
lib/src/main/java/org/asamk/signal/manager/storage/recipients/RecipientStore.java
lib/src/main/java/org/asamk/signal/manager/syncStorage/AccountRecordProcessor.java [new file with mode: 0644]
lib/src/main/java/org/asamk/signal/manager/syncStorage/ContactRecordProcessor.java [new file with mode: 0644]
lib/src/main/java/org/asamk/signal/manager/syncStorage/DefaultStorageRecordProcessor.java [new file with mode: 0644]
lib/src/main/java/org/asamk/signal/manager/syncStorage/GroupV1RecordProcessor.java [new file with mode: 0644]
lib/src/main/java/org/asamk/signal/manager/syncStorage/GroupV2RecordProcessor.java [new file with mode: 0644]
lib/src/main/java/org/asamk/signal/manager/syncStorage/StorageRecordProcessor.java [new file with mode: 0644]
lib/src/main/java/org/asamk/signal/manager/syncStorage/StorageRecordUpdate.java [new file with mode: 0644]
lib/src/main/java/org/asamk/signal/manager/syncStorage/StorageSyncModels.java [new file with mode: 0644]
lib/src/main/java/org/asamk/signal/manager/syncStorage/StorageSyncValidations.java [new file with mode: 0644]
lib/src/main/java/org/asamk/signal/manager/syncStorage/WriteOperationResult.java [new file with mode: 0644]
lib/src/main/java/org/asamk/signal/manager/util/KeyUtils.java

diff --git a/lib/src/main/java/org/asamk/signal/manager/actions/RetrieveStorageDataAction.java b/lib/src/main/java/org/asamk/signal/manager/actions/RetrieveStorageDataAction.java
deleted file mode 100644 (file)
index 8b29600..0000000
+++ /dev/null
@@ -1,20 +0,0 @@
-package org.asamk.signal.manager.actions;
-
-import org.asamk.signal.manager.helper.Context;
-
-public class RetrieveStorageDataAction implements HandleAction {
-
-    private static final RetrieveStorageDataAction INSTANCE = new RetrieveStorageDataAction();
-
-    private RetrieveStorageDataAction() {
-    }
-
-    public static RetrieveStorageDataAction create() {
-        return INSTANCE;
-    }
-
-    @Override
-    public void execute(Context context) throws Throwable {
-        context.getStorageHelper().readDataFromStorage();
-    }
-}
diff --git a/lib/src/main/java/org/asamk/signal/manager/actions/SyncStorageDataAction.java b/lib/src/main/java/org/asamk/signal/manager/actions/SyncStorageDataAction.java
new file mode 100644 (file)
index 0000000..7101b3d
--- /dev/null
@@ -0,0 +1,21 @@
+package org.asamk.signal.manager.actions;
+
+import org.asamk.signal.manager.helper.Context;
+import org.asamk.signal.manager.jobs.SyncStorageJob;
+
+public class SyncStorageDataAction implements HandleAction {
+
+    private static final SyncStorageDataAction INSTANCE = new SyncStorageDataAction();
+
+    private SyncStorageDataAction() {
+    }
+
+    public static SyncStorageDataAction create() {
+        return INSTANCE;
+    }
+
+    @Override
+    public void execute(Context context) throws Throwable {
+        context.getJobExecutor().enqueueJob(new SyncStorageJob());
+    }
+}
index cbfa0bd52447b7e9fb3cf249797a84156c3d0581..ac93e34a6cae825e8eda58325fe9c0f42fa8ee57 100644 (file)
@@ -1,7 +1,6 @@
 package org.asamk.signal.manager.api;
 
 import org.whispersystems.signalservice.api.messages.multidevice.VerifiedMessage;
-import org.whispersystems.signalservice.internal.storage.protos.ContactRecord;
 
 public enum TrustLevel {
     UNTRUSTED,
@@ -17,14 +16,6 @@ public enum TrustLevel {
         return TrustLevel.cachedValues[i];
     }
 
-    public static TrustLevel fromIdentityState(ContactRecord.IdentityState identityState) {
-        return switch (identityState) {
-            case DEFAULT -> TRUSTED_UNVERIFIED;
-            case UNVERIFIED -> UNTRUSTED;
-            case VERIFIED -> TRUSTED_VERIFIED;
-        };
-    }
-
     public static TrustLevel fromVerifiedState(VerifiedMessage.VerifiedState verifiedState) {
         return switch (verifiedState) {
             case DEFAULT -> TRUSTED_UNVERIFIED;
index cd103f801d088ed9958a411a362651d9890b91a0..d46b6988db432267c5e81b8e6ef9eaa3e48341fa 100644 (file)
@@ -29,7 +29,7 @@ public class ServiceConfig {
         final var giftBadges = !isPrimaryDevice;
         final var pni = !isPrimaryDevice;
         final var paymentActivation = !isPrimaryDevice;
-        return new AccountAttributes.Capabilities(false, true, true, true, true, giftBadges, pni, paymentActivation);
+        return new AccountAttributes.Capabilities(true, true, true, true, true, giftBadges, pni, paymentActivation);
     }
 
     public static ServiceEnvironmentConfig getServiceEnvironmentConfig(
index c83dde25323922037f90050791696a7ffe651791..2563494dfa102ee70f22c341e49f9c63d41b0dd1 100644 (file)
@@ -8,6 +8,7 @@ import org.asamk.signal.manager.api.NonNormalizedPhoneNumberException;
 import org.asamk.signal.manager.api.PinLockedException;
 import org.asamk.signal.manager.api.RateLimitException;
 import org.asamk.signal.manager.internal.SignalDependencies;
+import org.asamk.signal.manager.jobs.SyncStorageJob;
 import org.asamk.signal.manager.storage.SignalAccount;
 import org.asamk.signal.manager.util.KeyUtils;
 import org.asamk.signal.manager.util.NumberVerificationUtils;
@@ -137,11 +138,11 @@ public class AccountHelper {
             account.setPniIdentityKeyPair(KeyUtils.generateIdentityKeyPair());
         }
         account.getRecipientTrustedResolver().resolveSelfRecipientTrusted(account.getSelfRecipientAddress());
-        // TODO check and update remote storage
         context.getUnidentifiedAccessHelper().rotateSenderCertificates();
         dependencies.resetAfterAddressChange();
         context.getGroupV2Helper().clearAuthCredentialCache();
         context.getAccountFileUpdater().updateAccountIdentifiers(account.getNumber(), account.getAci());
+        context.getJobExecutor().enqueueJob(new SyncStorageJob());
     }
 
     public void setPni(
@@ -450,6 +451,7 @@ public class AccountHelper {
             throw new InvalidDeviceLinkException("Invalid device link", e);
         }
         account.setMultiDevice(true);
+        context.getJobExecutor().enqueueJob(new SyncStorageJob());
     }
 
     public void removeLinkedDevices(int deviceId) throws IOException {
index ba2e5fb016dbca0cf7bd5febdbcaa264dcdd8df4..848a57c161390acd93d37c1f086eff0934883ad8 100644 (file)
@@ -80,7 +80,7 @@ public class Context implements AutoCloseable {
         return attachmentStore;
     }
 
-    JobExecutor getJobExecutor() {
+    public JobExecutor getJobExecutor() {
         return jobExecutor;
     }
 
index 40558a8928e793c3b9f505f6487a6c7991a58e53..dec9185a815f29b289a6dbf0924cf37126acafab 100644 (file)
@@ -19,6 +19,7 @@ import org.asamk.signal.manager.api.SendMessageResult;
 import org.asamk.signal.manager.config.ServiceConfig;
 import org.asamk.signal.manager.groups.GroupUtils;
 import org.asamk.signal.manager.internal.SignalDependencies;
+import org.asamk.signal.manager.jobs.SyncStorageJob;
 import org.asamk.signal.manager.storage.SignalAccount;
 import org.asamk.signal.manager.storage.groups.GroupInfo;
 import org.asamk.signal.manager.storage.groups.GroupInfoV1;
@@ -143,6 +144,7 @@ public class GroupHelper {
             }
             groupInfoV2.setGroup(group);
             account.getGroupStore().updateGroup(groupInfoV2);
+            context.getJobExecutor().enqueueJob(new SyncStorageJob());
         }
 
         return groupInfoV2;
@@ -185,6 +187,7 @@ public class GroupHelper {
         final var result = sendGroupMessage(messageBuilder,
                 gv2.getMembersIncludingPendingWithout(selfRecipientId),
                 gv2.getDistributionId());
+        context.getJobExecutor().enqueueJob(new SyncStorageJob());
         return new Pair<>(gv2.getGroupId(), result);
     }
 
@@ -209,10 +212,11 @@ public class GroupHelper {
         var group = getGroupForUpdating(groupId);
         final var avatarBytes = readAvatarBytes(avatarFile);
 
+        SendGroupMessageResults results;
         switch (group) {
             case GroupInfoV2 gv2 -> {
                 try {
-                    return updateGroupV2(gv2,
+                    results = updateGroupV2(gv2,
                             name,
                             description,
                             members,
@@ -231,7 +235,7 @@ public class GroupHelper {
                 } catch (ConflictException e) {
                     // Detected conflicting update, refreshing group and trying again
                     group = getGroup(groupId, true);
-                    return updateGroupV2((GroupInfoV2) group,
+                    results = updateGroupV2((GroupInfoV2) group,
                             name,
                             description,
                             members,
@@ -251,13 +255,14 @@ public class GroupHelper {
             }
 
             case GroupInfoV1 gv1 -> {
-                final var result = updateGroupV1(gv1, name, members, avatarBytes);
+                results = updateGroupV1(gv1, name, members, avatarBytes);
                 if (expirationTimer != null) {
                     setExpirationTimer(gv1, expirationTimer);
                 }
-                return result;
             }
         }
+        context.getJobExecutor().enqueueJob(new SyncStorageJob());
+        return results;
     }
 
     public void updateGroupProfileKey(GroupIdV2 groupId) throws GroupNotFoundException, NotAGroupMemberException, IOException {
@@ -304,6 +309,7 @@ public class GroupHelper {
 
         final var result = sendUpdateGroupV2Message(group, group.getGroup(), groupChange);
 
+        context.getJobExecutor().enqueueJob(new SyncStorageJob());
         return new Pair<>(group.getGroupId(), result);
     }
 
@@ -327,6 +333,7 @@ public class GroupHelper {
     public void deleteGroup(GroupId groupId) throws IOException {
         account.getGroupStore().deleteGroup(groupId);
         context.getAvatarStore().deleteGroupAvatar(groupId);
+        context.getJobExecutor().enqueueJob(new SyncStorageJob());
     }
 
     public void setGroupBlocked(final GroupId groupId, final boolean blocked) throws GroupNotFoundException {
@@ -337,6 +344,7 @@ public class GroupHelper {
 
         group.setBlocked(blocked);
         account.getGroupStore().updateGroup(group);
+        context.getJobExecutor().enqueueJob(new SyncStorageJob());
     }
 
     public SendGroupMessageResults sendGroupInfoRequest(
index cf9c43a2a6dcd99bad2fc38768cb94c5d8cb690f..0f0628288e6840954aef9e2033648dccb72cf290 100644 (file)
@@ -6,7 +6,6 @@ import org.asamk.signal.manager.actions.RefreshPreKeysAction;
 import org.asamk.signal.manager.actions.RenewSessionAction;
 import org.asamk.signal.manager.actions.ResendMessageAction;
 import org.asamk.signal.manager.actions.RetrieveProfileAction;
-import org.asamk.signal.manager.actions.RetrieveStorageDataAction;
 import org.asamk.signal.manager.actions.SendGroupInfoAction;
 import org.asamk.signal.manager.actions.SendGroupInfoRequestAction;
 import org.asamk.signal.manager.actions.SendProfileKeyAction;
@@ -17,6 +16,7 @@ import org.asamk.signal.manager.actions.SendSyncConfigurationAction;
 import org.asamk.signal.manager.actions.SendSyncContactsAction;
 import org.asamk.signal.manager.actions.SendSyncGroupsAction;
 import org.asamk.signal.manager.actions.SendSyncKeysAction;
+import org.asamk.signal.manager.actions.SyncStorageDataAction;
 import org.asamk.signal.manager.actions.UpdateAccountAttributesAction;
 import org.asamk.signal.manager.api.GroupId;
 import org.asamk.signal.manager.api.GroupNotFoundException;
@@ -511,6 +511,7 @@ public final class IncomingMessageHandler {
             if (rm.isConfigurationRequest()) {
                 actions.add(SendSyncConfigurationAction.create());
             }
+            actions.add(SyncStorageDataAction.create());
         }
         if (syncMessage.getGroups().isPresent()) {
             try {
@@ -578,7 +579,7 @@ public final class IncomingMessageHandler {
         if (syncMessage.getFetchType().isPresent()) {
             switch (syncMessage.getFetchType().get()) {
                 case LOCAL_PROFILE -> actions.add(new RetrieveProfileAction(account.getSelfRecipientId()));
-                case STORAGE_MANIFEST -> actions.add(RetrieveStorageDataAction.create());
+                case STORAGE_MANIFEST -> actions.add(SyncStorageDataAction.create());
             }
         }
         if (syncMessage.getKeys().isPresent()) {
@@ -586,7 +587,12 @@ public final class IncomingMessageHandler {
             if (keysMessage.getStorageService().isPresent()) {
                 final var storageKey = keysMessage.getStorageService().get();
                 account.setStorageKey(storageKey);
-                actions.add(RetrieveStorageDataAction.create());
+                actions.add(SyncStorageDataAction.create());
+            }
+            if (keysMessage.getMaster().isPresent()) {
+                final var masterKey = keysMessage.getMaster().get();
+                account.setMasterKey(masterKey);
+                actions.add(SyncStorageDataAction.create());
             }
         }
         if (syncMessage.getConfiguration().isPresent()) {
index ec3ae020ccafc115ee2c9ea7723badebf9262877..dbde28a75ec57a54f3a93b37972909e905950689 100644 (file)
@@ -6,6 +6,7 @@ import org.asamk.signal.manager.api.PhoneNumberSharingMode;
 import org.asamk.signal.manager.api.Profile;
 import org.asamk.signal.manager.config.ServiceConfig;
 import org.asamk.signal.manager.internal.SignalDependencies;
+import org.asamk.signal.manager.jobs.SyncStorageJob;
 import org.asamk.signal.manager.storage.SignalAccount;
 import org.asamk.signal.manager.storage.groups.GroupInfoV2;
 import org.asamk.signal.manager.storage.recipients.RecipientAddress;
@@ -67,7 +68,8 @@ public final class ProfileHelper {
         account.setProfileKey(profileKey);
         context.getAccountHelper().updateAccountAttributes();
         setProfile(true, true, null, null, null, null, null, null);
-        // TODO update profile key in storage
+        account.getRecipientStore().rotateSelfStorageId();
+        context.getJobExecutor().enqueueJob(new SyncStorageJob());
 
         final var recipientIds = account.getRecipientStore().getRecipientIdsWithEnabledProfileSharing();
         for (final var recipientId : recipientIds) {
index 1eed964d8a45a5cc556a8b3e195e91c1856711b3..8c8ac6fcbb894619e30112596225ecc5101a2083 100644 (file)
@@ -1,39 +1,47 @@
 package org.asamk.signal.manager.helper;
 
-import org.asamk.signal.manager.api.Contact;
-import org.asamk.signal.manager.api.GroupId;
-import org.asamk.signal.manager.api.PhoneNumberSharingMode;
-import org.asamk.signal.manager.api.Profile;
-import org.asamk.signal.manager.api.TrustLevel;
+import org.asamk.signal.manager.api.GroupIdV1;
+import org.asamk.signal.manager.api.GroupIdV2;
 import org.asamk.signal.manager.internal.SignalDependencies;
-import org.asamk.signal.manager.jobs.CheckWhoAmIJob;
-import org.asamk.signal.manager.jobs.DownloadProfileAvatarJob;
 import org.asamk.signal.manager.storage.SignalAccount;
-import org.asamk.signal.manager.storage.recipients.RecipientAddress;
-import org.signal.libsignal.protocol.IdentityKey;
+import org.asamk.signal.manager.storage.recipients.RecipientId;
+import org.asamk.signal.manager.syncStorage.AccountRecordProcessor;
+import org.asamk.signal.manager.syncStorage.ContactRecordProcessor;
+import org.asamk.signal.manager.syncStorage.GroupV1RecordProcessor;
+import org.asamk.signal.manager.syncStorage.GroupV2RecordProcessor;
+import org.asamk.signal.manager.syncStorage.StorageSyncModels;
+import org.asamk.signal.manager.syncStorage.StorageSyncValidations;
+import org.asamk.signal.manager.syncStorage.WriteOperationResult;
+import org.asamk.signal.manager.util.KeyUtils;
+import org.signal.core.util.SetUtil;
 import org.signal.libsignal.protocol.InvalidKeyException;
-import org.signal.libsignal.zkgroup.InvalidInputException;
-import org.signal.libsignal.zkgroup.groups.GroupMasterKey;
-import org.signal.libsignal.zkgroup.profiles.ProfileKey;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.whispersystems.signalservice.api.storage.SignalAccountRecord;
 import org.whispersystems.signalservice.api.storage.SignalStorageManifest;
 import org.whispersystems.signalservice.api.storage.SignalStorageRecord;
 import org.whispersystems.signalservice.api.storage.StorageId;
+import org.whispersystems.signalservice.api.storage.StorageKey;
 import org.whispersystems.signalservice.internal.storage.protos.ManifestRecord;
 
 import java.io.IOException;
+import java.sql.Connection;
+import java.sql.SQLException;
 import java.util.ArrayList;
+import java.util.Base64;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import java.util.stream.Collectors;
 
 public class StorageHelper {
 
     private static final Logger logger = LoggerFactory.getLogger(StorageHelper.class);
+    private static final List<Integer> KNOWN_TYPES = List.of(ManifestRecord.Identifier.Type.CONTACT.getValue(),
+            ManifestRecord.Identifier.Type.GROUPV1.getValue(),
+            ManifestRecord.Identifier.Type.GROUPV2.getValue(),
+            ManifestRecord.Identifier.Type.ACCOUNT.getValue());
 
     private final SignalAccount account;
     private final SignalDependencies dependencies;
@@ -45,275 +53,496 @@ public class StorageHelper {
         this.context = context;
     }
 
-    public void readDataFromStorage() throws IOException {
+    public void syncDataWithStorage() throws IOException {
         final var storageKey = account.getOrCreateStorageKey();
         if (storageKey == null) {
-            logger.debug("Storage key unknown, requesting from primary device.");
-            context.getSyncHelper().requestSyncKeys();
+            if (!account.isPrimaryDevice()) {
+                logger.debug("Storage key unknown, requesting from primary device.");
+                context.getSyncHelper().requestSyncKeys();
+            }
             return;
         }
 
-        logger.debug("Reading data from remote storage");
-        Optional<SignalStorageManifest> manifest;
+        logger.trace("Reading manifest from remote storage");
+        final var localManifestVersion = account.getStorageManifestVersion();
+        final var localManifest = account.getStorageManifest().orElse(SignalStorageManifest.EMPTY);
+        SignalStorageManifest remoteManifest;
         try {
-            manifest = dependencies.getAccountManager()
-                    .getStorageManifestIfDifferentVersion(storageKey, account.getStorageManifestVersion());
+            remoteManifest = dependencies.getAccountManager()
+                    .getStorageManifestIfDifferentVersion(storageKey, localManifestVersion)
+                    .orElse(localManifest);
         } catch (InvalidKeyException e) {
-            logger.warn("Manifest couldn't be decrypted, ignoring.");
-            return;
-        }
-
-        if (manifest.isEmpty()) {
-            logger.debug("Manifest is up to date, does not exist or couldn't be decrypted, ignoring.");
+            logger.warn("Manifest couldn't be decrypted.");
+            if (account.isPrimaryDevice()) {
+                try {
+                    forcePushToStorage(storageKey);
+                } catch (RetryLaterException rle) {
+                    // TODO retry later
+                    return;
+                }
+            }
             return;
         }
 
-        logger.trace("Remote storage manifest has {} records", manifest.get().getStorageIds().size());
-        final var storageIds = manifest.get()
-                .getStorageIds()
-                .stream()
-                .filter(id -> !id.isUnknown())
-                .collect(Collectors.toSet());
-
-        Optional<SignalStorageManifest> localManifest = account.getStorageManifest();
-        localManifest.ifPresent(m -> m.getStorageIds().forEach(storageIds::remove));
+        logger.trace("Manifest versions: local {}, remote {}", localManifestVersion, remoteManifest.getVersion());
 
-        logger.trace("Reading {} new records", manifest.get().getStorageIds().size());
-        for (final var record : getSignalStorageRecords(storageIds)) {
-            logger.debug("Reading record of type {}", record.getType());
-            if (record.getType() == ManifestRecord.Identifier.Type.ACCOUNT.getValue()) {
-                readAccountRecord(record);
-            } else if (record.getType() == ManifestRecord.Identifier.Type.GROUPV2.getValue()) {
-                readGroupV2Record(record);
-            } else if (record.getType() == ManifestRecord.Identifier.Type.GROUPV1.getValue()) {
-                readGroupV1Record(record);
-            } else if (record.getType() == ManifestRecord.Identifier.Type.CONTACT.getValue()) {
-                readContactRecord(record);
-            }
+        var needsForcePush = false;
+        if (remoteManifest.getVersion() > localManifestVersion) {
+            logger.trace("Remote version was newer, reading records.");
+            needsForcePush = readDataFromStorage(storageKey, localManifest, remoteManifest);
+        } else if (remoteManifest.getVersion() < localManifest.getVersion()) {
+            logger.debug("Remote storage manifest version was older. User might have switched accounts.");
         }
-        account.setStorageManifestVersion(manifest.get().getVersion());
-        account.setStorageManifest(manifest.get());
-        logger.debug("Done reading data from remote storage");
-    }
+        logger.trace("Done reading data from remote storage");
 
-    private void readContactRecord(final SignalStorageRecord record) {
-        if (record == null || record.getContact().isEmpty()) {
-            return;
+        if (localManifest != remoteManifest) {
+            storeManifestLocally(remoteManifest);
         }
 
-        final var contactRecord = record.getContact().get();
-        final var aci = contactRecord.getAci().orElse(null);
-        final var pni = contactRecord.getPni().orElse(null);
-        if (contactRecord.getNumber().isEmpty() && aci == null && pni == null) {
+        readRecordsWithPreviouslyUnknownTypes(storageKey);
+
+        logger.trace("Adding missing storageIds to local data");
+        account.getRecipientStore().setMissingStorageIds();
+        account.getGroupStore().setMissingStorageIds();
+
+        var needsMultiDeviceSync = false;
+        try {
+            needsMultiDeviceSync = writeToStorage(storageKey, remoteManifest, needsForcePush);
+        } catch (RetryLaterException e) {
+            // TODO retry later
             return;
         }
-        final var address = new RecipientAddress(aci, pni, contactRecord.getNumber().orElse(null));
-        var recipientId = account.getRecipientResolver().resolveRecipient(address);
-        if (aci != null && contactRecord.getUsername().isPresent()) {
-            recipientId = account.getRecipientTrustedResolver()
-                    .resolveRecipientTrusted(aci, contactRecord.getUsername().get());
-        }
 
-        final var contact = account.getContactStore().getContact(recipientId);
-        final var blocked = contact != null && contact.isBlocked();
-        final var profileShared = contact != null && contact.isProfileSharingEnabled();
-        final var archived = contact != null && contact.isArchived();
-        final var hidden = contact != null && contact.isHidden();
-        final var contactGivenName = contact == null ? null : contact.givenName();
-        final var contactFamilyName = contact == null ? null : contact.familyName();
-        if (blocked != contactRecord.isBlocked()
-                || profileShared != contactRecord.isProfileSharingEnabled()
-                || archived != contactRecord.isArchived()
-                || hidden != contactRecord.isHidden()
-                || (
-                contactRecord.getSystemGivenName().isPresent() && !contactRecord.getSystemGivenName()
-                        .get()
-                        .equals(contactGivenName)
-        )
-                || (
-                contactRecord.getSystemFamilyName().isPresent() && !contactRecord.getSystemFamilyName()
-                        .get()
-                        .equals(contactFamilyName)
-        )) {
-            logger.debug("Storing new or updated contact {}", recipientId);
-            final var contactBuilder = contact == null ? Contact.newBuilder() : Contact.newBuilder(contact);
-            final var newContact = contactBuilder.withIsBlocked(contactRecord.isBlocked())
-                    .withIsProfileSharingEnabled(contactRecord.isProfileSharingEnabled())
-                    .withIsArchived(contactRecord.isArchived())
-                    .withIsHidden(contactRecord.isHidden());
-            if (contactRecord.getSystemGivenName().isPresent() || contactRecord.getSystemFamilyName().isPresent()) {
-                newContact.withGivenName(contactRecord.getSystemGivenName().orElse(null))
-                        .withFamilyName(contactRecord.getSystemFamilyName().orElse(null));
+        if (needsForcePush) {
+            logger.debug("Doing a force push.");
+            try {
+                forcePushToStorage(storageKey);
+                needsMultiDeviceSync = true;
+            } catch (RetryLaterException e) {
+                // TODO retry later
+                return;
             }
-            account.getContactStore().storeContact(recipientId, newContact.build());
         }
 
-        final var profile = account.getProfileStore().getProfile(recipientId);
-        final var profileGivenName = profile == null ? null : profile.getGivenName();
-        final var profileFamilyName = profile == null ? null : profile.getFamilyName();
-        if ((
-                contactRecord.getProfileGivenName().isPresent() && !contactRecord.getProfileGivenName()
-                        .get()
-                        .equals(profileGivenName)
-        ) || (
-                contactRecord.getProfileFamilyName().isPresent() && !contactRecord.getProfileFamilyName()
-                        .get()
-                        .equals(profileFamilyName)
-        )) {
-            final var profileBuilder = profile == null ? Profile.newBuilder() : Profile.newBuilder(profile);
-            final var newProfile = profileBuilder.withGivenName(contactRecord.getProfileGivenName().orElse(null))
-                    .withFamilyName(contactRecord.getProfileFamilyName().orElse(null))
-                    .build();
-            account.getProfileStore().storeProfile(recipientId, newProfile);
+        if (needsMultiDeviceSync) {
+            context.getSyncHelper().sendSyncFetchStorageMessage();
         }
-        if (contactRecord.getProfileKey().isPresent()) {
-            try {
-                logger.trace("Storing profile key {}", recipientId);
-                final var profileKey = new ProfileKey(contactRecord.getProfileKey().get());
-                account.getProfileStore().storeProfileKey(recipientId, profileKey);
-            } catch (InvalidInputException e) {
-                logger.warn("Received invalid contact profile key from storage");
+
+        logger.debug("Done syncing data with remote storage");
+    }
+
+    private boolean readDataFromStorage(
+            final StorageKey storageKey,
+            final SignalStorageManifest localManifest,
+            final SignalStorageManifest remoteManifest
+    ) throws IOException {
+        var needsForcePush = false;
+        try (final var connection = account.getAccountDatabase().getConnection()) {
+            connection.setAutoCommit(false);
+
+            var idDifference = findIdDifference(remoteManifest.getStorageIds(), localManifest.getStorageIds());
+
+            if (idDifference.hasTypeMismatches() && account.isPrimaryDevice()) {
+                logger.debug("Found type mismatches in the ID sets! Scheduling a force push after this sync completes.");
+                needsForcePush = true;
             }
-        }
-        if (contactRecord.getIdentityKey().isPresent() && aci != null) {
-            try {
-                logger.trace("Storing identity key {}", recipientId);
-                final var identityKey = new IdentityKey(contactRecord.getIdentityKey().get());
-                account.getIdentityKeyStore().saveIdentity(aci, identityKey);
 
-                final var trustLevel = TrustLevel.fromIdentityState(contactRecord.getIdentityState());
-                if (trustLevel != null) {
-                    account.getIdentityKeyStore().setIdentityTrustLevel(aci, identityKey, trustLevel);
+            logger.debug("Pre-Merge ID Difference :: " + idDifference);
+
+            if (!idDifference.isEmpty()) {
+                final var remoteOnlyRecords = getSignalStorageRecords(storageKey, idDifference.remoteOnlyIds());
+
+                if (remoteOnlyRecords.size() != idDifference.remoteOnlyIds().size()) {
+                    logger.debug("Could not find all remote-only records! Requested: "
+                            + idDifference.remoteOnlyIds()
+                            .size()
+                            + ", Found: "
+                            + remoteOnlyRecords.size()
+                            + ". These stragglers should naturally get deleted during the sync.");
                 }
-            } catch (InvalidKeyException e) {
-                logger.warn("Received invalid contact identity key from storage");
+
+                final var unknownInserts = processKnownRecords(connection, remoteOnlyRecords);
+                final var unknownDeletes = idDifference.localOnlyIds()
+                        .stream()
+                        .filter(id -> !KNOWN_TYPES.contains(id.getType()))
+                        .toList();
+
+                logger.debug("Storage ids with unknown type: {} inserts, {} deletes",
+                        unknownInserts.size(),
+                        unknownDeletes.size());
+
+                account.getUnknownStorageIdStore().addUnknownStorageIds(connection, unknownInserts);
+                account.getUnknownStorageIdStore().deleteUnknownStorageIds(connection, unknownDeletes);
+            } else {
+                logger.debug("Remote version was newer, but there were no remote-only IDs.");
             }
+            connection.commit();
+        } catch (SQLException e) {
+            throw new RuntimeException("Failed to sync remote storage", e);
         }
+        return needsForcePush;
     }
 
-    private void readGroupV1Record(final SignalStorageRecord record) {
-        if (record == null || record.getGroupV1().isEmpty()) {
-            return;
-        }
+    private void readRecordsWithPreviouslyUnknownTypes(final StorageKey storageKey) throws IOException {
+        try (final var connection = account.getAccountDatabase().getConnection()) {
+            connection.setAutoCommit(false);
+            final var knownUnknownIds = account.getUnknownStorageIdStore()
+                    .getUnknownStorageIds(connection, KNOWN_TYPES);
 
-        final var groupV1Record = record.getGroupV1().get();
-        final var groupIdV1 = GroupId.v1(groupV1Record.getGroupId());
+            if (!knownUnknownIds.isEmpty()) {
+                logger.debug("We have " + knownUnknownIds.size() + " unknown records that we can now process.");
 
-        var group = account.getGroupStore().getGroup(groupIdV1);
-        if (group == null) {
-            try {
-                context.getGroupHelper().sendGroupInfoRequest(groupIdV1, account.getSelfRecipientId());
-            } catch (Throwable e) {
-                logger.warn("Failed to send group request", e);
+                final var remote = getSignalStorageRecords(storageKey, knownUnknownIds);
+
+                logger.debug("Found " + remote.size() + " of the known-unknowns remotely.");
+
+                processKnownRecords(connection, remote);
+                account.getUnknownStorageIdStore()
+                        .deleteUnknownStorageIds(connection, remote.stream().map(SignalStorageRecord::getId).toList());
             }
-            group = account.getGroupStore().getOrCreateGroupV1(groupIdV1);
-        }
-        if (group != null && group.isBlocked() != groupV1Record.isBlocked()) {
-            group.setBlocked(groupV1Record.isBlocked());
-            account.getGroupStore().updateGroup(group);
+            connection.commit();
+        } catch (SQLException e) {
+            throw new RuntimeException("Failed to sync remote storage", e);
         }
     }
 
-    private void readGroupV2Record(final SignalStorageRecord record) {
-        if (record == null || record.getGroupV2().isEmpty()) {
-            return;
+    private boolean writeToStorage(
+            final StorageKey storageKey, final SignalStorageManifest remoteManifest, final boolean needsForcePush
+    ) throws IOException, RetryLaterException {
+        final WriteOperationResult remoteWriteOperation;
+        try (final var connection = account.getAccountDatabase().getConnection()) {
+            connection.setAutoCommit(false);
+
+            final var localStorageIds = getAllLocalStorageIds(connection);
+            final var idDifference = findIdDifference(remoteManifest.getStorageIds(), localStorageIds);
+            logger.debug("ID Difference :: " + idDifference);
+
+            final var remoteDeletes = idDifference.remoteOnlyIds().stream().map(StorageId::getRaw).toList();
+            final var remoteInserts = buildLocalStorageRecords(connection, idDifference.localOnlyIds());
+            // TODO check if local storage record proto matches remote, then reset to remote storage_id
+
+            remoteWriteOperation = new WriteOperationResult(new SignalStorageManifest(remoteManifest.getVersion() + 1,
+                    account.getDeviceId(),
+                    localStorageIds), remoteInserts, remoteDeletes);
+
+            connection.commit();
+        } catch (SQLException e) {
+            throw new RuntimeException("Failed to sync remote storage", e);
         }
 
-        final var groupV2Record = record.getGroupV2().get();
-        if (groupV2Record.isArchived()) {
-            return;
+        if (remoteWriteOperation.isEmpty()) {
+            logger.debug("No remote writes needed. Still at version: " + remoteManifest.getVersion());
+            return false;
         }
 
-        final GroupMasterKey groupMasterKey;
+        logger.debug("We have something to write remotely.");
+        logger.debug("WriteOperationResult :: " + remoteWriteOperation);
+
+        StorageSyncValidations.validate(remoteWriteOperation,
+                remoteManifest,
+                needsForcePush,
+                account.getSelfRecipientAddress());
+
+        final Optional<SignalStorageManifest> conflict;
         try {
-            groupMasterKey = new GroupMasterKey(groupV2Record.getMasterKeyBytes());
-        } catch (InvalidInputException e) {
-            logger.warn("Received invalid group master key from storage");
-            return;
+            conflict = dependencies.getAccountManager()
+                    .writeStorageRecords(storageKey,
+                            remoteWriteOperation.manifest(),
+                            remoteWriteOperation.inserts(),
+                            remoteWriteOperation.deletes());
+        } catch (InvalidKeyException e) {
+            logger.warn("Failed to decrypt conflicting storage manifest: {}", e.getMessage());
+            throw new IOException(e);
         }
 
-        final var group = context.getGroupHelper().getOrMigrateGroup(groupMasterKey, 0, null);
-        if (group.isBlocked() != groupV2Record.isBlocked()) {
-            group.setBlocked(groupV2Record.isBlocked());
-            account.getGroupStore().updateGroup(group);
+        if (conflict.isPresent()) {
+            logger.debug("Hit a conflict when trying to resolve the conflict! Retrying.");
+            throw new RetryLaterException();
         }
-    }
 
-    private void readAccountRecord(final SignalStorageRecord record) throws IOException {
-        if (record == null) {
-            logger.warn("Could not find account record, even though we had an ID, ignoring.");
-            return;
-        }
+        logger.debug("Saved new manifest. Now at version: " + remoteWriteOperation.manifest().getVersion());
+        storeManifestLocally(remoteWriteOperation.manifest());
 
-        SignalAccountRecord accountRecord = record.getAccount().orElse(null);
-        if (accountRecord == null) {
-            logger.warn("The storage record didn't actually have an account, ignoring.");
-            return;
-        }
+        return true;
+    }
 
-        if (!accountRecord.getE164().equals(account.getNumber())) {
-            context.getJobExecutor().enqueueJob(new CheckWhoAmIJob());
-        }
+    private void forcePushToStorage(
+            final StorageKey storageServiceKey
+    ) throws IOException, RetryLaterException {
+        logger.debug("Force pushing local state to remote storage");
+
+        final var currentVersion = dependencies.getAccountManager().getStorageManifestVersion();
+        final var newVersion = currentVersion + 1;
+        final var newStorageRecords = new ArrayList<SignalStorageRecord>();
+        final Map<RecipientId, StorageId> newContactStorageIds;
+        final Map<GroupIdV1, StorageId> newGroupV1StorageIds;
+        final Map<GroupIdV2, StorageId> newGroupV2StorageIds;
+
+        try (final var connection = account.getAccountDatabase().getConnection()) {
+            connection.setAutoCommit(false);
+
+            final var recipientIds = account.getRecipientStore().getRecipientIds(connection);
+            newContactStorageIds = generateContactStorageIds(recipientIds);
+            for (final var recipientId : recipientIds) {
+                final var storageId = newContactStorageIds.get(recipientId);
+                if (storageId.getType() == ManifestRecord.Identifier.Type.ACCOUNT.getValue()) {
+                    final var recipient = account.getRecipientStore().getRecipient(connection, recipientId);
+                    final var accountRecord = StorageSyncModels.localToRemoteRecord(account.getConfigurationStore(),
+                            recipient,
+                            account.getUsernameLink(),
+                            storageId.getRaw());
+                    newStorageRecords.add(accountRecord);
+                } else {
+                    final var recipient = account.getRecipientStore().getRecipient(connection, recipientId);
+                    final var address = recipient.getAddress().getIdentifier();
+                    final var identity = account.getIdentityKeyStore().getIdentityInfo(connection, address);
+                    final var record = StorageSyncModels.localToRemoteRecord(recipient, identity, storageId.getRaw());
+                    newStorageRecords.add(record);
+                }
+            }
 
-        account.getConfigurationStore().setReadReceipts(accountRecord.isReadReceiptsEnabled());
-        account.getConfigurationStore().setTypingIndicators(accountRecord.isTypingIndicatorsEnabled());
-        account.getConfigurationStore()
-                .setUnidentifiedDeliveryIndicators(accountRecord.isSealedSenderIndicatorsEnabled());
-        account.getConfigurationStore().setLinkPreviews(accountRecord.isLinkPreviewsEnabled());
-        account.getConfigurationStore().setPhoneNumberSharingMode(switch (accountRecord.getPhoneNumberSharingMode()) {
-            case EVERYBODY -> PhoneNumberSharingMode.EVERYBODY;
-            case NOBODY, UNKNOWN -> PhoneNumberSharingMode.NOBODY;
-        });
-        account.getConfigurationStore().setPhoneNumberUnlisted(accountRecord.isPhoneNumberUnlisted());
-        account.setUsername(accountRecord.getUsername());
-
-        if (accountRecord.getProfileKey().isPresent()) {
-            ProfileKey profileKey;
-            try {
-                profileKey = new ProfileKey(accountRecord.getProfileKey().get());
-            } catch (InvalidInputException e) {
-                logger.warn("Received invalid profile key from storage");
-                profileKey = null;
+            final var groupV1Ids = account.getGroupStore().getGroupV1Ids(connection);
+            newGroupV1StorageIds = generateGroupV1StorageIds(groupV1Ids);
+            for (final var groupId : groupV1Ids) {
+                final var storageId = newGroupV1StorageIds.get(groupId);
+                final var group = account.getGroupStore().getGroup(connection, groupId);
+                final var record = StorageSyncModels.localToRemoteRecord(group, storageId.getRaw());
+                newStorageRecords.add(record);
             }
-            if (profileKey != null) {
-                account.setProfileKey(profileKey);
-                final var avatarPath = accountRecord.getAvatarUrlPath().orElse(null);
-                context.getJobExecutor().enqueueJob(new DownloadProfileAvatarJob(avatarPath));
+
+            final var groupV2Ids = account.getGroupStore().getGroupV2Ids(connection);
+            newGroupV2StorageIds = generateGroupV2StorageIds(groupV2Ids);
+            for (final var groupId : groupV2Ids) {
+                final var storageId = newGroupV2StorageIds.get(groupId);
+                final var group = account.getGroupStore().getGroup(connection, groupId);
+                final var record = StorageSyncModels.localToRemoteRecord(group, storageId.getRaw());
+                newStorageRecords.add(record);
             }
+
+            connection.commit();
+        } catch (SQLException e) {
+            throw new RuntimeException("Failed to sync remote storage", e);
         }
+        final var newStorageIds = newStorageRecords.stream().map(SignalStorageRecord::getId).toList();
 
-        context.getProfileHelper()
-                .setProfile(false,
-                        false,
-                        accountRecord.getGivenName().orElse(null),
-                        accountRecord.getFamilyName().orElse(null),
-                        null,
-                        null,
-                        null,
-                        null);
-    }
+        final var manifest = new SignalStorageManifest(newVersion, account.getDeviceId(), newStorageIds);
 
-    private SignalStorageRecord getSignalStorageRecord(final StorageId accountId) throws IOException {
-        List<SignalStorageRecord> records;
+        StorageSyncValidations.validateForcePush(manifest, newStorageRecords, account.getSelfRecipientAddress());
+
+        final Optional<SignalStorageManifest> conflict;
         try {
-            records = dependencies.getAccountManager()
-                    .readStorageRecords(account.getStorageKey(), Collections.singletonList(accountId));
+            if (newVersion > 1) {
+                logger.trace("Force-pushing data. Inserting {} IDs.", newStorageRecords.size());
+                conflict = dependencies.getAccountManager()
+                        .resetStorageRecords(storageServiceKey, manifest, newStorageRecords);
+            } else {
+                logger.trace("First version, normal push. Inserting {} IDs.", newStorageRecords.size());
+                conflict = dependencies.getAccountManager()
+                        .writeStorageRecords(storageServiceKey, manifest, newStorageRecords, Collections.emptyList());
+            }
         } catch (InvalidKeyException e) {
-            logger.warn("Failed to read storage records, ignoring.");
-            return null;
+            logger.debug("Hit an invalid key exception, which likely indicates a conflict.", e);
+            throw new RetryLaterException();
+        }
+
+        if (conflict.isPresent()) {
+            logger.debug("Hit a conflict. Trying again.");
+            throw new RetryLaterException();
+        }
+
+        logger.debug("Force push succeeded. Updating local manifest version to: " + manifest.getVersion());
+        storeManifestLocally(manifest);
+
+        try (final var connection = account.getAccountDatabase().getConnection()) {
+            connection.setAutoCommit(false);
+            account.getRecipientStore().updateStorageIds(connection, newContactStorageIds);
+            account.getGroupStore().updateStorageIds(connection, newGroupV1StorageIds, newGroupV2StorageIds);
+
+            // delete all unknown storage ids
+            account.getUnknownStorageIdStore().deleteAllUnknownStorageIds(connection);
+            connection.commit();
+        } catch (SQLException e) {
+            throw new RuntimeException("Failed to sync remote storage", e);
         }
-        return !records.isEmpty() ? records.getFirst() : null;
     }
 
-    private List<SignalStorageRecord> getSignalStorageRecords(final Collection<StorageId> storageIds) throws IOException {
+    private Map<RecipientId, StorageId> generateContactStorageIds(List<RecipientId> recipientIds) {
+        final var selfRecipientId = account.getSelfRecipientId();
+        return recipientIds.stream().collect(Collectors.toMap(recipientId -> recipientId, recipientId -> {
+            if (recipientId.equals(selfRecipientId)) {
+                return StorageId.forAccount(KeyUtils.createRawStorageId());
+            } else {
+                return StorageId.forContact(KeyUtils.createRawStorageId());
+            }
+        }));
+    }
+
+    private Map<GroupIdV1, StorageId> generateGroupV1StorageIds(List<GroupIdV1> groupIds) {
+        return groupIds.stream()
+                .collect(Collectors.toMap(recipientId -> recipientId,
+                        recipientId -> StorageId.forGroupV1(KeyUtils.createRawStorageId())));
+    }
+
+    private Map<GroupIdV2, StorageId> generateGroupV2StorageIds(List<GroupIdV2> groupIds) {
+        return groupIds.stream()
+                .collect(Collectors.toMap(recipientId -> recipientId,
+                        recipientId -> StorageId.forGroupV2(KeyUtils.createRawStorageId())));
+    }
+
+    private void storeManifestLocally(
+            final SignalStorageManifest remoteManifest
+    ) {
+        account.setStorageManifestVersion(remoteManifest.getVersion());
+        account.setStorageManifest(remoteManifest);
+    }
+
+    private List<SignalStorageRecord> getSignalStorageRecords(
+            final StorageKey storageKey, final List<StorageId> storageIds
+    ) throws IOException {
         List<SignalStorageRecord> records;
         try {
-            records = dependencies.getAccountManager()
-                    .readStorageRecords(account.getStorageKey(), new ArrayList<>(storageIds));
+            records = dependencies.getAccountManager().readStorageRecords(storageKey, storageIds);
         } catch (InvalidKeyException e) {
             logger.warn("Failed to read storage records, ignoring.");
             return List.of();
         }
         return records;
     }
+
+    private List<StorageId> getAllLocalStorageIds(final Connection connection) throws SQLException {
+        final var storageIds = new ArrayList<StorageId>();
+        storageIds.addAll(account.getUnknownStorageIdStore().getUnknownStorageIds(connection));
+        storageIds.addAll(account.getGroupStore().getStorageIds(connection));
+        storageIds.addAll(account.getRecipientStore().getStorageIds(connection));
+        storageIds.add(account.getRecipientStore().getSelfStorageId(connection));
+        return storageIds;
+    }
+
+    private List<SignalStorageRecord> buildLocalStorageRecords(
+            final Connection connection, final List<StorageId> storageIds
+    ) throws SQLException {
+        final var records = new ArrayList<SignalStorageRecord>();
+        for (final var storageId : storageIds) {
+            final var record = buildLocalStorageRecord(connection, storageId);
+            if (record != null) {
+                records.add(record);
+            }
+        }
+        return records;
+    }
+
+    private SignalStorageRecord buildLocalStorageRecord(
+            Connection connection, StorageId storageId
+    ) throws SQLException {
+        return switch (ManifestRecord.Identifier.Type.fromValue(storageId.getType())) {
+            case ManifestRecord.Identifier.Type.CONTACT -> {
+                final var recipient = account.getRecipientStore().getRecipient(connection, storageId);
+                final var address = recipient.getAddress().getIdentifier();
+                final var identity = account.getIdentityKeyStore().getIdentityInfo(connection, address);
+                yield StorageSyncModels.localToRemoteRecord(recipient, identity, storageId.getRaw());
+            }
+            case ManifestRecord.Identifier.Type.GROUPV1 -> {
+                final var groupV1 = account.getGroupStore().getGroupV1(connection, storageId);
+                yield StorageSyncModels.localToRemoteRecord(groupV1, storageId.getRaw());
+            }
+            case ManifestRecord.Identifier.Type.GROUPV2 -> {
+                final var groupV2 = account.getGroupStore().getGroupV2(connection, storageId);
+                yield StorageSyncModels.localToRemoteRecord(groupV2, storageId.getRaw());
+            }
+            case ManifestRecord.Identifier.Type.ACCOUNT -> {
+                final var selfRecipient = account.getRecipientStore()
+                        .getRecipient(connection, account.getSelfRecipientId());
+                yield StorageSyncModels.localToRemoteRecord(account.getConfigurationStore(),
+                        selfRecipient,
+                        account.getUsernameLink(),
+                        storageId.getRaw());
+            }
+            case null, default -> throw new AssertionError("Got unknown local storage record type: " + storageId);
+        };
+    }
+
+    /**
+     * Given a list of all the local and remote keys you know about, this will
+     * return a result telling
+     * you which keys are exclusively remote and which are exclusively local.
+     *
+     * @param remoteIds All remote keys available.
+     * @param localIds  All local keys available.
+     * @return An object describing which keys are exclusive to the remote data set
+     * and which keys are
+     * exclusive to the local data set.
+     */
+    private static IdDifferenceResult findIdDifference(
+            Collection<StorageId> remoteIds, Collection<StorageId> localIds
+    ) {
+        final var base64Encoder = Base64.getEncoder();
+        final var remoteByRawId = remoteIds.stream()
+                .collect(Collectors.toMap(id -> base64Encoder.encodeToString(id.getRaw()), id -> id));
+        final var localByRawId = localIds.stream()
+                .collect(Collectors.toMap(id -> base64Encoder.encodeToString(id.getRaw()), id -> id));
+
+        boolean hasTypeMismatch = remoteByRawId.size() != remoteIds.size() || localByRawId.size() != localIds.size();
+
+        final var remoteOnlyRawIds = SetUtil.difference(remoteByRawId.keySet(), localByRawId.keySet());
+        final var localOnlyRawIds = SetUtil.difference(localByRawId.keySet(), remoteByRawId.keySet());
+        final var sharedRawIds = SetUtil.intersection(localByRawId.keySet(), remoteByRawId.keySet());
+
+        for (String rawId : sharedRawIds) {
+            final var remote = remoteByRawId.get(rawId);
+            final var local = localByRawId.get(rawId);
+
+            if (remote.getType() != local.getType()) {
+                remoteOnlyRawIds.remove(rawId);
+                localOnlyRawIds.remove(rawId);
+                hasTypeMismatch = true;
+                logger.debug("Remote type {} did not match local type {} for {}!",
+                        remote.getType(),
+                        local.getType(),
+                        rawId);
+            }
+        }
+
+        final var remoteOnlyKeys = remoteOnlyRawIds.stream().map(remoteByRawId::get).toList();
+        final var localOnlyKeys = localOnlyRawIds.stream().map(localByRawId::get).toList();
+
+        return new IdDifferenceResult(remoteOnlyKeys, localOnlyKeys, hasTypeMismatch);
+    }
+
+    private List<StorageId> processKnownRecords(
+            final Connection connection, List<SignalStorageRecord> records
+    ) throws SQLException {
+        final var unknownRecords = new ArrayList<StorageId>();
+
+        final var accountRecordProcessor = new AccountRecordProcessor(account, connection, context.getJobExecutor());
+        final var contactRecordProcessor = new ContactRecordProcessor(account, connection, context.getJobExecutor());
+        final var groupV1RecordProcessor = new GroupV1RecordProcessor(account, connection);
+        final var groupV2RecordProcessor = new GroupV2RecordProcessor(account, connection);
+
+        for (final var record : records) {
+            logger.debug("Reading record of type {}", record.getType());
+            switch (ManifestRecord.Identifier.Type.fromValue(record.getType())) {
+                case ACCOUNT -> accountRecordProcessor.process(record.getAccount().get());
+                case GROUPV1 -> groupV1RecordProcessor.process(record.getGroupV1().get());
+                case GROUPV2 -> groupV2RecordProcessor.process(record.getGroupV2().get());
+                case CONTACT -> contactRecordProcessor.process(record.getContact().get());
+                case null, default -> unknownRecords.add(record.getId());
+            }
+        }
+
+        return unknownRecords;
+    }
+
+    /**
+     * hasTypeMismatches is True if there exist some keys that have matching raw ID's but different types, otherwise false.
+     */
+    private record IdDifferenceResult(
+            List<StorageId> remoteOnlyIds, List<StorageId> localOnlyIds, boolean hasTypeMismatches
+    ) {
+
+        public boolean isEmpty() {
+            return remoteOnlyIds.isEmpty() && localOnlyIds.isEmpty();
+        }
+    }
+
+    private static class RetryLaterException extends Throwable {}
 }
index f6392ea9e28cb233ab0fd9162f0e05c41f5f045a..ac61104624aa7314d758cc89c953eea674dfed2e 100644 (file)
@@ -79,6 +79,11 @@ public class SyncHelper {
                 .sendSyncMessage(SignalServiceSyncMessage.forFetchLatest(SignalServiceSyncMessage.FetchType.LOCAL_PROFILE));
     }
 
+    public void sendSyncFetchStorageMessage() {
+        context.getSendHelper()
+                .sendSyncMessage(SignalServiceSyncMessage.forFetchLatest(SignalServiceSyncMessage.FetchType.STORAGE_MANIFEST));
+    }
+
     public void sendGroups() throws IOException {
         var groupsFile = IOUtils.createTempFile();
 
@@ -222,7 +227,7 @@ public class SyncHelper {
     }
 
     public SendMessageResult sendKeysMessage() {
-        var keysMessage = new KeysMessage(Optional.ofNullable(account.getStorageKey()),
+        var keysMessage = new KeysMessage(Optional.ofNullable(account.getOrCreateStorageKey()),
                 Optional.ofNullable(account.getOrCreatePinMasterKey()));
         return context.getSendHelper().sendSyncMessage(SignalServiceSyncMessage.forKeys(keysMessage));
     }
index ee4c222244e0a4ed226f3740fb6648d861cfc8eb..a76924d85bf1168fc42d15ced59534756aac3942 100644 (file)
@@ -70,11 +70,13 @@ public class JobExecutor implements AutoCloseable {
 
     @Override
     public void close() {
+        final boolean queueEmpty;
         synchronized (queue) {
-            if (queue.isEmpty()) {
-                executorService.close();
-                return;
-            }
+            queueEmpty = queue.isEmpty();
+        }
+        if (queueEmpty) {
+            executorService.close();
+            return;
         }
         synchronized (this) {
             try {
index 29f3d2abd79765fb2eff663b8f05d3cb32da6be4..a2575e90dc0a60ca71fb3eff863ed87a94a1b2ac 100644 (file)
@@ -66,6 +66,7 @@ import org.asamk.signal.manager.config.ServiceEnvironmentConfig;
 import org.asamk.signal.manager.helper.AccountFileUpdater;
 import org.asamk.signal.manager.helper.Context;
 import org.asamk.signal.manager.helper.RecipientHelper.RegisteredUser;
+import org.asamk.signal.manager.jobs.SyncStorageJob;
 import org.asamk.signal.manager.storage.AttachmentStore;
 import org.asamk.signal.manager.storage.AvatarStore;
 import org.asamk.signal.manager.storage.SignalAccount;
@@ -125,6 +126,7 @@ import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import io.reactivex.rxjava3.disposables.CompositeDisposable;
+import io.reactivex.rxjava3.schedulers.Schedulers;
 
 public class ManagerImpl implements Manager {
 
@@ -193,22 +195,25 @@ public class ManagerImpl implements Manager {
                 this.notifyAll();
             }
         });
-        disposable.add(account.getIdentityKeyStore().getIdentityChanges().subscribe(serviceId -> {
-            logger.trace("Archiving old sessions for {}", serviceId);
-            account.getAccountData(ServiceIdType.ACI).getSessionStore().archiveSessions(serviceId);
-            account.getAccountData(ServiceIdType.PNI).getSessionStore().archiveSessions(serviceId);
-            account.getSenderKeyStore().deleteSharedWith(serviceId);
-            final var recipientId = account.getRecipientResolver().resolveRecipient(serviceId);
-            final var profile = account.getProfileStore().getProfile(recipientId);
-            if (profile != null) {
-                account.getProfileStore()
-                        .storeProfile(recipientId,
-                                Profile.newBuilder(profile)
-                                        .withUnidentifiedAccessMode(Profile.UnidentifiedAccessMode.UNKNOWN)
-                                        .withLastUpdateTimestamp(0)
-                                        .build());
-            }
-        }));
+        disposable.add(account.getIdentityKeyStore()
+                .getIdentityChanges()
+                .observeOn(Schedulers.from(executor))
+                .subscribe(serviceId -> {
+                    logger.trace("Archiving old sessions for {}", serviceId);
+                    account.getAccountData(ServiceIdType.ACI).getSessionStore().archiveSessions(serviceId);
+                    account.getAccountData(ServiceIdType.PNI).getSessionStore().archiveSessions(serviceId);
+                    account.getSenderKeyStore().deleteSharedWith(serviceId);
+                    final var recipientId = account.getRecipientResolver().resolveRecipient(serviceId);
+                    final var profile = account.getProfileStore().getProfile(recipientId);
+                    if (profile != null) {
+                        account.getProfileStore()
+                                .storeProfile(recipientId,
+                                        Profile.newBuilder(profile)
+                                                .withUnidentifiedAccessMode(Profile.UnidentifiedAccessMode.UNKNOWN)
+                                                .withLastUpdateTimestamp(0)
+                                                .build());
+                    }
+                }));
     }
 
     @Override
@@ -295,13 +300,7 @@ public class ManagerImpl implements Manager {
     }
 
     @Override
-    public void updateConfiguration(
-            Configuration configuration
-    ) throws NotPrimaryDeviceException {
-        if (!account.isPrimaryDevice()) {
-            throw new NotPrimaryDeviceException();
-        }
-
+    public void updateConfiguration(Configuration configuration) {
         final var configurationStore = account.getConfigurationStore();
         if (configuration.readReceipts().isPresent()) {
             configurationStore.setReadReceipts(configuration.readReceipts().get());
@@ -316,6 +315,7 @@ public class ManagerImpl implements Manager {
             configurationStore.setLinkPreviews(configuration.linkPreviews().get());
         }
         context.getSyncHelper().sendConfigurationMessage();
+        syncRemoteStorage();
     }
 
     @Override
@@ -870,6 +870,7 @@ public class ManagerImpl implements Manager {
         if (recipientIdOptional.isPresent()) {
             context.getContactHelper().setContactHidden(recipientIdOptional.get(), true);
             account.removeRecipient(recipientIdOptional.get());
+            syncRemoteStorage();
         }
     }
 
@@ -878,6 +879,7 @@ public class ManagerImpl implements Manager {
         final var recipientIdOptional = context.getRecipientHelper().resolveRecipientOptional(recipient);
         if (recipientIdOptional.isPresent()) {
             account.removeRecipient(recipientIdOptional.get());
+            syncRemoteStorage();
         }
     }
 
@@ -886,6 +888,7 @@ public class ManagerImpl implements Manager {
         final var recipientIdOptional = context.getRecipientHelper().resolveRecipientOptional(recipient);
         if (recipientIdOptional.isPresent()) {
             account.getContactStore().deleteContact(recipientIdOptional.get());
+            syncRemoteStorage();
         }
     }
 
@@ -898,15 +901,13 @@ public class ManagerImpl implements Manager {
         }
         context.getContactHelper()
                 .setContactName(context.getRecipientHelper().resolveRecipient(recipient), givenName, familyName);
+        syncRemoteStorage();
     }
 
     @Override
     public void setContactsBlocked(
             Collection<RecipientIdentifier.Single> recipients, boolean blocked
-    ) throws NotPrimaryDeviceException, IOException, UnregisteredRecipientException {
-        if (!account.isPrimaryDevice()) {
-            throw new NotPrimaryDeviceException();
-        }
+    ) throws IOException, UnregisteredRecipientException {
         if (recipients.isEmpty()) {
             return;
         }
@@ -930,15 +931,13 @@ public class ManagerImpl implements Manager {
             context.getProfileHelper().rotateProfileKey();
         }
         context.getSyncHelper().sendBlockedList();
+        syncRemoteStorage();
     }
 
     @Override
     public void setGroupsBlocked(
             final Collection<GroupId> groupIds, final boolean blocked
-    ) throws GroupNotFoundException, NotPrimaryDeviceException, IOException {
-        if (!account.isPrimaryDevice()) {
-            throw new NotPrimaryDeviceException();
-        }
+    ) throws GroupNotFoundException, IOException {
         if (groupIds.isEmpty()) {
             return;
         }
@@ -954,6 +953,7 @@ public class ManagerImpl implements Manager {
             context.getProfileHelper().rotateProfileKey();
         }
         context.getSyncHelper().sendBlockedList();
+        syncRemoteStorage();
     }
 
     @Override
@@ -968,6 +968,7 @@ public class ManagerImpl implements Manager {
         } catch (NotAGroupMemberException | GroupNotFoundException | GroupSendingNotAllowedException e) {
             throw new AssertionError(e);
         }
+        syncRemoteStorage();
     }
 
     @Override
@@ -1025,13 +1026,13 @@ public class ManagerImpl implements Manager {
     }
 
     @Override
-    public void requestAllSyncData() throws IOException {
+    public void requestAllSyncData() {
         context.getSyncHelper().requestAllSyncData();
-        retrieveRemoteStorage();
+        syncRemoteStorage();
     }
 
-    void retrieveRemoteStorage() throws IOException {
-        context.getStorageHelper().readDataFromStorage();
+    void syncRemoteStorage() {
+        context.getJobExecutor().enqueueJob(new SyncStorageJob());
     }
 
     @Override
index 6a2f3633916a3326a00c008187ef5d57d4c05131..839bdd485015172be22a27f2e6e566cbfe9a3ec7 100644 (file)
@@ -169,7 +169,7 @@ public class RegistrationManagerImpl implements RegistrationManager {
 
             m.refreshPreKeys();
             if (response.isStorageCapable()) {
-                m.retrieveRemoteStorage();
+                m.syncRemoteStorage();
             }
             // Set an initial empty profile so user can be added to groups
             try {
diff --git a/lib/src/main/java/org/asamk/signal/manager/jobs/DownloadProfileJob.java b/lib/src/main/java/org/asamk/signal/manager/jobs/DownloadProfileJob.java
new file mode 100644 (file)
index 0000000..2279686
--- /dev/null
@@ -0,0 +1,24 @@
+package org.asamk.signal.manager.jobs;
+
+import org.asamk.signal.manager.helper.Context;
+import org.asamk.signal.manager.storage.recipients.RecipientAddress;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DownloadProfileJob implements Job {
+
+    private static final Logger logger = LoggerFactory.getLogger(DownloadProfileJob.class);
+    private final RecipientAddress address;
+
+    public DownloadProfileJob(RecipientAddress address) {
+        this.address = address;
+    }
+
+    @Override
+    public void run(Context context) {
+        logger.trace("Refreshing profile for {}", address);
+        final var account = context.getAccount();
+        final var recipientId = account.getRecipientStore().resolveRecipient(address);
+        context.getProfileHelper().refreshRecipientProfile(recipientId);
+    }
+}
diff --git a/lib/src/main/java/org/asamk/signal/manager/jobs/SyncStorageJob.java b/lib/src/main/java/org/asamk/signal/manager/jobs/SyncStorageJob.java
new file mode 100644 (file)
index 0000000..85cfbe2
--- /dev/null
@@ -0,0 +1,22 @@
+package org.asamk.signal.manager.jobs;
+
+import org.asamk.signal.manager.helper.Context;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+public class SyncStorageJob implements Job {
+
+    private static final Logger logger = LoggerFactory.getLogger(SyncStorageJob.class);
+
+    @Override
+    public void run(Context context) {
+        logger.trace("Running storage sync job");
+        try {
+            context.getStorageHelper().syncDataWithStorage();
+        } catch (IOException e) {
+            logger.warn("Failed to sync storage data", e);
+        }
+    }
+}
index 43a146a7a700f4cb87c6d8bc864f29c6a0b4b03b..c49e56ab59b45598551d6b6e5c6dde1043b3e1df 100644 (file)
@@ -32,7 +32,7 @@ import java.util.UUID;
 public class AccountDatabase extends Database {
 
     private static final Logger logger = LoggerFactory.getLogger(AccountDatabase.class);
-    private static final long DATABASE_VERSION = 19;
+    private static final long DATABASE_VERSION = 20;
 
     private AccountDatabase(final HikariDataSource dataSource) {
         super(logger, DATABASE_VERSION, dataSource);
@@ -57,6 +57,7 @@ public class AccountDatabase extends Database {
         SenderKeySharedStore.createSql(connection);
         KeyValueStore.createSql(connection);
         CdsiStore.createSql(connection);
+        UnknownStorageIdStore.createSql(connection);
     }
 
     @Override
@@ -539,5 +540,23 @@ public class AccountDatabase extends Database {
                                         """);
             }
         }
+        if (oldVersion < 20) {
+            logger.debug("Updating database: Creating storage id tables and columns");
+            try (final var statement = connection.createStatement()) {
+                statement.executeUpdate("""
+                                        CREATE TABLE storage_id (
+                                          _id INTEGER PRIMARY KEY,
+                                          type INTEGER NOT NULL,
+                                          storage_id BLOB NOT NULL
+                                        ) STRICT;
+                                        ALTER TABLE group_v1 ADD COLUMN storage_id BLOB;
+                                        ALTER TABLE group_v1 ADD COLUMN storage_record BLOB;
+                                        ALTER TABLE group_v2 ADD COLUMN storage_id BLOB;
+                                        ALTER TABLE group_v2 ADD COLUMN storage_record BLOB;
+                                        ALTER TABLE recipient ADD COLUMN storage_id BLOB;
+                                        ALTER TABLE recipient ADD COLUMN storage_record BLOB;
+                                        """);
+            }
+        }
     }
 }
index 6860e239cd4573a40bd858772aba5c6685c06bf4..41c05b98d866cb5c617071a6002ec030e0de1572 100644 (file)
@@ -168,6 +168,7 @@ public class SignalAccount implements Closeable {
     private GroupStore groupStore;
     private RecipientStore recipientStore;
     private StickerStore stickerStore;
+    private UnknownStorageIdStore unknownStorageIdStore;
     private ConfigurationStore configurationStore;
     private KeyValueStore keyValueStore;
     private CdsiStore cdsiStore;
@@ -176,6 +177,7 @@ public class SignalAccount implements Closeable {
     private MessageSendLogStore messageSendLogStore;
 
     private AccountDatabase accountDatabase;
+    private RecipientId selfRecipientId;
 
     private SignalAccount(final FileChannel fileChannel, final FileLock lock) {
         this.fileChannel = fileChannel;
@@ -194,6 +196,7 @@ public class SignalAccount implements Closeable {
             signalAccount.load(dataPath, accountPath, settings);
             logger.trace("Migrating legacy parts of account file");
             signalAccount.migrateLegacyConfigs();
+            signalAccount.init();
 
             return signalAccount;
         } catch (Throwable e) {
@@ -240,7 +243,7 @@ public class SignalAccount implements Closeable {
         signalAccount.registered = false;
 
         signalAccount.previousStorageVersion = CURRENT_STORAGE_VERSION;
-        signalAccount.migrateLegacyConfigs();
+        signalAccount.init();
         signalAccount.save();
 
         return signalAccount;
@@ -286,6 +289,7 @@ public class SignalAccount implements Closeable {
         this.number = number;
         this.aciAccountData.setServiceId(aci);
         this.pniAccountData.setServiceId(pni);
+        this.init();
         getRecipientTrustedResolver().resolveSelfRecipientTrusted(getSelfRecipientAddress());
         this.password = password;
         this.profileKey = profileKey;
@@ -337,6 +341,7 @@ public class SignalAccount implements Closeable {
         this.registered = true;
         this.aciAccountData.setServiceId(aci);
         this.pniAccountData.setServiceId(pni);
+        init();
         this.registrationLockPin = pin;
         getKeyValueStore().storeEntry(lastReceiveTimestamp, 0L);
         save();
@@ -356,6 +361,10 @@ public class SignalAccount implements Closeable {
         getAccountDatabase();
     }
 
+    private void init() {
+        this.selfRecipientId = getRecipientResolver().resolveRecipient(getSelfRecipientAddress());
+    }
+
     private void migrateLegacyConfigs() {
         if (isPrimaryDevice() && getPniIdentityKeyPair() == null) {
             setPniIdentityKeyPair(KeyUtils.generateIdentityKeyPair());
@@ -1158,7 +1167,9 @@ public class SignalAccount implements Closeable {
 
     public IdentityKeyStore getIdentityKeyStore() {
         return getOrCreate(() -> identityKeyStore,
-                () -> identityKeyStore = new IdentityKeyStore(getAccountDatabase(), settings.trustNewIdentity()));
+                () -> identityKeyStore = new IdentityKeyStore(getAccountDatabase(),
+                        settings.trustNewIdentity(),
+                        getRecipientStore()));
     }
 
     public GroupStore getGroupStore() {
@@ -1216,9 +1227,13 @@ public class SignalAccount implements Closeable {
         return getOrCreate(() -> keyValueStore, () -> keyValueStore = new KeyValueStore(getAccountDatabase()));
     }
 
+    public UnknownStorageIdStore getUnknownStorageIdStore() {
+        return getOrCreate(() -> unknownStorageIdStore, () -> unknownStorageIdStore = new UnknownStorageIdStore());
+    }
+
     public ConfigurationStore getConfigurationStore() {
         return getOrCreate(() -> configurationStore,
-                () -> configurationStore = new ConfigurationStore(getKeyValueStore()));
+                () -> configurationStore = new ConfigurationStore(getKeyValueStore(), getRecipientStore()));
     }
 
     public MessageCache getMessageCache() {
@@ -1387,7 +1402,7 @@ public class SignalAccount implements Closeable {
     }
 
     public RecipientId getSelfRecipientId() {
-        return getRecipientResolver().resolveRecipient(getSelfRecipientAddress());
+        return selfRecipientId;
     }
 
     public String getSessionId(final String forNumber) {
@@ -1472,22 +1487,29 @@ public class SignalAccount implements Closeable {
         return pinMasterKey;
     }
 
-    public StorageKey getStorageKey() {
-        if (pinMasterKey != null) {
-            return pinMasterKey.deriveStorageServiceKey();
+    public void setMasterKey(MasterKey masterKey) {
+        if (isPrimaryDevice()) {
+            return;
         }
-        return storageKey;
+        this.pinMasterKey = masterKey;
+        save();
     }
 
     public StorageKey getOrCreateStorageKey() {
-        if (isPrimaryDevice()) {
-            return getOrCreatePinMasterKey().deriveStorageServiceKey();
+        if (pinMasterKey != null) {
+            return pinMasterKey.deriveStorageServiceKey();
+        } else if (storageKey != null) {
+            return storageKey;
+        } else if (!isPrimaryDevice() || !isMultiDevice()) {
+            // Only upload storage, if a pin master key already exists or linked devices exist
+            return null;
         }
-        return storageKey;
+
+        return getOrCreatePinMasterKey().deriveStorageServiceKey();
     }
 
     public void setStorageKey(final StorageKey storageKey) {
-        if (storageKey.equals(this.storageKey)) {
+        if (isPrimaryDevice() || storageKey.equals(this.storageKey)) {
             return;
         }
         this.storageKey = storageKey;
diff --git a/lib/src/main/java/org/asamk/signal/manager/storage/UnknownStorageIdStore.java b/lib/src/main/java/org/asamk/signal/manager/storage/UnknownStorageIdStore.java
new file mode 100644 (file)
index 0000000..6e00ad7
--- /dev/null
@@ -0,0 +1,104 @@
+package org.asamk.signal.manager.storage;
+
+import org.whispersystems.signalservice.api.storage.StorageId;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Collection;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class UnknownStorageIdStore {
+
+    private static final String TABLE_STORAGE_ID = "storage_id";
+
+    public static void createSql(Connection connection) throws SQLException {
+        // When modifying the CREATE statement here, also add a migration in AccountDatabase.java
+        try (final var statement = connection.createStatement()) {
+            statement.executeUpdate("""
+                                    CREATE TABLE storage_id (
+                                      _id INTEGER PRIMARY KEY,
+                                      type INTEGER NOT NULL,
+                                      storage_id BLOB UNIQUE NOT NULL
+                                    ) STRICT;
+                                    """);
+        }
+    }
+
+    public Collection<StorageId> getUnknownStorageIds(Connection connection) throws SQLException {
+        final var sql = (
+                """
+                SELECT s.type, s.storage_id
+                FROM %s s
+                """
+        ).formatted(TABLE_STORAGE_ID);
+        try (final var statement = connection.prepareStatement(sql)) {
+            try (var result = Utils.executeQueryForStream(statement, this::getStorageIdFromResultSet)) {
+                return result.toList();
+            }
+        }
+    }
+
+    public List<StorageId> getUnknownStorageIds(
+            Connection connection, Collection<Integer> types
+    ) throws SQLException {
+        final var typesCommaSeparated = types.stream().map(String::valueOf).collect(Collectors.joining(","));
+        final var sql = (
+                """
+                SELECT s.type, s.storage_id
+                FROM %s s
+                WHERE s.type IN (%s)
+                """
+        ).formatted(TABLE_STORAGE_ID, typesCommaSeparated);
+        try (final var statement = connection.prepareStatement(sql)) {
+            try (var result = Utils.executeQueryForStream(statement, this::getStorageIdFromResultSet)) {
+                return result.toList();
+            }
+        }
+    }
+
+    public void addUnknownStorageIds(Connection connection, Collection<StorageId> storageIds) throws SQLException {
+        final var sql = (
+                """
+                INSERT OR REPLACE INTO %s (type, storage_id)
+                VALUES (?, ?)
+                """
+        ).formatted(TABLE_STORAGE_ID);
+        try (final var statement = connection.prepareStatement(sql)) {
+            for (final var storageId : storageIds) {
+                statement.setInt(1, storageId.getType());
+                statement.setBytes(2, storageId.getRaw());
+                statement.executeUpdate();
+            }
+        }
+    }
+
+    public void deleteUnknownStorageIds(Connection connection, Collection<StorageId> storageIds) throws SQLException {
+        final var sql = (
+                """
+                DELETE FROM %s
+                WHERE storage_id = ?
+                """
+        ).formatted(TABLE_STORAGE_ID);
+        try (final var statement = connection.prepareStatement(sql)) {
+            for (final var storageId : storageIds) {
+                statement.setBytes(1, storageId.getRaw());
+                statement.executeUpdate();
+            }
+        }
+    }
+
+    public void deleteAllUnknownStorageIds(Connection connection) throws SQLException {
+        final var sql = "DELETE FROM %s".formatted(TABLE_STORAGE_ID);
+        try (final var statement = connection.prepareStatement(sql)) {
+            statement.executeUpdate();
+        }
+    }
+
+    private StorageId getStorageIdFromResultSet(ResultSet resultSet) throws SQLException {
+        final var type = resultSet.getInt("type");
+        final var storageId = resultSet.getBytes("storage_id");
+        return StorageId.forType(storageId, type);
+    }
+}
index f85d3d8084f37a6ce2ef9ed528ae3bcb8860913d..558d4f86d5218f6c874f12a0121e2ca0de4cc875 100644 (file)
@@ -3,10 +3,15 @@ package org.asamk.signal.manager.storage.configuration;
 import org.asamk.signal.manager.api.PhoneNumberSharingMode;
 import org.asamk.signal.manager.storage.keyValue.KeyValueEntry;
 import org.asamk.signal.manager.storage.keyValue.KeyValueStore;
+import org.asamk.signal.manager.storage.recipients.RecipientStore;
+
+import java.sql.Connection;
+import java.sql.SQLException;
 
 public class ConfigurationStore {
 
     private final KeyValueStore keyValueStore;
+    private final RecipientStore recipientStore;
 
     private final KeyValueEntry<Boolean> readReceipts = new KeyValueEntry<>("config-read-receipts", Boolean.class);
     private final KeyValueEntry<Boolean> unidentifiedDeliveryIndicators = new KeyValueEntry<>(
@@ -20,9 +25,11 @@ public class ConfigurationStore {
     private final KeyValueEntry<PhoneNumberSharingMode> phoneNumberSharingMode = new KeyValueEntry<>(
             "config-phone-number-sharing-mode",
             PhoneNumberSharingMode.class);
+    private final KeyValueEntry<String> usernameLinkColor = new KeyValueEntry<>("username-link-color", String.class);
 
-    public ConfigurationStore(final KeyValueStore keyValueStore) {
+    public ConfigurationStore(final KeyValueStore keyValueStore, RecipientStore recipientStore) {
         this.keyValueStore = keyValueStore;
+        this.recipientStore = recipientStore;
     }
 
     public Boolean getReadReceipts() {
@@ -30,7 +37,15 @@ public class ConfigurationStore {
     }
 
     public void setReadReceipts(final boolean value) {
-        keyValueStore.storeEntry(readReceipts, value);
+        if (keyValueStore.storeEntry(readReceipts, value)) {
+            recipientStore.rotateSelfStorageId();
+        }
+    }
+
+    public void setReadReceipts(final Connection connection, final boolean value) throws SQLException {
+        if (keyValueStore.storeEntry(connection, readReceipts, value)) {
+            recipientStore.rotateSelfStorageId(connection);
+        }
     }
 
     public Boolean getUnidentifiedDeliveryIndicators() {
@@ -38,7 +53,17 @@ public class ConfigurationStore {
     }
 
     public void setUnidentifiedDeliveryIndicators(final boolean value) {
-        keyValueStore.storeEntry(unidentifiedDeliveryIndicators, value);
+        if (keyValueStore.storeEntry(unidentifiedDeliveryIndicators, value)) {
+            recipientStore.rotateSelfStorageId();
+        }
+    }
+
+    public void setUnidentifiedDeliveryIndicators(
+            final Connection connection, final boolean value
+    ) throws SQLException {
+        if (keyValueStore.storeEntry(connection, unidentifiedDeliveryIndicators, value)) {
+            recipientStore.rotateSelfStorageId(connection);
+        }
     }
 
     public Boolean getTypingIndicators() {
@@ -46,7 +71,15 @@ public class ConfigurationStore {
     }
 
     public void setTypingIndicators(final boolean value) {
-        keyValueStore.storeEntry(typingIndicators, value);
+        if (keyValueStore.storeEntry(typingIndicators, value)) {
+            recipientStore.rotateSelfStorageId();
+        }
+    }
+
+    public void setTypingIndicators(final Connection connection, final boolean value) throws SQLException {
+        if (keyValueStore.storeEntry(connection, typingIndicators, value)) {
+            recipientStore.rotateSelfStorageId(connection);
+        }
     }
 
     public Boolean getLinkPreviews() {
@@ -54,7 +87,15 @@ public class ConfigurationStore {
     }
 
     public void setLinkPreviews(final boolean value) {
-        keyValueStore.storeEntry(linkPreviews, value);
+        if (keyValueStore.storeEntry(linkPreviews, value)) {
+            recipientStore.rotateSelfStorageId();
+        }
+    }
+
+    public void setLinkPreviews(final Connection connection, final boolean value) throws SQLException {
+        if (keyValueStore.storeEntry(connection, linkPreviews, value)) {
+            recipientStore.rotateSelfStorageId(connection);
+        }
     }
 
     public Boolean getPhoneNumberUnlisted() {
@@ -62,7 +103,15 @@ public class ConfigurationStore {
     }
 
     public void setPhoneNumberUnlisted(final boolean value) {
-        keyValueStore.storeEntry(phoneNumberUnlisted, value);
+        if (keyValueStore.storeEntry(phoneNumberUnlisted, value)) {
+            recipientStore.rotateSelfStorageId();
+        }
+    }
+
+    public void setPhoneNumberUnlisted(final Connection connection, final boolean value) throws SQLException {
+        if (keyValueStore.storeEntry(connection, phoneNumberUnlisted, value)) {
+            recipientStore.rotateSelfStorageId(connection);
+        }
     }
 
     public PhoneNumberSharingMode getPhoneNumberSharingMode() {
@@ -70,6 +119,32 @@ public class ConfigurationStore {
     }
 
     public void setPhoneNumberSharingMode(final PhoneNumberSharingMode value) {
-        keyValueStore.storeEntry(phoneNumberSharingMode, value);
+        if (keyValueStore.storeEntry(phoneNumberSharingMode, value)) {
+            recipientStore.rotateSelfStorageId();
+        }
+    }
+
+    public void setPhoneNumberSharingMode(
+            final Connection connection, final PhoneNumberSharingMode value
+    ) throws SQLException {
+        if (keyValueStore.storeEntry(connection, phoneNumberSharingMode, value)) {
+            recipientStore.rotateSelfStorageId(connection);
+        }
+    }
+
+    public String getUsernameLinkColor() {
+        return keyValueStore.getEntry(usernameLinkColor);
+    }
+
+    public void setUsernameLinkColor(final String color) {
+        if (keyValueStore.storeEntry(usernameLinkColor, color)) {
+            recipientStore.rotateSelfStorageId();
+        }
+    }
+
+    public void setUsernameLinkColor(final Connection connection, final String color) throws SQLException {
+        if (keyValueStore.storeEntry(connection, usernameLinkColor, color)) {
+            recipientStore.rotateSelfStorageId(connection);
+        }
     }
 }
index ba09337cb7ddbedac331fd89010367d458e80adc..fd7bfe850235e6e40293f6087d8cef35fd615af2 100644 (file)
@@ -25,6 +25,7 @@ public final class GroupInfoV1 extends GroupInfo {
     public int messageExpirationTime;
     public boolean blocked;
     public boolean archived;
+    private byte[] storageRecord;
 
     public GroupInfoV1(GroupIdV1 groupId) {
         this.groupId = groupId;
@@ -38,7 +39,8 @@ public final class GroupInfoV1 extends GroupInfo {
             final String color,
             final int messageExpirationTime,
             final boolean blocked,
-            final boolean archived
+            final boolean archived,
+            final byte[] storageRecord
     ) {
         this.groupId = groupId;
         this.expectedV2Id = expectedV2Id;
@@ -48,6 +50,7 @@ public final class GroupInfoV1 extends GroupInfo {
         this.messageExpirationTime = messageExpirationTime;
         this.blocked = blocked;
         this.archived = archived;
+        this.storageRecord = storageRecord;
     }
 
     @Override
@@ -123,4 +126,8 @@ public final class GroupInfoV1 extends GroupInfo {
     public void removeMember(RecipientId recipientId) {
         this.members.removeIf(member -> member.equals(recipientId));
     }
+
+    public byte[] getStorageRecord() {
+        return storageRecord;
+    }
 }
index c263aa9bcfff7d52d82ee768ae236c3f1a825d53..8ddec54ef308ee9b380a668bfae0ba54e6fdc8cc 100644 (file)
@@ -24,6 +24,7 @@ public final class GroupInfoV2 extends GroupInfo {
     private final DistributionId distributionId;
     private boolean blocked;
     private DecryptedGroup group;
+    private byte[] storageRecord;
     private boolean permissionDenied;
 
     private final RecipientResolver recipientResolver;
@@ -44,6 +45,7 @@ public final class GroupInfoV2 extends GroupInfo {
             final DistributionId distributionId,
             final boolean blocked,
             final boolean permissionDenied,
+            final byte[] storageRecord,
             final RecipientResolver recipientResolver
     ) {
         this.groupId = groupId;
@@ -52,6 +54,7 @@ public final class GroupInfoV2 extends GroupInfo {
         this.distributionId = distributionId;
         this.blocked = blocked;
         this.permissionDenied = permissionDenied;
+        this.storageRecord = storageRecord;
         this.recipientResolver = recipientResolver;
     }
 
@@ -64,6 +67,10 @@ public final class GroupInfoV2 extends GroupInfo {
         return masterKey;
     }
 
+    public byte[] getStorageRecord() {
+        return storageRecord;
+    }
+
     public DistributionId getDistributionId() {
         return distributionId;
     }
index 33bb25320da2e91cdf6f5ce65054faf1c4091f56..e7cf57528657addada03faea06c515d9f761c997 100644 (file)
@@ -9,6 +9,7 @@ import org.asamk.signal.manager.storage.Utils;
 import org.asamk.signal.manager.storage.recipients.RecipientId;
 import org.asamk.signal.manager.storage.recipients.RecipientIdCreator;
 import org.asamk.signal.manager.storage.recipients.RecipientResolver;
+import org.asamk.signal.manager.util.KeyUtils;
 import org.signal.libsignal.zkgroup.InvalidInputException;
 import org.signal.libsignal.zkgroup.groups.GroupMasterKey;
 import org.signal.libsignal.zkgroup.groups.GroupSecretParams;
@@ -16,6 +17,7 @@ import org.signal.storageservice.protos.groups.local.DecryptedGroup;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.whispersystems.signalservice.api.push.DistributionId;
+import org.whispersystems.signalservice.api.storage.StorageId;
 import org.whispersystems.signalservice.api.util.UuidUtil;
 
 import java.io.IOException;
@@ -23,9 +25,11 @@ import java.sql.Connection;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Types;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 import java.util.stream.Collectors;
@@ -48,6 +52,8 @@ public class GroupStore {
             statement.executeUpdate("""
                                     CREATE TABLE group_v2 (
                                       _id INTEGER PRIMARY KEY,
+                                      storage_id BLOB UNIQUE,
+                                      storage_record BLOB,
                                       group_id BLOB UNIQUE NOT NULL,
                                       master_key BLOB NOT NULL,
                                       group_data BLOB,
@@ -57,6 +63,8 @@ public class GroupStore {
                                     ) STRICT;
                                     CREATE TABLE group_v1 (
                                       _id INTEGER PRIMARY KEY,
+                                      storage_id BLOB UNIQUE,
+                                      storage_record BLOB,
                                       group_id BLOB UNIQUE NOT NULL,
                                       group_id_v2 BLOB UNIQUE,
                                       name TEXT,
@@ -111,6 +119,28 @@ public class GroupStore {
         insertOrReplaceGroup(connection, internalId, group);
     }
 
+    public void storeStorageRecord(
+            final Connection connection, final GroupId groupId, final StorageId storageId, final byte[] storageRecord
+    ) throws SQLException {
+        final var sql = (
+                """
+                UPDATE %s
+                SET storage_id = ?, storage_record = ?
+                WHERE group_id = ?
+                """
+        ).formatted(groupId instanceof GroupIdV1 ? TABLE_GROUP_V1 : TABLE_GROUP_V2);
+        try (final var statement = connection.prepareStatement(sql)) {
+            statement.setBytes(1, storageId.getRaw());
+            if (storageRecord == null) {
+                statement.setNull(2, Types.BLOB);
+            } else {
+                statement.setBytes(2, storageRecord);
+            }
+            statement.setBytes(3, groupId.serialize());
+            statement.executeUpdate();
+        }
+    }
+
     public void deleteGroup(GroupId groupId) {
         if (groupId instanceof GroupIdV1 groupIdV1) {
             deleteGroup(groupIdV1);
@@ -249,6 +279,34 @@ public class GroupStore {
         return Stream.concat(getGroupsV2().stream(), getGroupsV1().stream()).toList();
     }
 
+    public List<GroupIdV1> getGroupV1Ids(Connection connection) throws SQLException {
+        final var sql = (
+                """
+                SELECT g.group_id
+                FROM %s g
+                """
+        ).formatted(TABLE_GROUP_V1);
+        try (final var statement = connection.prepareStatement(sql)) {
+            return Utils.executeQueryForStream(statement, this::getGroupIdV1FromResultSet)
+                    .filter(Objects::nonNull)
+                    .toList();
+        }
+    }
+
+    public List<GroupIdV2> getGroupV2Ids(Connection connection) throws SQLException {
+        final var sql = (
+                """
+                SELECT g.group_id
+                FROM %s g
+                """
+        ).formatted(TABLE_GROUP_V2);
+        try (final var statement = connection.prepareStatement(sql)) {
+            return Utils.executeQueryForStream(statement, this::getGroupIdV2FromResultSet)
+                    .filter(Objects::nonNull)
+                    .toList();
+        }
+    }
+
     public void mergeRecipients(
             final Connection connection, final RecipientId recipientId, final RecipientId toBeMergedRecipientId
     ) throws SQLException {
@@ -269,6 +327,106 @@ public class GroupStore {
         }
     }
 
+    public List<StorageId> getStorageIds(Connection connection) throws SQLException {
+        final var storageIds = new ArrayList<StorageId>();
+        final var sql = """
+                        SELECT g.storage_id
+                        FROM %s g WHERE g.storage_id IS NOT NULL
+                        """;
+        try (final var statement = connection.prepareStatement(sql.formatted(TABLE_GROUP_V1))) {
+            Utils.executeQueryForStream(statement, this::getGroupV1StorageIdFromResultSet).forEach(storageIds::add);
+        }
+        try (final var statement = connection.prepareStatement(sql.formatted(TABLE_GROUP_V2))) {
+            Utils.executeQueryForStream(statement, this::getGroupV2StorageIdFromResultSet).forEach(storageIds::add);
+        }
+        return storageIds;
+    }
+
+    public void updateStorageIds(
+            Connection connection, Map<GroupIdV1, StorageId> storageIdV1Map, Map<GroupIdV2, StorageId> storageIdV2Map
+    ) throws SQLException {
+        final var sql = (
+                """
+                UPDATE %s
+                SET storage_id = ?
+                WHERE group_id = ?
+                """
+        );
+        try (final var statement = connection.prepareStatement(sql.formatted(TABLE_GROUP_V1))) {
+            for (final var entry : storageIdV1Map.entrySet()) {
+                statement.setBytes(1, entry.getValue().getRaw());
+                statement.setBytes(2, entry.getKey().serialize());
+                statement.executeUpdate();
+            }
+        }
+        try (final var statement = connection.prepareStatement(sql.formatted(TABLE_GROUP_V2))) {
+            for (final var entry : storageIdV2Map.entrySet()) {
+                statement.setBytes(1, entry.getValue().getRaw());
+                statement.setBytes(2, entry.getKey().serialize());
+                statement.executeUpdate();
+            }
+        }
+    }
+
+    public void updateStorageId(
+            Connection connection, GroupId groupId, StorageId storageId
+    ) throws SQLException {
+        final var sqlV1 = (
+                """
+                UPDATE %s
+                SET storage_id = ?
+                WHERE group_id = ?
+                """
+        ).formatted(groupId instanceof GroupIdV1 ? TABLE_GROUP_V1 : TABLE_GROUP_V2);
+        try (final var statement = connection.prepareStatement(sqlV1)) {
+            statement.setBytes(1, storageId.getRaw());
+            statement.setBytes(2, groupId.serialize());
+            statement.executeUpdate();
+        }
+    }
+
+    public void setMissingStorageIds() {
+        final var selectSql = (
+                """
+                SELECT g.group_id
+                FROM %s g
+                WHERE g.storage_id IS NULL
+                """
+        );
+        final var updateSql = (
+                """
+                UPDATE %s
+                SET storage_id = ?
+                WHERE group_id = ?
+                """
+        );
+        try (final var connection = database.getConnection()) {
+            connection.setAutoCommit(false);
+            try (final var selectStmt = connection.prepareStatement(selectSql.formatted(TABLE_GROUP_V1))) {
+                final var groupIds = Utils.executeQueryForStream(selectStmt, this::getGroupIdV1FromResultSet).toList();
+                try (final var updateStmt = connection.prepareStatement(updateSql.formatted(TABLE_GROUP_V1))) {
+                    for (final var groupId : groupIds) {
+                        updateStmt.setBytes(1, KeyUtils.createRawStorageId());
+                        updateStmt.setBytes(2, groupId.serialize());
+                    }
+                }
+            }
+            try (final var selectStmt = connection.prepareStatement(selectSql.formatted(TABLE_GROUP_V2))) {
+                final var groupIds = Utils.executeQueryForStream(selectStmt, this::getGroupIdV2FromResultSet).toList();
+                try (final var updateStmt = connection.prepareStatement(updateSql.formatted(TABLE_GROUP_V2))) {
+                    for (final var groupId : groupIds) {
+                        updateStmt.setBytes(1, KeyUtils.createRawStorageId());
+                        updateStmt.setBytes(2, groupId.serialize());
+                        updateStmt.executeUpdate();
+                    }
+                }
+            }
+            connection.commit();
+        } catch (SQLException e) {
+            throw new RuntimeException("Failed update group store", e);
+        }
+    }
+
     void addLegacyGroups(final Collection<GroupInfo> groups) {
         logger.debug("Migrating legacy groups to database");
         long start = System.nanoTime();
@@ -296,8 +454,8 @@ public class GroupStore {
                 }
             }
             final var sql = """
-                            INSERT OR REPLACE INTO %s (_id, group_id, group_id_v2, name, color, expiration_time, blocked, archived)
-                            VALUES (?, ?, ?, ?, ?, ?, ?, ?)
+                            INSERT OR REPLACE INTO %s (_id, group_id, group_id_v2, name, color, expiration_time, blocked, archived, storage_id)
+                            VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
                             RETURNING _id
                             """.formatted(TABLE_GROUP_V1);
             try (final var statement = connection.prepareStatement(sql)) {
@@ -313,6 +471,7 @@ public class GroupStore {
                 statement.setLong(6, groupV1.getMessageExpirationTimer());
                 statement.setBoolean(7, groupV1.isBlocked());
                 statement.setBoolean(8, groupV1.archived);
+                statement.setBytes(9, KeyUtils.createRawStorageId());
                 final var generatedKey = Utils.executeQueryForOptional(statement, Utils::getIdMapper);
 
                 if (internalId == null) {
@@ -337,8 +496,8 @@ public class GroupStore {
         } else if (group instanceof GroupInfoV2 groupV2) {
             final var sql = (
                     """
-                    INSERT OR REPLACE INTO %s (_id, group_id, master_key, group_data, distribution_id, blocked, distribution_id)
-                    VALUES (?, ?, ?, ?, ?, ?, ?)
+                    INSERT OR REPLACE INTO %s (_id, group_id, master_key, group_data, distribution_id, blocked, distribution_id, storage_id)
+                    VALUES (?, ?, ?, ?, ?, ?, ?, ?)
                     """
             ).formatted(TABLE_GROUP_V2);
             try (final var statement = connection.prepareStatement(sql)) {
@@ -357,6 +516,7 @@ public class GroupStore {
                 statement.setBytes(5, UuidUtil.toByteArray(groupV2.getDistributionId().asUuid()));
                 statement.setBoolean(6, groupV2.isBlocked());
                 statement.setBoolean(7, groupV2.isPermissionDenied());
+                statement.setBytes(8, KeyUtils.createRawStorageId());
                 statement.executeUpdate();
             }
         } else {
@@ -367,7 +527,7 @@ public class GroupStore {
     private List<GroupInfoV2> getGroupsV2() {
         final var sql = (
                 """
-                SELECT g.group_id, g.master_key, g.group_data, g.distribution_id, g.blocked, g.permission_denied
+                SELECT g.group_id, g.master_key, g.group_data, g.distribution_id, g.blocked, g.permission_denied, g.storage_record
                 FROM %s g
                 """
         ).formatted(TABLE_GROUP_V2);
@@ -382,20 +542,59 @@ public class GroupStore {
         }
     }
 
-    private GroupInfoV2 getGroup(Connection connection, GroupIdV2 groupIdV2) throws SQLException {
+    public GroupInfoV2 getGroup(Connection connection, GroupIdV2 groupIdV2) throws SQLException {
+        final var sql = (
+                """
+                SELECT g.group_id, g.master_key, g.group_data, g.distribution_id, g.blocked, g.permission_denied, g.storage_record
+                FROM %s g
+                WHERE g.group_id = ?
+                """
+        ).formatted(TABLE_GROUP_V2);
+        try (final var statement = connection.prepareStatement(sql)) {
+            statement.setBytes(1, groupIdV2.serialize());
+            return Utils.executeQueryForOptional(statement, this::getGroupInfoV2FromResultSet).orElse(null);
+        }
+    }
+
+    public StorageId getGroupStorageId(Connection connection, GroupIdV2 groupIdV2) throws SQLException {
         final var sql = (
                 """
-                SELECT g.group_id, g.master_key, g.group_data, g.distribution_id, g.blocked, g.permission_denied
+                SELECT g.storage_id
                 FROM %s g
                 WHERE g.group_id = ?
                 """
         ).formatted(TABLE_GROUP_V2);
         try (final var statement = connection.prepareStatement(sql)) {
             statement.setBytes(1, groupIdV2.serialize());
+            final var storageId = Utils.executeQueryForOptional(statement, this::getGroupV2StorageIdFromResultSet);
+            if (storageId.isPresent()) {
+                return storageId.get();
+            }
+        }
+        final var newStorageId = StorageId.forGroupV2(KeyUtils.createRawStorageId());
+        updateStorageId(connection, groupIdV2, newStorageId);
+        return newStorageId;
+    }
+
+    public GroupInfoV2 getGroupV2(Connection connection, StorageId storageId) throws SQLException {
+        final var sql = (
+                """
+                SELECT g.group_id, g.master_key, g.group_data, g.distribution_id, g.blocked, g.permission_denied, g.storage_record
+                FROM %s g
+                WHERE g.storage_id = ?
+                """
+        ).formatted(TABLE_GROUP_V2);
+        try (final var statement = connection.prepareStatement(sql)) {
+            statement.setBytes(1, storageId.getRaw());
             return Utils.executeQueryForOptional(statement, this::getGroupInfoV2FromResultSet).orElse(null);
         }
     }
 
+    private GroupIdV2 getGroupIdV2FromResultSet(ResultSet resultSet) throws SQLException {
+        final var groupId = resultSet.getBytes("group_id");
+        return GroupId.v2(groupId);
+    }
+
     private GroupInfoV2 getGroupInfoV2FromResultSet(ResultSet resultSet) throws SQLException {
         try {
             final var groupId = resultSet.getBytes("group_id");
@@ -404,22 +603,38 @@ public class GroupStore {
             final var distributionId = resultSet.getBytes("distribution_id");
             final var blocked = resultSet.getBoolean("blocked");
             final var permissionDenied = resultSet.getBoolean("permission_denied");
+            final var storageRecord = resultSet.getBytes("storage_record");
             return new GroupInfoV2(GroupId.v2(groupId),
                     new GroupMasterKey(masterKey),
                     groupData == null ? null : DecryptedGroup.ADAPTER.decode(groupData),
                     DistributionId.from(UuidUtil.parseOrThrow(distributionId)),
                     blocked,
                     permissionDenied,
+                    storageRecord,
                     recipientResolver);
         } catch (InvalidInputException | IOException e) {
             return null;
         }
     }
 
+    private StorageId getGroupV1StorageIdFromResultSet(ResultSet resultSet) throws SQLException {
+        final var storageId = resultSet.getBytes("storage_id");
+        return storageId == null
+                ? StorageId.forGroupV1(KeyUtils.createRawStorageId())
+                : StorageId.forGroupV1(storageId);
+    }
+
+    private StorageId getGroupV2StorageIdFromResultSet(ResultSet resultSet) throws SQLException {
+        final var storageId = resultSet.getBytes("storage_id");
+        return storageId == null
+                ? StorageId.forGroupV2(KeyUtils.createRawStorageId())
+                : StorageId.forGroupV2(storageId);
+    }
+
     private List<GroupInfoV1> getGroupsV1() {
         final var sql = (
                 """
-                SELECT g.group_id, g.group_id_v2, g.name, g.color, (select group_concat(gm.recipient_id) from %s gm where gm.group_id = g._id) as members, g.expiration_time, g.blocked, g.archived
+                SELECT g.group_id, g.group_id_v2, g.name, g.color, (select group_concat(gm.recipient_id) from %s gm where gm.group_id = g._id) as members, g.expiration_time, g.blocked, g.archived, g.storage_record
                 FROM %s g
                 """
         ).formatted(TABLE_GROUP_V1_MEMBER, TABLE_GROUP_V1);
@@ -434,10 +649,10 @@ public class GroupStore {
         }
     }
 
-    private GroupInfoV1 getGroup(Connection connection, GroupIdV1 groupIdV1) throws SQLException {
+    public GroupInfoV1 getGroup(Connection connection, GroupIdV1 groupIdV1) throws SQLException {
         final var sql = (
                 """
-                SELECT g.group_id, g.group_id_v2, g.name, g.color, (select group_concat(gm.recipient_id) from %s gm where gm.group_id = g._id) as members, g.expiration_time, g.blocked, g.archived
+                SELECT g.group_id, g.group_id_v2, g.name, g.color, (select group_concat(gm.recipient_id) from %s gm where gm.group_id = g._id) as members, g.expiration_time, g.blocked, g.archived, g.storage_record
                 FROM %s g
                 WHERE g.group_id = ?
                 """
@@ -448,6 +663,45 @@ public class GroupStore {
         }
     }
 
+    public StorageId getGroupStorageId(Connection connection, GroupIdV1 groupIdV1) throws SQLException {
+        final var sql = (
+                """
+                SELECT g.storage_id
+                FROM %s g
+                WHERE g.group_id = ?
+                """
+        ).formatted(TABLE_GROUP_V1);
+        try (final var statement = connection.prepareStatement(sql)) {
+            statement.setBytes(1, groupIdV1.serialize());
+            final var storageId = Utils.executeQueryForOptional(statement, this::getGroupV1StorageIdFromResultSet);
+            if (storageId.isPresent()) {
+                return storageId.get();
+            }
+        }
+        final var newStorageId = StorageId.forGroupV1(KeyUtils.createRawStorageId());
+        updateStorageId(connection, groupIdV1, newStorageId);
+        return newStorageId;
+    }
+
+    public GroupInfoV1 getGroupV1(Connection connection, StorageId storageId) throws SQLException {
+        final var sql = (
+                """
+                SELECT g.group_id, g.group_id_v2, g.name, g.color, (select group_concat(gm.recipient_id) from %s gm where gm.group_id = g._id) as members, g.expiration_time, g.blocked, g.archived, g.storage_record
+                FROM %s g
+                WHERE g.storage_id = ?
+                """
+        ).formatted(TABLE_GROUP_V1_MEMBER, TABLE_GROUP_V1);
+        try (final var statement = connection.prepareStatement(sql)) {
+            statement.setBytes(1, storageId.getRaw());
+            return Utils.executeQueryForOptional(statement, this::getGroupInfoV1FromResultSet).orElse(null);
+        }
+    }
+
+    private GroupIdV1 getGroupIdV1FromResultSet(ResultSet resultSet) throws SQLException {
+        final var groupId = resultSet.getBytes("group_id");
+        return GroupId.v1(groupId);
+    }
+
     private GroupInfoV1 getGroupInfoV1FromResultSet(ResultSet resultSet) throws SQLException {
         final var groupId = resultSet.getBytes("group_id");
         final var groupIdV2 = resultSet.getBytes("group_id_v2");
@@ -463,6 +717,7 @@ public class GroupStore {
         final var expirationTime = resultSet.getInt("expiration_time");
         final var blocked = resultSet.getBoolean("blocked");
         final var archived = resultSet.getBoolean("archived");
+        final var storagRecord = resultSet.getBytes("storage_record");
         return new GroupInfoV1(GroupId.v1(groupId),
                 groupIdV2 == null ? null : GroupId.v2(groupIdV2),
                 name,
@@ -470,7 +725,8 @@ public class GroupStore {
                 color,
                 expirationTime,
                 blocked,
-                archived);
+                archived,
+                storagRecord);
     }
 
     private GroupInfoV2 getGroupV2ByV1Id(final Connection connection, final GroupIdV1 groupId) throws SQLException {
@@ -480,7 +736,7 @@ public class GroupStore {
     private GroupInfoV1 getGroupV1ByV2Id(Connection connection, GroupIdV2 groupIdV2) throws SQLException {
         final var sql = (
                 """
-                SELECT g.group_id, g.group_id_v2, g.name, g.color, (select group_concat(gm.recipient_id) from %s gm where gm.group_id = g._id) as members, g.expiration_time, g.blocked, g.archived
+                SELECT g.group_id, g.group_id_v2, g.name, g.color, (select group_concat(gm.recipient_id) from %s gm where gm.group_id = g._id) as members, g.expiration_time, g.blocked, g.archived, g.storage_record
                 FROM %s g
                 WHERE g.group_id_v2 = ?
                 """
index a87759b37271b221c6639c57060111bc0047f4d1..a35d4513a92fe8cbae492f6c8e9c65b104dd0336 100644 (file)
@@ -59,7 +59,8 @@ public class LegacyGroupStore {
                         g1.color,
                         g1.messageExpirationTime,
                         g1.blocked,
-                        g1.archived);
+                        g1.archived,
+                        null);
             }
 
             final var g2 = (Storage.GroupV2) g;
@@ -77,6 +78,7 @@ public class LegacyGroupStore {
                     g2.distributionId == null ? DistributionId.create() : DistributionId.from(g2.distributionId),
                     g2.blocked,
                     g2.permissionDenied,
+                    null,
                     recipientResolver);
         }).toList();
 
index bcb40669cbc4e235caaa8400ef4d7fac7b08795f..a4b355ea66c2c28bd57fae8a19271f2ad68699d7 100644 (file)
@@ -4,6 +4,7 @@ import org.asamk.signal.manager.api.TrustLevel;
 import org.asamk.signal.manager.api.TrustNewIdentity;
 import org.asamk.signal.manager.storage.Database;
 import org.asamk.signal.manager.storage.Utils;
+import org.asamk.signal.manager.storage.recipients.RecipientStore;
 import org.signal.libsignal.protocol.IdentityKey;
 import org.signal.libsignal.protocol.InvalidKeyException;
 import org.signal.libsignal.protocol.state.IdentityKeyStore.Direction;
@@ -27,6 +28,7 @@ public class IdentityKeyStore {
     private static final String TABLE_IDENTITY = "identity";
     private final Database database;
     private final TrustNewIdentity trustNewIdentity;
+    private final RecipientStore recipientStore;
     private final PublishSubject<ServiceId> identityChanges = PublishSubject.create();
 
     private boolean isRetryingDecryption = false;
@@ -46,9 +48,12 @@ public class IdentityKeyStore {
         }
     }
 
-    public IdentityKeyStore(final Database database, final TrustNewIdentity trustNewIdentity) {
+    public IdentityKeyStore(
+            final Database database, final TrustNewIdentity trustNewIdentity, RecipientStore recipientStore
+    ) {
         this.database = database;
         this.trustNewIdentity = trustNewIdentity;
+        this.recipientStore = recipientStore;
     }
 
     public Observable<ServiceId> getIdentityChanges() {
@@ -59,58 +64,79 @@ public class IdentityKeyStore {
         return saveIdentity(serviceId.toString(), identityKey);
     }
 
+    public boolean saveIdentity(
+            final Connection connection, final ServiceId serviceId, final IdentityKey identityKey
+    ) throws SQLException {
+        return saveIdentity(connection, serviceId.toString(), identityKey);
+    }
+
     boolean saveIdentity(final String address, final IdentityKey identityKey) {
         if (isRetryingDecryption) {
             return false;
         }
         try (final var connection = database.getConnection()) {
-            final var identityInfo = loadIdentity(connection, address);
-            if (identityInfo != null && identityInfo.getIdentityKey().equals(identityKey)) {
-                // Identity already exists, not updating the trust level
-                logger.trace("Not storing new identity for recipient {}, identity already stored", address);
-                return false;
-            }
-
-            saveNewIdentity(connection, address, identityKey, identityInfo == null);
-            return true;
+            return saveIdentity(connection, address, identityKey);
         } catch (SQLException e) {
             throw new RuntimeException("Failed update identity store", e);
         }
     }
 
+    private boolean saveIdentity(
+            final Connection connection, final String address, final IdentityKey identityKey
+    ) throws SQLException {
+        final var identityInfo = loadIdentity(connection, address);
+        if (identityInfo != null && identityInfo.getIdentityKey().equals(identityKey)) {
+            // Identity already exists, not updating the trust level
+            logger.trace("Not storing new identity for recipient {}, identity already stored", address);
+            return false;
+        }
+
+        saveNewIdentity(connection, address, identityKey, identityInfo == null);
+        return true;
+    }
+
     public void setRetryingDecryption(final boolean retryingDecryption) {
         isRetryingDecryption = retryingDecryption;
     }
 
     public boolean setIdentityTrustLevel(ServiceId serviceId, IdentityKey identityKey, TrustLevel trustLevel) {
         try (final var connection = database.getConnection()) {
-            final var address = serviceId.toString();
-            final var identityInfo = loadIdentity(connection, address);
-            if (identityInfo == null) {
-                logger.debug("Not updating trust level for recipient {}, identity not found", serviceId);
-                return false;
-            }
-            if (!identityInfo.getIdentityKey().equals(identityKey)) {
-                logger.debug("Not updating trust level for recipient {}, different identity found", serviceId);
-                return false;
-            }
-            if (identityInfo.getTrustLevel() == trustLevel) {
-                logger.trace("Not updating trust level for recipient {}, trust level already matches", serviceId);
-                return false;
-            }
-
-            logger.debug("Updating trust level for recipient {} with trust {}", serviceId, trustLevel);
-            final var newIdentityInfo = new IdentityInfo(address,
-                    identityKey,
-                    trustLevel,
-                    identityInfo.getDateAddedTimestamp());
-            storeIdentity(connection, newIdentityInfo);
-            return true;
+            return setIdentityTrustLevel(connection, serviceId, identityKey, trustLevel);
         } catch (SQLException e) {
             throw new RuntimeException("Failed update identity store", e);
         }
     }
 
+    public boolean setIdentityTrustLevel(
+            final Connection connection,
+            final ServiceId serviceId,
+            final IdentityKey identityKey,
+            final TrustLevel trustLevel
+    ) throws SQLException {
+        final var address = serviceId.toString();
+        final var identityInfo = loadIdentity(connection, address);
+        if (identityInfo == null) {
+            logger.debug("Not updating trust level for recipient {}, identity not found", serviceId);
+            return false;
+        }
+        if (!identityInfo.getIdentityKey().equals(identityKey)) {
+            logger.debug("Not updating trust level for recipient {}, different identity found", serviceId);
+            return false;
+        }
+        if (identityInfo.getTrustLevel() == trustLevel) {
+            logger.trace("Not updating trust level for recipient {}, trust level already matches", serviceId);
+            return false;
+        }
+
+        logger.debug("Updating trust level for recipient {} with trust {}", serviceId, trustLevel);
+        final var newIdentityInfo = new IdentityInfo(address,
+                identityKey,
+                trustLevel,
+                identityInfo.getDateAddedTimestamp());
+        storeIdentity(connection, newIdentityInfo);
+        return true;
+    }
+
     public boolean isTrustedIdentity(ServiceId serviceId, IdentityKey identityKey, Direction direction) {
         return isTrustedIdentity(serviceId.toString(), identityKey, direction);
     }
@@ -159,6 +185,10 @@ public class IdentityKeyStore {
         }
     }
 
+    public IdentityInfo getIdentityInfo(Connection connection, String address) throws SQLException {
+        return loadIdentity(connection, address);
+    }
+
     public List<IdentityInfo> getIdentities() {
         try (final var connection = database.getConnection()) {
             final var sql = (
@@ -252,6 +282,7 @@ public class IdentityKeyStore {
             statement.setInt(4, identityInfo.getTrustLevel().ordinal());
             statement.executeUpdate();
         }
+        recipientStore.rotateStorageId(connection, identityInfo.getServiceId());
     }
 
     private void deleteIdentity(final Connection connection, final String address) throws SQLException {
index 9addcc7d9628f797e4c3927b1747e34284d743ac..c51f32f1e863f7a4c439ee52ec4abc1995b2ea14 100644 (file)
@@ -10,6 +10,7 @@ import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Types;
+import java.util.Objects;
 
 public class KeyValueStore {
 
@@ -43,9 +44,9 @@ public class KeyValueStore {
         }
     }
 
-    public <T> void storeEntry(KeyValueEntry<T> key, T value) {
+    public <T> boolean storeEntry(KeyValueEntry<T> key, T value) {
         try (final var connection = database.getConnection()) {
-            storeEntry(connection, key, value);
+            return storeEntry(connection, key, value);
         } catch (SQLException e) {
             throw new RuntimeException("Failed update key_value store", e);
         }
@@ -72,9 +73,14 @@ public class KeyValueStore {
         }
     }
 
-    private <T> void storeEntry(
+    public <T> boolean storeEntry(
             final Connection connection, final KeyValueEntry<T> key, final T value
     ) throws SQLException {
+        final var entry = getEntry(key);
+        if (Objects.equals(entry, value)) {
+            return false;
+        }
+
         final var sql = (
                 """
                 INSERT INTO %s (key, value)
@@ -87,6 +93,7 @@ public class KeyValueStore {
             setParameterValue(statement, 2, key.clazz(), value);
             statement.executeUpdate();
         }
+        return true;
     }
 
     @SuppressWarnings("unchecked")
index 55231cb7eb474dfcc568007f45ccc6c740b19bd8..02061a66f4e95702b70f2639ca6b620710c0fe3f 100644 (file)
@@ -83,7 +83,13 @@ public class LegacyRecipientStore2 {
                                     .collect(Collectors.toSet()));
                 }
 
-                return new Recipient(recipientId, address, contact, profileKey, expiringProfileKeyCredential, profile);
+                return new Recipient(recipientId,
+                        address,
+                        contact,
+                        profileKey,
+                        expiringProfileKeyCredential,
+                        profile,
+                        null);
             }).collect(Collectors.toMap(Recipient::getRecipientId, r -> r));
 
             recipientStore.addLegacyRecipients(recipients);
index 3790ecde98e8b4285c84e2ec5788b79c5f73d87d..1d5fb9c81094bfcca0d74f74cc15a116a0410b19 100644 (file)
@@ -21,13 +21,16 @@ public class Recipient {
 
     private final Profile profile;
 
+    private final byte[] storageRecord;
+
     public Recipient(
             final RecipientId recipientId,
             final RecipientAddress address,
             final Contact contact,
             final ProfileKey profileKey,
             final ExpiringProfileKeyCredential expiringProfileKeyCredential,
-            final Profile profile
+            final Profile profile,
+            final byte[] storageRecord
     ) {
         this.recipientId = recipientId;
         this.address = address;
@@ -35,6 +38,7 @@ public class Recipient {
         this.profileKey = profileKey;
         this.expiringProfileKeyCredential = expiringProfileKeyCredential;
         this.profile = profile;
+        this.storageRecord = storageRecord;
     }
 
     private Recipient(final Builder builder) {
@@ -42,8 +46,9 @@ public class Recipient {
         address = builder.address;
         contact = builder.contact;
         profileKey = builder.profileKey;
-        expiringProfileKeyCredential = builder.expiringProfileKeyCredential1;
+        expiringProfileKeyCredential = builder.expiringProfileKeyCredential;
         profile = builder.profile;
+        storageRecord = builder.storageRecord;
     }
 
     public static Builder newBuilder() {
@@ -56,8 +61,9 @@ public class Recipient {
         builder.address = copy.getAddress();
         builder.contact = copy.getContact();
         builder.profileKey = copy.getProfileKey();
-        builder.expiringProfileKeyCredential1 = copy.getExpiringProfileKeyCredential();
+        builder.expiringProfileKeyCredential = copy.getExpiringProfileKeyCredential();
         builder.profile = copy.getProfile();
+        builder.storageRecord = copy.getStorageRecord();
         return builder;
     }
 
@@ -85,6 +91,10 @@ public class Recipient {
         return profile;
     }
 
+    public byte[] getStorageRecord() {
+        return storageRecord;
+    }
+
     @Override
     public boolean equals(final Object o) {
         if (this == o) return true;
@@ -109,8 +119,9 @@ public class Recipient {
         private RecipientAddress address;
         private Contact contact;
         private ProfileKey profileKey;
-        private ExpiringProfileKeyCredential expiringProfileKeyCredential1;
+        private ExpiringProfileKeyCredential expiringProfileKeyCredential;
         private Profile profile;
+        private byte[] storageRecord;
 
         private Builder() {
         }
@@ -136,7 +147,7 @@ public class Recipient {
         }
 
         public Builder withExpiringProfileKeyCredential(final ExpiringProfileKeyCredential val) {
-            expiringProfileKeyCredential1 = val;
+            expiringProfileKeyCredential = val;
             return this;
         }
 
@@ -145,6 +156,11 @@ public class Recipient {
             return this;
         }
 
+        public Builder withStorageRecord(final byte[] val) {
+            storageRecord = val;
+            return this;
+        }
+
         public Recipient build() {
             return new Recipient(this);
         }
index 86ada86ab38d6f8071075926b95e0eff0072377a..e4c24c99eeeb81d613504d86d3055a1032c3128d 100644 (file)
@@ -89,6 +89,10 @@ public record RecipientAddress(
                 address.username.equals(this.username) ? Optional.empty() : this.username);
     }
 
+    public Optional<ACI> aci() {
+        return serviceId.map(s -> s instanceof ServiceId.ACI aci ? aci : null);
+    }
+
     public String getIdentifier() {
         if (serviceId.isPresent()) {
             return serviceId.get().toString();
index 96a8f22489e8fcc31b9b762bdb94b759a42c2fd2..926f09bd26959964f2f39c6a8d80ca094e6d9ad8 100644 (file)
@@ -8,6 +8,7 @@ import org.asamk.signal.manager.storage.Database;
 import org.asamk.signal.manager.storage.Utils;
 import org.asamk.signal.manager.storage.contacts.ContactsStore;
 import org.asamk.signal.manager.storage.profiles.ProfileStore;
+import org.asamk.signal.manager.util.KeyUtils;
 import org.signal.libsignal.zkgroup.InvalidInputException;
 import org.signal.libsignal.zkgroup.profiles.ExpiringProfileKeyCredential;
 import org.signal.libsignal.zkgroup.profiles.ProfileKey;
@@ -17,11 +18,13 @@ import org.whispersystems.signalservice.api.push.ServiceId;
 import org.whispersystems.signalservice.api.push.ServiceId.ACI;
 import org.whispersystems.signalservice.api.push.ServiceId.PNI;
 import org.whispersystems.signalservice.api.push.SignalServiceAddress;
+import org.whispersystems.signalservice.api.storage.StorageId;
 import org.whispersystems.signalservice.api.util.UuidUtil;
 
 import java.sql.Connection;
 import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.sql.Types;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -56,6 +59,8 @@ public class RecipientStore implements RecipientIdCreator, RecipientResolver, Re
             statement.executeUpdate("""
                                     CREATE TABLE recipient (
                                       _id INTEGER PRIMARY KEY AUTOINCREMENT,
+                                      storage_id BLOB UNIQUE,
+                                      storage_record BLOB,
                                       number TEXT UNIQUE,
                                       username TEXT UNIQUE,
                                       uuid BLOB UNIQUE,
@@ -273,6 +278,10 @@ public class RecipientStore implements RecipientIdCreator, RecipientResolver, Re
         }
     }
 
+    public RecipientId resolveRecipient(Connection connection, RecipientAddress address) throws SQLException {
+        return resolveRecipientLocked(connection, address);
+    }
+
     @Override
     public RecipientId resolveSelfRecipientTrusted(RecipientAddress address) {
         return resolveRecipientTrusted(address, true);
@@ -283,6 +292,14 @@ public class RecipientStore implements RecipientIdCreator, RecipientResolver, Re
         return resolveRecipientTrusted(address, false);
     }
 
+    public RecipientId resolveRecipientTrusted(Connection connection, RecipientAddress address) throws SQLException {
+        final var pair = resolveRecipientTrustedLocked(connection, address, false);
+        if (!pair.second().isEmpty()) {
+            mergeRecipients(connection, pair.first(), pair.second());
+        }
+        return pair.first();
+    }
+
     @Override
     public RecipientId resolveRecipientTrusted(SignalServiceAddress address) {
         return resolveRecipientTrusted(new RecipientAddress(address));
@@ -341,6 +358,44 @@ public class RecipientStore implements RecipientIdCreator, RecipientResolver, Re
         }
     }
 
+    public Recipient getRecipient(Connection connection, RecipientId recipientId) throws SQLException {
+        final var sql = (
+                """
+                SELECT r._id,
+                       r.number, r.uuid, r.pni, r.username,
+                       r.profile_key, r.profile_key_credential,
+                       r.given_name, r.family_name, r.expiration_time, r.profile_sharing, r.color, r.blocked, r.archived, r.hidden,
+                       r.profile_last_update_timestamp, r.profile_given_name, r.profile_family_name, r.profile_about, r.profile_about_emoji, r.profile_avatar_url_path, r.profile_mobile_coin_address, r.profile_unidentified_access_mode, r.profile_capabilities,
+                       r.storage_record
+                FROM %s r
+                WHERE r._id = ?
+                """
+        ).formatted(TABLE_RECIPIENT);
+        try (final var statement = connection.prepareStatement(sql)) {
+            statement.setLong(1, recipientId.id());
+            return Utils.executeQuerySingleRow(statement, this::getRecipientFromResultSet);
+        }
+    }
+
+    public Recipient getRecipient(Connection connection, StorageId storageId) throws SQLException {
+        final var sql = (
+                """
+                SELECT r._id,
+                       r.number, r.uuid, r.pni, r.username,
+                       r.profile_key, r.profile_key_credential,
+                       r.given_name, r.family_name, r.expiration_time, r.profile_sharing, r.color, r.blocked, r.archived, r.hidden,
+                       r.profile_last_update_timestamp, r.profile_given_name, r.profile_family_name, r.profile_about, r.profile_about_emoji, r.profile_avatar_url_path, r.profile_mobile_coin_address, r.profile_unidentified_access_mode, r.profile_capabilities,
+                       r.storage_record
+                FROM %s r
+                WHERE r.storage_id = ?
+                """
+        ).formatted(TABLE_RECIPIENT);
+        try (final var statement = connection.prepareStatement(sql)) {
+            statement.setBytes(1, storageId.getRaw());
+            return Utils.executeQuerySingleRow(statement, this::getRecipientFromResultSet);
+        }
+    }
+
     public List<Recipient> getRecipients(
             boolean onlyContacts, Optional<Boolean> blocked, Set<RecipientId> recipientIds, Optional<String> name
     ) {
@@ -364,7 +419,8 @@ public class RecipientStore implements RecipientIdCreator, RecipientResolver, Re
                        r.number, r.uuid, r.pni, r.username,
                        r.profile_key, r.profile_key_credential,
                        r.given_name, r.family_name, r.expiration_time, r.profile_sharing, r.color, r.blocked, r.archived, r.hidden,
-                       r.profile_last_update_timestamp, r.profile_given_name, r.profile_family_name, r.profile_about, r.profile_about_emoji, r.profile_avatar_url_path, r.profile_mobile_coin_address, r.profile_unidentified_access_mode, r.profile_capabilities
+                       r.profile_last_update_timestamp, r.profile_given_name, r.profile_family_name, r.profile_about, r.profile_about_emoji, r.profile_avatar_url_path, r.profile_mobile_coin_address, r.profile_unidentified_access_mode, r.profile_capabilities,
+                       r.storage_record
                 FROM %s r
                 WHERE (r.number IS NOT NULL OR r.uuid IS NOT NULL) AND %s
                 """
@@ -447,6 +503,53 @@ public class RecipientStore implements RecipientIdCreator, RecipientResolver, Re
         }
     }
 
+    public List<RecipientId> getRecipientIds(Connection connection) throws SQLException {
+        final var sql = (
+                """
+                SELECT r._id
+                FROM %s r
+                WHERE (r.number IS NOT NULL OR r.uuid IS NOT NULL)
+                """
+        ).formatted(TABLE_RECIPIENT);
+        try (final var statement = connection.prepareStatement(sql)) {
+            return Utils.executeQueryForStream(statement, this::getRecipientIdFromResultSet).toList();
+        }
+    }
+
+    public void setMissingStorageIds() {
+        final var selectSql = (
+                """
+                SELECT r._id
+                FROM %s r
+                WHERE r.storage_id IS NULL
+                """
+        ).formatted(TABLE_RECIPIENT);
+        final var updateSql = (
+                """
+                UPDATE %s
+                SET storage_id = ?
+                WHERE _id = ?
+                """
+        ).formatted(TABLE_RECIPIENT);
+        try (final var connection = database.getConnection()) {
+            connection.setAutoCommit(false);
+            try (final var selectStmt = connection.prepareStatement(selectSql)) {
+                final var recipientIds = Utils.executeQueryForStream(selectStmt, this::getRecipientIdFromResultSet)
+                        .toList();
+                try (final var updateStmt = connection.prepareStatement(updateSql)) {
+                    for (final var recipientId : recipientIds) {
+                        updateStmt.setBytes(1, KeyUtils.createRawStorageId());
+                        updateStmt.setLong(2, recipientId.id());
+                        updateStmt.executeUpdate();
+                    }
+                }
+            }
+            connection.commit();
+        } catch (SQLException e) {
+            throw new RuntimeException("Failed update recipient store", e);
+        }
+    }
+
     @Override
     public void deleteContact(RecipientId recipientId) {
         storeContact(recipientId, null);
@@ -509,12 +612,18 @@ public class RecipientStore implements RecipientIdCreator, RecipientResolver, Re
     @Override
     public void storeProfileKey(RecipientId recipientId, final ProfileKey profileKey) {
         try (final var connection = database.getConnection()) {
-            storeProfileKey(connection, recipientId, profileKey, true);
+            storeProfileKey(connection, recipientId, profileKey);
         } catch (SQLException e) {
             throw new RuntimeException("Failed update recipient store", e);
         }
     }
 
+    public void storeProfileKey(
+            Connection connection, RecipientId recipientId, final ProfileKey profileKey
+    ) throws SQLException {
+        storeProfileKey(connection, recipientId, profileKey, true);
+    }
+
     @Override
     public void storeExpiringProfileKeyCredential(
             RecipientId recipientId, final ExpiringProfileKeyCredential profileKeyCredential
@@ -526,6 +635,121 @@ public class RecipientStore implements RecipientIdCreator, RecipientResolver, Re
         }
     }
 
+    public void rotateSelfStorageId() {
+        try (final var connection = database.getConnection()) {
+            rotateSelfStorageId(connection);
+        } catch (SQLException e) {
+            throw new RuntimeException("Failed update recipient store", e);
+        }
+    }
+
+    public void rotateSelfStorageId(final Connection connection) throws SQLException {
+        final var selfRecipientId = resolveRecipient(connection, selfAddressProvider.getSelfAddress());
+        rotateStorageId(connection, selfRecipientId);
+    }
+
+    public StorageId rotateStorageId(final Connection connection, final ServiceId serviceId) throws SQLException {
+        final var selfRecipientId = resolveRecipient(connection, new RecipientAddress(serviceId));
+        return rotateStorageId(connection, selfRecipientId);
+    }
+
+    public List<StorageId> getStorageIds(Connection connection) throws SQLException {
+        final var sql = """
+                        SELECT r.storage_id
+                        FROM %s r WHERE r.storage_id IS NOT NULL AND r._id != ? AND (r.uuid IS NOT NULL OR r.pni IS NOT NULL)
+                        """.formatted(TABLE_RECIPIENT);
+        final var selfRecipientId = resolveRecipient(connection, selfAddressProvider.getSelfAddress());
+        try (final var statement = connection.prepareStatement(sql)) {
+            statement.setLong(1, selfRecipientId.id());
+            return Utils.executeQueryForStream(statement, this::getContactStorageIdFromResultSet).toList();
+        }
+    }
+
+    public void updateStorageId(
+            Connection connection, RecipientId recipientId, StorageId storageId
+    ) throws SQLException {
+        final var sql = (
+                """
+                UPDATE %s
+                SET storage_id = ?
+                WHERE _id = ?
+                """
+        ).formatted(TABLE_RECIPIENT);
+        try (final var statement = connection.prepareStatement(sql)) {
+            statement.setBytes(1, storageId.getRaw());
+            statement.setLong(2, recipientId.id());
+            statement.executeUpdate();
+        }
+    }
+
+    public void updateStorageIds(Connection connection, Map<RecipientId, StorageId> storageIdMap) throws SQLException {
+        final var sql = (
+                """
+                UPDATE %s
+                SET storage_id = ?
+                WHERE _id = ?
+                """
+        ).formatted(TABLE_RECIPIENT);
+        try (final var statement = connection.prepareStatement(sql)) {
+            for (final var entry : storageIdMap.entrySet()) {
+                statement.setBytes(1, entry.getValue().getRaw());
+                statement.setLong(2, entry.getKey().id());
+                statement.executeUpdate();
+            }
+        }
+    }
+
+    public StorageId getSelfStorageId(final Connection connection) throws SQLException {
+        final var selfRecipientId = resolveRecipient(connection, selfAddressProvider.getSelfAddress());
+        return StorageId.forAccount(getStorageId(connection, selfRecipientId).getRaw());
+    }
+
+    public StorageId getStorageId(final Connection connection, final RecipientId recipientId) throws SQLException {
+        final var sql = """
+                        SELECT r.storage_id
+                        FROM %s r WHERE r._id = ? AND r.storage_id IS NOT NULL
+                        """.formatted(TABLE_RECIPIENT);
+        try (final var statement = connection.prepareStatement(sql)) {
+            statement.setLong(1, recipientId.id());
+            final var storageId = Utils.executeQueryForOptional(statement, this::getContactStorageIdFromResultSet);
+            if (storageId.isPresent()) {
+                return storageId.get();
+            }
+        }
+        return rotateStorageId(connection, recipientId);
+    }
+
+    private StorageId rotateStorageId(final Connection connection, final RecipientId recipientId) throws SQLException {
+        final var newStorageId = StorageId.forAccount(KeyUtils.createRawStorageId());
+        updateStorageId(connection, recipientId, newStorageId);
+        return newStorageId;
+    }
+
+    public void storeStorageRecord(
+            final Connection connection,
+            final RecipientId recipientId,
+            final StorageId storageId,
+            final byte[] storageRecord
+    ) throws SQLException {
+        final var sql = (
+                """
+                UPDATE %s
+                SET storage_id = ?, storage_record = ?
+                WHERE _id = ?
+                """
+        ).formatted(TABLE_RECIPIENT);
+        try (final var statement = connection.prepareStatement(sql)) {
+            statement.setBytes(1, storageId.getRaw());
+            if (storageRecord == null) {
+                statement.setNull(2, Types.BLOB);
+            } else {
+                statement.setBytes(2, storageRecord);
+            }
+            statement.setLong(3, recipientId.id());
+            statement.executeUpdate();
+        }
+    }
+
     void addLegacyRecipients(final Map<RecipientId, Recipient> recipients) {
         logger.debug("Migrating legacy recipients to database");
         long start = System.nanoTime();
@@ -587,7 +811,7 @@ public class RecipientStore implements RecipientIdCreator, RecipientResolver, Re
         return recipientId;
     }
 
-    private void storeContact(
+    public void storeContact(
             final Connection connection, final RecipientId recipientId, final Contact contact
     ) throws SQLException {
         final var sql = (
@@ -608,6 +832,7 @@ public class RecipientStore implements RecipientIdCreator, RecipientResolver, Re
             statement.setLong(8, recipientId.id());
             statement.executeUpdate();
         }
+        rotateStorageId(connection, recipientId);
     }
 
     private void storeExpiringProfileKeyCredential(
@@ -629,7 +854,7 @@ public class RecipientStore implements RecipientIdCreator, RecipientResolver, Re
         }
     }
 
-    private void storeProfile(
+    public void storeProfile(
             final Connection connection, final RecipientId recipientId, final Profile profile
     ) throws SQLException {
         final var sql = (
@@ -655,6 +880,7 @@ public class RecipientStore implements RecipientIdCreator, RecipientResolver, Re
             statement.setLong(10, recipientId.id());
             statement.executeUpdate();
         }
+        rotateStorageId(connection, recipientId);
     }
 
     private void storeProfileKey(
@@ -686,6 +912,7 @@ public class RecipientStore implements RecipientIdCreator, RecipientResolver, Re
             statement.setLong(2, recipientId.id());
             statement.executeUpdate();
         }
+        rotateStorageId(connection, recipientId);
     }
 
     private RecipientId resolveRecipientTrusted(RecipientAddress address, boolean isSelf) {
@@ -693,17 +920,7 @@ public class RecipientStore implements RecipientIdCreator, RecipientResolver, Re
         synchronized (recipientsLock) {
             try (final var connection = database.getConnection()) {
                 connection.setAutoCommit(false);
-                if (address.hasSingleIdentifier() || (
-                        !isSelf && selfAddressProvider.getSelfAddress().matches(address)
-                )) {
-                    pair = new Pair<>(resolveRecipientLocked(connection, address), List.of());
-                } else {
-                    pair = MergeRecipientHelper.resolveRecipientTrustedLocked(new HelperStore(connection), address);
-
-                    for (final var toBeMergedRecipientId : pair.second()) {
-                        mergeRecipientsLocked(connection, pair.first(), toBeMergedRecipientId);
-                    }
-                }
+                pair = resolveRecipientTrustedLocked(connection, address, isSelf);
                 connection.commit();
             } catch (SQLException e) {
                 throw new RuntimeException("Failed update recipient store", e);
@@ -712,13 +929,9 @@ public class RecipientStore implements RecipientIdCreator, RecipientResolver, Re
 
         if (!pair.second().isEmpty()) {
             try (final var connection = database.getConnection()) {
-                for (final var toBeMergedRecipientId : pair.second()) {
-                    recipientMergeHandler.mergeRecipients(connection, pair.first(), toBeMergedRecipientId);
-                    deleteRecipient(connection, toBeMergedRecipientId);
-                    synchronized (recipientsLock) {
-                        recipientAddressCache.entrySet().removeIf(e -> e.getValue().id().equals(toBeMergedRecipientId));
-                    }
-                }
+                connection.setAutoCommit(false);
+                mergeRecipients(connection, pair.first(), pair.second());
+                connection.commit();
             } catch (SQLException e) {
                 throw new RuntimeException("Failed update recipient store", e);
             }
@@ -726,15 +939,44 @@ public class RecipientStore implements RecipientIdCreator, RecipientResolver, Re
         return pair.first();
     }
 
+    private Pair<RecipientId, List<RecipientId>> resolveRecipientTrustedLocked(
+            final Connection connection, final RecipientAddress address, final boolean isSelf
+    ) throws SQLException {
+        if (address.hasSingleIdentifier() || (
+                !isSelf && selfAddressProvider.getSelfAddress().matches(address)
+        )) {
+            return new Pair<>(resolveRecipientLocked(connection, address), List.of());
+        } else {
+            final var pair = MergeRecipientHelper.resolveRecipientTrustedLocked(new HelperStore(connection), address);
+
+            for (final var toBeMergedRecipientId : pair.second()) {
+                mergeRecipientsLocked(connection, pair.first(), toBeMergedRecipientId);
+            }
+            return pair;
+        }
+    }
+
+    private void mergeRecipients(
+            final Connection connection, final RecipientId recipientId, final List<RecipientId> toBeMergedRecipientIds
+    ) throws SQLException {
+        for (final var toBeMergedRecipientId : toBeMergedRecipientIds) {
+            recipientMergeHandler.mergeRecipients(connection, recipientId, toBeMergedRecipientId);
+            deleteRecipient(connection, toBeMergedRecipientId);
+            synchronized (recipientsLock) {
+                recipientAddressCache.entrySet().removeIf(e -> e.getValue().id().equals(toBeMergedRecipientId));
+            }
+        }
+    }
+
     private RecipientId resolveRecipientLocked(
             Connection connection, RecipientAddress address
     ) throws SQLException {
-        final var byServiceId = address.serviceId().isEmpty()
+        final var aci = address.aci().isEmpty()
                 ? Optional.<RecipientWithAddress>empty()
-                : findByServiceId(connection, address.serviceId().get());
+                : findByServiceId(connection, address.aci().get());
 
-        if (byServiceId.isPresent()) {
-            return byServiceId.get().id();
+        if (aci.isPresent()) {
+            return aci.get().id();
         }
 
         final var byPni = address.pni().isEmpty()
@@ -796,8 +1038,7 @@ public class RecipientStore implements RecipientIdCreator, RecipientResolver, Re
         ).formatted(TABLE_RECIPIENT);
         try (final var statement = connection.prepareStatement(sql)) {
             statement.setString(1, address.number().orElse(null));
-            statement.setBytes(2,
-                    address.serviceId().map(ServiceId::getRawUuid).map(UuidUtil::toByteArray).orElse(null));
+            statement.setBytes(2, address.aci().map(ServiceId::getRawUuid).map(UuidUtil::toByteArray).orElse(null));
             statement.setBytes(3, address.pni().map(PNI::getRawUuid).map(UuidUtil::toByteArray).orElse(null));
             statement.setString(4, address.username().orElse(null));
             final var generatedKey = Utils.executeQueryForOptional(statement, Utils::getIdMapper);
@@ -817,7 +1058,7 @@ public class RecipientStore implements RecipientIdCreator, RecipientResolver, Re
             final var sql = (
                     """
                     UPDATE %s
-                    SET number = NULL, uuid = NULL, pni = NULL, username = NULL
+                    SET number = NULL, uuid = NULL, pni = NULL, username = NULL, storage_id = NULL
                     WHERE _id = ?
                     """
             ).formatted(TABLE_RECIPIENT);
@@ -842,13 +1083,13 @@ public class RecipientStore implements RecipientIdCreator, RecipientResolver, Re
             ).formatted(TABLE_RECIPIENT);
             try (final var statement = connection.prepareStatement(sql)) {
                 statement.setString(1, address.number().orElse(null));
-                statement.setBytes(2,
-                        address.serviceId().map(ServiceId::getRawUuid).map(UuidUtil::toByteArray).orElse(null));
+                statement.setBytes(2, address.aci().map(ServiceId::getRawUuid).map(UuidUtil::toByteArray).orElse(null));
                 statement.setBytes(3, address.pni().map(PNI::getRawUuid).map(UuidUtil::toByteArray).orElse(null));
                 statement.setString(4, address.username().orElse(null));
                 statement.setLong(5, recipientId.id());
                 statement.executeUpdate();
             }
+            rotateStorageId(connection, recipientId);
         }
     }
 
@@ -936,9 +1177,9 @@ public class RecipientStore implements RecipientIdCreator, RecipientResolver, Re
         final var sql = """
                         SELECT r._id, r.number, r.uuid, r.pni, r.username
                         FROM %s r
-                        WHERE r.uuid = ?1 OR r.pni = ?1
+                        WHERE %s = ?1
                         LIMIT 1
-                        """.formatted(TABLE_RECIPIENT);
+                        """.formatted(TABLE_RECIPIENT, serviceId instanceof ACI ? "r.uuid" : "r.pni");
         try (final var statement = connection.prepareStatement(sql)) {
             statement.setBytes(1, UuidUtil.toByteArray(serviceId.getRawUuid()));
             recipientWithAddress = Utils.executeQueryForOptional(statement, this::getRecipientWithAddressFromResultSet);
@@ -953,14 +1194,13 @@ public class RecipientStore implements RecipientIdCreator, RecipientResolver, Re
         final var sql = """
                         SELECT r._id, r.number, r.uuid, r.pni, r.username
                         FROM %s r
-                        WHERE r.uuid = ?1 OR r.pni = ?1 OR
-                              r.uuid = ?2 OR r.pni = ?2 OR
+                        WHERE r.uuid = ?1 OR
+                              r.pni = ?2 OR
                               r.number = ?3 OR
                               r.username = ?4
                         """.formatted(TABLE_RECIPIENT);
         try (final var statement = connection.prepareStatement(sql)) {
-            statement.setBytes(1,
-                    address.serviceId().map(ServiceId::getRawUuid).map(UuidUtil::toByteArray).orElse(null));
+            statement.setBytes(1, address.aci().map(ServiceId::getRawUuid).map(UuidUtil::toByteArray).orElse(null));
             statement.setBytes(2, address.pni().map(ServiceId::getRawUuid).map(UuidUtil::toByteArray).orElse(null));
             statement.setString(3, address.number().orElse(null));
             statement.setString(4, address.username().orElse(null));
@@ -1018,7 +1258,7 @@ public class RecipientStore implements RecipientIdCreator, RecipientResolver, Re
         }
     }
 
-    private Profile getProfile(final Connection connection, final RecipientId recipientId) throws SQLException {
+    public Profile getProfile(final Connection connection, final RecipientId recipientId) throws SQLException {
         final var sql = (
                 """
                 SELECT r.profile_last_update_timestamp, r.profile_given_name, r.profile_family_name, r.profile_about, r.profile_about_emoji, r.profile_avatar_url_path, r.profile_mobile_coin_address, r.profile_unidentified_access_mode, r.profile_capabilities
@@ -1057,7 +1297,8 @@ public class RecipientStore implements RecipientIdCreator, RecipientResolver, Re
                 getContactFromResultSet(resultSet),
                 getProfileKeyFromResultSet(resultSet),
                 getExpiringProfileKeyCredentialFromResultSet(resultSet),
-                getProfileFromResultSet(resultSet));
+                getProfileFromResultSet(resultSet),
+                getStorageRecordFromResultSet(resultSet));
     }
 
     private Contact getContactFromResultSet(ResultSet resultSet) throws SQLException {
@@ -1118,6 +1359,15 @@ public class RecipientStore implements RecipientIdCreator, RecipientResolver, Re
         }
     }
 
+    private StorageId getContactStorageIdFromResultSet(ResultSet resultSet) throws SQLException {
+        final var storageId = resultSet.getBytes("storage_id");
+        return StorageId.forContact(storageId);
+    }
+
+    private byte[] getStorageRecordFromResultSet(ResultSet resultSet) throws SQLException {
+        return resultSet.getBytes("storage_record");
+    }
+
     public interface RecipientMergeHandler {
 
         void mergeRecipients(
diff --git a/lib/src/main/java/org/asamk/signal/manager/syncStorage/AccountRecordProcessor.java b/lib/src/main/java/org/asamk/signal/manager/syncStorage/AccountRecordProcessor.java
new file mode 100644 (file)
index 0000000..9250505
--- /dev/null
@@ -0,0 +1,225 @@
+package org.asamk.signal.manager.syncStorage;
+
+import org.asamk.signal.manager.api.Profile;
+import org.asamk.signal.manager.internal.JobExecutor;
+import org.asamk.signal.manager.jobs.CheckWhoAmIJob;
+import org.asamk.signal.manager.jobs.DownloadProfileAvatarJob;
+import org.asamk.signal.manager.storage.SignalAccount;
+import org.asamk.signal.manager.util.KeyUtils;
+import org.signal.libsignal.zkgroup.InvalidInputException;
+import org.signal.libsignal.zkgroup.profiles.ProfileKey;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.whispersystems.signalservice.api.push.UsernameLinkComponents;
+import org.whispersystems.signalservice.api.storage.SignalAccountRecord;
+import org.whispersystems.signalservice.api.util.OptionalUtil;
+import org.whispersystems.signalservice.api.util.UuidUtil;
+import org.whispersystems.signalservice.internal.storage.protos.OptionalBool;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.Optional;
+
+/**
+ * Processes {@link SignalAccountRecord}s.
+ */
+public class AccountRecordProcessor extends DefaultStorageRecordProcessor<SignalAccountRecord> {
+
+    private static final Logger logger = LoggerFactory.getLogger(AccountRecordProcessor.class);
+    private final SignalAccountRecord localAccountRecord;
+    private final SignalAccount account;
+    private final Connection connection;
+    private final JobExecutor jobExecutor;
+
+    public AccountRecordProcessor(
+            SignalAccount account, Connection connection, final JobExecutor jobExecutor
+    ) throws SQLException {
+        this.account = account;
+        this.connection = connection;
+        this.jobExecutor = jobExecutor;
+        final var selfRecipientId = account.getSelfRecipientId();
+        final var recipient = account.getRecipientStore().getRecipient(connection, selfRecipientId);
+        final var storageId = account.getRecipientStore().getSelfStorageId(connection);
+        this.localAccountRecord = StorageSyncModels.localToRemoteRecord(account.getConfigurationStore(),
+                recipient,
+                account.getUsernameLink(),
+                storageId.getRaw()).getAccount().get();
+    }
+
+    @Override
+    protected boolean isInvalid(SignalAccountRecord remote) {
+        return false;
+    }
+
+    @Override
+    protected Optional<SignalAccountRecord> getMatching(SignalAccountRecord record) {
+        return Optional.of(localAccountRecord);
+    }
+
+    @Override
+    protected SignalAccountRecord merge(SignalAccountRecord remote, SignalAccountRecord local) {
+        String givenName;
+        String familyName;
+        if (remote.getGivenName().isPresent() || remote.getFamilyName().isPresent()) {
+            givenName = remote.getGivenName().orElse("");
+            familyName = remote.getFamilyName().orElse("");
+        } else {
+            givenName = local.getGivenName().orElse("");
+            familyName = local.getFamilyName().orElse("");
+        }
+
+        final var payments = remote.getPayments().getEntropy().isPresent() ? remote.getPayments() : local.getPayments();
+        final var subscriber = remote.getSubscriber().getId().isPresent()
+                ? remote.getSubscriber()
+                : local.getSubscriber();
+        final var storyViewReceiptsState = remote.getStoryViewReceiptsState() == OptionalBool.UNSET
+                ? local.getStoryViewReceiptsState()
+                : remote.getStoryViewReceiptsState();
+        final var unknownFields = remote.serializeUnknownFields();
+        final var avatarUrlPath = OptionalUtil.or(remote.getAvatarUrlPath(), local.getAvatarUrlPath()).orElse("");
+        final var profileKey = OptionalUtil.or(remote.getProfileKey(), local.getProfileKey()).orElse(null);
+        final var noteToSelfArchived = remote.isNoteToSelfArchived();
+        final var noteToSelfForcedUnread = remote.isNoteToSelfForcedUnread();
+        final var readReceipts = remote.isReadReceiptsEnabled();
+        final var typingIndicators = remote.isTypingIndicatorsEnabled();
+        final var sealedSenderIndicators = remote.isSealedSenderIndicatorsEnabled();
+        final var linkPreviews = remote.isLinkPreviewsEnabled();
+        final var unlisted = remote.isPhoneNumberUnlisted();
+        final var pinnedConversations = remote.getPinnedConversations();
+        final var phoneNumberSharingMode = remote.getPhoneNumberSharingMode();
+        final var preferContactAvatars = remote.isPreferContactAvatars();
+        final var universalExpireTimer = remote.getUniversalExpireTimer();
+        final var e164 = local.getE164();
+        final var defaultReactions = !remote.getDefaultReactions().isEmpty()
+                ? remote.getDefaultReactions()
+                : local.getDefaultReactions();
+        final var displayBadgesOnProfile = remote.isDisplayBadgesOnProfile();
+        final var subscriptionManuallyCancelled = remote.isSubscriptionManuallyCancelled();
+        final var keepMutedChatsArchived = remote.isKeepMutedChatsArchived();
+        final var hasSetMyStoriesPrivacy = remote.hasSetMyStoriesPrivacy();
+        final var hasViewedOnboardingStory = remote.hasViewedOnboardingStory() || local.hasViewedOnboardingStory();
+        final var storiesDisabled = remote.isStoriesDisabled();
+        final var hasSeenGroupStoryEducation = remote.hasSeenGroupStoryEducationSheet()
+                || local.hasSeenGroupStoryEducationSheet();
+        final var username = remote.getUsername() != null && !remote.getUsername().isEmpty()
+                ? remote.getUsername()
+                : local.getUsername() != null && !local.getUsername().isEmpty() ? local.getUsername() : null;
+        final var usernameLink = remote.getUsernameLink() != null ? remote.getUsernameLink() : local.getUsernameLink();
+
+        final var mergedBuilder = new SignalAccountRecord.Builder(remote.getId().getRaw(), unknownFields).setGivenName(
+                        givenName)
+                .setFamilyName(familyName)
+                .setAvatarUrlPath(avatarUrlPath)
+                .setProfileKey(profileKey)
+                .setNoteToSelfArchived(noteToSelfArchived)
+                .setNoteToSelfForcedUnread(noteToSelfForcedUnread)
+                .setReadReceiptsEnabled(readReceipts)
+                .setTypingIndicatorsEnabled(typingIndicators)
+                .setSealedSenderIndicatorsEnabled(sealedSenderIndicators)
+                .setLinkPreviewsEnabled(linkPreviews)
+                .setUnlistedPhoneNumber(unlisted)
+                .setPhoneNumberSharingMode(phoneNumberSharingMode)
+                .setUnlistedPhoneNumber(unlisted)
+                .setPinnedConversations(pinnedConversations)
+                .setPreferContactAvatars(preferContactAvatars)
+                .setPayments(payments.isEnabled(), payments.getEntropy().orElse(null))
+                .setUniversalExpireTimer(universalExpireTimer)
+                .setDefaultReactions(defaultReactions)
+                .setSubscriber(subscriber)
+                .setDisplayBadgesOnProfile(displayBadgesOnProfile)
+                .setSubscriptionManuallyCancelled(subscriptionManuallyCancelled)
+                .setKeepMutedChatsArchived(keepMutedChatsArchived)
+                .setHasSetMyStoriesPrivacy(hasSetMyStoriesPrivacy)
+                .setHasViewedOnboardingStory(hasViewedOnboardingStory)
+                .setStoriesDisabled(storiesDisabled)
+                .setHasSeenGroupStoryEducationSheet(hasSeenGroupStoryEducation)
+                .setStoryViewReceiptsState(storyViewReceiptsState)
+                .setUsername(username)
+                .setUsernameLink(usernameLink)
+                .setE164(e164);
+        final var merged = mergedBuilder.build();
+
+        final var matchesRemote = doProtosMatch(merged, remote);
+        if (matchesRemote) {
+            return remote;
+        }
+
+        final var matchesLocal = doProtosMatch(merged, local);
+        if (matchesLocal) {
+            return local;
+        }
+
+        return mergedBuilder.setId(KeyUtils.createRawStorageId()).build();
+    }
+
+    @Override
+    protected void insertLocal(SignalAccountRecord record) {
+        throw new UnsupportedOperationException(
+                "We should always have a local AccountRecord, so we should never been inserting a new one.");
+    }
+
+    @Override
+    protected void updateLocal(StorageRecordUpdate<SignalAccountRecord> update) throws SQLException {
+        final var accountRecord = update.newRecord();
+
+        if (!accountRecord.getE164().equals(account.getNumber())) {
+            jobExecutor.enqueueJob(new CheckWhoAmIJob());
+        }
+
+        account.getConfigurationStore().setReadReceipts(connection, accountRecord.isReadReceiptsEnabled());
+        account.getConfigurationStore().setTypingIndicators(connection, accountRecord.isTypingIndicatorsEnabled());
+        account.getConfigurationStore()
+                .setUnidentifiedDeliveryIndicators(connection, accountRecord.isSealedSenderIndicatorsEnabled());
+        account.getConfigurationStore().setLinkPreviews(connection, accountRecord.isLinkPreviewsEnabled());
+        account.getConfigurationStore()
+                .setPhoneNumberSharingMode(connection,
+                        StorageSyncModels.remoteToLocal(accountRecord.getPhoneNumberSharingMode()));
+        account.getConfigurationStore().setPhoneNumberUnlisted(connection, accountRecord.isPhoneNumberUnlisted());
+
+        account.setUsername(accountRecord.getUsername() != null && !accountRecord.getUsername().isEmpty()
+                ? accountRecord.getUsername()
+                : null);
+        if (accountRecord.getUsernameLink() != null) {
+            final var usernameLink = accountRecord.getUsernameLink();
+            account.setUsernameLink(new UsernameLinkComponents(usernameLink.entropy.toByteArray(),
+                    UuidUtil.parseOrThrow(usernameLink.serverId.toByteArray())));
+            account.getConfigurationStore().setUsernameLinkColor(connection, usernameLink.color.name());
+        }
+
+        if (accountRecord.getProfileKey().isPresent()) {
+            ProfileKey profileKey;
+            try {
+                profileKey = new ProfileKey(accountRecord.getProfileKey().get());
+            } catch (InvalidInputException e) {
+                logger.debug("Received invalid profile key from storage");
+                profileKey = null;
+            }
+            if (profileKey != null) {
+                account.setProfileKey(profileKey);
+                final var avatarPath = accountRecord.getAvatarUrlPath().orElse(null);
+                jobExecutor.enqueueJob(new DownloadProfileAvatarJob(avatarPath));
+            }
+        }
+
+        final var profile = account.getRecipientStore().getProfile(connection, account.getSelfRecipientId());
+        final var builder = profile == null ? Profile.newBuilder() : Profile.newBuilder(profile);
+        builder.withGivenName(accountRecord.getGivenName().orElse(null));
+        builder.withFamilyName(accountRecord.getFamilyName().orElse(null));
+        account.getRecipientStore().storeProfile(connection, account.getSelfRecipientId(), builder.build());
+        account.getRecipientStore()
+                .storeStorageRecord(connection,
+                        account.getSelfRecipientId(),
+                        accountRecord.getId(),
+                        accountRecord.toProto().encode());
+    }
+
+    @Override
+    public int compare(SignalAccountRecord lhs, SignalAccountRecord rhs) {
+        return 0;
+    }
+
+    private static boolean doProtosMatch(SignalAccountRecord merged, SignalAccountRecord other) {
+        return Arrays.equals(merged.toProto().encode(), other.toProto().encode());
+    }
+}
diff --git a/lib/src/main/java/org/asamk/signal/manager/syncStorage/ContactRecordProcessor.java b/lib/src/main/java/org/asamk/signal/manager/syncStorage/ContactRecordProcessor.java
new file mode 100644 (file)
index 0000000..2e557e1
--- /dev/null
@@ -0,0 +1,336 @@
+package org.asamk.signal.manager.syncStorage;
+
+import org.asamk.signal.manager.api.Contact;
+import org.asamk.signal.manager.api.Profile;
+import org.asamk.signal.manager.internal.JobExecutor;
+import org.asamk.signal.manager.jobs.DownloadProfileJob;
+import org.asamk.signal.manager.storage.SignalAccount;
+import org.asamk.signal.manager.storage.recipients.RecipientAddress;
+import org.asamk.signal.manager.util.KeyUtils;
+import org.signal.libsignal.protocol.IdentityKey;
+import org.signal.libsignal.protocol.InvalidKeyException;
+import org.signal.libsignal.zkgroup.InvalidInputException;
+import org.signal.libsignal.zkgroup.profiles.ProfileKey;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.whispersystems.signalservice.api.push.ServiceId.ACI;
+import org.whispersystems.signalservice.api.push.ServiceId.PNI;
+import org.whispersystems.signalservice.api.storage.SignalContactRecord;
+import org.whispersystems.signalservice.api.util.OptionalUtil;
+import org.whispersystems.signalservice.internal.storage.protos.ContactRecord.IdentityState;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.regex.Pattern;
+
+public class ContactRecordProcessor extends DefaultStorageRecordProcessor<SignalContactRecord> {
+
+    private static final Logger logger = LoggerFactory.getLogger(ContactRecordProcessor.class);
+
+    private static final Pattern E164_PATTERN = Pattern.compile("^\\+[1-9]\\d{0,18}$");
+
+    private final ACI selfAci;
+    private final PNI selfPni;
+    private final String selfNumber;
+    private final SignalAccount account;
+    private final Connection connection;
+    private final JobExecutor jobExecutor;
+
+    public ContactRecordProcessor(SignalAccount account, Connection connection, final JobExecutor jobExecutor) {
+        this.account = account;
+        this.connection = connection;
+        this.jobExecutor = jobExecutor;
+        this.selfAci = account.getAci();
+        this.selfPni = account.getPni();
+        this.selfNumber = account.getNumber();
+    }
+
+    /**
+     * Error cases:
+     * - You can't have a contact record without an ACI or PNI.
+     * - You can't have a contact record for yourself. That should be an account record.
+     */
+    @Override
+    protected boolean isInvalid(SignalContactRecord remote) {
+        boolean hasAci = remote.getAci().isPresent() && remote.getAci().get().isValid();
+        boolean hasPni = remote.getPni().isPresent() && remote.getPni().get().isValid();
+
+        if (!hasAci && !hasPni) {
+            logger.debug("Found a ContactRecord with neither an ACI nor a PNI -- marking as invalid.");
+            return true;
+        } else if (selfAci != null && selfAci.equals(remote.getAci().orElse(null)) || (
+                selfPni != null && selfPni.equals(remote.getPni().orElse(null))
+        ) || (selfNumber != null && selfNumber.equals(remote.getNumber().orElse(null)))) {
+            logger.debug("Found a ContactRecord for ourselves -- marking as invalid.");
+            return true;
+        } else if (remote.getNumber().isPresent() && !isValidE164(remote.getNumber().get())) {
+            logger.debug("Found a record with an invalid E164. Marking as invalid.");
+            return true;
+        } else {
+            return false;
+        }
+    }
+
+    @Override
+    protected Optional<SignalContactRecord> getMatching(SignalContactRecord remote) throws SQLException {
+        final var address = getRecipientAddress(remote);
+        final var recipientId = account.getRecipientStore().resolveRecipient(connection, address);
+        final var recipient = account.getRecipientStore().getRecipient(connection, recipientId);
+
+        final var identifier = recipient.getAddress().getIdentifier();
+        final var identity = account.getIdentityKeyStore().getIdentityInfo(connection, identifier);
+        final var storageId = account.getRecipientStore().getStorageId(connection, recipientId);
+
+        return Optional.of(StorageSyncModels.localToRemoteRecord(recipient, identity, storageId.getRaw())
+                .getContact()
+                .get());
+    }
+
+    @Override
+    protected SignalContactRecord merge(
+            SignalContactRecord remote, SignalContactRecord local
+    ) {
+        String profileGivenName;
+        String profileFamilyName;
+        if (remote.getProfileGivenName().isPresent() || remote.getProfileFamilyName().isPresent()) {
+            profileGivenName = remote.getProfileGivenName().orElse("");
+            profileFamilyName = remote.getProfileFamilyName().orElse("");
+        } else {
+            profileGivenName = local.getProfileGivenName().orElse("");
+            profileFamilyName = local.getProfileFamilyName().orElse("");
+        }
+
+        IdentityState identityState;
+        byte[] identityKey;
+        if ((remote.getIdentityState() != local.getIdentityState() && remote.getIdentityKey().isPresent())
+                || (remote.getIdentityKey().isPresent() && local.getIdentityKey().isEmpty())) {
+            identityState = remote.getIdentityState();
+            identityKey = remote.getIdentityKey().get();
+        } else {
+            identityState = local.getIdentityState();
+            identityKey = local.getIdentityKey().orElse(null);
+        }
+
+        if (local.getAci().isPresent() && identityKey != null && remote.getIdentityKey().isPresent() && !Arrays.equals(
+                identityKey,
+                remote.getIdentityKey().get())) {
+            logger.debug("The local and remote identity keys do not match for {}. Enqueueing a profile fetch.",
+                    local.getAci().orElse(null));
+            final var address = getRecipientAddress(local);
+            jobExecutor.enqueueJob(new DownloadProfileJob(address));
+        }
+
+        final var e164sMatchButPnisDont = local.getNumber().isPresent()
+                && local.getNumber()
+                .get()
+                .equals(remote.getNumber().orElse(null))
+                && local.getPni().isPresent()
+                && remote.getPni().isPresent()
+                && !local.getPni().get().equals(remote.getPni().get());
+
+        final var pnisMatchButE164sDont = local.getPni().isPresent()
+                && local.getPni()
+                .get()
+                .equals(remote.getPni().orElse(null))
+                && local.getNumber().isPresent()
+                && remote.getNumber().isPresent()
+                && !local.getNumber().get().equals(remote.getNumber().get());
+
+        PNI pni;
+        String e164;
+        if (e164sMatchButPnisDont) {
+            logger.debug("Matching E164s, but the PNIs differ! Trusting our local pair.");
+            // TODO [pnp] Schedule CDS fetch?
+            pni = local.getPni().get();
+            e164 = local.getNumber().get();
+        } else if (pnisMatchButE164sDont) {
+            logger.debug("Matching PNIs, but the E164s differ! Trusting our local pair.");
+            // TODO [pnp] Schedule CDS fetch?
+            pni = local.getPni().get();
+            e164 = local.getNumber().get();
+        } else {
+            pni = OptionalUtil.or(remote.getPni(), local.getPni()).orElse(null);
+            e164 = OptionalUtil.or(remote.getNumber(), local.getNumber()).orElse(null);
+        }
+
+        final var unknownFields = remote.serializeUnknownFields();
+        final var aci = local.getAci().isEmpty() ? remote.getAci().orElse(null) : local.getAci().get();
+        final var profileKey = OptionalUtil.or(remote.getProfileKey(), local.getProfileKey()).orElse(null);
+        final var username = OptionalUtil.or(remote.getUsername(), local.getUsername()).orElse("");
+        final var blocked = remote.isBlocked();
+        final var profileSharing = remote.isProfileSharingEnabled();
+        final var archived = remote.isArchived();
+        final var forcedUnread = remote.isForcedUnread();
+        final var muteUntil = remote.getMuteUntil();
+        final var hideStory = remote.shouldHideStory();
+        final var unregisteredTimestamp = remote.getUnregisteredTimestamp();
+        final var hidden = remote.isHidden();
+        final var systemGivenName = account.isPrimaryDevice()
+                ? local.getSystemGivenName().orElse("")
+                : remote.getSystemGivenName().orElse("");
+        final var systemFamilyName = account.isPrimaryDevice()
+                ? local.getSystemFamilyName().orElse("")
+                : remote.getSystemFamilyName().orElse("");
+        final var systemNickname = remote.getSystemNickname().orElse("");
+
+        final var mergedBuilder = new SignalContactRecord.Builder(remote.getId().getRaw(), aci, unknownFields).setE164(
+                        e164)
+                .setPni(pni)
+                .setProfileGivenName(profileGivenName)
+                .setProfileFamilyName(profileFamilyName)
+                .setSystemGivenName(systemGivenName)
+                .setSystemFamilyName(systemFamilyName)
+                .setSystemNickname(systemNickname)
+                .setProfileKey(profileKey)
+                .setUsername(username)
+                .setIdentityState(identityState)
+                .setIdentityKey(identityKey)
+                .setBlocked(blocked)
+                .setProfileSharingEnabled(profileSharing)
+                .setArchived(archived)
+                .setForcedUnread(forcedUnread)
+                .setMuteUntil(muteUntil)
+                .setHideStory(hideStory)
+                .setUnregisteredTimestamp(unregisteredTimestamp)
+                .setHidden(hidden);
+        final var merged = mergedBuilder.build();
+
+        final var matchesRemote = doProtosMatch(merged, remote);
+        if (matchesRemote) {
+            return remote;
+        }
+
+        final var matchesLocal = doProtosMatch(merged, local);
+        if (matchesLocal) {
+            return local;
+        }
+
+        return mergedBuilder.setId(KeyUtils.createRawStorageId()).build();
+    }
+
+    @Override
+    protected void insertLocal(SignalContactRecord record) throws SQLException {
+        StorageRecordUpdate<SignalContactRecord> update = new StorageRecordUpdate<>(null, record);
+        updateLocal(update);
+    }
+
+    @Override
+    protected void updateLocal(StorageRecordUpdate<SignalContactRecord> update) throws SQLException {
+        final var contactRecord = update.newRecord();
+        final var address = getRecipientAddress(contactRecord);
+        final var recipientId = account.getRecipientStore().resolveRecipientTrusted(connection, address);
+        final var recipient = account.getRecipientStore().getRecipient(connection, recipientId);
+
+        final var contact = recipient.getContact();
+        final var blocked = contact != null && contact.isBlocked();
+        final var profileShared = contact != null && contact.isProfileSharingEnabled();
+        final var archived = contact != null && contact.isArchived();
+        final var hidden = contact != null && contact.isHidden();
+        final var contactGivenName = contact == null ? null : contact.givenName();
+        final var contactFamilyName = contact == null ? null : contact.familyName();
+        if (blocked != contactRecord.isBlocked()
+                || profileShared != contactRecord.isProfileSharingEnabled()
+                || archived != contactRecord.isArchived()
+                || hidden != contactRecord.isHidden()
+                || (
+                contactRecord.getSystemGivenName().isPresent() && !contactRecord.getSystemGivenName()
+                        .get()
+                        .equals(contactGivenName)
+        )
+                || (
+                contactRecord.getSystemFamilyName().isPresent() && !contactRecord.getSystemFamilyName()
+                        .get()
+                        .equals(contactFamilyName)
+        )) {
+            logger.debug("Storing new or updated contact {}", recipientId);
+            final var contactBuilder = contact == null ? Contact.newBuilder() : Contact.newBuilder(contact);
+            final var newContact = contactBuilder.withIsBlocked(contactRecord.isBlocked())
+                    .withIsProfileSharingEnabled(contactRecord.isProfileSharingEnabled())
+                    .withIsArchived(contactRecord.isArchived())
+                    .withIsHidden(contactRecord.isHidden());
+            if (contactRecord.getSystemGivenName().isPresent() || contactRecord.getSystemFamilyName().isPresent()) {
+                newContact.withGivenName(contactRecord.getSystemGivenName().orElse(null))
+                        .withFamilyName(contactRecord.getSystemFamilyName().orElse(null));
+            }
+            account.getRecipientStore().storeContact(connection, recipientId, newContact.build());
+        }
+
+        final var profile = recipient.getProfile();
+        final var profileGivenName = profile == null ? null : profile.getGivenName();
+        final var profileFamilyName = profile == null ? null : profile.getFamilyName();
+        if ((
+                contactRecord.getProfileGivenName().isPresent() && !contactRecord.getProfileGivenName()
+                        .get()
+                        .equals(profileGivenName)
+        ) || (
+                contactRecord.getProfileFamilyName().isPresent() && !contactRecord.getProfileFamilyName()
+                        .get()
+                        .equals(profileFamilyName)
+        )) {
+            final var profileBuilder = profile == null ? Profile.newBuilder() : Profile.newBuilder(profile);
+            final var newProfile = profileBuilder.withGivenName(contactRecord.getProfileGivenName().orElse(null))
+                    .withFamilyName(contactRecord.getProfileFamilyName().orElse(null))
+                    .build();
+            account.getRecipientStore().storeProfile(connection, recipientId, newProfile);
+        }
+        if (contactRecord.getProfileKey().isPresent()) {
+            try {
+                logger.trace("Storing profile key {}", recipientId);
+                final var profileKey = new ProfileKey(contactRecord.getProfileKey().get());
+                account.getRecipientStore().storeProfileKey(connection, recipientId, profileKey);
+            } catch (InvalidInputException e) {
+                logger.warn("Received invalid contact profile key from storage");
+            }
+        }
+        if (contactRecord.getIdentityKey().isPresent() && contactRecord.getAci().orElse(null) != null) {
+            try {
+                logger.trace("Storing identity key {}", recipientId);
+                final var identityKey = new IdentityKey(contactRecord.getIdentityKey().get());
+                account.getIdentityKeyStore()
+                        .saveIdentity(connection, contactRecord.getAci().orElse(null), identityKey);
+
+                final var trustLevel = StorageSyncModels.remoteToLocal(contactRecord.getIdentityState());
+                if (trustLevel != null) {
+                    account.getIdentityKeyStore()
+                            .setIdentityTrustLevel(connection,
+                                    contactRecord.getAci().orElse(null),
+                                    identityKey,
+                                    trustLevel);
+                }
+            } catch (InvalidKeyException e) {
+                logger.warn("Received invalid contact identity key from storage");
+            }
+        }
+        account.getRecipientStore()
+                .storeStorageRecord(connection, recipientId, contactRecord.getId(), contactRecord.toProto().encode());
+    }
+
+    private static RecipientAddress getRecipientAddress(final SignalContactRecord contactRecord) {
+        return new RecipientAddress(contactRecord.getAci().orElse(null),
+                contactRecord.getPni().orElse(null),
+                contactRecord.getNumber().orElse(null),
+                contactRecord.getUsername().orElse(null));
+    }
+
+    @Override
+    public int compare(SignalContactRecord lhs, SignalContactRecord rhs) {
+        if ((lhs.getAci().isPresent() && Objects.equals(lhs.getAci(), rhs.getAci())) || (
+                lhs.getNumber().isPresent() && Objects.equals(lhs.getNumber(), rhs.getNumber())
+        ) || (lhs.getPni().isPresent() && Objects.equals(lhs.getPni(), rhs.getPni()))) {
+            return 0;
+        } else {
+            return 1;
+        }
+    }
+
+    private static boolean isValidE164(String value) {
+        return E164_PATTERN.matcher(value).matches();
+    }
+
+    private static boolean doProtosMatch(SignalContactRecord merged, SignalContactRecord other) {
+        return Arrays.equals(merged.toProto().encode(), other.toProto().encode());
+    }
+}
diff --git a/lib/src/main/java/org/asamk/signal/manager/syncStorage/DefaultStorageRecordProcessor.java b/lib/src/main/java/org/asamk/signal/manager/syncStorage/DefaultStorageRecordProcessor.java
new file mode 100644 (file)
index 0000000..2b6334e
--- /dev/null
@@ -0,0 +1,96 @@
+package org.asamk.signal.manager.syncStorage;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.whispersystems.signalservice.api.storage.SignalRecord;
+import org.whispersystems.signalservice.api.storage.StorageId;
+
+import java.sql.SQLException;
+import java.util.Comparator;
+import java.util.Optional;
+import java.util.Set;
+import java.util.TreeSet;
+
+/**
+ * An implementation of {@link StorageRecordProcessor} that solidifies a pattern and reduces
+ * duplicate code in individual implementations.
+ * <p>
+ * Concerning the implementation of {@link #compare(Object, Object)}, it's purpose is to detect if
+ * two items would map to the same logical entity (i.e. they would correspond to the same record in
+ * our local store). We use it for a {@link TreeSet}, so mainly it's just important that the '0'
+ * case is correct. Other cases are whatever, just make it something stable.
+ */
+abstract class DefaultStorageRecordProcessor<E extends SignalRecord> implements StorageRecordProcessor<E>, Comparator<E> {
+
+    private static final Logger logger = LoggerFactory.getLogger(DefaultStorageRecordProcessor.class);
+    private final Set<E> matchedRecords = new TreeSet<>(this);
+
+    /**
+     * One type of invalid remote data this handles is two records mapping to the same local data. We
+     * have to trim this bad data out, because if we don't, we'll upload an ID set that only has one
+     * of the IDs in it, but won't properly delete the dupes, which will then fail our validation
+     * checks.
+     * <p>
+     * This is a bit tricky -- as we process records, IDs are written back to the local store, so we
+     * can't easily be like "oh multiple records are mapping to the same local storage ID". And in
+     * general we rely on SignalRecords to implement an equals() that includes the StorageId, so using
+     * a regular set is out. Instead, we use a {@link TreeSet}, which allows us to define a custom
+     * comparator for checking equality. Then we delegate to the subclass to tell us if two items are
+     * the same based on their actual data (i.e. two contacts having the same UUID, or two groups
+     * having the same MasterKey).
+     */
+    @Override
+    public void process(E remote) throws SQLException {
+        if (isInvalid(remote)) {
+            debug(remote.getId(), remote, "Found invalid key! Ignoring it.");
+            return;
+        }
+
+        final var local = getMatching(remote);
+
+        if (local.isEmpty()) {
+            debug(remote.getId(), remote, "No matching local record. Inserting.");
+            insertLocal(remote);
+            return;
+        }
+
+        if (matchedRecords.contains(local.get())) {
+            debug(remote.getId(), remote, "Multiple remote records map to the same local record! Ignoring this one.");
+            return;
+        }
+
+        matchedRecords.add(local.get());
+
+        final var merged = merge(remote, local.get());
+        if (!merged.equals(remote)) {
+            debug(remote.getId(), remote, "[Remote Update] " + merged.describeDiff(remote));
+        }
+
+        if (!merged.equals(local.get())) {
+            final var update = new StorageRecordUpdate<>(local.get(), merged);
+            debug(remote.getId(), remote, "[Local Update] " + update);
+            updateLocal(update);
+        }
+    }
+
+    private void debug(StorageId i, E record, String message) {
+        logger.debug("[" + i + "][" + record.getClass().getSimpleName() + "] " + message);
+    }
+
+    /**
+     * @return True if the record is invalid and should be removed from storage service, otherwise false.
+     */
+    protected abstract boolean isInvalid(E remote) throws SQLException;
+
+    /**
+     * Only records that pass the validity check (i.e. return false from {@link #isInvalid(SignalRecord)})
+     * make it to here, so you can assume all records are valid.
+     */
+    protected abstract Optional<E> getMatching(E remote) throws SQLException;
+
+    protected abstract E merge(E remote, E local);
+
+    protected abstract void insertLocal(E record) throws SQLException;
+
+    protected abstract void updateLocal(StorageRecordUpdate<E> update) throws SQLException;
+}
diff --git a/lib/src/main/java/org/asamk/signal/manager/syncStorage/GroupV1RecordProcessor.java b/lib/src/main/java/org/asamk/signal/manager/syncStorage/GroupV1RecordProcessor.java
new file mode 100644 (file)
index 0000000..be95591
--- /dev/null
@@ -0,0 +1,137 @@
+package org.asamk.signal.manager.syncStorage;
+
+import org.asamk.signal.manager.api.GroupId;
+import org.asamk.signal.manager.api.GroupIdV1;
+import org.asamk.signal.manager.storage.SignalAccount;
+import org.asamk.signal.manager.storage.groups.GroupInfoV2;
+import org.asamk.signal.manager.util.KeyUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.whispersystems.signalservice.api.storage.SignalGroupV1Record;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.Optional;
+
+/**
+ * Handles merging remote storage updates into local group v1 state.
+ */
+public final class GroupV1RecordProcessor extends DefaultStorageRecordProcessor<SignalGroupV1Record> {
+
+    private static final Logger logger = LoggerFactory.getLogger(GroupV1RecordProcessor.class);
+    private final SignalAccount account;
+    private final Connection connection;
+
+    public GroupV1RecordProcessor(SignalAccount account, Connection connection) {
+        this.account = account;
+        this.connection = connection;
+    }
+
+    /**
+     * We want to catch:
+     * - Invalid group IDs
+     * - GV1 IDs that map to GV2 IDs, meaning we've already migrated them.
+     */
+    @Override
+    protected boolean isInvalid(SignalGroupV1Record remote) throws SQLException {
+        try {
+            final var id = GroupId.unknownVersion(remote.getGroupId());
+            if (!(id instanceof GroupIdV1)) {
+                return true;
+            }
+            final var group = account.getGroupStore().getGroup(connection, id);
+
+            if (group instanceof GroupInfoV2) {
+                logger.debug("We already have an upgraded V2 group for this V1 group -- marking as invalid.");
+                return true;
+            } else {
+                return false;
+            }
+        } catch (AssertionError e) {
+            logger.debug("Bad Group ID -- marking as invalid.");
+            return true;
+        }
+    }
+
+    @Override
+    protected Optional<SignalGroupV1Record> getMatching(SignalGroupV1Record remote) throws SQLException {
+        final var id = GroupId.v1(remote.getGroupId());
+        final var group = account.getGroupStore().getGroup(connection, id);
+
+        if (group == null) {
+            return Optional.empty();
+        }
+
+        final var storageId = account.getGroupStore().getGroupStorageId(connection, id);
+        return Optional.of(StorageSyncModels.localToRemoteRecord(group, storageId.getRaw()).getGroupV1().get());
+    }
+
+    @Override
+    protected SignalGroupV1Record merge(SignalGroupV1Record remote, SignalGroupV1Record local) {
+        final var unknownFields = remote.serializeUnknownFields();
+        final var blocked = remote.isBlocked();
+        final var profileSharing = remote.isProfileSharingEnabled();
+        final var archived = remote.isArchived();
+        final var forcedUnread = remote.isForcedUnread();
+        final var muteUntil = remote.getMuteUntil();
+
+        final var mergedBuilder = new SignalGroupV1Record.Builder(remote.getId().getRaw(),
+                remote.getGroupId(),
+                unknownFields).setBlocked(blocked)
+                .setProfileSharingEnabled(profileSharing)
+                .setForcedUnread(forcedUnread)
+                .setMuteUntil(muteUntil)
+                .setArchived(archived);
+
+        final var merged = mergedBuilder.build();
+
+        final var matchesRemote = doProtosMatch(merged, remote);
+        if (matchesRemote) {
+            return remote;
+        }
+
+        final var matchesLocal = doProtosMatch(merged, local);
+        if (matchesLocal) {
+            return local;
+        }
+
+        return mergedBuilder.setId(KeyUtils.createRawStorageId()).build();
+    }
+
+    @Override
+    protected void insertLocal(SignalGroupV1Record record) throws SQLException {
+        // TODO send group info request (after server message queue is empty)
+        // context.getGroupHelper().sendGroupInfoRequest(groupIdV1, account.getSelfRecipientId());
+        StorageRecordUpdate<SignalGroupV1Record> update = new StorageRecordUpdate<>(null, record);
+        updateLocal(update);
+    }
+
+    @Override
+    protected void updateLocal(StorageRecordUpdate<SignalGroupV1Record> update) throws SQLException {
+        final var groupV1Record = update.newRecord();
+        final var groupIdV1 = GroupId.v1(groupV1Record.getGroupId());
+
+        final var group = account.getGroupStore().getGroup(connection, groupIdV1);
+        group.setBlocked(groupV1Record.isBlocked());
+        account.getGroupStore().updateGroup(connection, group);
+        account.getGroupStore()
+                .storeStorageRecord(connection,
+                        group.getGroupId(),
+                        groupV1Record.getId(),
+                        groupV1Record.toProto().encode());
+    }
+
+    @Override
+    public int compare(SignalGroupV1Record lhs, SignalGroupV1Record rhs) {
+        if (Arrays.equals(lhs.getGroupId(), rhs.getGroupId())) {
+            return 0;
+        } else {
+            return 1;
+        }
+    }
+
+    private static boolean doProtosMatch(SignalGroupV1Record merged, SignalGroupV1Record other) {
+        return Arrays.equals(merged.toProto().encode(), other.toProto().encode());
+    }
+}
diff --git a/lib/src/main/java/org/asamk/signal/manager/syncStorage/GroupV2RecordProcessor.java b/lib/src/main/java/org/asamk/signal/manager/syncStorage/GroupV2RecordProcessor.java
new file mode 100644 (file)
index 0000000..4d41901
--- /dev/null
@@ -0,0 +1,115 @@
+package org.asamk.signal.manager.syncStorage;
+
+import org.asamk.signal.manager.groups.GroupUtils;
+import org.asamk.signal.manager.storage.SignalAccount;
+import org.asamk.signal.manager.util.KeyUtils;
+import org.signal.libsignal.zkgroup.groups.GroupMasterKey;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.whispersystems.signalservice.api.storage.SignalGroupV2Record;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.Optional;
+
+public final class GroupV2RecordProcessor extends DefaultStorageRecordProcessor<SignalGroupV2Record> {
+
+    private static final Logger logger = LoggerFactory.getLogger(GroupV2RecordProcessor.class);
+    private final SignalAccount account;
+    private final Connection connection;
+
+    public GroupV2RecordProcessor(SignalAccount account, Connection connection) {
+        this.account = account;
+        this.connection = connection;
+    }
+
+    @Override
+    protected boolean isInvalid(SignalGroupV2Record remote) {
+        return remote.getMasterKeyBytes().length != GroupMasterKey.SIZE;
+    }
+
+    @Override
+    protected Optional<SignalGroupV2Record> getMatching(SignalGroupV2Record remote) throws SQLException {
+        final var id = GroupUtils.getGroupIdV2(remote.getMasterKeyOrThrow());
+        final var group = account.getGroupStore().getGroup(connection, id);
+
+        if (group == null) {
+            return Optional.empty();
+        }
+
+        final var storageId = account.getGroupStore().getGroupStorageId(connection, id);
+        return Optional.of(StorageSyncModels.localToRemoteRecord(group, storageId.getRaw()).getGroupV2().get());
+    }
+
+    @Override
+    protected SignalGroupV2Record merge(SignalGroupV2Record remote, SignalGroupV2Record local) {
+        final var unknownFields = remote.serializeUnknownFields();
+        final var blocked = remote.isBlocked();
+        final var profileSharing = remote.isProfileSharingEnabled();
+        final var archived = remote.isArchived();
+        final var forcedUnread = remote.isForcedUnread();
+        final var muteUntil = remote.getMuteUntil();
+        final var notifyForMentionsWhenMuted = remote.notifyForMentionsWhenMuted();
+        final var hideStory = remote.shouldHideStory();
+        final var storySendMode = remote.getStorySendMode();
+
+        final var mergedBuilder = new SignalGroupV2Record.Builder(remote.getId().getRaw(),
+                remote.getMasterKeyBytes(),
+                unknownFields).setBlocked(blocked)
+                .setProfileSharingEnabled(profileSharing)
+                .setArchived(archived)
+                .setForcedUnread(forcedUnread)
+                .setMuteUntil(muteUntil)
+                .setNotifyForMentionsWhenMuted(notifyForMentionsWhenMuted)
+                .setHideStory(hideStory)
+                .setStorySendMode(storySendMode);
+        final var merged = mergedBuilder.build();
+
+        final var matchesRemote = doProtosMatch(merged, remote);
+        if (matchesRemote) {
+            return remote;
+        }
+
+        final var matchesLocal = doProtosMatch(merged, local);
+        if (matchesLocal) {
+            return local;
+        }
+
+        return mergedBuilder.setId(KeyUtils.createRawStorageId()).build();
+    }
+
+    @Override
+    protected void insertLocal(SignalGroupV2Record record) throws SQLException {
+        StorageRecordUpdate<SignalGroupV2Record> update = new StorageRecordUpdate<>(null, record);
+        updateLocal(update);
+    }
+
+    @Override
+    protected void updateLocal(StorageRecordUpdate<SignalGroupV2Record> update) throws SQLException {
+        final var groupV2Record = update.newRecord();
+        final var groupMasterKey = groupV2Record.getMasterKeyOrThrow();
+
+        final var group = account.getGroupStore().getGroupOrPartialMigrate(connection, groupMasterKey);
+        group.setBlocked(groupV2Record.isBlocked());
+        account.getGroupStore().updateGroup(connection, group);
+        account.getGroupStore()
+                .storeStorageRecord(connection,
+                        group.getGroupId(),
+                        groupV2Record.getId(),
+                        groupV2Record.toProto().encode());
+    }
+
+    @Override
+    public int compare(SignalGroupV2Record lhs, SignalGroupV2Record rhs) {
+        if (Arrays.equals(lhs.getMasterKeyBytes(), rhs.getMasterKeyBytes())) {
+            return 0;
+        } else {
+            return 1;
+        }
+    }
+
+    private static boolean doProtosMatch(SignalGroupV2Record merged, SignalGroupV2Record other) {
+        return Arrays.equals(merged.toProto().encode(), other.toProto().encode());
+    }
+}
diff --git a/lib/src/main/java/org/asamk/signal/manager/syncStorage/StorageRecordProcessor.java b/lib/src/main/java/org/asamk/signal/manager/syncStorage/StorageRecordProcessor.java
new file mode 100644 (file)
index 0000000..45a9956
--- /dev/null
@@ -0,0 +1,14 @@
+package org.asamk.signal.manager.syncStorage;
+
+import org.whispersystems.signalservice.api.storage.SignalRecord;
+
+import java.sql.SQLException;
+
+/**
+ * Handles processing a remote record, which involves applying any local changes that need to be
+ * made based on the remote records.
+ */
+interface StorageRecordProcessor<E extends SignalRecord> {
+
+    void process(E remoteRecord) throws SQLException;
+}
diff --git a/lib/src/main/java/org/asamk/signal/manager/syncStorage/StorageRecordUpdate.java b/lib/src/main/java/org/asamk/signal/manager/syncStorage/StorageRecordUpdate.java
new file mode 100644 (file)
index 0000000..8dcb7b3
--- /dev/null
@@ -0,0 +1,14 @@
+package org.asamk.signal.manager.syncStorage;
+
+import org.whispersystems.signalservice.api.storage.SignalRecord;
+
+/**
+ * Represents a pair of records: one old, and one new. The new record should replace the old.
+ */
+record StorageRecordUpdate<E extends SignalRecord>(E oldRecord, E newRecord) {
+
+    @Override
+    public String toString() {
+        return newRecord.describeDiff(oldRecord);
+    }
+}
diff --git a/lib/src/main/java/org/asamk/signal/manager/syncStorage/StorageSyncModels.java b/lib/src/main/java/org/asamk/signal/manager/syncStorage/StorageSyncModels.java
new file mode 100644 (file)
index 0000000..c18434c
--- /dev/null
@@ -0,0 +1,145 @@
+package org.asamk.signal.manager.syncStorage;
+
+import org.asamk.signal.manager.api.PhoneNumberSharingMode;
+import org.asamk.signal.manager.api.TrustLevel;
+import org.asamk.signal.manager.storage.configuration.ConfigurationStore;
+import org.asamk.signal.manager.storage.groups.GroupInfoV1;
+import org.asamk.signal.manager.storage.groups.GroupInfoV2;
+import org.asamk.signal.manager.storage.identities.IdentityInfo;
+import org.asamk.signal.manager.storage.recipients.Recipient;
+import org.whispersystems.signalservice.api.push.UsernameLinkComponents;
+import org.whispersystems.signalservice.api.storage.SignalAccountRecord;
+import org.whispersystems.signalservice.api.storage.SignalContactRecord;
+import org.whispersystems.signalservice.api.storage.SignalGroupV1Record;
+import org.whispersystems.signalservice.api.storage.SignalGroupV2Record;
+import org.whispersystems.signalservice.api.storage.SignalStorageRecord;
+import org.whispersystems.signalservice.api.util.UuidUtil;
+import org.whispersystems.signalservice.internal.storage.protos.AccountRecord;
+import org.whispersystems.signalservice.internal.storage.protos.AccountRecord.UsernameLink;
+import org.whispersystems.signalservice.internal.storage.protos.ContactRecord;
+import org.whispersystems.signalservice.internal.storage.protos.ContactRecord.IdentityState;
+
+import java.util.Optional;
+
+import okio.ByteString;
+
+public final class StorageSyncModels {
+
+    private StorageSyncModels() {
+    }
+
+    public static AccountRecord.PhoneNumberSharingMode localToRemote(PhoneNumberSharingMode phoneNumberPhoneNumberSharingMode) {
+        return switch (phoneNumberPhoneNumberSharingMode) {
+            case EVERYBODY -> AccountRecord.PhoneNumberSharingMode.EVERYBODY;
+            case CONTACTS, NOBODY -> AccountRecord.PhoneNumberSharingMode.NOBODY;
+        };
+    }
+
+    public static PhoneNumberSharingMode remoteToLocal(AccountRecord.PhoneNumberSharingMode phoneNumberPhoneNumberSharingMode) {
+        return switch (phoneNumberPhoneNumberSharingMode) {
+            case EVERYBODY -> PhoneNumberSharingMode.EVERYBODY;
+            case UNKNOWN, NOBODY -> PhoneNumberSharingMode.NOBODY;
+        };
+    }
+
+    public static SignalStorageRecord localToRemoteRecord(
+            ConfigurationStore configStore,
+            Recipient self,
+            UsernameLinkComponents usernameLinkComponents,
+            byte[] rawStorageId
+    ) {
+        final var builder = new SignalAccountRecord.Builder(rawStorageId, self.getStorageRecord());
+        if (self.getProfileKey() != null) {
+            builder.setProfileKey(self.getProfileKey().serialize());
+        }
+        if (self.getProfile() != null) {
+            builder.setGivenName(self.getProfile().getGivenName())
+                    .setFamilyName(self.getProfile().getFamilyName())
+                    .setAvatarUrlPath(self.getProfile().getAvatarUrlPath());
+        }
+        builder.setTypingIndicatorsEnabled(Optional.ofNullable(configStore.getTypingIndicators()).orElse(true))
+                .setReadReceiptsEnabled(Optional.ofNullable(configStore.getReadReceipts()).orElse(true))
+                .setSealedSenderIndicatorsEnabled(Optional.ofNullable(configStore.getUnidentifiedDeliveryIndicators())
+                        .orElse(true))
+                .setLinkPreviewsEnabled(Optional.ofNullable(configStore.getLinkPreviews()).orElse(true))
+                .setUnlistedPhoneNumber(Optional.ofNullable(configStore.getPhoneNumberUnlisted()).orElse(true))
+                .setPhoneNumberSharingMode(localToRemote(Optional.ofNullable(configStore.getPhoneNumberSharingMode())
+                        .orElse(PhoneNumberSharingMode.EVERYBODY)))
+                .setE164(self.getAddress().number().orElse(""))
+                .setUsername(self.getAddress().username().orElse(null));
+        if (usernameLinkComponents != null) {
+            final var linkColor = configStore.getUsernameLinkColor();
+            builder.setUsernameLink(new UsernameLink.Builder().entropy(ByteString.of(usernameLinkComponents.getEntropy()))
+                    .serverId(UuidUtil.toByteString(usernameLinkComponents.getServerId()))
+                    .color(linkColor == null ? UsernameLink.Color.UNKNOWN : UsernameLink.Color.valueOf(linkColor))
+                    .build());
+        }
+
+        return SignalStorageRecord.forAccount(builder.build());
+    }
+
+    public static SignalStorageRecord localToRemoteRecord(
+            Recipient recipient, IdentityInfo identity, byte[] rawStorageId
+    ) {
+        final var address = recipient.getAddress();
+        final var builder = new SignalContactRecord.Builder(rawStorageId,
+                address.aci().orElse(null),
+                recipient.getStorageRecord()).setE164(address.number().orElse(null))
+                .setPni(address.pni().orElse(null))
+                .setUsername(address.username().orElse(null))
+                .setProfileKey(recipient.getProfileKey() == null ? null : recipient.getProfileKey().serialize());
+        if (recipient.getProfile() != null) {
+            builder.setProfileGivenName(recipient.getProfile().getGivenName())
+                    .setProfileFamilyName(recipient.getProfile().getFamilyName());
+        }
+        if (recipient.getContact() != null) {
+            builder.setSystemGivenName(recipient.getContact().givenName())
+                    .setSystemFamilyName((recipient.getContact().familyName()))
+                    .setBlocked(recipient.getContact().isBlocked())
+                    .setProfileSharingEnabled(recipient.getContact().isProfileSharingEnabled())
+                    .setArchived(recipient.getContact().isArchived())
+                    .setHidden(recipient.getContact().isHidden());
+        }
+        if (identity != null) {
+            builder.setIdentityKey(identity.getIdentityKey().serialize())
+                    .setIdentityState(localToRemote(identity.getTrustLevel()));
+        }
+        return SignalStorageRecord.forContact(builder.build());
+    }
+
+    public static SignalStorageRecord localToRemoteRecord(
+            GroupInfoV1 group, byte[] rawStorageId
+    ) {
+        final var builder = new SignalGroupV1Record.Builder(rawStorageId,
+                group.getGroupId().serialize(),
+                group.getStorageRecord());
+        builder.setBlocked(group.isBlocked()).setArchived(group.archived);
+        return SignalStorageRecord.forGroupV1(builder.build());
+    }
+
+    public static SignalStorageRecord localToRemoteRecord(
+            GroupInfoV2 group, byte[] rawStorageId
+    ) {
+        final var builder = new SignalGroupV2Record.Builder(rawStorageId,
+                group.getMasterKey(),
+                group.getStorageRecord());
+        builder.setBlocked(group.isBlocked());
+        return SignalStorageRecord.forGroupV2(builder.build());
+    }
+
+    public static TrustLevel remoteToLocal(ContactRecord.IdentityState identityState) {
+        return switch (identityState) {
+            case DEFAULT -> TrustLevel.TRUSTED_UNVERIFIED;
+            case UNVERIFIED -> TrustLevel.UNTRUSTED;
+            case VERIFIED -> TrustLevel.TRUSTED_VERIFIED;
+        };
+    }
+
+    private static IdentityState localToRemote(TrustLevel local) {
+        return switch (local) {
+            case TRUSTED_VERIFIED -> IdentityState.VERIFIED;
+            case UNTRUSTED -> IdentityState.UNVERIFIED;
+            default -> IdentityState.DEFAULT;
+        };
+    }
+}
diff --git a/lib/src/main/java/org/asamk/signal/manager/syncStorage/StorageSyncValidations.java b/lib/src/main/java/org/asamk/signal/manager/syncStorage/StorageSyncValidations.java
new file mode 100644 (file)
index 0000000..2168a2e
--- /dev/null
@@ -0,0 +1,238 @@
+package org.asamk.signal.manager.syncStorage;
+
+import org.asamk.signal.manager.storage.recipients.RecipientAddress;
+import org.signal.core.util.Base64;
+import org.signal.core.util.SetUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.whispersystems.signalservice.api.push.ServiceId;
+import org.whispersystems.signalservice.api.storage.SignalStorageManifest;
+import org.whispersystems.signalservice.api.storage.SignalStorageRecord;
+import org.whispersystems.signalservice.api.storage.StorageId;
+import org.whispersystems.signalservice.internal.storage.protos.ManifestRecord;
+
+import java.nio.ByteBuffer;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public final class StorageSyncValidations {
+
+    private static final Logger logger = LoggerFactory.getLogger(StorageSyncValidations.class);
+
+    private StorageSyncValidations() {
+    }
+
+    public static void validate(
+            WriteOperationResult result,
+            SignalStorageManifest previousManifest,
+            boolean forcePushPending,
+            RecipientAddress self
+    ) {
+        validateManifestAndInserts(result.manifest(), result.inserts(), self);
+
+        if (!result.deletes().isEmpty()) {
+            Set<String> allSetEncoded = result.manifest()
+                    .getStorageIds()
+                    .stream()
+                    .map(StorageId::getRaw)
+                    .map(Base64::encodeWithPadding)
+                    .collect(Collectors.toSet());
+
+            for (byte[] delete : result.deletes()) {
+                String encoded = Base64.encodeWithPadding(delete);
+                if (allSetEncoded.contains(encoded)) {
+                    throw new DeletePresentInFullIdSetError();
+                }
+            }
+        }
+
+        if (previousManifest.getVersion() == 0) {
+            logger.debug(
+                    "Previous manifest is empty, not bothering with additional validations around the diffs between the two manifests.");
+            return;
+        }
+
+        if (result.manifest().getVersion() != previousManifest.getVersion() + 1) {
+            throw new IncorrectManifestVersionError();
+        }
+
+        if (forcePushPending) {
+            logger.debug(
+                    "Force push pending, not bothering with additional validations around the diffs between the two manifests.");
+            return;
+        }
+
+        Set<ByteBuffer> previousIds = previousManifest.getStorageIds()
+                .stream()
+                .map(id -> ByteBuffer.wrap(id.getRaw()))
+                .collect(Collectors.toSet());
+        Set<ByteBuffer> newIds = result.manifest()
+                .getStorageIds()
+                .stream()
+                .map(id -> ByteBuffer.wrap(id.getRaw()))
+                .collect(Collectors.toSet());
+
+        Set<ByteBuffer> manifestInserts = SetUtil.difference(newIds, previousIds);
+        Set<ByteBuffer> manifestDeletes = SetUtil.difference(previousIds, newIds);
+
+        Set<ByteBuffer> declaredInserts = result.inserts()
+                .stream()
+                .map(r -> ByteBuffer.wrap(r.getId().getRaw()))
+                .collect(Collectors.toSet());
+        Set<ByteBuffer> declaredDeletes = result.deletes().stream().map(ByteBuffer::wrap).collect(Collectors.toSet());
+
+        if (declaredInserts.size() > manifestInserts.size()) {
+            logger.debug("DeclaredInserts: " + declaredInserts.size() + ", ManifestInserts: " + manifestInserts.size());
+            throw new MoreInsertsThanExpectedError();
+        }
+
+        if (declaredInserts.size() < manifestInserts.size()) {
+            logger.debug("DeclaredInserts: " + declaredInserts.size() + ", ManifestInserts: " + manifestInserts.size());
+            throw new LessInsertsThanExpectedError();
+        }
+
+        if (!declaredInserts.containsAll(manifestInserts)) {
+            throw new InsertMismatchError();
+        }
+
+        if (declaredDeletes.size() > manifestDeletes.size()) {
+            logger.debug("DeclaredDeletes: " + declaredDeletes.size() + ", ManifestDeletes: " + manifestDeletes.size());
+            throw new MoreDeletesThanExpectedError();
+        }
+
+        if (declaredDeletes.size() < manifestDeletes.size()) {
+            logger.debug("DeclaredDeletes: " + declaredDeletes.size() + ", ManifestDeletes: " + manifestDeletes.size());
+            throw new LessDeletesThanExpectedError();
+        }
+
+        if (!declaredDeletes.containsAll(manifestDeletes)) {
+            throw new DeleteMismatchError();
+        }
+    }
+
+    public static void validateForcePush(
+            SignalStorageManifest manifest, List<SignalStorageRecord> inserts, RecipientAddress self
+    ) {
+        validateManifestAndInserts(manifest, inserts, self);
+    }
+
+    private static void validateManifestAndInserts(
+            SignalStorageManifest manifest, List<SignalStorageRecord> inserts, RecipientAddress self
+    ) {
+        int accountCount = 0;
+        for (StorageId id : manifest.getStorageIds()) {
+            accountCount += id.getType() == ManifestRecord.Identifier.Type.ACCOUNT.getValue() ? 1 : 0;
+        }
+
+        if (accountCount > 1) {
+            throw new MultipleAccountError();
+        }
+
+        if (accountCount == 0) {
+            throw new MissingAccountError();
+        }
+
+        Set<StorageId> allSet = new HashSet<>(manifest.getStorageIds());
+        Set<StorageId> insertSet = inserts.stream().map(SignalStorageRecord::getId).collect(Collectors.toSet());
+        Set<ByteBuffer> rawIdSet = allSet.stream().map(id -> ByteBuffer.wrap(id.getRaw())).collect(Collectors.toSet());
+
+        if (allSet.size() != manifest.getStorageIds().size()) {
+            throw new DuplicateStorageIdError();
+        }
+
+        if (rawIdSet.size() != allSet.size()) {
+            List<StorageId> ids = manifest.getStorageIdsByType().get(ManifestRecord.Identifier.Type.CONTACT.getValue());
+            if (ids.size() != new HashSet<>(ids).size()) {
+                throw new DuplicateContactIdError();
+            }
+
+            ids = manifest.getStorageIdsByType().get(ManifestRecord.Identifier.Type.GROUPV1.getValue());
+            if (ids.size() != new HashSet<>(ids).size()) {
+                throw new DuplicateGroupV1IdError();
+            }
+
+            ids = manifest.getStorageIdsByType().get(ManifestRecord.Identifier.Type.GROUPV2.getValue());
+            if (ids.size() != new HashSet<>(ids).size()) {
+                throw new DuplicateGroupV2IdError();
+            }
+
+            ids = manifest.getStorageIdsByType().get(ManifestRecord.Identifier.Type.STORY_DISTRIBUTION_LIST.getValue());
+            if (ids.size() != new HashSet<>(ids).size()) {
+                throw new DuplicateDistributionListIdError();
+            }
+
+            throw new DuplicateRawIdAcrossTypesError();
+        }
+
+        if (inserts.size() > insertSet.size()) {
+            throw new DuplicateInsertInWriteError();
+        }
+
+        for (SignalStorageRecord insert : inserts) {
+            if (!allSet.contains(insert.getId())) {
+                throw new InsertNotPresentInFullIdSetError();
+            }
+
+            if (insert.isUnknown()) {
+                throw new UnknownInsertError();
+            }
+
+            if (insert.getContact().isPresent()) {
+                final var contact = insert.getContact().get();
+                final var serviceId = contact.getServiceId().map(ServiceId.class::cast);
+                final var pni = contact.getPni();
+                final var number = contact.getNumber();
+                final var username = contact.getUsername();
+                final var address = new RecipientAddress(serviceId, pni, number, username);
+                if (self.matches(address)) {
+                    throw new SelfAddedAsContactError();
+                }
+            }
+            if (insert.getAccount().isPresent() && insert.getAccount().get().getProfileKey().isEmpty()) {
+                logger.debug("Uploading a null profile key in our AccountRecord!");
+            }
+        }
+    }
+
+    private static final class DuplicateStorageIdError extends Error {}
+
+    private static final class DuplicateRawIdAcrossTypesError extends Error {}
+
+    private static final class DuplicateContactIdError extends Error {}
+
+    private static final class DuplicateGroupV1IdError extends Error {}
+
+    private static final class DuplicateGroupV2IdError extends Error {}
+
+    private static final class DuplicateDistributionListIdError extends Error {}
+
+    private static final class DuplicateInsertInWriteError extends Error {}
+
+    private static final class InsertNotPresentInFullIdSetError extends Error {}
+
+    private static final class DeletePresentInFullIdSetError extends Error {}
+
+    private static final class UnknownInsertError extends Error {}
+
+    private static final class MultipleAccountError extends Error {}
+
+    private static final class MissingAccountError extends Error {}
+
+    private static final class SelfAddedAsContactError extends Error {}
+
+    private static final class IncorrectManifestVersionError extends Error {}
+
+    private static final class MoreInsertsThanExpectedError extends Error {}
+
+    private static final class LessInsertsThanExpectedError extends Error {}
+
+    private static final class InsertMismatchError extends Error {}
+
+    private static final class MoreDeletesThanExpectedError extends Error {}
+
+    private static final class LessDeletesThanExpectedError extends Error {}
+
+    private static final class DeleteMismatchError extends Error {}
+}
diff --git a/lib/src/main/java/org/asamk/signal/manager/syncStorage/WriteOperationResult.java b/lib/src/main/java/org/asamk/signal/manager/syncStorage/WriteOperationResult.java
new file mode 100644 (file)
index 0000000..97e3579
--- /dev/null
@@ -0,0 +1,30 @@
+package org.asamk.signal.manager.syncStorage;
+
+import org.whispersystems.signalservice.api.storage.SignalStorageManifest;
+import org.whispersystems.signalservice.api.storage.SignalStorageRecord;
+
+import java.util.List;
+import java.util.Locale;
+
+public record WriteOperationResult(
+        SignalStorageManifest manifest, List<SignalStorageRecord> inserts, List<byte[]> deletes
+) {
+
+    public boolean isEmpty() {
+        return inserts.isEmpty() && deletes.isEmpty();
+    }
+
+    @Override
+    public String toString() {
+        if (isEmpty()) {
+            return "Empty";
+        } else {
+            return String.format(Locale.ROOT,
+                    "ManifestVersion: %d, Total Keys: %d, Inserts: %d, Deletes: %d",
+                    manifest.getVersion(),
+                    manifest.getStorageIds().size(),
+                    inserts.size(),
+                    deletes.size());
+        }
+    }
+}
index 486e3655bf0cf3f6ac97a5c4e354fef2aafa7e40..bfcb750c680f5e31d335738c2b11c4a9ef462055 100644 (file)
@@ -113,6 +113,10 @@ public class KeyUtils {
         return MasterKey.createNew(secureRandom);
     }
 
+    public static byte[] createRawStorageId() {
+        return getSecretBytes(16);
+    }
+
     private static String getSecret(int size) {
         var secret = getSecretBytes(size);
         return Base64.getEncoder().encodeToString(secret);