From 6a5dcd00b2bc60a0f756c95659a65876cdaeb3c8 Mon Sep 17 00:00:00 2001 From: AsamK Date: Mon, 27 Jun 2022 15:39:22 +0200 Subject: [PATCH] Implement remote storage sync Closes #604 --- .../actions/RetrieveStorageDataAction.java | 20 - .../actions/SyncStorageDataAction.java | 21 + .../asamk/signal/manager/api/TrustLevel.java | 9 - .../signal/manager/config/ServiceConfig.java | 2 +- .../signal/manager/helper/AccountHelper.java | 4 +- .../asamk/signal/manager/helper/Context.java | 2 +- .../signal/manager/helper/GroupHelper.java | 16 +- .../helper/IncomingMessageHandler.java | 12 +- .../signal/manager/helper/ProfileHelper.java | 4 +- .../signal/manager/helper/StorageHelper.java | 667 ++++++++++++------ .../signal/manager/helper/SyncHelper.java | 7 +- .../signal/manager/internal/JobExecutor.java | 10 +- .../signal/manager/internal/ManagerImpl.java | 71 +- .../internal/RegistrationManagerImpl.java | 2 +- .../manager/jobs/DownloadProfileJob.java | 24 + .../signal/manager/jobs/SyncStorageJob.java | 22 + .../manager/storage/AccountDatabase.java | 21 +- .../signal/manager/storage/SignalAccount.java | 46 +- .../storage/UnknownStorageIdStore.java | 104 +++ .../configuration/ConfigurationStore.java | 89 ++- .../manager/storage/groups/GroupInfoV1.java | 9 +- .../manager/storage/groups/GroupInfoV2.java | 7 + .../manager/storage/groups/GroupStore.java | 280 +++++++- .../storage/groups/LegacyGroupStore.java | 4 +- .../storage/identities/IdentityKeyStore.java | 95 ++- .../storage/keyValue/KeyValueStore.java | 13 +- .../recipients/LegacyRecipientStore2.java | 8 +- .../manager/storage/recipients/Recipient.java | 26 +- .../storage/recipients/RecipientAddress.java | 4 + .../storage/recipients/RecipientStore.java | 328 ++++++++- .../syncStorage/AccountRecordProcessor.java | 225 ++++++ .../syncStorage/ContactRecordProcessor.java | 336 +++++++++ .../DefaultStorageRecordProcessor.java | 96 +++ .../syncStorage/GroupV1RecordProcessor.java | 137 ++++ .../syncStorage/GroupV2RecordProcessor.java | 115 +++ .../syncStorage/StorageRecordProcessor.java | 14 + .../syncStorage/StorageRecordUpdate.java | 14 + .../syncStorage/StorageSyncModels.java | 145 ++++ .../syncStorage/StorageSyncValidations.java | 238 +++++++ .../syncStorage/WriteOperationResult.java | 30 + .../asamk/signal/manager/util/KeyUtils.java | 4 + 41 files changed, 2867 insertions(+), 414 deletions(-) delete mode 100644 lib/src/main/java/org/asamk/signal/manager/actions/RetrieveStorageDataAction.java create mode 100644 lib/src/main/java/org/asamk/signal/manager/actions/SyncStorageDataAction.java create mode 100644 lib/src/main/java/org/asamk/signal/manager/jobs/DownloadProfileJob.java create mode 100644 lib/src/main/java/org/asamk/signal/manager/jobs/SyncStorageJob.java create mode 100644 lib/src/main/java/org/asamk/signal/manager/storage/UnknownStorageIdStore.java create mode 100644 lib/src/main/java/org/asamk/signal/manager/syncStorage/AccountRecordProcessor.java create mode 100644 lib/src/main/java/org/asamk/signal/manager/syncStorage/ContactRecordProcessor.java create mode 100644 lib/src/main/java/org/asamk/signal/manager/syncStorage/DefaultStorageRecordProcessor.java create mode 100644 lib/src/main/java/org/asamk/signal/manager/syncStorage/GroupV1RecordProcessor.java create mode 100644 lib/src/main/java/org/asamk/signal/manager/syncStorage/GroupV2RecordProcessor.java create mode 100644 lib/src/main/java/org/asamk/signal/manager/syncStorage/StorageRecordProcessor.java create mode 100644 lib/src/main/java/org/asamk/signal/manager/syncStorage/StorageRecordUpdate.java create mode 100644 lib/src/main/java/org/asamk/signal/manager/syncStorage/StorageSyncModels.java create mode 100644 lib/src/main/java/org/asamk/signal/manager/syncStorage/StorageSyncValidations.java create mode 100644 lib/src/main/java/org/asamk/signal/manager/syncStorage/WriteOperationResult.java diff --git a/lib/src/main/java/org/asamk/signal/manager/actions/RetrieveStorageDataAction.java b/lib/src/main/java/org/asamk/signal/manager/actions/RetrieveStorageDataAction.java deleted file mode 100644 index 8b296006..00000000 --- a/lib/src/main/java/org/asamk/signal/manager/actions/RetrieveStorageDataAction.java +++ /dev/null @@ -1,20 +0,0 @@ -package org.asamk.signal.manager.actions; - -import org.asamk.signal.manager.helper.Context; - -public class RetrieveStorageDataAction implements HandleAction { - - private static final RetrieveStorageDataAction INSTANCE = new RetrieveStorageDataAction(); - - private RetrieveStorageDataAction() { - } - - public static RetrieveStorageDataAction create() { - return INSTANCE; - } - - @Override - public void execute(Context context) throws Throwable { - context.getStorageHelper().readDataFromStorage(); - } -} diff --git a/lib/src/main/java/org/asamk/signal/manager/actions/SyncStorageDataAction.java b/lib/src/main/java/org/asamk/signal/manager/actions/SyncStorageDataAction.java new file mode 100644 index 00000000..7101b3d1 --- /dev/null +++ b/lib/src/main/java/org/asamk/signal/manager/actions/SyncStorageDataAction.java @@ -0,0 +1,21 @@ +package org.asamk.signal.manager.actions; + +import org.asamk.signal.manager.helper.Context; +import org.asamk.signal.manager.jobs.SyncStorageJob; + +public class SyncStorageDataAction implements HandleAction { + + private static final SyncStorageDataAction INSTANCE = new SyncStorageDataAction(); + + private SyncStorageDataAction() { + } + + public static SyncStorageDataAction create() { + return INSTANCE; + } + + @Override + public void execute(Context context) throws Throwable { + context.getJobExecutor().enqueueJob(new SyncStorageJob()); + } +} diff --git a/lib/src/main/java/org/asamk/signal/manager/api/TrustLevel.java b/lib/src/main/java/org/asamk/signal/manager/api/TrustLevel.java index cbfa0bd5..ac93e34a 100644 --- a/lib/src/main/java/org/asamk/signal/manager/api/TrustLevel.java +++ b/lib/src/main/java/org/asamk/signal/manager/api/TrustLevel.java @@ -1,7 +1,6 @@ package org.asamk.signal.manager.api; import org.whispersystems.signalservice.api.messages.multidevice.VerifiedMessage; -import org.whispersystems.signalservice.internal.storage.protos.ContactRecord; public enum TrustLevel { UNTRUSTED, @@ -17,14 +16,6 @@ public enum TrustLevel { return TrustLevel.cachedValues[i]; } - public static TrustLevel fromIdentityState(ContactRecord.IdentityState identityState) { - return switch (identityState) { - case DEFAULT -> TRUSTED_UNVERIFIED; - case UNVERIFIED -> UNTRUSTED; - case VERIFIED -> TRUSTED_VERIFIED; - }; - } - public static TrustLevel fromVerifiedState(VerifiedMessage.VerifiedState verifiedState) { return switch (verifiedState) { case DEFAULT -> TRUSTED_UNVERIFIED; diff --git a/lib/src/main/java/org/asamk/signal/manager/config/ServiceConfig.java b/lib/src/main/java/org/asamk/signal/manager/config/ServiceConfig.java index cd103f80..d46b6988 100644 --- a/lib/src/main/java/org/asamk/signal/manager/config/ServiceConfig.java +++ b/lib/src/main/java/org/asamk/signal/manager/config/ServiceConfig.java @@ -29,7 +29,7 @@ public class ServiceConfig { final var giftBadges = !isPrimaryDevice; final var pni = !isPrimaryDevice; final var paymentActivation = !isPrimaryDevice; - return new AccountAttributes.Capabilities(false, true, true, true, true, giftBadges, pni, paymentActivation); + return new AccountAttributes.Capabilities(true, true, true, true, true, giftBadges, pni, paymentActivation); } public static ServiceEnvironmentConfig getServiceEnvironmentConfig( diff --git a/lib/src/main/java/org/asamk/signal/manager/helper/AccountHelper.java b/lib/src/main/java/org/asamk/signal/manager/helper/AccountHelper.java index c83dde25..2563494d 100644 --- a/lib/src/main/java/org/asamk/signal/manager/helper/AccountHelper.java +++ b/lib/src/main/java/org/asamk/signal/manager/helper/AccountHelper.java @@ -8,6 +8,7 @@ import org.asamk.signal.manager.api.NonNormalizedPhoneNumberException; import org.asamk.signal.manager.api.PinLockedException; import org.asamk.signal.manager.api.RateLimitException; import org.asamk.signal.manager.internal.SignalDependencies; +import org.asamk.signal.manager.jobs.SyncStorageJob; import org.asamk.signal.manager.storage.SignalAccount; import org.asamk.signal.manager.util.KeyUtils; import org.asamk.signal.manager.util.NumberVerificationUtils; @@ -137,11 +138,11 @@ public class AccountHelper { account.setPniIdentityKeyPair(KeyUtils.generateIdentityKeyPair()); } account.getRecipientTrustedResolver().resolveSelfRecipientTrusted(account.getSelfRecipientAddress()); - // TODO check and update remote storage context.getUnidentifiedAccessHelper().rotateSenderCertificates(); dependencies.resetAfterAddressChange(); context.getGroupV2Helper().clearAuthCredentialCache(); context.getAccountFileUpdater().updateAccountIdentifiers(account.getNumber(), account.getAci()); + context.getJobExecutor().enqueueJob(new SyncStorageJob()); } public void setPni( @@ -450,6 +451,7 @@ public class AccountHelper { throw new InvalidDeviceLinkException("Invalid device link", e); } account.setMultiDevice(true); + context.getJobExecutor().enqueueJob(new SyncStorageJob()); } public void removeLinkedDevices(int deviceId) throws IOException { diff --git a/lib/src/main/java/org/asamk/signal/manager/helper/Context.java b/lib/src/main/java/org/asamk/signal/manager/helper/Context.java index ba2e5fb0..848a57c1 100644 --- a/lib/src/main/java/org/asamk/signal/manager/helper/Context.java +++ b/lib/src/main/java/org/asamk/signal/manager/helper/Context.java @@ -80,7 +80,7 @@ public class Context implements AutoCloseable { return attachmentStore; } - JobExecutor getJobExecutor() { + public JobExecutor getJobExecutor() { return jobExecutor; } diff --git a/lib/src/main/java/org/asamk/signal/manager/helper/GroupHelper.java b/lib/src/main/java/org/asamk/signal/manager/helper/GroupHelper.java index 40558a89..dec9185a 100644 --- a/lib/src/main/java/org/asamk/signal/manager/helper/GroupHelper.java +++ b/lib/src/main/java/org/asamk/signal/manager/helper/GroupHelper.java @@ -19,6 +19,7 @@ import org.asamk.signal.manager.api.SendMessageResult; import org.asamk.signal.manager.config.ServiceConfig; import org.asamk.signal.manager.groups.GroupUtils; import org.asamk.signal.manager.internal.SignalDependencies; +import org.asamk.signal.manager.jobs.SyncStorageJob; import org.asamk.signal.manager.storage.SignalAccount; import org.asamk.signal.manager.storage.groups.GroupInfo; import org.asamk.signal.manager.storage.groups.GroupInfoV1; @@ -143,6 +144,7 @@ public class GroupHelper { } groupInfoV2.setGroup(group); account.getGroupStore().updateGroup(groupInfoV2); + context.getJobExecutor().enqueueJob(new SyncStorageJob()); } return groupInfoV2; @@ -185,6 +187,7 @@ public class GroupHelper { final var result = sendGroupMessage(messageBuilder, gv2.getMembersIncludingPendingWithout(selfRecipientId), gv2.getDistributionId()); + context.getJobExecutor().enqueueJob(new SyncStorageJob()); return new Pair<>(gv2.getGroupId(), result); } @@ -209,10 +212,11 @@ public class GroupHelper { var group = getGroupForUpdating(groupId); final var avatarBytes = readAvatarBytes(avatarFile); + SendGroupMessageResults results; switch (group) { case GroupInfoV2 gv2 -> { try { - return updateGroupV2(gv2, + results = updateGroupV2(gv2, name, description, members, @@ -231,7 +235,7 @@ public class GroupHelper { } catch (ConflictException e) { // Detected conflicting update, refreshing group and trying again group = getGroup(groupId, true); - return updateGroupV2((GroupInfoV2) group, + results = updateGroupV2((GroupInfoV2) group, name, description, members, @@ -251,13 +255,14 @@ public class GroupHelper { } case GroupInfoV1 gv1 -> { - final var result = updateGroupV1(gv1, name, members, avatarBytes); + results = updateGroupV1(gv1, name, members, avatarBytes); if (expirationTimer != null) { setExpirationTimer(gv1, expirationTimer); } - return result; } } + context.getJobExecutor().enqueueJob(new SyncStorageJob()); + return results; } public void updateGroupProfileKey(GroupIdV2 groupId) throws GroupNotFoundException, NotAGroupMemberException, IOException { @@ -304,6 +309,7 @@ public class GroupHelper { final var result = sendUpdateGroupV2Message(group, group.getGroup(), groupChange); + context.getJobExecutor().enqueueJob(new SyncStorageJob()); return new Pair<>(group.getGroupId(), result); } @@ -327,6 +333,7 @@ public class GroupHelper { public void deleteGroup(GroupId groupId) throws IOException { account.getGroupStore().deleteGroup(groupId); context.getAvatarStore().deleteGroupAvatar(groupId); + context.getJobExecutor().enqueueJob(new SyncStorageJob()); } public void setGroupBlocked(final GroupId groupId, final boolean blocked) throws GroupNotFoundException { @@ -337,6 +344,7 @@ public class GroupHelper { group.setBlocked(blocked); account.getGroupStore().updateGroup(group); + context.getJobExecutor().enqueueJob(new SyncStorageJob()); } public SendGroupMessageResults sendGroupInfoRequest( diff --git a/lib/src/main/java/org/asamk/signal/manager/helper/IncomingMessageHandler.java b/lib/src/main/java/org/asamk/signal/manager/helper/IncomingMessageHandler.java index cf9c43a2..0f062828 100644 --- a/lib/src/main/java/org/asamk/signal/manager/helper/IncomingMessageHandler.java +++ b/lib/src/main/java/org/asamk/signal/manager/helper/IncomingMessageHandler.java @@ -6,7 +6,6 @@ import org.asamk.signal.manager.actions.RefreshPreKeysAction; import org.asamk.signal.manager.actions.RenewSessionAction; import org.asamk.signal.manager.actions.ResendMessageAction; import org.asamk.signal.manager.actions.RetrieveProfileAction; -import org.asamk.signal.manager.actions.RetrieveStorageDataAction; import org.asamk.signal.manager.actions.SendGroupInfoAction; import org.asamk.signal.manager.actions.SendGroupInfoRequestAction; import org.asamk.signal.manager.actions.SendProfileKeyAction; @@ -17,6 +16,7 @@ import org.asamk.signal.manager.actions.SendSyncConfigurationAction; import org.asamk.signal.manager.actions.SendSyncContactsAction; import org.asamk.signal.manager.actions.SendSyncGroupsAction; import org.asamk.signal.manager.actions.SendSyncKeysAction; +import org.asamk.signal.manager.actions.SyncStorageDataAction; import org.asamk.signal.manager.actions.UpdateAccountAttributesAction; import org.asamk.signal.manager.api.GroupId; import org.asamk.signal.manager.api.GroupNotFoundException; @@ -511,6 +511,7 @@ public final class IncomingMessageHandler { if (rm.isConfigurationRequest()) { actions.add(SendSyncConfigurationAction.create()); } + actions.add(SyncStorageDataAction.create()); } if (syncMessage.getGroups().isPresent()) { try { @@ -578,7 +579,7 @@ public final class IncomingMessageHandler { if (syncMessage.getFetchType().isPresent()) { switch (syncMessage.getFetchType().get()) { case LOCAL_PROFILE -> actions.add(new RetrieveProfileAction(account.getSelfRecipientId())); - case STORAGE_MANIFEST -> actions.add(RetrieveStorageDataAction.create()); + case STORAGE_MANIFEST -> actions.add(SyncStorageDataAction.create()); } } if (syncMessage.getKeys().isPresent()) { @@ -586,7 +587,12 @@ public final class IncomingMessageHandler { if (keysMessage.getStorageService().isPresent()) { final var storageKey = keysMessage.getStorageService().get(); account.setStorageKey(storageKey); - actions.add(RetrieveStorageDataAction.create()); + actions.add(SyncStorageDataAction.create()); + } + if (keysMessage.getMaster().isPresent()) { + final var masterKey = keysMessage.getMaster().get(); + account.setMasterKey(masterKey); + actions.add(SyncStorageDataAction.create()); } } if (syncMessage.getConfiguration().isPresent()) { diff --git a/lib/src/main/java/org/asamk/signal/manager/helper/ProfileHelper.java b/lib/src/main/java/org/asamk/signal/manager/helper/ProfileHelper.java index ec3ae020..dbde28a7 100644 --- a/lib/src/main/java/org/asamk/signal/manager/helper/ProfileHelper.java +++ b/lib/src/main/java/org/asamk/signal/manager/helper/ProfileHelper.java @@ -6,6 +6,7 @@ import org.asamk.signal.manager.api.PhoneNumberSharingMode; import org.asamk.signal.manager.api.Profile; import org.asamk.signal.manager.config.ServiceConfig; import org.asamk.signal.manager.internal.SignalDependencies; +import org.asamk.signal.manager.jobs.SyncStorageJob; import org.asamk.signal.manager.storage.SignalAccount; import org.asamk.signal.manager.storage.groups.GroupInfoV2; import org.asamk.signal.manager.storage.recipients.RecipientAddress; @@ -67,7 +68,8 @@ public final class ProfileHelper { account.setProfileKey(profileKey); context.getAccountHelper().updateAccountAttributes(); setProfile(true, true, null, null, null, null, null, null); - // TODO update profile key in storage + account.getRecipientStore().rotateSelfStorageId(); + context.getJobExecutor().enqueueJob(new SyncStorageJob()); final var recipientIds = account.getRecipientStore().getRecipientIdsWithEnabledProfileSharing(); for (final var recipientId : recipientIds) { diff --git a/lib/src/main/java/org/asamk/signal/manager/helper/StorageHelper.java b/lib/src/main/java/org/asamk/signal/manager/helper/StorageHelper.java index 1eed964d..8c8ac6fc 100644 --- a/lib/src/main/java/org/asamk/signal/manager/helper/StorageHelper.java +++ b/lib/src/main/java/org/asamk/signal/manager/helper/StorageHelper.java @@ -1,39 +1,47 @@ package org.asamk.signal.manager.helper; -import org.asamk.signal.manager.api.Contact; -import org.asamk.signal.manager.api.GroupId; -import org.asamk.signal.manager.api.PhoneNumberSharingMode; -import org.asamk.signal.manager.api.Profile; -import org.asamk.signal.manager.api.TrustLevel; +import org.asamk.signal.manager.api.GroupIdV1; +import org.asamk.signal.manager.api.GroupIdV2; import org.asamk.signal.manager.internal.SignalDependencies; -import org.asamk.signal.manager.jobs.CheckWhoAmIJob; -import org.asamk.signal.manager.jobs.DownloadProfileAvatarJob; import org.asamk.signal.manager.storage.SignalAccount; -import org.asamk.signal.manager.storage.recipients.RecipientAddress; -import org.signal.libsignal.protocol.IdentityKey; +import org.asamk.signal.manager.storage.recipients.RecipientId; +import org.asamk.signal.manager.syncStorage.AccountRecordProcessor; +import org.asamk.signal.manager.syncStorage.ContactRecordProcessor; +import org.asamk.signal.manager.syncStorage.GroupV1RecordProcessor; +import org.asamk.signal.manager.syncStorage.GroupV2RecordProcessor; +import org.asamk.signal.manager.syncStorage.StorageSyncModels; +import org.asamk.signal.manager.syncStorage.StorageSyncValidations; +import org.asamk.signal.manager.syncStorage.WriteOperationResult; +import org.asamk.signal.manager.util.KeyUtils; +import org.signal.core.util.SetUtil; import org.signal.libsignal.protocol.InvalidKeyException; -import org.signal.libsignal.zkgroup.InvalidInputException; -import org.signal.libsignal.zkgroup.groups.GroupMasterKey; -import org.signal.libsignal.zkgroup.profiles.ProfileKey; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.whispersystems.signalservice.api.storage.SignalAccountRecord; import org.whispersystems.signalservice.api.storage.SignalStorageManifest; import org.whispersystems.signalservice.api.storage.SignalStorageRecord; import org.whispersystems.signalservice.api.storage.StorageId; +import org.whispersystems.signalservice.api.storage.StorageKey; import org.whispersystems.signalservice.internal.storage.protos.ManifestRecord; import java.io.IOException; +import java.sql.Connection; +import java.sql.SQLException; import java.util.ArrayList; +import java.util.Base64; import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.stream.Collectors; public class StorageHelper { private static final Logger logger = LoggerFactory.getLogger(StorageHelper.class); + private static final List KNOWN_TYPES = List.of(ManifestRecord.Identifier.Type.CONTACT.getValue(), + ManifestRecord.Identifier.Type.GROUPV1.getValue(), + ManifestRecord.Identifier.Type.GROUPV2.getValue(), + ManifestRecord.Identifier.Type.ACCOUNT.getValue()); private final SignalAccount account; private final SignalDependencies dependencies; @@ -45,275 +53,496 @@ public class StorageHelper { this.context = context; } - public void readDataFromStorage() throws IOException { + public void syncDataWithStorage() throws IOException { final var storageKey = account.getOrCreateStorageKey(); if (storageKey == null) { - logger.debug("Storage key unknown, requesting from primary device."); - context.getSyncHelper().requestSyncKeys(); + if (!account.isPrimaryDevice()) { + logger.debug("Storage key unknown, requesting from primary device."); + context.getSyncHelper().requestSyncKeys(); + } return; } - logger.debug("Reading data from remote storage"); - Optional manifest; + logger.trace("Reading manifest from remote storage"); + final var localManifestVersion = account.getStorageManifestVersion(); + final var localManifest = account.getStorageManifest().orElse(SignalStorageManifest.EMPTY); + SignalStorageManifest remoteManifest; try { - manifest = dependencies.getAccountManager() - .getStorageManifestIfDifferentVersion(storageKey, account.getStorageManifestVersion()); + remoteManifest = dependencies.getAccountManager() + .getStorageManifestIfDifferentVersion(storageKey, localManifestVersion) + .orElse(localManifest); } catch (InvalidKeyException e) { - logger.warn("Manifest couldn't be decrypted, ignoring."); - return; - } - - if (manifest.isEmpty()) { - logger.debug("Manifest is up to date, does not exist or couldn't be decrypted, ignoring."); + logger.warn("Manifest couldn't be decrypted."); + if (account.isPrimaryDevice()) { + try { + forcePushToStorage(storageKey); + } catch (RetryLaterException rle) { + // TODO retry later + return; + } + } return; } - logger.trace("Remote storage manifest has {} records", manifest.get().getStorageIds().size()); - final var storageIds = manifest.get() - .getStorageIds() - .stream() - .filter(id -> !id.isUnknown()) - .collect(Collectors.toSet()); - - Optional localManifest = account.getStorageManifest(); - localManifest.ifPresent(m -> m.getStorageIds().forEach(storageIds::remove)); + logger.trace("Manifest versions: local {}, remote {}", localManifestVersion, remoteManifest.getVersion()); - logger.trace("Reading {} new records", manifest.get().getStorageIds().size()); - for (final var record : getSignalStorageRecords(storageIds)) { - logger.debug("Reading record of type {}", record.getType()); - if (record.getType() == ManifestRecord.Identifier.Type.ACCOUNT.getValue()) { - readAccountRecord(record); - } else if (record.getType() == ManifestRecord.Identifier.Type.GROUPV2.getValue()) { - readGroupV2Record(record); - } else if (record.getType() == ManifestRecord.Identifier.Type.GROUPV1.getValue()) { - readGroupV1Record(record); - } else if (record.getType() == ManifestRecord.Identifier.Type.CONTACT.getValue()) { - readContactRecord(record); - } + var needsForcePush = false; + if (remoteManifest.getVersion() > localManifestVersion) { + logger.trace("Remote version was newer, reading records."); + needsForcePush = readDataFromStorage(storageKey, localManifest, remoteManifest); + } else if (remoteManifest.getVersion() < localManifest.getVersion()) { + logger.debug("Remote storage manifest version was older. User might have switched accounts."); } - account.setStorageManifestVersion(manifest.get().getVersion()); - account.setStorageManifest(manifest.get()); - logger.debug("Done reading data from remote storage"); - } + logger.trace("Done reading data from remote storage"); - private void readContactRecord(final SignalStorageRecord record) { - if (record == null || record.getContact().isEmpty()) { - return; + if (localManifest != remoteManifest) { + storeManifestLocally(remoteManifest); } - final var contactRecord = record.getContact().get(); - final var aci = contactRecord.getAci().orElse(null); - final var pni = contactRecord.getPni().orElse(null); - if (contactRecord.getNumber().isEmpty() && aci == null && pni == null) { + readRecordsWithPreviouslyUnknownTypes(storageKey); + + logger.trace("Adding missing storageIds to local data"); + account.getRecipientStore().setMissingStorageIds(); + account.getGroupStore().setMissingStorageIds(); + + var needsMultiDeviceSync = false; + try { + needsMultiDeviceSync = writeToStorage(storageKey, remoteManifest, needsForcePush); + } catch (RetryLaterException e) { + // TODO retry later return; } - final var address = new RecipientAddress(aci, pni, contactRecord.getNumber().orElse(null)); - var recipientId = account.getRecipientResolver().resolveRecipient(address); - if (aci != null && contactRecord.getUsername().isPresent()) { - recipientId = account.getRecipientTrustedResolver() - .resolveRecipientTrusted(aci, contactRecord.getUsername().get()); - } - final var contact = account.getContactStore().getContact(recipientId); - final var blocked = contact != null && contact.isBlocked(); - final var profileShared = contact != null && contact.isProfileSharingEnabled(); - final var archived = contact != null && contact.isArchived(); - final var hidden = contact != null && contact.isHidden(); - final var contactGivenName = contact == null ? null : contact.givenName(); - final var contactFamilyName = contact == null ? null : contact.familyName(); - if (blocked != contactRecord.isBlocked() - || profileShared != contactRecord.isProfileSharingEnabled() - || archived != contactRecord.isArchived() - || hidden != contactRecord.isHidden() - || ( - contactRecord.getSystemGivenName().isPresent() && !contactRecord.getSystemGivenName() - .get() - .equals(contactGivenName) - ) - || ( - contactRecord.getSystemFamilyName().isPresent() && !contactRecord.getSystemFamilyName() - .get() - .equals(contactFamilyName) - )) { - logger.debug("Storing new or updated contact {}", recipientId); - final var contactBuilder = contact == null ? Contact.newBuilder() : Contact.newBuilder(contact); - final var newContact = contactBuilder.withIsBlocked(contactRecord.isBlocked()) - .withIsProfileSharingEnabled(contactRecord.isProfileSharingEnabled()) - .withIsArchived(contactRecord.isArchived()) - .withIsHidden(contactRecord.isHidden()); - if (contactRecord.getSystemGivenName().isPresent() || contactRecord.getSystemFamilyName().isPresent()) { - newContact.withGivenName(contactRecord.getSystemGivenName().orElse(null)) - .withFamilyName(contactRecord.getSystemFamilyName().orElse(null)); + if (needsForcePush) { + logger.debug("Doing a force push."); + try { + forcePushToStorage(storageKey); + needsMultiDeviceSync = true; + } catch (RetryLaterException e) { + // TODO retry later + return; } - account.getContactStore().storeContact(recipientId, newContact.build()); } - final var profile = account.getProfileStore().getProfile(recipientId); - final var profileGivenName = profile == null ? null : profile.getGivenName(); - final var profileFamilyName = profile == null ? null : profile.getFamilyName(); - if (( - contactRecord.getProfileGivenName().isPresent() && !contactRecord.getProfileGivenName() - .get() - .equals(profileGivenName) - ) || ( - contactRecord.getProfileFamilyName().isPresent() && !contactRecord.getProfileFamilyName() - .get() - .equals(profileFamilyName) - )) { - final var profileBuilder = profile == null ? Profile.newBuilder() : Profile.newBuilder(profile); - final var newProfile = profileBuilder.withGivenName(contactRecord.getProfileGivenName().orElse(null)) - .withFamilyName(contactRecord.getProfileFamilyName().orElse(null)) - .build(); - account.getProfileStore().storeProfile(recipientId, newProfile); + if (needsMultiDeviceSync) { + context.getSyncHelper().sendSyncFetchStorageMessage(); } - if (contactRecord.getProfileKey().isPresent()) { - try { - logger.trace("Storing profile key {}", recipientId); - final var profileKey = new ProfileKey(contactRecord.getProfileKey().get()); - account.getProfileStore().storeProfileKey(recipientId, profileKey); - } catch (InvalidInputException e) { - logger.warn("Received invalid contact profile key from storage"); + + logger.debug("Done syncing data with remote storage"); + } + + private boolean readDataFromStorage( + final StorageKey storageKey, + final SignalStorageManifest localManifest, + final SignalStorageManifest remoteManifest + ) throws IOException { + var needsForcePush = false; + try (final var connection = account.getAccountDatabase().getConnection()) { + connection.setAutoCommit(false); + + var idDifference = findIdDifference(remoteManifest.getStorageIds(), localManifest.getStorageIds()); + + if (idDifference.hasTypeMismatches() && account.isPrimaryDevice()) { + logger.debug("Found type mismatches in the ID sets! Scheduling a force push after this sync completes."); + needsForcePush = true; } - } - if (contactRecord.getIdentityKey().isPresent() && aci != null) { - try { - logger.trace("Storing identity key {}", recipientId); - final var identityKey = new IdentityKey(contactRecord.getIdentityKey().get()); - account.getIdentityKeyStore().saveIdentity(aci, identityKey); - final var trustLevel = TrustLevel.fromIdentityState(contactRecord.getIdentityState()); - if (trustLevel != null) { - account.getIdentityKeyStore().setIdentityTrustLevel(aci, identityKey, trustLevel); + logger.debug("Pre-Merge ID Difference :: " + idDifference); + + if (!idDifference.isEmpty()) { + final var remoteOnlyRecords = getSignalStorageRecords(storageKey, idDifference.remoteOnlyIds()); + + if (remoteOnlyRecords.size() != idDifference.remoteOnlyIds().size()) { + logger.debug("Could not find all remote-only records! Requested: " + + idDifference.remoteOnlyIds() + .size() + + ", Found: " + + remoteOnlyRecords.size() + + ". These stragglers should naturally get deleted during the sync."); } - } catch (InvalidKeyException e) { - logger.warn("Received invalid contact identity key from storage"); + + final var unknownInserts = processKnownRecords(connection, remoteOnlyRecords); + final var unknownDeletes = idDifference.localOnlyIds() + .stream() + .filter(id -> !KNOWN_TYPES.contains(id.getType())) + .toList(); + + logger.debug("Storage ids with unknown type: {} inserts, {} deletes", + unknownInserts.size(), + unknownDeletes.size()); + + account.getUnknownStorageIdStore().addUnknownStorageIds(connection, unknownInserts); + account.getUnknownStorageIdStore().deleteUnknownStorageIds(connection, unknownDeletes); + } else { + logger.debug("Remote version was newer, but there were no remote-only IDs."); } + connection.commit(); + } catch (SQLException e) { + throw new RuntimeException("Failed to sync remote storage", e); } + return needsForcePush; } - private void readGroupV1Record(final SignalStorageRecord record) { - if (record == null || record.getGroupV1().isEmpty()) { - return; - } + private void readRecordsWithPreviouslyUnknownTypes(final StorageKey storageKey) throws IOException { + try (final var connection = account.getAccountDatabase().getConnection()) { + connection.setAutoCommit(false); + final var knownUnknownIds = account.getUnknownStorageIdStore() + .getUnknownStorageIds(connection, KNOWN_TYPES); - final var groupV1Record = record.getGroupV1().get(); - final var groupIdV1 = GroupId.v1(groupV1Record.getGroupId()); + if (!knownUnknownIds.isEmpty()) { + logger.debug("We have " + knownUnknownIds.size() + " unknown records that we can now process."); - var group = account.getGroupStore().getGroup(groupIdV1); - if (group == null) { - try { - context.getGroupHelper().sendGroupInfoRequest(groupIdV1, account.getSelfRecipientId()); - } catch (Throwable e) { - logger.warn("Failed to send group request", e); + final var remote = getSignalStorageRecords(storageKey, knownUnknownIds); + + logger.debug("Found " + remote.size() + " of the known-unknowns remotely."); + + processKnownRecords(connection, remote); + account.getUnknownStorageIdStore() + .deleteUnknownStorageIds(connection, remote.stream().map(SignalStorageRecord::getId).toList()); } - group = account.getGroupStore().getOrCreateGroupV1(groupIdV1); - } - if (group != null && group.isBlocked() != groupV1Record.isBlocked()) { - group.setBlocked(groupV1Record.isBlocked()); - account.getGroupStore().updateGroup(group); + connection.commit(); + } catch (SQLException e) { + throw new RuntimeException("Failed to sync remote storage", e); } } - private void readGroupV2Record(final SignalStorageRecord record) { - if (record == null || record.getGroupV2().isEmpty()) { - return; + private boolean writeToStorage( + final StorageKey storageKey, final SignalStorageManifest remoteManifest, final boolean needsForcePush + ) throws IOException, RetryLaterException { + final WriteOperationResult remoteWriteOperation; + try (final var connection = account.getAccountDatabase().getConnection()) { + connection.setAutoCommit(false); + + final var localStorageIds = getAllLocalStorageIds(connection); + final var idDifference = findIdDifference(remoteManifest.getStorageIds(), localStorageIds); + logger.debug("ID Difference :: " + idDifference); + + final var remoteDeletes = idDifference.remoteOnlyIds().stream().map(StorageId::getRaw).toList(); + final var remoteInserts = buildLocalStorageRecords(connection, idDifference.localOnlyIds()); + // TODO check if local storage record proto matches remote, then reset to remote storage_id + + remoteWriteOperation = new WriteOperationResult(new SignalStorageManifest(remoteManifest.getVersion() + 1, + account.getDeviceId(), + localStorageIds), remoteInserts, remoteDeletes); + + connection.commit(); + } catch (SQLException e) { + throw new RuntimeException("Failed to sync remote storage", e); } - final var groupV2Record = record.getGroupV2().get(); - if (groupV2Record.isArchived()) { - return; + if (remoteWriteOperation.isEmpty()) { + logger.debug("No remote writes needed. Still at version: " + remoteManifest.getVersion()); + return false; } - final GroupMasterKey groupMasterKey; + logger.debug("We have something to write remotely."); + logger.debug("WriteOperationResult :: " + remoteWriteOperation); + + StorageSyncValidations.validate(remoteWriteOperation, + remoteManifest, + needsForcePush, + account.getSelfRecipientAddress()); + + final Optional conflict; try { - groupMasterKey = new GroupMasterKey(groupV2Record.getMasterKeyBytes()); - } catch (InvalidInputException e) { - logger.warn("Received invalid group master key from storage"); - return; + conflict = dependencies.getAccountManager() + .writeStorageRecords(storageKey, + remoteWriteOperation.manifest(), + remoteWriteOperation.inserts(), + remoteWriteOperation.deletes()); + } catch (InvalidKeyException e) { + logger.warn("Failed to decrypt conflicting storage manifest: {}", e.getMessage()); + throw new IOException(e); } - final var group = context.getGroupHelper().getOrMigrateGroup(groupMasterKey, 0, null); - if (group.isBlocked() != groupV2Record.isBlocked()) { - group.setBlocked(groupV2Record.isBlocked()); - account.getGroupStore().updateGroup(group); + if (conflict.isPresent()) { + logger.debug("Hit a conflict when trying to resolve the conflict! Retrying."); + throw new RetryLaterException(); } - } - private void readAccountRecord(final SignalStorageRecord record) throws IOException { - if (record == null) { - logger.warn("Could not find account record, even though we had an ID, ignoring."); - return; - } + logger.debug("Saved new manifest. Now at version: " + remoteWriteOperation.manifest().getVersion()); + storeManifestLocally(remoteWriteOperation.manifest()); - SignalAccountRecord accountRecord = record.getAccount().orElse(null); - if (accountRecord == null) { - logger.warn("The storage record didn't actually have an account, ignoring."); - return; - } + return true; + } - if (!accountRecord.getE164().equals(account.getNumber())) { - context.getJobExecutor().enqueueJob(new CheckWhoAmIJob()); - } + private void forcePushToStorage( + final StorageKey storageServiceKey + ) throws IOException, RetryLaterException { + logger.debug("Force pushing local state to remote storage"); + + final var currentVersion = dependencies.getAccountManager().getStorageManifestVersion(); + final var newVersion = currentVersion + 1; + final var newStorageRecords = new ArrayList(); + final Map newContactStorageIds; + final Map newGroupV1StorageIds; + final Map newGroupV2StorageIds; + + try (final var connection = account.getAccountDatabase().getConnection()) { + connection.setAutoCommit(false); + + final var recipientIds = account.getRecipientStore().getRecipientIds(connection); + newContactStorageIds = generateContactStorageIds(recipientIds); + for (final var recipientId : recipientIds) { + final var storageId = newContactStorageIds.get(recipientId); + if (storageId.getType() == ManifestRecord.Identifier.Type.ACCOUNT.getValue()) { + final var recipient = account.getRecipientStore().getRecipient(connection, recipientId); + final var accountRecord = StorageSyncModels.localToRemoteRecord(account.getConfigurationStore(), + recipient, + account.getUsernameLink(), + storageId.getRaw()); + newStorageRecords.add(accountRecord); + } else { + final var recipient = account.getRecipientStore().getRecipient(connection, recipientId); + final var address = recipient.getAddress().getIdentifier(); + final var identity = account.getIdentityKeyStore().getIdentityInfo(connection, address); + final var record = StorageSyncModels.localToRemoteRecord(recipient, identity, storageId.getRaw()); + newStorageRecords.add(record); + } + } - account.getConfigurationStore().setReadReceipts(accountRecord.isReadReceiptsEnabled()); - account.getConfigurationStore().setTypingIndicators(accountRecord.isTypingIndicatorsEnabled()); - account.getConfigurationStore() - .setUnidentifiedDeliveryIndicators(accountRecord.isSealedSenderIndicatorsEnabled()); - account.getConfigurationStore().setLinkPreviews(accountRecord.isLinkPreviewsEnabled()); - account.getConfigurationStore().setPhoneNumberSharingMode(switch (accountRecord.getPhoneNumberSharingMode()) { - case EVERYBODY -> PhoneNumberSharingMode.EVERYBODY; - case NOBODY, UNKNOWN -> PhoneNumberSharingMode.NOBODY; - }); - account.getConfigurationStore().setPhoneNumberUnlisted(accountRecord.isPhoneNumberUnlisted()); - account.setUsername(accountRecord.getUsername()); - - if (accountRecord.getProfileKey().isPresent()) { - ProfileKey profileKey; - try { - profileKey = new ProfileKey(accountRecord.getProfileKey().get()); - } catch (InvalidInputException e) { - logger.warn("Received invalid profile key from storage"); - profileKey = null; + final var groupV1Ids = account.getGroupStore().getGroupV1Ids(connection); + newGroupV1StorageIds = generateGroupV1StorageIds(groupV1Ids); + for (final var groupId : groupV1Ids) { + final var storageId = newGroupV1StorageIds.get(groupId); + final var group = account.getGroupStore().getGroup(connection, groupId); + final var record = StorageSyncModels.localToRemoteRecord(group, storageId.getRaw()); + newStorageRecords.add(record); } - if (profileKey != null) { - account.setProfileKey(profileKey); - final var avatarPath = accountRecord.getAvatarUrlPath().orElse(null); - context.getJobExecutor().enqueueJob(new DownloadProfileAvatarJob(avatarPath)); + + final var groupV2Ids = account.getGroupStore().getGroupV2Ids(connection); + newGroupV2StorageIds = generateGroupV2StorageIds(groupV2Ids); + for (final var groupId : groupV2Ids) { + final var storageId = newGroupV2StorageIds.get(groupId); + final var group = account.getGroupStore().getGroup(connection, groupId); + final var record = StorageSyncModels.localToRemoteRecord(group, storageId.getRaw()); + newStorageRecords.add(record); } + + connection.commit(); + } catch (SQLException e) { + throw new RuntimeException("Failed to sync remote storage", e); } + final var newStorageIds = newStorageRecords.stream().map(SignalStorageRecord::getId).toList(); - context.getProfileHelper() - .setProfile(false, - false, - accountRecord.getGivenName().orElse(null), - accountRecord.getFamilyName().orElse(null), - null, - null, - null, - null); - } + final var manifest = new SignalStorageManifest(newVersion, account.getDeviceId(), newStorageIds); - private SignalStorageRecord getSignalStorageRecord(final StorageId accountId) throws IOException { - List records; + StorageSyncValidations.validateForcePush(manifest, newStorageRecords, account.getSelfRecipientAddress()); + + final Optional conflict; try { - records = dependencies.getAccountManager() - .readStorageRecords(account.getStorageKey(), Collections.singletonList(accountId)); + if (newVersion > 1) { + logger.trace("Force-pushing data. Inserting {} IDs.", newStorageRecords.size()); + conflict = dependencies.getAccountManager() + .resetStorageRecords(storageServiceKey, manifest, newStorageRecords); + } else { + logger.trace("First version, normal push. Inserting {} IDs.", newStorageRecords.size()); + conflict = dependencies.getAccountManager() + .writeStorageRecords(storageServiceKey, manifest, newStorageRecords, Collections.emptyList()); + } } catch (InvalidKeyException e) { - logger.warn("Failed to read storage records, ignoring."); - return null; + logger.debug("Hit an invalid key exception, which likely indicates a conflict.", e); + throw new RetryLaterException(); + } + + if (conflict.isPresent()) { + logger.debug("Hit a conflict. Trying again."); + throw new RetryLaterException(); + } + + logger.debug("Force push succeeded. Updating local manifest version to: " + manifest.getVersion()); + storeManifestLocally(manifest); + + try (final var connection = account.getAccountDatabase().getConnection()) { + connection.setAutoCommit(false); + account.getRecipientStore().updateStorageIds(connection, newContactStorageIds); + account.getGroupStore().updateStorageIds(connection, newGroupV1StorageIds, newGroupV2StorageIds); + + // delete all unknown storage ids + account.getUnknownStorageIdStore().deleteAllUnknownStorageIds(connection); + connection.commit(); + } catch (SQLException e) { + throw new RuntimeException("Failed to sync remote storage", e); } - return !records.isEmpty() ? records.getFirst() : null; } - private List getSignalStorageRecords(final Collection storageIds) throws IOException { + private Map generateContactStorageIds(List recipientIds) { + final var selfRecipientId = account.getSelfRecipientId(); + return recipientIds.stream().collect(Collectors.toMap(recipientId -> recipientId, recipientId -> { + if (recipientId.equals(selfRecipientId)) { + return StorageId.forAccount(KeyUtils.createRawStorageId()); + } else { + return StorageId.forContact(KeyUtils.createRawStorageId()); + } + })); + } + + private Map generateGroupV1StorageIds(List groupIds) { + return groupIds.stream() + .collect(Collectors.toMap(recipientId -> recipientId, + recipientId -> StorageId.forGroupV1(KeyUtils.createRawStorageId()))); + } + + private Map generateGroupV2StorageIds(List groupIds) { + return groupIds.stream() + .collect(Collectors.toMap(recipientId -> recipientId, + recipientId -> StorageId.forGroupV2(KeyUtils.createRawStorageId()))); + } + + private void storeManifestLocally( + final SignalStorageManifest remoteManifest + ) { + account.setStorageManifestVersion(remoteManifest.getVersion()); + account.setStorageManifest(remoteManifest); + } + + private List getSignalStorageRecords( + final StorageKey storageKey, final List storageIds + ) throws IOException { List records; try { - records = dependencies.getAccountManager() - .readStorageRecords(account.getStorageKey(), new ArrayList<>(storageIds)); + records = dependencies.getAccountManager().readStorageRecords(storageKey, storageIds); } catch (InvalidKeyException e) { logger.warn("Failed to read storage records, ignoring."); return List.of(); } return records; } + + private List getAllLocalStorageIds(final Connection connection) throws SQLException { + final var storageIds = new ArrayList(); + storageIds.addAll(account.getUnknownStorageIdStore().getUnknownStorageIds(connection)); + storageIds.addAll(account.getGroupStore().getStorageIds(connection)); + storageIds.addAll(account.getRecipientStore().getStorageIds(connection)); + storageIds.add(account.getRecipientStore().getSelfStorageId(connection)); + return storageIds; + } + + private List buildLocalStorageRecords( + final Connection connection, final List storageIds + ) throws SQLException { + final var records = new ArrayList(); + for (final var storageId : storageIds) { + final var record = buildLocalStorageRecord(connection, storageId); + if (record != null) { + records.add(record); + } + } + return records; + } + + private SignalStorageRecord buildLocalStorageRecord( + Connection connection, StorageId storageId + ) throws SQLException { + return switch (ManifestRecord.Identifier.Type.fromValue(storageId.getType())) { + case ManifestRecord.Identifier.Type.CONTACT -> { + final var recipient = account.getRecipientStore().getRecipient(connection, storageId); + final var address = recipient.getAddress().getIdentifier(); + final var identity = account.getIdentityKeyStore().getIdentityInfo(connection, address); + yield StorageSyncModels.localToRemoteRecord(recipient, identity, storageId.getRaw()); + } + case ManifestRecord.Identifier.Type.GROUPV1 -> { + final var groupV1 = account.getGroupStore().getGroupV1(connection, storageId); + yield StorageSyncModels.localToRemoteRecord(groupV1, storageId.getRaw()); + } + case ManifestRecord.Identifier.Type.GROUPV2 -> { + final var groupV2 = account.getGroupStore().getGroupV2(connection, storageId); + yield StorageSyncModels.localToRemoteRecord(groupV2, storageId.getRaw()); + } + case ManifestRecord.Identifier.Type.ACCOUNT -> { + final var selfRecipient = account.getRecipientStore() + .getRecipient(connection, account.getSelfRecipientId()); + yield StorageSyncModels.localToRemoteRecord(account.getConfigurationStore(), + selfRecipient, + account.getUsernameLink(), + storageId.getRaw()); + } + case null, default -> throw new AssertionError("Got unknown local storage record type: " + storageId); + }; + } + + /** + * Given a list of all the local and remote keys you know about, this will + * return a result telling + * you which keys are exclusively remote and which are exclusively local. + * + * @param remoteIds All remote keys available. + * @param localIds All local keys available. + * @return An object describing which keys are exclusive to the remote data set + * and which keys are + * exclusive to the local data set. + */ + private static IdDifferenceResult findIdDifference( + Collection remoteIds, Collection localIds + ) { + final var base64Encoder = Base64.getEncoder(); + final var remoteByRawId = remoteIds.stream() + .collect(Collectors.toMap(id -> base64Encoder.encodeToString(id.getRaw()), id -> id)); + final var localByRawId = localIds.stream() + .collect(Collectors.toMap(id -> base64Encoder.encodeToString(id.getRaw()), id -> id)); + + boolean hasTypeMismatch = remoteByRawId.size() != remoteIds.size() || localByRawId.size() != localIds.size(); + + final var remoteOnlyRawIds = SetUtil.difference(remoteByRawId.keySet(), localByRawId.keySet()); + final var localOnlyRawIds = SetUtil.difference(localByRawId.keySet(), remoteByRawId.keySet()); + final var sharedRawIds = SetUtil.intersection(localByRawId.keySet(), remoteByRawId.keySet()); + + for (String rawId : sharedRawIds) { + final var remote = remoteByRawId.get(rawId); + final var local = localByRawId.get(rawId); + + if (remote.getType() != local.getType()) { + remoteOnlyRawIds.remove(rawId); + localOnlyRawIds.remove(rawId); + hasTypeMismatch = true; + logger.debug("Remote type {} did not match local type {} for {}!", + remote.getType(), + local.getType(), + rawId); + } + } + + final var remoteOnlyKeys = remoteOnlyRawIds.stream().map(remoteByRawId::get).toList(); + final var localOnlyKeys = localOnlyRawIds.stream().map(localByRawId::get).toList(); + + return new IdDifferenceResult(remoteOnlyKeys, localOnlyKeys, hasTypeMismatch); + } + + private List processKnownRecords( + final Connection connection, List records + ) throws SQLException { + final var unknownRecords = new ArrayList(); + + final var accountRecordProcessor = new AccountRecordProcessor(account, connection, context.getJobExecutor()); + final var contactRecordProcessor = new ContactRecordProcessor(account, connection, context.getJobExecutor()); + final var groupV1RecordProcessor = new GroupV1RecordProcessor(account, connection); + final var groupV2RecordProcessor = new GroupV2RecordProcessor(account, connection); + + for (final var record : records) { + logger.debug("Reading record of type {}", record.getType()); + switch (ManifestRecord.Identifier.Type.fromValue(record.getType())) { + case ACCOUNT -> accountRecordProcessor.process(record.getAccount().get()); + case GROUPV1 -> groupV1RecordProcessor.process(record.getGroupV1().get()); + case GROUPV2 -> groupV2RecordProcessor.process(record.getGroupV2().get()); + case CONTACT -> contactRecordProcessor.process(record.getContact().get()); + case null, default -> unknownRecords.add(record.getId()); + } + } + + return unknownRecords; + } + + /** + * hasTypeMismatches is True if there exist some keys that have matching raw ID's but different types, otherwise false. + */ + private record IdDifferenceResult( + List remoteOnlyIds, List localOnlyIds, boolean hasTypeMismatches + ) { + + public boolean isEmpty() { + return remoteOnlyIds.isEmpty() && localOnlyIds.isEmpty(); + } + } + + private static class RetryLaterException extends Throwable {} } diff --git a/lib/src/main/java/org/asamk/signal/manager/helper/SyncHelper.java b/lib/src/main/java/org/asamk/signal/manager/helper/SyncHelper.java index f6392ea9..ac611046 100644 --- a/lib/src/main/java/org/asamk/signal/manager/helper/SyncHelper.java +++ b/lib/src/main/java/org/asamk/signal/manager/helper/SyncHelper.java @@ -79,6 +79,11 @@ public class SyncHelper { .sendSyncMessage(SignalServiceSyncMessage.forFetchLatest(SignalServiceSyncMessage.FetchType.LOCAL_PROFILE)); } + public void sendSyncFetchStorageMessage() { + context.getSendHelper() + .sendSyncMessage(SignalServiceSyncMessage.forFetchLatest(SignalServiceSyncMessage.FetchType.STORAGE_MANIFEST)); + } + public void sendGroups() throws IOException { var groupsFile = IOUtils.createTempFile(); @@ -222,7 +227,7 @@ public class SyncHelper { } public SendMessageResult sendKeysMessage() { - var keysMessage = new KeysMessage(Optional.ofNullable(account.getStorageKey()), + var keysMessage = new KeysMessage(Optional.ofNullable(account.getOrCreateStorageKey()), Optional.ofNullable(account.getOrCreatePinMasterKey())); return context.getSendHelper().sendSyncMessage(SignalServiceSyncMessage.forKeys(keysMessage)); } diff --git a/lib/src/main/java/org/asamk/signal/manager/internal/JobExecutor.java b/lib/src/main/java/org/asamk/signal/manager/internal/JobExecutor.java index ee4c2222..a76924d8 100644 --- a/lib/src/main/java/org/asamk/signal/manager/internal/JobExecutor.java +++ b/lib/src/main/java/org/asamk/signal/manager/internal/JobExecutor.java @@ -70,11 +70,13 @@ public class JobExecutor implements AutoCloseable { @Override public void close() { + final boolean queueEmpty; synchronized (queue) { - if (queue.isEmpty()) { - executorService.close(); - return; - } + queueEmpty = queue.isEmpty(); + } + if (queueEmpty) { + executorService.close(); + return; } synchronized (this) { try { diff --git a/lib/src/main/java/org/asamk/signal/manager/internal/ManagerImpl.java b/lib/src/main/java/org/asamk/signal/manager/internal/ManagerImpl.java index 29f3d2ab..a2575e90 100644 --- a/lib/src/main/java/org/asamk/signal/manager/internal/ManagerImpl.java +++ b/lib/src/main/java/org/asamk/signal/manager/internal/ManagerImpl.java @@ -66,6 +66,7 @@ import org.asamk.signal.manager.config.ServiceEnvironmentConfig; import org.asamk.signal.manager.helper.AccountFileUpdater; import org.asamk.signal.manager.helper.Context; import org.asamk.signal.manager.helper.RecipientHelper.RegisteredUser; +import org.asamk.signal.manager.jobs.SyncStorageJob; import org.asamk.signal.manager.storage.AttachmentStore; import org.asamk.signal.manager.storage.AvatarStore; import org.asamk.signal.manager.storage.SignalAccount; @@ -125,6 +126,7 @@ import java.util.stream.Collectors; import java.util.stream.Stream; import io.reactivex.rxjava3.disposables.CompositeDisposable; +import io.reactivex.rxjava3.schedulers.Schedulers; public class ManagerImpl implements Manager { @@ -193,22 +195,25 @@ public class ManagerImpl implements Manager { this.notifyAll(); } }); - disposable.add(account.getIdentityKeyStore().getIdentityChanges().subscribe(serviceId -> { - logger.trace("Archiving old sessions for {}", serviceId); - account.getAccountData(ServiceIdType.ACI).getSessionStore().archiveSessions(serviceId); - account.getAccountData(ServiceIdType.PNI).getSessionStore().archiveSessions(serviceId); - account.getSenderKeyStore().deleteSharedWith(serviceId); - final var recipientId = account.getRecipientResolver().resolveRecipient(serviceId); - final var profile = account.getProfileStore().getProfile(recipientId); - if (profile != null) { - account.getProfileStore() - .storeProfile(recipientId, - Profile.newBuilder(profile) - .withUnidentifiedAccessMode(Profile.UnidentifiedAccessMode.UNKNOWN) - .withLastUpdateTimestamp(0) - .build()); - } - })); + disposable.add(account.getIdentityKeyStore() + .getIdentityChanges() + .observeOn(Schedulers.from(executor)) + .subscribe(serviceId -> { + logger.trace("Archiving old sessions for {}", serviceId); + account.getAccountData(ServiceIdType.ACI).getSessionStore().archiveSessions(serviceId); + account.getAccountData(ServiceIdType.PNI).getSessionStore().archiveSessions(serviceId); + account.getSenderKeyStore().deleteSharedWith(serviceId); + final var recipientId = account.getRecipientResolver().resolveRecipient(serviceId); + final var profile = account.getProfileStore().getProfile(recipientId); + if (profile != null) { + account.getProfileStore() + .storeProfile(recipientId, + Profile.newBuilder(profile) + .withUnidentifiedAccessMode(Profile.UnidentifiedAccessMode.UNKNOWN) + .withLastUpdateTimestamp(0) + .build()); + } + })); } @Override @@ -295,13 +300,7 @@ public class ManagerImpl implements Manager { } @Override - public void updateConfiguration( - Configuration configuration - ) throws NotPrimaryDeviceException { - if (!account.isPrimaryDevice()) { - throw new NotPrimaryDeviceException(); - } - + public void updateConfiguration(Configuration configuration) { final var configurationStore = account.getConfigurationStore(); if (configuration.readReceipts().isPresent()) { configurationStore.setReadReceipts(configuration.readReceipts().get()); @@ -316,6 +315,7 @@ public class ManagerImpl implements Manager { configurationStore.setLinkPreviews(configuration.linkPreviews().get()); } context.getSyncHelper().sendConfigurationMessage(); + syncRemoteStorage(); } @Override @@ -870,6 +870,7 @@ public class ManagerImpl implements Manager { if (recipientIdOptional.isPresent()) { context.getContactHelper().setContactHidden(recipientIdOptional.get(), true); account.removeRecipient(recipientIdOptional.get()); + syncRemoteStorage(); } } @@ -878,6 +879,7 @@ public class ManagerImpl implements Manager { final var recipientIdOptional = context.getRecipientHelper().resolveRecipientOptional(recipient); if (recipientIdOptional.isPresent()) { account.removeRecipient(recipientIdOptional.get()); + syncRemoteStorage(); } } @@ -886,6 +888,7 @@ public class ManagerImpl implements Manager { final var recipientIdOptional = context.getRecipientHelper().resolveRecipientOptional(recipient); if (recipientIdOptional.isPresent()) { account.getContactStore().deleteContact(recipientIdOptional.get()); + syncRemoteStorage(); } } @@ -898,15 +901,13 @@ public class ManagerImpl implements Manager { } context.getContactHelper() .setContactName(context.getRecipientHelper().resolveRecipient(recipient), givenName, familyName); + syncRemoteStorage(); } @Override public void setContactsBlocked( Collection recipients, boolean blocked - ) throws NotPrimaryDeviceException, IOException, UnregisteredRecipientException { - if (!account.isPrimaryDevice()) { - throw new NotPrimaryDeviceException(); - } + ) throws IOException, UnregisteredRecipientException { if (recipients.isEmpty()) { return; } @@ -930,15 +931,13 @@ public class ManagerImpl implements Manager { context.getProfileHelper().rotateProfileKey(); } context.getSyncHelper().sendBlockedList(); + syncRemoteStorage(); } @Override public void setGroupsBlocked( final Collection groupIds, final boolean blocked - ) throws GroupNotFoundException, NotPrimaryDeviceException, IOException { - if (!account.isPrimaryDevice()) { - throw new NotPrimaryDeviceException(); - } + ) throws GroupNotFoundException, IOException { if (groupIds.isEmpty()) { return; } @@ -954,6 +953,7 @@ public class ManagerImpl implements Manager { context.getProfileHelper().rotateProfileKey(); } context.getSyncHelper().sendBlockedList(); + syncRemoteStorage(); } @Override @@ -968,6 +968,7 @@ public class ManagerImpl implements Manager { } catch (NotAGroupMemberException | GroupNotFoundException | GroupSendingNotAllowedException e) { throw new AssertionError(e); } + syncRemoteStorage(); } @Override @@ -1025,13 +1026,13 @@ public class ManagerImpl implements Manager { } @Override - public void requestAllSyncData() throws IOException { + public void requestAllSyncData() { context.getSyncHelper().requestAllSyncData(); - retrieveRemoteStorage(); + syncRemoteStorage(); } - void retrieveRemoteStorage() throws IOException { - context.getStorageHelper().readDataFromStorage(); + void syncRemoteStorage() { + context.getJobExecutor().enqueueJob(new SyncStorageJob()); } @Override diff --git a/lib/src/main/java/org/asamk/signal/manager/internal/RegistrationManagerImpl.java b/lib/src/main/java/org/asamk/signal/manager/internal/RegistrationManagerImpl.java index 6a2f3633..839bdd48 100644 --- a/lib/src/main/java/org/asamk/signal/manager/internal/RegistrationManagerImpl.java +++ b/lib/src/main/java/org/asamk/signal/manager/internal/RegistrationManagerImpl.java @@ -169,7 +169,7 @@ public class RegistrationManagerImpl implements RegistrationManager { m.refreshPreKeys(); if (response.isStorageCapable()) { - m.retrieveRemoteStorage(); + m.syncRemoteStorage(); } // Set an initial empty profile so user can be added to groups try { diff --git a/lib/src/main/java/org/asamk/signal/manager/jobs/DownloadProfileJob.java b/lib/src/main/java/org/asamk/signal/manager/jobs/DownloadProfileJob.java new file mode 100644 index 00000000..22796864 --- /dev/null +++ b/lib/src/main/java/org/asamk/signal/manager/jobs/DownloadProfileJob.java @@ -0,0 +1,24 @@ +package org.asamk.signal.manager.jobs; + +import org.asamk.signal.manager.helper.Context; +import org.asamk.signal.manager.storage.recipients.RecipientAddress; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class DownloadProfileJob implements Job { + + private static final Logger logger = LoggerFactory.getLogger(DownloadProfileJob.class); + private final RecipientAddress address; + + public DownloadProfileJob(RecipientAddress address) { + this.address = address; + } + + @Override + public void run(Context context) { + logger.trace("Refreshing profile for {}", address); + final var account = context.getAccount(); + final var recipientId = account.getRecipientStore().resolveRecipient(address); + context.getProfileHelper().refreshRecipientProfile(recipientId); + } +} diff --git a/lib/src/main/java/org/asamk/signal/manager/jobs/SyncStorageJob.java b/lib/src/main/java/org/asamk/signal/manager/jobs/SyncStorageJob.java new file mode 100644 index 00000000..85cfbe2c --- /dev/null +++ b/lib/src/main/java/org/asamk/signal/manager/jobs/SyncStorageJob.java @@ -0,0 +1,22 @@ +package org.asamk.signal.manager.jobs; + +import org.asamk.signal.manager.helper.Context; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +public class SyncStorageJob implements Job { + + private static final Logger logger = LoggerFactory.getLogger(SyncStorageJob.class); + + @Override + public void run(Context context) { + logger.trace("Running storage sync job"); + try { + context.getStorageHelper().syncDataWithStorage(); + } catch (IOException e) { + logger.warn("Failed to sync storage data", e); + } + } +} diff --git a/lib/src/main/java/org/asamk/signal/manager/storage/AccountDatabase.java b/lib/src/main/java/org/asamk/signal/manager/storage/AccountDatabase.java index 43a146a7..c49e56ab 100644 --- a/lib/src/main/java/org/asamk/signal/manager/storage/AccountDatabase.java +++ b/lib/src/main/java/org/asamk/signal/manager/storage/AccountDatabase.java @@ -32,7 +32,7 @@ import java.util.UUID; public class AccountDatabase extends Database { private static final Logger logger = LoggerFactory.getLogger(AccountDatabase.class); - private static final long DATABASE_VERSION = 19; + private static final long DATABASE_VERSION = 20; private AccountDatabase(final HikariDataSource dataSource) { super(logger, DATABASE_VERSION, dataSource); @@ -57,6 +57,7 @@ public class AccountDatabase extends Database { SenderKeySharedStore.createSql(connection); KeyValueStore.createSql(connection); CdsiStore.createSql(connection); + UnknownStorageIdStore.createSql(connection); } @Override @@ -539,5 +540,23 @@ public class AccountDatabase extends Database { """); } } + if (oldVersion < 20) { + logger.debug("Updating database: Creating storage id tables and columns"); + try (final var statement = connection.createStatement()) { + statement.executeUpdate(""" + CREATE TABLE storage_id ( + _id INTEGER PRIMARY KEY, + type INTEGER NOT NULL, + storage_id BLOB NOT NULL + ) STRICT; + ALTER TABLE group_v1 ADD COLUMN storage_id BLOB; + ALTER TABLE group_v1 ADD COLUMN storage_record BLOB; + ALTER TABLE group_v2 ADD COLUMN storage_id BLOB; + ALTER TABLE group_v2 ADD COLUMN storage_record BLOB; + ALTER TABLE recipient ADD COLUMN storage_id BLOB; + ALTER TABLE recipient ADD COLUMN storage_record BLOB; + """); + } + } } } diff --git a/lib/src/main/java/org/asamk/signal/manager/storage/SignalAccount.java b/lib/src/main/java/org/asamk/signal/manager/storage/SignalAccount.java index 6860e239..41c05b98 100644 --- a/lib/src/main/java/org/asamk/signal/manager/storage/SignalAccount.java +++ b/lib/src/main/java/org/asamk/signal/manager/storage/SignalAccount.java @@ -168,6 +168,7 @@ public class SignalAccount implements Closeable { private GroupStore groupStore; private RecipientStore recipientStore; private StickerStore stickerStore; + private UnknownStorageIdStore unknownStorageIdStore; private ConfigurationStore configurationStore; private KeyValueStore keyValueStore; private CdsiStore cdsiStore; @@ -176,6 +177,7 @@ public class SignalAccount implements Closeable { private MessageSendLogStore messageSendLogStore; private AccountDatabase accountDatabase; + private RecipientId selfRecipientId; private SignalAccount(final FileChannel fileChannel, final FileLock lock) { this.fileChannel = fileChannel; @@ -194,6 +196,7 @@ public class SignalAccount implements Closeable { signalAccount.load(dataPath, accountPath, settings); logger.trace("Migrating legacy parts of account file"); signalAccount.migrateLegacyConfigs(); + signalAccount.init(); return signalAccount; } catch (Throwable e) { @@ -240,7 +243,7 @@ public class SignalAccount implements Closeable { signalAccount.registered = false; signalAccount.previousStorageVersion = CURRENT_STORAGE_VERSION; - signalAccount.migrateLegacyConfigs(); + signalAccount.init(); signalAccount.save(); return signalAccount; @@ -286,6 +289,7 @@ public class SignalAccount implements Closeable { this.number = number; this.aciAccountData.setServiceId(aci); this.pniAccountData.setServiceId(pni); + this.init(); getRecipientTrustedResolver().resolveSelfRecipientTrusted(getSelfRecipientAddress()); this.password = password; this.profileKey = profileKey; @@ -337,6 +341,7 @@ public class SignalAccount implements Closeable { this.registered = true; this.aciAccountData.setServiceId(aci); this.pniAccountData.setServiceId(pni); + init(); this.registrationLockPin = pin; getKeyValueStore().storeEntry(lastReceiveTimestamp, 0L); save(); @@ -356,6 +361,10 @@ public class SignalAccount implements Closeable { getAccountDatabase(); } + private void init() { + this.selfRecipientId = getRecipientResolver().resolveRecipient(getSelfRecipientAddress()); + } + private void migrateLegacyConfigs() { if (isPrimaryDevice() && getPniIdentityKeyPair() == null) { setPniIdentityKeyPair(KeyUtils.generateIdentityKeyPair()); @@ -1158,7 +1167,9 @@ public class SignalAccount implements Closeable { public IdentityKeyStore getIdentityKeyStore() { return getOrCreate(() -> identityKeyStore, - () -> identityKeyStore = new IdentityKeyStore(getAccountDatabase(), settings.trustNewIdentity())); + () -> identityKeyStore = new IdentityKeyStore(getAccountDatabase(), + settings.trustNewIdentity(), + getRecipientStore())); } public GroupStore getGroupStore() { @@ -1216,9 +1227,13 @@ public class SignalAccount implements Closeable { return getOrCreate(() -> keyValueStore, () -> keyValueStore = new KeyValueStore(getAccountDatabase())); } + public UnknownStorageIdStore getUnknownStorageIdStore() { + return getOrCreate(() -> unknownStorageIdStore, () -> unknownStorageIdStore = new UnknownStorageIdStore()); + } + public ConfigurationStore getConfigurationStore() { return getOrCreate(() -> configurationStore, - () -> configurationStore = new ConfigurationStore(getKeyValueStore())); + () -> configurationStore = new ConfigurationStore(getKeyValueStore(), getRecipientStore())); } public MessageCache getMessageCache() { @@ -1387,7 +1402,7 @@ public class SignalAccount implements Closeable { } public RecipientId getSelfRecipientId() { - return getRecipientResolver().resolveRecipient(getSelfRecipientAddress()); + return selfRecipientId; } public String getSessionId(final String forNumber) { @@ -1472,22 +1487,29 @@ public class SignalAccount implements Closeable { return pinMasterKey; } - public StorageKey getStorageKey() { - if (pinMasterKey != null) { - return pinMasterKey.deriveStorageServiceKey(); + public void setMasterKey(MasterKey masterKey) { + if (isPrimaryDevice()) { + return; } - return storageKey; + this.pinMasterKey = masterKey; + save(); } public StorageKey getOrCreateStorageKey() { - if (isPrimaryDevice()) { - return getOrCreatePinMasterKey().deriveStorageServiceKey(); + if (pinMasterKey != null) { + return pinMasterKey.deriveStorageServiceKey(); + } else if (storageKey != null) { + return storageKey; + } else if (!isPrimaryDevice() || !isMultiDevice()) { + // Only upload storage, if a pin master key already exists or linked devices exist + return null; } - return storageKey; + + return getOrCreatePinMasterKey().deriveStorageServiceKey(); } public void setStorageKey(final StorageKey storageKey) { - if (storageKey.equals(this.storageKey)) { + if (isPrimaryDevice() || storageKey.equals(this.storageKey)) { return; } this.storageKey = storageKey; diff --git a/lib/src/main/java/org/asamk/signal/manager/storage/UnknownStorageIdStore.java b/lib/src/main/java/org/asamk/signal/manager/storage/UnknownStorageIdStore.java new file mode 100644 index 00000000..6e00ad75 --- /dev/null +++ b/lib/src/main/java/org/asamk/signal/manager/storage/UnknownStorageIdStore.java @@ -0,0 +1,104 @@ +package org.asamk.signal.manager.storage; + +import org.whispersystems.signalservice.api.storage.StorageId; + +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.Collection; +import java.util.List; +import java.util.stream.Collectors; + +public class UnknownStorageIdStore { + + private static final String TABLE_STORAGE_ID = "storage_id"; + + public static void createSql(Connection connection) throws SQLException { + // When modifying the CREATE statement here, also add a migration in AccountDatabase.java + try (final var statement = connection.createStatement()) { + statement.executeUpdate(""" + CREATE TABLE storage_id ( + _id INTEGER PRIMARY KEY, + type INTEGER NOT NULL, + storage_id BLOB UNIQUE NOT NULL + ) STRICT; + """); + } + } + + public Collection getUnknownStorageIds(Connection connection) throws SQLException { + final var sql = ( + """ + SELECT s.type, s.storage_id + FROM %s s + """ + ).formatted(TABLE_STORAGE_ID); + try (final var statement = connection.prepareStatement(sql)) { + try (var result = Utils.executeQueryForStream(statement, this::getStorageIdFromResultSet)) { + return result.toList(); + } + } + } + + public List getUnknownStorageIds( + Connection connection, Collection types + ) throws SQLException { + final var typesCommaSeparated = types.stream().map(String::valueOf).collect(Collectors.joining(",")); + final var sql = ( + """ + SELECT s.type, s.storage_id + FROM %s s + WHERE s.type IN (%s) + """ + ).formatted(TABLE_STORAGE_ID, typesCommaSeparated); + try (final var statement = connection.prepareStatement(sql)) { + try (var result = Utils.executeQueryForStream(statement, this::getStorageIdFromResultSet)) { + return result.toList(); + } + } + } + + public void addUnknownStorageIds(Connection connection, Collection storageIds) throws SQLException { + final var sql = ( + """ + INSERT OR REPLACE INTO %s (type, storage_id) + VALUES (?, ?) + """ + ).formatted(TABLE_STORAGE_ID); + try (final var statement = connection.prepareStatement(sql)) { + for (final var storageId : storageIds) { + statement.setInt(1, storageId.getType()); + statement.setBytes(2, storageId.getRaw()); + statement.executeUpdate(); + } + } + } + + public void deleteUnknownStorageIds(Connection connection, Collection storageIds) throws SQLException { + final var sql = ( + """ + DELETE FROM %s + WHERE storage_id = ? + """ + ).formatted(TABLE_STORAGE_ID); + try (final var statement = connection.prepareStatement(sql)) { + for (final var storageId : storageIds) { + statement.setBytes(1, storageId.getRaw()); + statement.executeUpdate(); + } + } + } + + public void deleteAllUnknownStorageIds(Connection connection) throws SQLException { + final var sql = "DELETE FROM %s".formatted(TABLE_STORAGE_ID); + try (final var statement = connection.prepareStatement(sql)) { + statement.executeUpdate(); + } + } + + private StorageId getStorageIdFromResultSet(ResultSet resultSet) throws SQLException { + final var type = resultSet.getInt("type"); + final var storageId = resultSet.getBytes("storage_id"); + return StorageId.forType(storageId, type); + } +} diff --git a/lib/src/main/java/org/asamk/signal/manager/storage/configuration/ConfigurationStore.java b/lib/src/main/java/org/asamk/signal/manager/storage/configuration/ConfigurationStore.java index f85d3d80..558d4f86 100644 --- a/lib/src/main/java/org/asamk/signal/manager/storage/configuration/ConfigurationStore.java +++ b/lib/src/main/java/org/asamk/signal/manager/storage/configuration/ConfigurationStore.java @@ -3,10 +3,15 @@ package org.asamk.signal.manager.storage.configuration; import org.asamk.signal.manager.api.PhoneNumberSharingMode; import org.asamk.signal.manager.storage.keyValue.KeyValueEntry; import org.asamk.signal.manager.storage.keyValue.KeyValueStore; +import org.asamk.signal.manager.storage.recipients.RecipientStore; + +import java.sql.Connection; +import java.sql.SQLException; public class ConfigurationStore { private final KeyValueStore keyValueStore; + private final RecipientStore recipientStore; private final KeyValueEntry readReceipts = new KeyValueEntry<>("config-read-receipts", Boolean.class); private final KeyValueEntry unidentifiedDeliveryIndicators = new KeyValueEntry<>( @@ -20,9 +25,11 @@ public class ConfigurationStore { private final KeyValueEntry phoneNumberSharingMode = new KeyValueEntry<>( "config-phone-number-sharing-mode", PhoneNumberSharingMode.class); + private final KeyValueEntry usernameLinkColor = new KeyValueEntry<>("username-link-color", String.class); - public ConfigurationStore(final KeyValueStore keyValueStore) { + public ConfigurationStore(final KeyValueStore keyValueStore, RecipientStore recipientStore) { this.keyValueStore = keyValueStore; + this.recipientStore = recipientStore; } public Boolean getReadReceipts() { @@ -30,7 +37,15 @@ public class ConfigurationStore { } public void setReadReceipts(final boolean value) { - keyValueStore.storeEntry(readReceipts, value); + if (keyValueStore.storeEntry(readReceipts, value)) { + recipientStore.rotateSelfStorageId(); + } + } + + public void setReadReceipts(final Connection connection, final boolean value) throws SQLException { + if (keyValueStore.storeEntry(connection, readReceipts, value)) { + recipientStore.rotateSelfStorageId(connection); + } } public Boolean getUnidentifiedDeliveryIndicators() { @@ -38,7 +53,17 @@ public class ConfigurationStore { } public void setUnidentifiedDeliveryIndicators(final boolean value) { - keyValueStore.storeEntry(unidentifiedDeliveryIndicators, value); + if (keyValueStore.storeEntry(unidentifiedDeliveryIndicators, value)) { + recipientStore.rotateSelfStorageId(); + } + } + + public void setUnidentifiedDeliveryIndicators( + final Connection connection, final boolean value + ) throws SQLException { + if (keyValueStore.storeEntry(connection, unidentifiedDeliveryIndicators, value)) { + recipientStore.rotateSelfStorageId(connection); + } } public Boolean getTypingIndicators() { @@ -46,7 +71,15 @@ public class ConfigurationStore { } public void setTypingIndicators(final boolean value) { - keyValueStore.storeEntry(typingIndicators, value); + if (keyValueStore.storeEntry(typingIndicators, value)) { + recipientStore.rotateSelfStorageId(); + } + } + + public void setTypingIndicators(final Connection connection, final boolean value) throws SQLException { + if (keyValueStore.storeEntry(connection, typingIndicators, value)) { + recipientStore.rotateSelfStorageId(connection); + } } public Boolean getLinkPreviews() { @@ -54,7 +87,15 @@ public class ConfigurationStore { } public void setLinkPreviews(final boolean value) { - keyValueStore.storeEntry(linkPreviews, value); + if (keyValueStore.storeEntry(linkPreviews, value)) { + recipientStore.rotateSelfStorageId(); + } + } + + public void setLinkPreviews(final Connection connection, final boolean value) throws SQLException { + if (keyValueStore.storeEntry(connection, linkPreviews, value)) { + recipientStore.rotateSelfStorageId(connection); + } } public Boolean getPhoneNumberUnlisted() { @@ -62,7 +103,15 @@ public class ConfigurationStore { } public void setPhoneNumberUnlisted(final boolean value) { - keyValueStore.storeEntry(phoneNumberUnlisted, value); + if (keyValueStore.storeEntry(phoneNumberUnlisted, value)) { + recipientStore.rotateSelfStorageId(); + } + } + + public void setPhoneNumberUnlisted(final Connection connection, final boolean value) throws SQLException { + if (keyValueStore.storeEntry(connection, phoneNumberUnlisted, value)) { + recipientStore.rotateSelfStorageId(connection); + } } public PhoneNumberSharingMode getPhoneNumberSharingMode() { @@ -70,6 +119,32 @@ public class ConfigurationStore { } public void setPhoneNumberSharingMode(final PhoneNumberSharingMode value) { - keyValueStore.storeEntry(phoneNumberSharingMode, value); + if (keyValueStore.storeEntry(phoneNumberSharingMode, value)) { + recipientStore.rotateSelfStorageId(); + } + } + + public void setPhoneNumberSharingMode( + final Connection connection, final PhoneNumberSharingMode value + ) throws SQLException { + if (keyValueStore.storeEntry(connection, phoneNumberSharingMode, value)) { + recipientStore.rotateSelfStorageId(connection); + } + } + + public String getUsernameLinkColor() { + return keyValueStore.getEntry(usernameLinkColor); + } + + public void setUsernameLinkColor(final String color) { + if (keyValueStore.storeEntry(usernameLinkColor, color)) { + recipientStore.rotateSelfStorageId(); + } + } + + public void setUsernameLinkColor(final Connection connection, final String color) throws SQLException { + if (keyValueStore.storeEntry(connection, usernameLinkColor, color)) { + recipientStore.rotateSelfStorageId(connection); + } } } diff --git a/lib/src/main/java/org/asamk/signal/manager/storage/groups/GroupInfoV1.java b/lib/src/main/java/org/asamk/signal/manager/storage/groups/GroupInfoV1.java index ba09337c..fd7bfe85 100644 --- a/lib/src/main/java/org/asamk/signal/manager/storage/groups/GroupInfoV1.java +++ b/lib/src/main/java/org/asamk/signal/manager/storage/groups/GroupInfoV1.java @@ -25,6 +25,7 @@ public final class GroupInfoV1 extends GroupInfo { public int messageExpirationTime; public boolean blocked; public boolean archived; + private byte[] storageRecord; public GroupInfoV1(GroupIdV1 groupId) { this.groupId = groupId; @@ -38,7 +39,8 @@ public final class GroupInfoV1 extends GroupInfo { final String color, final int messageExpirationTime, final boolean blocked, - final boolean archived + final boolean archived, + final byte[] storageRecord ) { this.groupId = groupId; this.expectedV2Id = expectedV2Id; @@ -48,6 +50,7 @@ public final class GroupInfoV1 extends GroupInfo { this.messageExpirationTime = messageExpirationTime; this.blocked = blocked; this.archived = archived; + this.storageRecord = storageRecord; } @Override @@ -123,4 +126,8 @@ public final class GroupInfoV1 extends GroupInfo { public void removeMember(RecipientId recipientId) { this.members.removeIf(member -> member.equals(recipientId)); } + + public byte[] getStorageRecord() { + return storageRecord; + } } diff --git a/lib/src/main/java/org/asamk/signal/manager/storage/groups/GroupInfoV2.java b/lib/src/main/java/org/asamk/signal/manager/storage/groups/GroupInfoV2.java index c263aa9b..8ddec54e 100644 --- a/lib/src/main/java/org/asamk/signal/manager/storage/groups/GroupInfoV2.java +++ b/lib/src/main/java/org/asamk/signal/manager/storage/groups/GroupInfoV2.java @@ -24,6 +24,7 @@ public final class GroupInfoV2 extends GroupInfo { private final DistributionId distributionId; private boolean blocked; private DecryptedGroup group; + private byte[] storageRecord; private boolean permissionDenied; private final RecipientResolver recipientResolver; @@ -44,6 +45,7 @@ public final class GroupInfoV2 extends GroupInfo { final DistributionId distributionId, final boolean blocked, final boolean permissionDenied, + final byte[] storageRecord, final RecipientResolver recipientResolver ) { this.groupId = groupId; @@ -52,6 +54,7 @@ public final class GroupInfoV2 extends GroupInfo { this.distributionId = distributionId; this.blocked = blocked; this.permissionDenied = permissionDenied; + this.storageRecord = storageRecord; this.recipientResolver = recipientResolver; } @@ -64,6 +67,10 @@ public final class GroupInfoV2 extends GroupInfo { return masterKey; } + public byte[] getStorageRecord() { + return storageRecord; + } + public DistributionId getDistributionId() { return distributionId; } diff --git a/lib/src/main/java/org/asamk/signal/manager/storage/groups/GroupStore.java b/lib/src/main/java/org/asamk/signal/manager/storage/groups/GroupStore.java index 33bb2532..e7cf5752 100644 --- a/lib/src/main/java/org/asamk/signal/manager/storage/groups/GroupStore.java +++ b/lib/src/main/java/org/asamk/signal/manager/storage/groups/GroupStore.java @@ -9,6 +9,7 @@ import org.asamk.signal.manager.storage.Utils; import org.asamk.signal.manager.storage.recipients.RecipientId; import org.asamk.signal.manager.storage.recipients.RecipientIdCreator; import org.asamk.signal.manager.storage.recipients.RecipientResolver; +import org.asamk.signal.manager.util.KeyUtils; import org.signal.libsignal.zkgroup.InvalidInputException; import org.signal.libsignal.zkgroup.groups.GroupMasterKey; import org.signal.libsignal.zkgroup.groups.GroupSecretParams; @@ -16,6 +17,7 @@ import org.signal.storageservice.protos.groups.local.DecryptedGroup; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.whispersystems.signalservice.api.push.DistributionId; +import org.whispersystems.signalservice.api.storage.StorageId; import org.whispersystems.signalservice.api.util.UuidUtil; import java.io.IOException; @@ -23,9 +25,11 @@ import java.sql.Connection; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Types; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.Set; import java.util.stream.Collectors; @@ -48,6 +52,8 @@ public class GroupStore { statement.executeUpdate(""" CREATE TABLE group_v2 ( _id INTEGER PRIMARY KEY, + storage_id BLOB UNIQUE, + storage_record BLOB, group_id BLOB UNIQUE NOT NULL, master_key BLOB NOT NULL, group_data BLOB, @@ -57,6 +63,8 @@ public class GroupStore { ) STRICT; CREATE TABLE group_v1 ( _id INTEGER PRIMARY KEY, + storage_id BLOB UNIQUE, + storage_record BLOB, group_id BLOB UNIQUE NOT NULL, group_id_v2 BLOB UNIQUE, name TEXT, @@ -111,6 +119,28 @@ public class GroupStore { insertOrReplaceGroup(connection, internalId, group); } + public void storeStorageRecord( + final Connection connection, final GroupId groupId, final StorageId storageId, final byte[] storageRecord + ) throws SQLException { + final var sql = ( + """ + UPDATE %s + SET storage_id = ?, storage_record = ? + WHERE group_id = ? + """ + ).formatted(groupId instanceof GroupIdV1 ? TABLE_GROUP_V1 : TABLE_GROUP_V2); + try (final var statement = connection.prepareStatement(sql)) { + statement.setBytes(1, storageId.getRaw()); + if (storageRecord == null) { + statement.setNull(2, Types.BLOB); + } else { + statement.setBytes(2, storageRecord); + } + statement.setBytes(3, groupId.serialize()); + statement.executeUpdate(); + } + } + public void deleteGroup(GroupId groupId) { if (groupId instanceof GroupIdV1 groupIdV1) { deleteGroup(groupIdV1); @@ -249,6 +279,34 @@ public class GroupStore { return Stream.concat(getGroupsV2().stream(), getGroupsV1().stream()).toList(); } + public List getGroupV1Ids(Connection connection) throws SQLException { + final var sql = ( + """ + SELECT g.group_id + FROM %s g + """ + ).formatted(TABLE_GROUP_V1); + try (final var statement = connection.prepareStatement(sql)) { + return Utils.executeQueryForStream(statement, this::getGroupIdV1FromResultSet) + .filter(Objects::nonNull) + .toList(); + } + } + + public List getGroupV2Ids(Connection connection) throws SQLException { + final var sql = ( + """ + SELECT g.group_id + FROM %s g + """ + ).formatted(TABLE_GROUP_V2); + try (final var statement = connection.prepareStatement(sql)) { + return Utils.executeQueryForStream(statement, this::getGroupIdV2FromResultSet) + .filter(Objects::nonNull) + .toList(); + } + } + public void mergeRecipients( final Connection connection, final RecipientId recipientId, final RecipientId toBeMergedRecipientId ) throws SQLException { @@ -269,6 +327,106 @@ public class GroupStore { } } + public List getStorageIds(Connection connection) throws SQLException { + final var storageIds = new ArrayList(); + final var sql = """ + SELECT g.storage_id + FROM %s g WHERE g.storage_id IS NOT NULL + """; + try (final var statement = connection.prepareStatement(sql.formatted(TABLE_GROUP_V1))) { + Utils.executeQueryForStream(statement, this::getGroupV1StorageIdFromResultSet).forEach(storageIds::add); + } + try (final var statement = connection.prepareStatement(sql.formatted(TABLE_GROUP_V2))) { + Utils.executeQueryForStream(statement, this::getGroupV2StorageIdFromResultSet).forEach(storageIds::add); + } + return storageIds; + } + + public void updateStorageIds( + Connection connection, Map storageIdV1Map, Map storageIdV2Map + ) throws SQLException { + final var sql = ( + """ + UPDATE %s + SET storage_id = ? + WHERE group_id = ? + """ + ); + try (final var statement = connection.prepareStatement(sql.formatted(TABLE_GROUP_V1))) { + for (final var entry : storageIdV1Map.entrySet()) { + statement.setBytes(1, entry.getValue().getRaw()); + statement.setBytes(2, entry.getKey().serialize()); + statement.executeUpdate(); + } + } + try (final var statement = connection.prepareStatement(sql.formatted(TABLE_GROUP_V2))) { + for (final var entry : storageIdV2Map.entrySet()) { + statement.setBytes(1, entry.getValue().getRaw()); + statement.setBytes(2, entry.getKey().serialize()); + statement.executeUpdate(); + } + } + } + + public void updateStorageId( + Connection connection, GroupId groupId, StorageId storageId + ) throws SQLException { + final var sqlV1 = ( + """ + UPDATE %s + SET storage_id = ? + WHERE group_id = ? + """ + ).formatted(groupId instanceof GroupIdV1 ? TABLE_GROUP_V1 : TABLE_GROUP_V2); + try (final var statement = connection.prepareStatement(sqlV1)) { + statement.setBytes(1, storageId.getRaw()); + statement.setBytes(2, groupId.serialize()); + statement.executeUpdate(); + } + } + + public void setMissingStorageIds() { + final var selectSql = ( + """ + SELECT g.group_id + FROM %s g + WHERE g.storage_id IS NULL + """ + ); + final var updateSql = ( + """ + UPDATE %s + SET storage_id = ? + WHERE group_id = ? + """ + ); + try (final var connection = database.getConnection()) { + connection.setAutoCommit(false); + try (final var selectStmt = connection.prepareStatement(selectSql.formatted(TABLE_GROUP_V1))) { + final var groupIds = Utils.executeQueryForStream(selectStmt, this::getGroupIdV1FromResultSet).toList(); + try (final var updateStmt = connection.prepareStatement(updateSql.formatted(TABLE_GROUP_V1))) { + for (final var groupId : groupIds) { + updateStmt.setBytes(1, KeyUtils.createRawStorageId()); + updateStmt.setBytes(2, groupId.serialize()); + } + } + } + try (final var selectStmt = connection.prepareStatement(selectSql.formatted(TABLE_GROUP_V2))) { + final var groupIds = Utils.executeQueryForStream(selectStmt, this::getGroupIdV2FromResultSet).toList(); + try (final var updateStmt = connection.prepareStatement(updateSql.formatted(TABLE_GROUP_V2))) { + for (final var groupId : groupIds) { + updateStmt.setBytes(1, KeyUtils.createRawStorageId()); + updateStmt.setBytes(2, groupId.serialize()); + updateStmt.executeUpdate(); + } + } + } + connection.commit(); + } catch (SQLException e) { + throw new RuntimeException("Failed update group store", e); + } + } + void addLegacyGroups(final Collection groups) { logger.debug("Migrating legacy groups to database"); long start = System.nanoTime(); @@ -296,8 +454,8 @@ public class GroupStore { } } final var sql = """ - INSERT OR REPLACE INTO %s (_id, group_id, group_id_v2, name, color, expiration_time, blocked, archived) - VALUES (?, ?, ?, ?, ?, ?, ?, ?) + INSERT OR REPLACE INTO %s (_id, group_id, group_id_v2, name, color, expiration_time, blocked, archived, storage_id) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) RETURNING _id """.formatted(TABLE_GROUP_V1); try (final var statement = connection.prepareStatement(sql)) { @@ -313,6 +471,7 @@ public class GroupStore { statement.setLong(6, groupV1.getMessageExpirationTimer()); statement.setBoolean(7, groupV1.isBlocked()); statement.setBoolean(8, groupV1.archived); + statement.setBytes(9, KeyUtils.createRawStorageId()); final var generatedKey = Utils.executeQueryForOptional(statement, Utils::getIdMapper); if (internalId == null) { @@ -337,8 +496,8 @@ public class GroupStore { } else if (group instanceof GroupInfoV2 groupV2) { final var sql = ( """ - INSERT OR REPLACE INTO %s (_id, group_id, master_key, group_data, distribution_id, blocked, distribution_id) - VALUES (?, ?, ?, ?, ?, ?, ?) + INSERT OR REPLACE INTO %s (_id, group_id, master_key, group_data, distribution_id, blocked, distribution_id, storage_id) + VALUES (?, ?, ?, ?, ?, ?, ?, ?) """ ).formatted(TABLE_GROUP_V2); try (final var statement = connection.prepareStatement(sql)) { @@ -357,6 +516,7 @@ public class GroupStore { statement.setBytes(5, UuidUtil.toByteArray(groupV2.getDistributionId().asUuid())); statement.setBoolean(6, groupV2.isBlocked()); statement.setBoolean(7, groupV2.isPermissionDenied()); + statement.setBytes(8, KeyUtils.createRawStorageId()); statement.executeUpdate(); } } else { @@ -367,7 +527,7 @@ public class GroupStore { private List getGroupsV2() { final var sql = ( """ - SELECT g.group_id, g.master_key, g.group_data, g.distribution_id, g.blocked, g.permission_denied + SELECT g.group_id, g.master_key, g.group_data, g.distribution_id, g.blocked, g.permission_denied, g.storage_record FROM %s g """ ).formatted(TABLE_GROUP_V2); @@ -382,20 +542,59 @@ public class GroupStore { } } - private GroupInfoV2 getGroup(Connection connection, GroupIdV2 groupIdV2) throws SQLException { + public GroupInfoV2 getGroup(Connection connection, GroupIdV2 groupIdV2) throws SQLException { + final var sql = ( + """ + SELECT g.group_id, g.master_key, g.group_data, g.distribution_id, g.blocked, g.permission_denied, g.storage_record + FROM %s g + WHERE g.group_id = ? + """ + ).formatted(TABLE_GROUP_V2); + try (final var statement = connection.prepareStatement(sql)) { + statement.setBytes(1, groupIdV2.serialize()); + return Utils.executeQueryForOptional(statement, this::getGroupInfoV2FromResultSet).orElse(null); + } + } + + public StorageId getGroupStorageId(Connection connection, GroupIdV2 groupIdV2) throws SQLException { final var sql = ( """ - SELECT g.group_id, g.master_key, g.group_data, g.distribution_id, g.blocked, g.permission_denied + SELECT g.storage_id FROM %s g WHERE g.group_id = ? """ ).formatted(TABLE_GROUP_V2); try (final var statement = connection.prepareStatement(sql)) { statement.setBytes(1, groupIdV2.serialize()); + final var storageId = Utils.executeQueryForOptional(statement, this::getGroupV2StorageIdFromResultSet); + if (storageId.isPresent()) { + return storageId.get(); + } + } + final var newStorageId = StorageId.forGroupV2(KeyUtils.createRawStorageId()); + updateStorageId(connection, groupIdV2, newStorageId); + return newStorageId; + } + + public GroupInfoV2 getGroupV2(Connection connection, StorageId storageId) throws SQLException { + final var sql = ( + """ + SELECT g.group_id, g.master_key, g.group_data, g.distribution_id, g.blocked, g.permission_denied, g.storage_record + FROM %s g + WHERE g.storage_id = ? + """ + ).formatted(TABLE_GROUP_V2); + try (final var statement = connection.prepareStatement(sql)) { + statement.setBytes(1, storageId.getRaw()); return Utils.executeQueryForOptional(statement, this::getGroupInfoV2FromResultSet).orElse(null); } } + private GroupIdV2 getGroupIdV2FromResultSet(ResultSet resultSet) throws SQLException { + final var groupId = resultSet.getBytes("group_id"); + return GroupId.v2(groupId); + } + private GroupInfoV2 getGroupInfoV2FromResultSet(ResultSet resultSet) throws SQLException { try { final var groupId = resultSet.getBytes("group_id"); @@ -404,22 +603,38 @@ public class GroupStore { final var distributionId = resultSet.getBytes("distribution_id"); final var blocked = resultSet.getBoolean("blocked"); final var permissionDenied = resultSet.getBoolean("permission_denied"); + final var storageRecord = resultSet.getBytes("storage_record"); return new GroupInfoV2(GroupId.v2(groupId), new GroupMasterKey(masterKey), groupData == null ? null : DecryptedGroup.ADAPTER.decode(groupData), DistributionId.from(UuidUtil.parseOrThrow(distributionId)), blocked, permissionDenied, + storageRecord, recipientResolver); } catch (InvalidInputException | IOException e) { return null; } } + private StorageId getGroupV1StorageIdFromResultSet(ResultSet resultSet) throws SQLException { + final var storageId = resultSet.getBytes("storage_id"); + return storageId == null + ? StorageId.forGroupV1(KeyUtils.createRawStorageId()) + : StorageId.forGroupV1(storageId); + } + + private StorageId getGroupV2StorageIdFromResultSet(ResultSet resultSet) throws SQLException { + final var storageId = resultSet.getBytes("storage_id"); + return storageId == null + ? StorageId.forGroupV2(KeyUtils.createRawStorageId()) + : StorageId.forGroupV2(storageId); + } + private List getGroupsV1() { final var sql = ( """ - SELECT g.group_id, g.group_id_v2, g.name, g.color, (select group_concat(gm.recipient_id) from %s gm where gm.group_id = g._id) as members, g.expiration_time, g.blocked, g.archived + SELECT g.group_id, g.group_id_v2, g.name, g.color, (select group_concat(gm.recipient_id) from %s gm where gm.group_id = g._id) as members, g.expiration_time, g.blocked, g.archived, g.storage_record FROM %s g """ ).formatted(TABLE_GROUP_V1_MEMBER, TABLE_GROUP_V1); @@ -434,10 +649,10 @@ public class GroupStore { } } - private GroupInfoV1 getGroup(Connection connection, GroupIdV1 groupIdV1) throws SQLException { + public GroupInfoV1 getGroup(Connection connection, GroupIdV1 groupIdV1) throws SQLException { final var sql = ( """ - SELECT g.group_id, g.group_id_v2, g.name, g.color, (select group_concat(gm.recipient_id) from %s gm where gm.group_id = g._id) as members, g.expiration_time, g.blocked, g.archived + SELECT g.group_id, g.group_id_v2, g.name, g.color, (select group_concat(gm.recipient_id) from %s gm where gm.group_id = g._id) as members, g.expiration_time, g.blocked, g.archived, g.storage_record FROM %s g WHERE g.group_id = ? """ @@ -448,6 +663,45 @@ public class GroupStore { } } + public StorageId getGroupStorageId(Connection connection, GroupIdV1 groupIdV1) throws SQLException { + final var sql = ( + """ + SELECT g.storage_id + FROM %s g + WHERE g.group_id = ? + """ + ).formatted(TABLE_GROUP_V1); + try (final var statement = connection.prepareStatement(sql)) { + statement.setBytes(1, groupIdV1.serialize()); + final var storageId = Utils.executeQueryForOptional(statement, this::getGroupV1StorageIdFromResultSet); + if (storageId.isPresent()) { + return storageId.get(); + } + } + final var newStorageId = StorageId.forGroupV1(KeyUtils.createRawStorageId()); + updateStorageId(connection, groupIdV1, newStorageId); + return newStorageId; + } + + public GroupInfoV1 getGroupV1(Connection connection, StorageId storageId) throws SQLException { + final var sql = ( + """ + SELECT g.group_id, g.group_id_v2, g.name, g.color, (select group_concat(gm.recipient_id) from %s gm where gm.group_id = g._id) as members, g.expiration_time, g.blocked, g.archived, g.storage_record + FROM %s g + WHERE g.storage_id = ? + """ + ).formatted(TABLE_GROUP_V1_MEMBER, TABLE_GROUP_V1); + try (final var statement = connection.prepareStatement(sql)) { + statement.setBytes(1, storageId.getRaw()); + return Utils.executeQueryForOptional(statement, this::getGroupInfoV1FromResultSet).orElse(null); + } + } + + private GroupIdV1 getGroupIdV1FromResultSet(ResultSet resultSet) throws SQLException { + final var groupId = resultSet.getBytes("group_id"); + return GroupId.v1(groupId); + } + private GroupInfoV1 getGroupInfoV1FromResultSet(ResultSet resultSet) throws SQLException { final var groupId = resultSet.getBytes("group_id"); final var groupIdV2 = resultSet.getBytes("group_id_v2"); @@ -463,6 +717,7 @@ public class GroupStore { final var expirationTime = resultSet.getInt("expiration_time"); final var blocked = resultSet.getBoolean("blocked"); final var archived = resultSet.getBoolean("archived"); + final var storagRecord = resultSet.getBytes("storage_record"); return new GroupInfoV1(GroupId.v1(groupId), groupIdV2 == null ? null : GroupId.v2(groupIdV2), name, @@ -470,7 +725,8 @@ public class GroupStore { color, expirationTime, blocked, - archived); + archived, + storagRecord); } private GroupInfoV2 getGroupV2ByV1Id(final Connection connection, final GroupIdV1 groupId) throws SQLException { @@ -480,7 +736,7 @@ public class GroupStore { private GroupInfoV1 getGroupV1ByV2Id(Connection connection, GroupIdV2 groupIdV2) throws SQLException { final var sql = ( """ - SELECT g.group_id, g.group_id_v2, g.name, g.color, (select group_concat(gm.recipient_id) from %s gm where gm.group_id = g._id) as members, g.expiration_time, g.blocked, g.archived + SELECT g.group_id, g.group_id_v2, g.name, g.color, (select group_concat(gm.recipient_id) from %s gm where gm.group_id = g._id) as members, g.expiration_time, g.blocked, g.archived, g.storage_record FROM %s g WHERE g.group_id_v2 = ? """ diff --git a/lib/src/main/java/org/asamk/signal/manager/storage/groups/LegacyGroupStore.java b/lib/src/main/java/org/asamk/signal/manager/storage/groups/LegacyGroupStore.java index a87759b3..a35d4513 100644 --- a/lib/src/main/java/org/asamk/signal/manager/storage/groups/LegacyGroupStore.java +++ b/lib/src/main/java/org/asamk/signal/manager/storage/groups/LegacyGroupStore.java @@ -59,7 +59,8 @@ public class LegacyGroupStore { g1.color, g1.messageExpirationTime, g1.blocked, - g1.archived); + g1.archived, + null); } final var g2 = (Storage.GroupV2) g; @@ -77,6 +78,7 @@ public class LegacyGroupStore { g2.distributionId == null ? DistributionId.create() : DistributionId.from(g2.distributionId), g2.blocked, g2.permissionDenied, + null, recipientResolver); }).toList(); diff --git a/lib/src/main/java/org/asamk/signal/manager/storage/identities/IdentityKeyStore.java b/lib/src/main/java/org/asamk/signal/manager/storage/identities/IdentityKeyStore.java index bcb40669..a4b355ea 100644 --- a/lib/src/main/java/org/asamk/signal/manager/storage/identities/IdentityKeyStore.java +++ b/lib/src/main/java/org/asamk/signal/manager/storage/identities/IdentityKeyStore.java @@ -4,6 +4,7 @@ import org.asamk.signal.manager.api.TrustLevel; import org.asamk.signal.manager.api.TrustNewIdentity; import org.asamk.signal.manager.storage.Database; import org.asamk.signal.manager.storage.Utils; +import org.asamk.signal.manager.storage.recipients.RecipientStore; import org.signal.libsignal.protocol.IdentityKey; import org.signal.libsignal.protocol.InvalidKeyException; import org.signal.libsignal.protocol.state.IdentityKeyStore.Direction; @@ -27,6 +28,7 @@ public class IdentityKeyStore { private static final String TABLE_IDENTITY = "identity"; private final Database database; private final TrustNewIdentity trustNewIdentity; + private final RecipientStore recipientStore; private final PublishSubject identityChanges = PublishSubject.create(); private boolean isRetryingDecryption = false; @@ -46,9 +48,12 @@ public class IdentityKeyStore { } } - public IdentityKeyStore(final Database database, final TrustNewIdentity trustNewIdentity) { + public IdentityKeyStore( + final Database database, final TrustNewIdentity trustNewIdentity, RecipientStore recipientStore + ) { this.database = database; this.trustNewIdentity = trustNewIdentity; + this.recipientStore = recipientStore; } public Observable getIdentityChanges() { @@ -59,58 +64,79 @@ public class IdentityKeyStore { return saveIdentity(serviceId.toString(), identityKey); } + public boolean saveIdentity( + final Connection connection, final ServiceId serviceId, final IdentityKey identityKey + ) throws SQLException { + return saveIdentity(connection, serviceId.toString(), identityKey); + } + boolean saveIdentity(final String address, final IdentityKey identityKey) { if (isRetryingDecryption) { return false; } try (final var connection = database.getConnection()) { - final var identityInfo = loadIdentity(connection, address); - if (identityInfo != null && identityInfo.getIdentityKey().equals(identityKey)) { - // Identity already exists, not updating the trust level - logger.trace("Not storing new identity for recipient {}, identity already stored", address); - return false; - } - - saveNewIdentity(connection, address, identityKey, identityInfo == null); - return true; + return saveIdentity(connection, address, identityKey); } catch (SQLException e) { throw new RuntimeException("Failed update identity store", e); } } + private boolean saveIdentity( + final Connection connection, final String address, final IdentityKey identityKey + ) throws SQLException { + final var identityInfo = loadIdentity(connection, address); + if (identityInfo != null && identityInfo.getIdentityKey().equals(identityKey)) { + // Identity already exists, not updating the trust level + logger.trace("Not storing new identity for recipient {}, identity already stored", address); + return false; + } + + saveNewIdentity(connection, address, identityKey, identityInfo == null); + return true; + } + public void setRetryingDecryption(final boolean retryingDecryption) { isRetryingDecryption = retryingDecryption; } public boolean setIdentityTrustLevel(ServiceId serviceId, IdentityKey identityKey, TrustLevel trustLevel) { try (final var connection = database.getConnection()) { - final var address = serviceId.toString(); - final var identityInfo = loadIdentity(connection, address); - if (identityInfo == null) { - logger.debug("Not updating trust level for recipient {}, identity not found", serviceId); - return false; - } - if (!identityInfo.getIdentityKey().equals(identityKey)) { - logger.debug("Not updating trust level for recipient {}, different identity found", serviceId); - return false; - } - if (identityInfo.getTrustLevel() == trustLevel) { - logger.trace("Not updating trust level for recipient {}, trust level already matches", serviceId); - return false; - } - - logger.debug("Updating trust level for recipient {} with trust {}", serviceId, trustLevel); - final var newIdentityInfo = new IdentityInfo(address, - identityKey, - trustLevel, - identityInfo.getDateAddedTimestamp()); - storeIdentity(connection, newIdentityInfo); - return true; + return setIdentityTrustLevel(connection, serviceId, identityKey, trustLevel); } catch (SQLException e) { throw new RuntimeException("Failed update identity store", e); } } + public boolean setIdentityTrustLevel( + final Connection connection, + final ServiceId serviceId, + final IdentityKey identityKey, + final TrustLevel trustLevel + ) throws SQLException { + final var address = serviceId.toString(); + final var identityInfo = loadIdentity(connection, address); + if (identityInfo == null) { + logger.debug("Not updating trust level for recipient {}, identity not found", serviceId); + return false; + } + if (!identityInfo.getIdentityKey().equals(identityKey)) { + logger.debug("Not updating trust level for recipient {}, different identity found", serviceId); + return false; + } + if (identityInfo.getTrustLevel() == trustLevel) { + logger.trace("Not updating trust level for recipient {}, trust level already matches", serviceId); + return false; + } + + logger.debug("Updating trust level for recipient {} with trust {}", serviceId, trustLevel); + final var newIdentityInfo = new IdentityInfo(address, + identityKey, + trustLevel, + identityInfo.getDateAddedTimestamp()); + storeIdentity(connection, newIdentityInfo); + return true; + } + public boolean isTrustedIdentity(ServiceId serviceId, IdentityKey identityKey, Direction direction) { return isTrustedIdentity(serviceId.toString(), identityKey, direction); } @@ -159,6 +185,10 @@ public class IdentityKeyStore { } } + public IdentityInfo getIdentityInfo(Connection connection, String address) throws SQLException { + return loadIdentity(connection, address); + } + public List getIdentities() { try (final var connection = database.getConnection()) { final var sql = ( @@ -252,6 +282,7 @@ public class IdentityKeyStore { statement.setInt(4, identityInfo.getTrustLevel().ordinal()); statement.executeUpdate(); } + recipientStore.rotateStorageId(connection, identityInfo.getServiceId()); } private void deleteIdentity(final Connection connection, final String address) throws SQLException { diff --git a/lib/src/main/java/org/asamk/signal/manager/storage/keyValue/KeyValueStore.java b/lib/src/main/java/org/asamk/signal/manager/storage/keyValue/KeyValueStore.java index 9addcc7d..c51f32f1 100644 --- a/lib/src/main/java/org/asamk/signal/manager/storage/keyValue/KeyValueStore.java +++ b/lib/src/main/java/org/asamk/signal/manager/storage/keyValue/KeyValueStore.java @@ -10,6 +10,7 @@ import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Types; +import java.util.Objects; public class KeyValueStore { @@ -43,9 +44,9 @@ public class KeyValueStore { } } - public void storeEntry(KeyValueEntry key, T value) { + public boolean storeEntry(KeyValueEntry key, T value) { try (final var connection = database.getConnection()) { - storeEntry(connection, key, value); + return storeEntry(connection, key, value); } catch (SQLException e) { throw new RuntimeException("Failed update key_value store", e); } @@ -72,9 +73,14 @@ public class KeyValueStore { } } - private void storeEntry( + public boolean storeEntry( final Connection connection, final KeyValueEntry key, final T value ) throws SQLException { + final var entry = getEntry(key); + if (Objects.equals(entry, value)) { + return false; + } + final var sql = ( """ INSERT INTO %s (key, value) @@ -87,6 +93,7 @@ public class KeyValueStore { setParameterValue(statement, 2, key.clazz(), value); statement.executeUpdate(); } + return true; } @SuppressWarnings("unchecked") diff --git a/lib/src/main/java/org/asamk/signal/manager/storage/recipients/LegacyRecipientStore2.java b/lib/src/main/java/org/asamk/signal/manager/storage/recipients/LegacyRecipientStore2.java index 55231cb7..02061a66 100644 --- a/lib/src/main/java/org/asamk/signal/manager/storage/recipients/LegacyRecipientStore2.java +++ b/lib/src/main/java/org/asamk/signal/manager/storage/recipients/LegacyRecipientStore2.java @@ -83,7 +83,13 @@ public class LegacyRecipientStore2 { .collect(Collectors.toSet())); } - return new Recipient(recipientId, address, contact, profileKey, expiringProfileKeyCredential, profile); + return new Recipient(recipientId, + address, + contact, + profileKey, + expiringProfileKeyCredential, + profile, + null); }).collect(Collectors.toMap(Recipient::getRecipientId, r -> r)); recipientStore.addLegacyRecipients(recipients); diff --git a/lib/src/main/java/org/asamk/signal/manager/storage/recipients/Recipient.java b/lib/src/main/java/org/asamk/signal/manager/storage/recipients/Recipient.java index 3790ecde..1d5fb9c8 100644 --- a/lib/src/main/java/org/asamk/signal/manager/storage/recipients/Recipient.java +++ b/lib/src/main/java/org/asamk/signal/manager/storage/recipients/Recipient.java @@ -21,13 +21,16 @@ public class Recipient { private final Profile profile; + private final byte[] storageRecord; + public Recipient( final RecipientId recipientId, final RecipientAddress address, final Contact contact, final ProfileKey profileKey, final ExpiringProfileKeyCredential expiringProfileKeyCredential, - final Profile profile + final Profile profile, + final byte[] storageRecord ) { this.recipientId = recipientId; this.address = address; @@ -35,6 +38,7 @@ public class Recipient { this.profileKey = profileKey; this.expiringProfileKeyCredential = expiringProfileKeyCredential; this.profile = profile; + this.storageRecord = storageRecord; } private Recipient(final Builder builder) { @@ -42,8 +46,9 @@ public class Recipient { address = builder.address; contact = builder.contact; profileKey = builder.profileKey; - expiringProfileKeyCredential = builder.expiringProfileKeyCredential1; + expiringProfileKeyCredential = builder.expiringProfileKeyCredential; profile = builder.profile; + storageRecord = builder.storageRecord; } public static Builder newBuilder() { @@ -56,8 +61,9 @@ public class Recipient { builder.address = copy.getAddress(); builder.contact = copy.getContact(); builder.profileKey = copy.getProfileKey(); - builder.expiringProfileKeyCredential1 = copy.getExpiringProfileKeyCredential(); + builder.expiringProfileKeyCredential = copy.getExpiringProfileKeyCredential(); builder.profile = copy.getProfile(); + builder.storageRecord = copy.getStorageRecord(); return builder; } @@ -85,6 +91,10 @@ public class Recipient { return profile; } + public byte[] getStorageRecord() { + return storageRecord; + } + @Override public boolean equals(final Object o) { if (this == o) return true; @@ -109,8 +119,9 @@ public class Recipient { private RecipientAddress address; private Contact contact; private ProfileKey profileKey; - private ExpiringProfileKeyCredential expiringProfileKeyCredential1; + private ExpiringProfileKeyCredential expiringProfileKeyCredential; private Profile profile; + private byte[] storageRecord; private Builder() { } @@ -136,7 +147,7 @@ public class Recipient { } public Builder withExpiringProfileKeyCredential(final ExpiringProfileKeyCredential val) { - expiringProfileKeyCredential1 = val; + expiringProfileKeyCredential = val; return this; } @@ -145,6 +156,11 @@ public class Recipient { return this; } + public Builder withStorageRecord(final byte[] val) { + storageRecord = val; + return this; + } + public Recipient build() { return new Recipient(this); } diff --git a/lib/src/main/java/org/asamk/signal/manager/storage/recipients/RecipientAddress.java b/lib/src/main/java/org/asamk/signal/manager/storage/recipients/RecipientAddress.java index 86ada86a..e4c24c99 100644 --- a/lib/src/main/java/org/asamk/signal/manager/storage/recipients/RecipientAddress.java +++ b/lib/src/main/java/org/asamk/signal/manager/storage/recipients/RecipientAddress.java @@ -89,6 +89,10 @@ public record RecipientAddress( address.username.equals(this.username) ? Optional.empty() : this.username); } + public Optional aci() { + return serviceId.map(s -> s instanceof ServiceId.ACI aci ? aci : null); + } + public String getIdentifier() { if (serviceId.isPresent()) { return serviceId.get().toString(); diff --git a/lib/src/main/java/org/asamk/signal/manager/storage/recipients/RecipientStore.java b/lib/src/main/java/org/asamk/signal/manager/storage/recipients/RecipientStore.java index 96a8f224..926f09bd 100644 --- a/lib/src/main/java/org/asamk/signal/manager/storage/recipients/RecipientStore.java +++ b/lib/src/main/java/org/asamk/signal/manager/storage/recipients/RecipientStore.java @@ -8,6 +8,7 @@ import org.asamk.signal.manager.storage.Database; import org.asamk.signal.manager.storage.Utils; import org.asamk.signal.manager.storage.contacts.ContactsStore; import org.asamk.signal.manager.storage.profiles.ProfileStore; +import org.asamk.signal.manager.util.KeyUtils; import org.signal.libsignal.zkgroup.InvalidInputException; import org.signal.libsignal.zkgroup.profiles.ExpiringProfileKeyCredential; import org.signal.libsignal.zkgroup.profiles.ProfileKey; @@ -17,11 +18,13 @@ import org.whispersystems.signalservice.api.push.ServiceId; import org.whispersystems.signalservice.api.push.ServiceId.ACI; import org.whispersystems.signalservice.api.push.ServiceId.PNI; import org.whispersystems.signalservice.api.push.SignalServiceAddress; +import org.whispersystems.signalservice.api.storage.StorageId; import org.whispersystems.signalservice.api.util.UuidUtil; import java.sql.Connection; import java.sql.ResultSet; import java.sql.SQLException; +import java.sql.Types; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -56,6 +59,8 @@ public class RecipientStore implements RecipientIdCreator, RecipientResolver, Re statement.executeUpdate(""" CREATE TABLE recipient ( _id INTEGER PRIMARY KEY AUTOINCREMENT, + storage_id BLOB UNIQUE, + storage_record BLOB, number TEXT UNIQUE, username TEXT UNIQUE, uuid BLOB UNIQUE, @@ -273,6 +278,10 @@ public class RecipientStore implements RecipientIdCreator, RecipientResolver, Re } } + public RecipientId resolveRecipient(Connection connection, RecipientAddress address) throws SQLException { + return resolveRecipientLocked(connection, address); + } + @Override public RecipientId resolveSelfRecipientTrusted(RecipientAddress address) { return resolveRecipientTrusted(address, true); @@ -283,6 +292,14 @@ public class RecipientStore implements RecipientIdCreator, RecipientResolver, Re return resolveRecipientTrusted(address, false); } + public RecipientId resolveRecipientTrusted(Connection connection, RecipientAddress address) throws SQLException { + final var pair = resolveRecipientTrustedLocked(connection, address, false); + if (!pair.second().isEmpty()) { + mergeRecipients(connection, pair.first(), pair.second()); + } + return pair.first(); + } + @Override public RecipientId resolveRecipientTrusted(SignalServiceAddress address) { return resolveRecipientTrusted(new RecipientAddress(address)); @@ -341,6 +358,44 @@ public class RecipientStore implements RecipientIdCreator, RecipientResolver, Re } } + public Recipient getRecipient(Connection connection, RecipientId recipientId) throws SQLException { + final var sql = ( + """ + SELECT r._id, + r.number, r.uuid, r.pni, r.username, + r.profile_key, r.profile_key_credential, + r.given_name, r.family_name, r.expiration_time, r.profile_sharing, r.color, r.blocked, r.archived, r.hidden, + r.profile_last_update_timestamp, r.profile_given_name, r.profile_family_name, r.profile_about, r.profile_about_emoji, r.profile_avatar_url_path, r.profile_mobile_coin_address, r.profile_unidentified_access_mode, r.profile_capabilities, + r.storage_record + FROM %s r + WHERE r._id = ? + """ + ).formatted(TABLE_RECIPIENT); + try (final var statement = connection.prepareStatement(sql)) { + statement.setLong(1, recipientId.id()); + return Utils.executeQuerySingleRow(statement, this::getRecipientFromResultSet); + } + } + + public Recipient getRecipient(Connection connection, StorageId storageId) throws SQLException { + final var sql = ( + """ + SELECT r._id, + r.number, r.uuid, r.pni, r.username, + r.profile_key, r.profile_key_credential, + r.given_name, r.family_name, r.expiration_time, r.profile_sharing, r.color, r.blocked, r.archived, r.hidden, + r.profile_last_update_timestamp, r.profile_given_name, r.profile_family_name, r.profile_about, r.profile_about_emoji, r.profile_avatar_url_path, r.profile_mobile_coin_address, r.profile_unidentified_access_mode, r.profile_capabilities, + r.storage_record + FROM %s r + WHERE r.storage_id = ? + """ + ).formatted(TABLE_RECIPIENT); + try (final var statement = connection.prepareStatement(sql)) { + statement.setBytes(1, storageId.getRaw()); + return Utils.executeQuerySingleRow(statement, this::getRecipientFromResultSet); + } + } + public List getRecipients( boolean onlyContacts, Optional blocked, Set recipientIds, Optional name ) { @@ -364,7 +419,8 @@ public class RecipientStore implements RecipientIdCreator, RecipientResolver, Re r.number, r.uuid, r.pni, r.username, r.profile_key, r.profile_key_credential, r.given_name, r.family_name, r.expiration_time, r.profile_sharing, r.color, r.blocked, r.archived, r.hidden, - r.profile_last_update_timestamp, r.profile_given_name, r.profile_family_name, r.profile_about, r.profile_about_emoji, r.profile_avatar_url_path, r.profile_mobile_coin_address, r.profile_unidentified_access_mode, r.profile_capabilities + r.profile_last_update_timestamp, r.profile_given_name, r.profile_family_name, r.profile_about, r.profile_about_emoji, r.profile_avatar_url_path, r.profile_mobile_coin_address, r.profile_unidentified_access_mode, r.profile_capabilities, + r.storage_record FROM %s r WHERE (r.number IS NOT NULL OR r.uuid IS NOT NULL) AND %s """ @@ -447,6 +503,53 @@ public class RecipientStore implements RecipientIdCreator, RecipientResolver, Re } } + public List getRecipientIds(Connection connection) throws SQLException { + final var sql = ( + """ + SELECT r._id + FROM %s r + WHERE (r.number IS NOT NULL OR r.uuid IS NOT NULL) + """ + ).formatted(TABLE_RECIPIENT); + try (final var statement = connection.prepareStatement(sql)) { + return Utils.executeQueryForStream(statement, this::getRecipientIdFromResultSet).toList(); + } + } + + public void setMissingStorageIds() { + final var selectSql = ( + """ + SELECT r._id + FROM %s r + WHERE r.storage_id IS NULL + """ + ).formatted(TABLE_RECIPIENT); + final var updateSql = ( + """ + UPDATE %s + SET storage_id = ? + WHERE _id = ? + """ + ).formatted(TABLE_RECIPIENT); + try (final var connection = database.getConnection()) { + connection.setAutoCommit(false); + try (final var selectStmt = connection.prepareStatement(selectSql)) { + final var recipientIds = Utils.executeQueryForStream(selectStmt, this::getRecipientIdFromResultSet) + .toList(); + try (final var updateStmt = connection.prepareStatement(updateSql)) { + for (final var recipientId : recipientIds) { + updateStmt.setBytes(1, KeyUtils.createRawStorageId()); + updateStmt.setLong(2, recipientId.id()); + updateStmt.executeUpdate(); + } + } + } + connection.commit(); + } catch (SQLException e) { + throw new RuntimeException("Failed update recipient store", e); + } + } + @Override public void deleteContact(RecipientId recipientId) { storeContact(recipientId, null); @@ -509,12 +612,18 @@ public class RecipientStore implements RecipientIdCreator, RecipientResolver, Re @Override public void storeProfileKey(RecipientId recipientId, final ProfileKey profileKey) { try (final var connection = database.getConnection()) { - storeProfileKey(connection, recipientId, profileKey, true); + storeProfileKey(connection, recipientId, profileKey); } catch (SQLException e) { throw new RuntimeException("Failed update recipient store", e); } } + public void storeProfileKey( + Connection connection, RecipientId recipientId, final ProfileKey profileKey + ) throws SQLException { + storeProfileKey(connection, recipientId, profileKey, true); + } + @Override public void storeExpiringProfileKeyCredential( RecipientId recipientId, final ExpiringProfileKeyCredential profileKeyCredential @@ -526,6 +635,121 @@ public class RecipientStore implements RecipientIdCreator, RecipientResolver, Re } } + public void rotateSelfStorageId() { + try (final var connection = database.getConnection()) { + rotateSelfStorageId(connection); + } catch (SQLException e) { + throw new RuntimeException("Failed update recipient store", e); + } + } + + public void rotateSelfStorageId(final Connection connection) throws SQLException { + final var selfRecipientId = resolveRecipient(connection, selfAddressProvider.getSelfAddress()); + rotateStorageId(connection, selfRecipientId); + } + + public StorageId rotateStorageId(final Connection connection, final ServiceId serviceId) throws SQLException { + final var selfRecipientId = resolveRecipient(connection, new RecipientAddress(serviceId)); + return rotateStorageId(connection, selfRecipientId); + } + + public List getStorageIds(Connection connection) throws SQLException { + final var sql = """ + SELECT r.storage_id + FROM %s r WHERE r.storage_id IS NOT NULL AND r._id != ? AND (r.uuid IS NOT NULL OR r.pni IS NOT NULL) + """.formatted(TABLE_RECIPIENT); + final var selfRecipientId = resolveRecipient(connection, selfAddressProvider.getSelfAddress()); + try (final var statement = connection.prepareStatement(sql)) { + statement.setLong(1, selfRecipientId.id()); + return Utils.executeQueryForStream(statement, this::getContactStorageIdFromResultSet).toList(); + } + } + + public void updateStorageId( + Connection connection, RecipientId recipientId, StorageId storageId + ) throws SQLException { + final var sql = ( + """ + UPDATE %s + SET storage_id = ? + WHERE _id = ? + """ + ).formatted(TABLE_RECIPIENT); + try (final var statement = connection.prepareStatement(sql)) { + statement.setBytes(1, storageId.getRaw()); + statement.setLong(2, recipientId.id()); + statement.executeUpdate(); + } + } + + public void updateStorageIds(Connection connection, Map storageIdMap) throws SQLException { + final var sql = ( + """ + UPDATE %s + SET storage_id = ? + WHERE _id = ? + """ + ).formatted(TABLE_RECIPIENT); + try (final var statement = connection.prepareStatement(sql)) { + for (final var entry : storageIdMap.entrySet()) { + statement.setBytes(1, entry.getValue().getRaw()); + statement.setLong(2, entry.getKey().id()); + statement.executeUpdate(); + } + } + } + + public StorageId getSelfStorageId(final Connection connection) throws SQLException { + final var selfRecipientId = resolveRecipient(connection, selfAddressProvider.getSelfAddress()); + return StorageId.forAccount(getStorageId(connection, selfRecipientId).getRaw()); + } + + public StorageId getStorageId(final Connection connection, final RecipientId recipientId) throws SQLException { + final var sql = """ + SELECT r.storage_id + FROM %s r WHERE r._id = ? AND r.storage_id IS NOT NULL + """.formatted(TABLE_RECIPIENT); + try (final var statement = connection.prepareStatement(sql)) { + statement.setLong(1, recipientId.id()); + final var storageId = Utils.executeQueryForOptional(statement, this::getContactStorageIdFromResultSet); + if (storageId.isPresent()) { + return storageId.get(); + } + } + return rotateStorageId(connection, recipientId); + } + + private StorageId rotateStorageId(final Connection connection, final RecipientId recipientId) throws SQLException { + final var newStorageId = StorageId.forAccount(KeyUtils.createRawStorageId()); + updateStorageId(connection, recipientId, newStorageId); + return newStorageId; + } + + public void storeStorageRecord( + final Connection connection, + final RecipientId recipientId, + final StorageId storageId, + final byte[] storageRecord + ) throws SQLException { + final var sql = ( + """ + UPDATE %s + SET storage_id = ?, storage_record = ? + WHERE _id = ? + """ + ).formatted(TABLE_RECIPIENT); + try (final var statement = connection.prepareStatement(sql)) { + statement.setBytes(1, storageId.getRaw()); + if (storageRecord == null) { + statement.setNull(2, Types.BLOB); + } else { + statement.setBytes(2, storageRecord); + } + statement.setLong(3, recipientId.id()); + statement.executeUpdate(); + } + } + void addLegacyRecipients(final Map recipients) { logger.debug("Migrating legacy recipients to database"); long start = System.nanoTime(); @@ -587,7 +811,7 @@ public class RecipientStore implements RecipientIdCreator, RecipientResolver, Re return recipientId; } - private void storeContact( + public void storeContact( final Connection connection, final RecipientId recipientId, final Contact contact ) throws SQLException { final var sql = ( @@ -608,6 +832,7 @@ public class RecipientStore implements RecipientIdCreator, RecipientResolver, Re statement.setLong(8, recipientId.id()); statement.executeUpdate(); } + rotateStorageId(connection, recipientId); } private void storeExpiringProfileKeyCredential( @@ -629,7 +854,7 @@ public class RecipientStore implements RecipientIdCreator, RecipientResolver, Re } } - private void storeProfile( + public void storeProfile( final Connection connection, final RecipientId recipientId, final Profile profile ) throws SQLException { final var sql = ( @@ -655,6 +880,7 @@ public class RecipientStore implements RecipientIdCreator, RecipientResolver, Re statement.setLong(10, recipientId.id()); statement.executeUpdate(); } + rotateStorageId(connection, recipientId); } private void storeProfileKey( @@ -686,6 +912,7 @@ public class RecipientStore implements RecipientIdCreator, RecipientResolver, Re statement.setLong(2, recipientId.id()); statement.executeUpdate(); } + rotateStorageId(connection, recipientId); } private RecipientId resolveRecipientTrusted(RecipientAddress address, boolean isSelf) { @@ -693,17 +920,7 @@ public class RecipientStore implements RecipientIdCreator, RecipientResolver, Re synchronized (recipientsLock) { try (final var connection = database.getConnection()) { connection.setAutoCommit(false); - if (address.hasSingleIdentifier() || ( - !isSelf && selfAddressProvider.getSelfAddress().matches(address) - )) { - pair = new Pair<>(resolveRecipientLocked(connection, address), List.of()); - } else { - pair = MergeRecipientHelper.resolveRecipientTrustedLocked(new HelperStore(connection), address); - - for (final var toBeMergedRecipientId : pair.second()) { - mergeRecipientsLocked(connection, pair.first(), toBeMergedRecipientId); - } - } + pair = resolveRecipientTrustedLocked(connection, address, isSelf); connection.commit(); } catch (SQLException e) { throw new RuntimeException("Failed update recipient store", e); @@ -712,13 +929,9 @@ public class RecipientStore implements RecipientIdCreator, RecipientResolver, Re if (!pair.second().isEmpty()) { try (final var connection = database.getConnection()) { - for (final var toBeMergedRecipientId : pair.second()) { - recipientMergeHandler.mergeRecipients(connection, pair.first(), toBeMergedRecipientId); - deleteRecipient(connection, toBeMergedRecipientId); - synchronized (recipientsLock) { - recipientAddressCache.entrySet().removeIf(e -> e.getValue().id().equals(toBeMergedRecipientId)); - } - } + connection.setAutoCommit(false); + mergeRecipients(connection, pair.first(), pair.second()); + connection.commit(); } catch (SQLException e) { throw new RuntimeException("Failed update recipient store", e); } @@ -726,15 +939,44 @@ public class RecipientStore implements RecipientIdCreator, RecipientResolver, Re return pair.first(); } + private Pair> resolveRecipientTrustedLocked( + final Connection connection, final RecipientAddress address, final boolean isSelf + ) throws SQLException { + if (address.hasSingleIdentifier() || ( + !isSelf && selfAddressProvider.getSelfAddress().matches(address) + )) { + return new Pair<>(resolveRecipientLocked(connection, address), List.of()); + } else { + final var pair = MergeRecipientHelper.resolveRecipientTrustedLocked(new HelperStore(connection), address); + + for (final var toBeMergedRecipientId : pair.second()) { + mergeRecipientsLocked(connection, pair.first(), toBeMergedRecipientId); + } + return pair; + } + } + + private void mergeRecipients( + final Connection connection, final RecipientId recipientId, final List toBeMergedRecipientIds + ) throws SQLException { + for (final var toBeMergedRecipientId : toBeMergedRecipientIds) { + recipientMergeHandler.mergeRecipients(connection, recipientId, toBeMergedRecipientId); + deleteRecipient(connection, toBeMergedRecipientId); + synchronized (recipientsLock) { + recipientAddressCache.entrySet().removeIf(e -> e.getValue().id().equals(toBeMergedRecipientId)); + } + } + } + private RecipientId resolveRecipientLocked( Connection connection, RecipientAddress address ) throws SQLException { - final var byServiceId = address.serviceId().isEmpty() + final var aci = address.aci().isEmpty() ? Optional.empty() - : findByServiceId(connection, address.serviceId().get()); + : findByServiceId(connection, address.aci().get()); - if (byServiceId.isPresent()) { - return byServiceId.get().id(); + if (aci.isPresent()) { + return aci.get().id(); } final var byPni = address.pni().isEmpty() @@ -796,8 +1038,7 @@ public class RecipientStore implements RecipientIdCreator, RecipientResolver, Re ).formatted(TABLE_RECIPIENT); try (final var statement = connection.prepareStatement(sql)) { statement.setString(1, address.number().orElse(null)); - statement.setBytes(2, - address.serviceId().map(ServiceId::getRawUuid).map(UuidUtil::toByteArray).orElse(null)); + statement.setBytes(2, address.aci().map(ServiceId::getRawUuid).map(UuidUtil::toByteArray).orElse(null)); statement.setBytes(3, address.pni().map(PNI::getRawUuid).map(UuidUtil::toByteArray).orElse(null)); statement.setString(4, address.username().orElse(null)); final var generatedKey = Utils.executeQueryForOptional(statement, Utils::getIdMapper); @@ -817,7 +1058,7 @@ public class RecipientStore implements RecipientIdCreator, RecipientResolver, Re final var sql = ( """ UPDATE %s - SET number = NULL, uuid = NULL, pni = NULL, username = NULL + SET number = NULL, uuid = NULL, pni = NULL, username = NULL, storage_id = NULL WHERE _id = ? """ ).formatted(TABLE_RECIPIENT); @@ -842,13 +1083,13 @@ public class RecipientStore implements RecipientIdCreator, RecipientResolver, Re ).formatted(TABLE_RECIPIENT); try (final var statement = connection.prepareStatement(sql)) { statement.setString(1, address.number().orElse(null)); - statement.setBytes(2, - address.serviceId().map(ServiceId::getRawUuid).map(UuidUtil::toByteArray).orElse(null)); + statement.setBytes(2, address.aci().map(ServiceId::getRawUuid).map(UuidUtil::toByteArray).orElse(null)); statement.setBytes(3, address.pni().map(PNI::getRawUuid).map(UuidUtil::toByteArray).orElse(null)); statement.setString(4, address.username().orElse(null)); statement.setLong(5, recipientId.id()); statement.executeUpdate(); } + rotateStorageId(connection, recipientId); } } @@ -936,9 +1177,9 @@ public class RecipientStore implements RecipientIdCreator, RecipientResolver, Re final var sql = """ SELECT r._id, r.number, r.uuid, r.pni, r.username FROM %s r - WHERE r.uuid = ?1 OR r.pni = ?1 + WHERE %s = ?1 LIMIT 1 - """.formatted(TABLE_RECIPIENT); + """.formatted(TABLE_RECIPIENT, serviceId instanceof ACI ? "r.uuid" : "r.pni"); try (final var statement = connection.prepareStatement(sql)) { statement.setBytes(1, UuidUtil.toByteArray(serviceId.getRawUuid())); recipientWithAddress = Utils.executeQueryForOptional(statement, this::getRecipientWithAddressFromResultSet); @@ -953,14 +1194,13 @@ public class RecipientStore implements RecipientIdCreator, RecipientResolver, Re final var sql = """ SELECT r._id, r.number, r.uuid, r.pni, r.username FROM %s r - WHERE r.uuid = ?1 OR r.pni = ?1 OR - r.uuid = ?2 OR r.pni = ?2 OR + WHERE r.uuid = ?1 OR + r.pni = ?2 OR r.number = ?3 OR r.username = ?4 """.formatted(TABLE_RECIPIENT); try (final var statement = connection.prepareStatement(sql)) { - statement.setBytes(1, - address.serviceId().map(ServiceId::getRawUuid).map(UuidUtil::toByteArray).orElse(null)); + statement.setBytes(1, address.aci().map(ServiceId::getRawUuid).map(UuidUtil::toByteArray).orElse(null)); statement.setBytes(2, address.pni().map(ServiceId::getRawUuid).map(UuidUtil::toByteArray).orElse(null)); statement.setString(3, address.number().orElse(null)); statement.setString(4, address.username().orElse(null)); @@ -1018,7 +1258,7 @@ public class RecipientStore implements RecipientIdCreator, RecipientResolver, Re } } - private Profile getProfile(final Connection connection, final RecipientId recipientId) throws SQLException { + public Profile getProfile(final Connection connection, final RecipientId recipientId) throws SQLException { final var sql = ( """ SELECT r.profile_last_update_timestamp, r.profile_given_name, r.profile_family_name, r.profile_about, r.profile_about_emoji, r.profile_avatar_url_path, r.profile_mobile_coin_address, r.profile_unidentified_access_mode, r.profile_capabilities @@ -1057,7 +1297,8 @@ public class RecipientStore implements RecipientIdCreator, RecipientResolver, Re getContactFromResultSet(resultSet), getProfileKeyFromResultSet(resultSet), getExpiringProfileKeyCredentialFromResultSet(resultSet), - getProfileFromResultSet(resultSet)); + getProfileFromResultSet(resultSet), + getStorageRecordFromResultSet(resultSet)); } private Contact getContactFromResultSet(ResultSet resultSet) throws SQLException { @@ -1118,6 +1359,15 @@ public class RecipientStore implements RecipientIdCreator, RecipientResolver, Re } } + private StorageId getContactStorageIdFromResultSet(ResultSet resultSet) throws SQLException { + final var storageId = resultSet.getBytes("storage_id"); + return StorageId.forContact(storageId); + } + + private byte[] getStorageRecordFromResultSet(ResultSet resultSet) throws SQLException { + return resultSet.getBytes("storage_record"); + } + public interface RecipientMergeHandler { void mergeRecipients( diff --git a/lib/src/main/java/org/asamk/signal/manager/syncStorage/AccountRecordProcessor.java b/lib/src/main/java/org/asamk/signal/manager/syncStorage/AccountRecordProcessor.java new file mode 100644 index 00000000..9250505c --- /dev/null +++ b/lib/src/main/java/org/asamk/signal/manager/syncStorage/AccountRecordProcessor.java @@ -0,0 +1,225 @@ +package org.asamk.signal.manager.syncStorage; + +import org.asamk.signal.manager.api.Profile; +import org.asamk.signal.manager.internal.JobExecutor; +import org.asamk.signal.manager.jobs.CheckWhoAmIJob; +import org.asamk.signal.manager.jobs.DownloadProfileAvatarJob; +import org.asamk.signal.manager.storage.SignalAccount; +import org.asamk.signal.manager.util.KeyUtils; +import org.signal.libsignal.zkgroup.InvalidInputException; +import org.signal.libsignal.zkgroup.profiles.ProfileKey; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.whispersystems.signalservice.api.push.UsernameLinkComponents; +import org.whispersystems.signalservice.api.storage.SignalAccountRecord; +import org.whispersystems.signalservice.api.util.OptionalUtil; +import org.whispersystems.signalservice.api.util.UuidUtil; +import org.whispersystems.signalservice.internal.storage.protos.OptionalBool; + +import java.sql.Connection; +import java.sql.SQLException; +import java.util.Arrays; +import java.util.Optional; + +/** + * Processes {@link SignalAccountRecord}s. + */ +public class AccountRecordProcessor extends DefaultStorageRecordProcessor { + + private static final Logger logger = LoggerFactory.getLogger(AccountRecordProcessor.class); + private final SignalAccountRecord localAccountRecord; + private final SignalAccount account; + private final Connection connection; + private final JobExecutor jobExecutor; + + public AccountRecordProcessor( + SignalAccount account, Connection connection, final JobExecutor jobExecutor + ) throws SQLException { + this.account = account; + this.connection = connection; + this.jobExecutor = jobExecutor; + final var selfRecipientId = account.getSelfRecipientId(); + final var recipient = account.getRecipientStore().getRecipient(connection, selfRecipientId); + final var storageId = account.getRecipientStore().getSelfStorageId(connection); + this.localAccountRecord = StorageSyncModels.localToRemoteRecord(account.getConfigurationStore(), + recipient, + account.getUsernameLink(), + storageId.getRaw()).getAccount().get(); + } + + @Override + protected boolean isInvalid(SignalAccountRecord remote) { + return false; + } + + @Override + protected Optional getMatching(SignalAccountRecord record) { + return Optional.of(localAccountRecord); + } + + @Override + protected SignalAccountRecord merge(SignalAccountRecord remote, SignalAccountRecord local) { + String givenName; + String familyName; + if (remote.getGivenName().isPresent() || remote.getFamilyName().isPresent()) { + givenName = remote.getGivenName().orElse(""); + familyName = remote.getFamilyName().orElse(""); + } else { + givenName = local.getGivenName().orElse(""); + familyName = local.getFamilyName().orElse(""); + } + + final var payments = remote.getPayments().getEntropy().isPresent() ? remote.getPayments() : local.getPayments(); + final var subscriber = remote.getSubscriber().getId().isPresent() + ? remote.getSubscriber() + : local.getSubscriber(); + final var storyViewReceiptsState = remote.getStoryViewReceiptsState() == OptionalBool.UNSET + ? local.getStoryViewReceiptsState() + : remote.getStoryViewReceiptsState(); + final var unknownFields = remote.serializeUnknownFields(); + final var avatarUrlPath = OptionalUtil.or(remote.getAvatarUrlPath(), local.getAvatarUrlPath()).orElse(""); + final var profileKey = OptionalUtil.or(remote.getProfileKey(), local.getProfileKey()).orElse(null); + final var noteToSelfArchived = remote.isNoteToSelfArchived(); + final var noteToSelfForcedUnread = remote.isNoteToSelfForcedUnread(); + final var readReceipts = remote.isReadReceiptsEnabled(); + final var typingIndicators = remote.isTypingIndicatorsEnabled(); + final var sealedSenderIndicators = remote.isSealedSenderIndicatorsEnabled(); + final var linkPreviews = remote.isLinkPreviewsEnabled(); + final var unlisted = remote.isPhoneNumberUnlisted(); + final var pinnedConversations = remote.getPinnedConversations(); + final var phoneNumberSharingMode = remote.getPhoneNumberSharingMode(); + final var preferContactAvatars = remote.isPreferContactAvatars(); + final var universalExpireTimer = remote.getUniversalExpireTimer(); + final var e164 = local.getE164(); + final var defaultReactions = !remote.getDefaultReactions().isEmpty() + ? remote.getDefaultReactions() + : local.getDefaultReactions(); + final var displayBadgesOnProfile = remote.isDisplayBadgesOnProfile(); + final var subscriptionManuallyCancelled = remote.isSubscriptionManuallyCancelled(); + final var keepMutedChatsArchived = remote.isKeepMutedChatsArchived(); + final var hasSetMyStoriesPrivacy = remote.hasSetMyStoriesPrivacy(); + final var hasViewedOnboardingStory = remote.hasViewedOnboardingStory() || local.hasViewedOnboardingStory(); + final var storiesDisabled = remote.isStoriesDisabled(); + final var hasSeenGroupStoryEducation = remote.hasSeenGroupStoryEducationSheet() + || local.hasSeenGroupStoryEducationSheet(); + final var username = remote.getUsername() != null && !remote.getUsername().isEmpty() + ? remote.getUsername() + : local.getUsername() != null && !local.getUsername().isEmpty() ? local.getUsername() : null; + final var usernameLink = remote.getUsernameLink() != null ? remote.getUsernameLink() : local.getUsernameLink(); + + final var mergedBuilder = new SignalAccountRecord.Builder(remote.getId().getRaw(), unknownFields).setGivenName( + givenName) + .setFamilyName(familyName) + .setAvatarUrlPath(avatarUrlPath) + .setProfileKey(profileKey) + .setNoteToSelfArchived(noteToSelfArchived) + .setNoteToSelfForcedUnread(noteToSelfForcedUnread) + .setReadReceiptsEnabled(readReceipts) + .setTypingIndicatorsEnabled(typingIndicators) + .setSealedSenderIndicatorsEnabled(sealedSenderIndicators) + .setLinkPreviewsEnabled(linkPreviews) + .setUnlistedPhoneNumber(unlisted) + .setPhoneNumberSharingMode(phoneNumberSharingMode) + .setUnlistedPhoneNumber(unlisted) + .setPinnedConversations(pinnedConversations) + .setPreferContactAvatars(preferContactAvatars) + .setPayments(payments.isEnabled(), payments.getEntropy().orElse(null)) + .setUniversalExpireTimer(universalExpireTimer) + .setDefaultReactions(defaultReactions) + .setSubscriber(subscriber) + .setDisplayBadgesOnProfile(displayBadgesOnProfile) + .setSubscriptionManuallyCancelled(subscriptionManuallyCancelled) + .setKeepMutedChatsArchived(keepMutedChatsArchived) + .setHasSetMyStoriesPrivacy(hasSetMyStoriesPrivacy) + .setHasViewedOnboardingStory(hasViewedOnboardingStory) + .setStoriesDisabled(storiesDisabled) + .setHasSeenGroupStoryEducationSheet(hasSeenGroupStoryEducation) + .setStoryViewReceiptsState(storyViewReceiptsState) + .setUsername(username) + .setUsernameLink(usernameLink) + .setE164(e164); + final var merged = mergedBuilder.build(); + + final var matchesRemote = doProtosMatch(merged, remote); + if (matchesRemote) { + return remote; + } + + final var matchesLocal = doProtosMatch(merged, local); + if (matchesLocal) { + return local; + } + + return mergedBuilder.setId(KeyUtils.createRawStorageId()).build(); + } + + @Override + protected void insertLocal(SignalAccountRecord record) { + throw new UnsupportedOperationException( + "We should always have a local AccountRecord, so we should never been inserting a new one."); + } + + @Override + protected void updateLocal(StorageRecordUpdate update) throws SQLException { + final var accountRecord = update.newRecord(); + + if (!accountRecord.getE164().equals(account.getNumber())) { + jobExecutor.enqueueJob(new CheckWhoAmIJob()); + } + + account.getConfigurationStore().setReadReceipts(connection, accountRecord.isReadReceiptsEnabled()); + account.getConfigurationStore().setTypingIndicators(connection, accountRecord.isTypingIndicatorsEnabled()); + account.getConfigurationStore() + .setUnidentifiedDeliveryIndicators(connection, accountRecord.isSealedSenderIndicatorsEnabled()); + account.getConfigurationStore().setLinkPreviews(connection, accountRecord.isLinkPreviewsEnabled()); + account.getConfigurationStore() + .setPhoneNumberSharingMode(connection, + StorageSyncModels.remoteToLocal(accountRecord.getPhoneNumberSharingMode())); + account.getConfigurationStore().setPhoneNumberUnlisted(connection, accountRecord.isPhoneNumberUnlisted()); + + account.setUsername(accountRecord.getUsername() != null && !accountRecord.getUsername().isEmpty() + ? accountRecord.getUsername() + : null); + if (accountRecord.getUsernameLink() != null) { + final var usernameLink = accountRecord.getUsernameLink(); + account.setUsernameLink(new UsernameLinkComponents(usernameLink.entropy.toByteArray(), + UuidUtil.parseOrThrow(usernameLink.serverId.toByteArray()))); + account.getConfigurationStore().setUsernameLinkColor(connection, usernameLink.color.name()); + } + + if (accountRecord.getProfileKey().isPresent()) { + ProfileKey profileKey; + try { + profileKey = new ProfileKey(accountRecord.getProfileKey().get()); + } catch (InvalidInputException e) { + logger.debug("Received invalid profile key from storage"); + profileKey = null; + } + if (profileKey != null) { + account.setProfileKey(profileKey); + final var avatarPath = accountRecord.getAvatarUrlPath().orElse(null); + jobExecutor.enqueueJob(new DownloadProfileAvatarJob(avatarPath)); + } + } + + final var profile = account.getRecipientStore().getProfile(connection, account.getSelfRecipientId()); + final var builder = profile == null ? Profile.newBuilder() : Profile.newBuilder(profile); + builder.withGivenName(accountRecord.getGivenName().orElse(null)); + builder.withFamilyName(accountRecord.getFamilyName().orElse(null)); + account.getRecipientStore().storeProfile(connection, account.getSelfRecipientId(), builder.build()); + account.getRecipientStore() + .storeStorageRecord(connection, + account.getSelfRecipientId(), + accountRecord.getId(), + accountRecord.toProto().encode()); + } + + @Override + public int compare(SignalAccountRecord lhs, SignalAccountRecord rhs) { + return 0; + } + + private static boolean doProtosMatch(SignalAccountRecord merged, SignalAccountRecord other) { + return Arrays.equals(merged.toProto().encode(), other.toProto().encode()); + } +} diff --git a/lib/src/main/java/org/asamk/signal/manager/syncStorage/ContactRecordProcessor.java b/lib/src/main/java/org/asamk/signal/manager/syncStorage/ContactRecordProcessor.java new file mode 100644 index 00000000..2e557e1a --- /dev/null +++ b/lib/src/main/java/org/asamk/signal/manager/syncStorage/ContactRecordProcessor.java @@ -0,0 +1,336 @@ +package org.asamk.signal.manager.syncStorage; + +import org.asamk.signal.manager.api.Contact; +import org.asamk.signal.manager.api.Profile; +import org.asamk.signal.manager.internal.JobExecutor; +import org.asamk.signal.manager.jobs.DownloadProfileJob; +import org.asamk.signal.manager.storage.SignalAccount; +import org.asamk.signal.manager.storage.recipients.RecipientAddress; +import org.asamk.signal.manager.util.KeyUtils; +import org.signal.libsignal.protocol.IdentityKey; +import org.signal.libsignal.protocol.InvalidKeyException; +import org.signal.libsignal.zkgroup.InvalidInputException; +import org.signal.libsignal.zkgroup.profiles.ProfileKey; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.whispersystems.signalservice.api.push.ServiceId.ACI; +import org.whispersystems.signalservice.api.push.ServiceId.PNI; +import org.whispersystems.signalservice.api.storage.SignalContactRecord; +import org.whispersystems.signalservice.api.util.OptionalUtil; +import org.whispersystems.signalservice.internal.storage.protos.ContactRecord.IdentityState; + +import java.sql.Connection; +import java.sql.SQLException; +import java.util.Arrays; +import java.util.Objects; +import java.util.Optional; +import java.util.regex.Pattern; + +public class ContactRecordProcessor extends DefaultStorageRecordProcessor { + + private static final Logger logger = LoggerFactory.getLogger(ContactRecordProcessor.class); + + private static final Pattern E164_PATTERN = Pattern.compile("^\\+[1-9]\\d{0,18}$"); + + private final ACI selfAci; + private final PNI selfPni; + private final String selfNumber; + private final SignalAccount account; + private final Connection connection; + private final JobExecutor jobExecutor; + + public ContactRecordProcessor(SignalAccount account, Connection connection, final JobExecutor jobExecutor) { + this.account = account; + this.connection = connection; + this.jobExecutor = jobExecutor; + this.selfAci = account.getAci(); + this.selfPni = account.getPni(); + this.selfNumber = account.getNumber(); + } + + /** + * Error cases: + * - You can't have a contact record without an ACI or PNI. + * - You can't have a contact record for yourself. That should be an account record. + */ + @Override + protected boolean isInvalid(SignalContactRecord remote) { + boolean hasAci = remote.getAci().isPresent() && remote.getAci().get().isValid(); + boolean hasPni = remote.getPni().isPresent() && remote.getPni().get().isValid(); + + if (!hasAci && !hasPni) { + logger.debug("Found a ContactRecord with neither an ACI nor a PNI -- marking as invalid."); + return true; + } else if (selfAci != null && selfAci.equals(remote.getAci().orElse(null)) || ( + selfPni != null && selfPni.equals(remote.getPni().orElse(null)) + ) || (selfNumber != null && selfNumber.equals(remote.getNumber().orElse(null)))) { + logger.debug("Found a ContactRecord for ourselves -- marking as invalid."); + return true; + } else if (remote.getNumber().isPresent() && !isValidE164(remote.getNumber().get())) { + logger.debug("Found a record with an invalid E164. Marking as invalid."); + return true; + } else { + return false; + } + } + + @Override + protected Optional getMatching(SignalContactRecord remote) throws SQLException { + final var address = getRecipientAddress(remote); + final var recipientId = account.getRecipientStore().resolveRecipient(connection, address); + final var recipient = account.getRecipientStore().getRecipient(connection, recipientId); + + final var identifier = recipient.getAddress().getIdentifier(); + final var identity = account.getIdentityKeyStore().getIdentityInfo(connection, identifier); + final var storageId = account.getRecipientStore().getStorageId(connection, recipientId); + + return Optional.of(StorageSyncModels.localToRemoteRecord(recipient, identity, storageId.getRaw()) + .getContact() + .get()); + } + + @Override + protected SignalContactRecord merge( + SignalContactRecord remote, SignalContactRecord local + ) { + String profileGivenName; + String profileFamilyName; + if (remote.getProfileGivenName().isPresent() || remote.getProfileFamilyName().isPresent()) { + profileGivenName = remote.getProfileGivenName().orElse(""); + profileFamilyName = remote.getProfileFamilyName().orElse(""); + } else { + profileGivenName = local.getProfileGivenName().orElse(""); + profileFamilyName = local.getProfileFamilyName().orElse(""); + } + + IdentityState identityState; + byte[] identityKey; + if ((remote.getIdentityState() != local.getIdentityState() && remote.getIdentityKey().isPresent()) + || (remote.getIdentityKey().isPresent() && local.getIdentityKey().isEmpty())) { + identityState = remote.getIdentityState(); + identityKey = remote.getIdentityKey().get(); + } else { + identityState = local.getIdentityState(); + identityKey = local.getIdentityKey().orElse(null); + } + + if (local.getAci().isPresent() && identityKey != null && remote.getIdentityKey().isPresent() && !Arrays.equals( + identityKey, + remote.getIdentityKey().get())) { + logger.debug("The local and remote identity keys do not match for {}. Enqueueing a profile fetch.", + local.getAci().orElse(null)); + final var address = getRecipientAddress(local); + jobExecutor.enqueueJob(new DownloadProfileJob(address)); + } + + final var e164sMatchButPnisDont = local.getNumber().isPresent() + && local.getNumber() + .get() + .equals(remote.getNumber().orElse(null)) + && local.getPni().isPresent() + && remote.getPni().isPresent() + && !local.getPni().get().equals(remote.getPni().get()); + + final var pnisMatchButE164sDont = local.getPni().isPresent() + && local.getPni() + .get() + .equals(remote.getPni().orElse(null)) + && local.getNumber().isPresent() + && remote.getNumber().isPresent() + && !local.getNumber().get().equals(remote.getNumber().get()); + + PNI pni; + String e164; + if (e164sMatchButPnisDont) { + logger.debug("Matching E164s, but the PNIs differ! Trusting our local pair."); + // TODO [pnp] Schedule CDS fetch? + pni = local.getPni().get(); + e164 = local.getNumber().get(); + } else if (pnisMatchButE164sDont) { + logger.debug("Matching PNIs, but the E164s differ! Trusting our local pair."); + // TODO [pnp] Schedule CDS fetch? + pni = local.getPni().get(); + e164 = local.getNumber().get(); + } else { + pni = OptionalUtil.or(remote.getPni(), local.getPni()).orElse(null); + e164 = OptionalUtil.or(remote.getNumber(), local.getNumber()).orElse(null); + } + + final var unknownFields = remote.serializeUnknownFields(); + final var aci = local.getAci().isEmpty() ? remote.getAci().orElse(null) : local.getAci().get(); + final var profileKey = OptionalUtil.or(remote.getProfileKey(), local.getProfileKey()).orElse(null); + final var username = OptionalUtil.or(remote.getUsername(), local.getUsername()).orElse(""); + final var blocked = remote.isBlocked(); + final var profileSharing = remote.isProfileSharingEnabled(); + final var archived = remote.isArchived(); + final var forcedUnread = remote.isForcedUnread(); + final var muteUntil = remote.getMuteUntil(); + final var hideStory = remote.shouldHideStory(); + final var unregisteredTimestamp = remote.getUnregisteredTimestamp(); + final var hidden = remote.isHidden(); + final var systemGivenName = account.isPrimaryDevice() + ? local.getSystemGivenName().orElse("") + : remote.getSystemGivenName().orElse(""); + final var systemFamilyName = account.isPrimaryDevice() + ? local.getSystemFamilyName().orElse("") + : remote.getSystemFamilyName().orElse(""); + final var systemNickname = remote.getSystemNickname().orElse(""); + + final var mergedBuilder = new SignalContactRecord.Builder(remote.getId().getRaw(), aci, unknownFields).setE164( + e164) + .setPni(pni) + .setProfileGivenName(profileGivenName) + .setProfileFamilyName(profileFamilyName) + .setSystemGivenName(systemGivenName) + .setSystemFamilyName(systemFamilyName) + .setSystemNickname(systemNickname) + .setProfileKey(profileKey) + .setUsername(username) + .setIdentityState(identityState) + .setIdentityKey(identityKey) + .setBlocked(blocked) + .setProfileSharingEnabled(profileSharing) + .setArchived(archived) + .setForcedUnread(forcedUnread) + .setMuteUntil(muteUntil) + .setHideStory(hideStory) + .setUnregisteredTimestamp(unregisteredTimestamp) + .setHidden(hidden); + final var merged = mergedBuilder.build(); + + final var matchesRemote = doProtosMatch(merged, remote); + if (matchesRemote) { + return remote; + } + + final var matchesLocal = doProtosMatch(merged, local); + if (matchesLocal) { + return local; + } + + return mergedBuilder.setId(KeyUtils.createRawStorageId()).build(); + } + + @Override + protected void insertLocal(SignalContactRecord record) throws SQLException { + StorageRecordUpdate update = new StorageRecordUpdate<>(null, record); + updateLocal(update); + } + + @Override + protected void updateLocal(StorageRecordUpdate update) throws SQLException { + final var contactRecord = update.newRecord(); + final var address = getRecipientAddress(contactRecord); + final var recipientId = account.getRecipientStore().resolveRecipientTrusted(connection, address); + final var recipient = account.getRecipientStore().getRecipient(connection, recipientId); + + final var contact = recipient.getContact(); + final var blocked = contact != null && contact.isBlocked(); + final var profileShared = contact != null && contact.isProfileSharingEnabled(); + final var archived = contact != null && contact.isArchived(); + final var hidden = contact != null && contact.isHidden(); + final var contactGivenName = contact == null ? null : contact.givenName(); + final var contactFamilyName = contact == null ? null : contact.familyName(); + if (blocked != contactRecord.isBlocked() + || profileShared != contactRecord.isProfileSharingEnabled() + || archived != contactRecord.isArchived() + || hidden != contactRecord.isHidden() + || ( + contactRecord.getSystemGivenName().isPresent() && !contactRecord.getSystemGivenName() + .get() + .equals(contactGivenName) + ) + || ( + contactRecord.getSystemFamilyName().isPresent() && !contactRecord.getSystemFamilyName() + .get() + .equals(contactFamilyName) + )) { + logger.debug("Storing new or updated contact {}", recipientId); + final var contactBuilder = contact == null ? Contact.newBuilder() : Contact.newBuilder(contact); + final var newContact = contactBuilder.withIsBlocked(contactRecord.isBlocked()) + .withIsProfileSharingEnabled(contactRecord.isProfileSharingEnabled()) + .withIsArchived(contactRecord.isArchived()) + .withIsHidden(contactRecord.isHidden()); + if (contactRecord.getSystemGivenName().isPresent() || contactRecord.getSystemFamilyName().isPresent()) { + newContact.withGivenName(contactRecord.getSystemGivenName().orElse(null)) + .withFamilyName(contactRecord.getSystemFamilyName().orElse(null)); + } + account.getRecipientStore().storeContact(connection, recipientId, newContact.build()); + } + + final var profile = recipient.getProfile(); + final var profileGivenName = profile == null ? null : profile.getGivenName(); + final var profileFamilyName = profile == null ? null : profile.getFamilyName(); + if (( + contactRecord.getProfileGivenName().isPresent() && !contactRecord.getProfileGivenName() + .get() + .equals(profileGivenName) + ) || ( + contactRecord.getProfileFamilyName().isPresent() && !contactRecord.getProfileFamilyName() + .get() + .equals(profileFamilyName) + )) { + final var profileBuilder = profile == null ? Profile.newBuilder() : Profile.newBuilder(profile); + final var newProfile = profileBuilder.withGivenName(contactRecord.getProfileGivenName().orElse(null)) + .withFamilyName(contactRecord.getProfileFamilyName().orElse(null)) + .build(); + account.getRecipientStore().storeProfile(connection, recipientId, newProfile); + } + if (contactRecord.getProfileKey().isPresent()) { + try { + logger.trace("Storing profile key {}", recipientId); + final var profileKey = new ProfileKey(contactRecord.getProfileKey().get()); + account.getRecipientStore().storeProfileKey(connection, recipientId, profileKey); + } catch (InvalidInputException e) { + logger.warn("Received invalid contact profile key from storage"); + } + } + if (contactRecord.getIdentityKey().isPresent() && contactRecord.getAci().orElse(null) != null) { + try { + logger.trace("Storing identity key {}", recipientId); + final var identityKey = new IdentityKey(contactRecord.getIdentityKey().get()); + account.getIdentityKeyStore() + .saveIdentity(connection, contactRecord.getAci().orElse(null), identityKey); + + final var trustLevel = StorageSyncModels.remoteToLocal(contactRecord.getIdentityState()); + if (trustLevel != null) { + account.getIdentityKeyStore() + .setIdentityTrustLevel(connection, + contactRecord.getAci().orElse(null), + identityKey, + trustLevel); + } + } catch (InvalidKeyException e) { + logger.warn("Received invalid contact identity key from storage"); + } + } + account.getRecipientStore() + .storeStorageRecord(connection, recipientId, contactRecord.getId(), contactRecord.toProto().encode()); + } + + private static RecipientAddress getRecipientAddress(final SignalContactRecord contactRecord) { + return new RecipientAddress(contactRecord.getAci().orElse(null), + contactRecord.getPni().orElse(null), + contactRecord.getNumber().orElse(null), + contactRecord.getUsername().orElse(null)); + } + + @Override + public int compare(SignalContactRecord lhs, SignalContactRecord rhs) { + if ((lhs.getAci().isPresent() && Objects.equals(lhs.getAci(), rhs.getAci())) || ( + lhs.getNumber().isPresent() && Objects.equals(lhs.getNumber(), rhs.getNumber()) + ) || (lhs.getPni().isPresent() && Objects.equals(lhs.getPni(), rhs.getPni()))) { + return 0; + } else { + return 1; + } + } + + private static boolean isValidE164(String value) { + return E164_PATTERN.matcher(value).matches(); + } + + private static boolean doProtosMatch(SignalContactRecord merged, SignalContactRecord other) { + return Arrays.equals(merged.toProto().encode(), other.toProto().encode()); + } +} diff --git a/lib/src/main/java/org/asamk/signal/manager/syncStorage/DefaultStorageRecordProcessor.java b/lib/src/main/java/org/asamk/signal/manager/syncStorage/DefaultStorageRecordProcessor.java new file mode 100644 index 00000000..2b6334e0 --- /dev/null +++ b/lib/src/main/java/org/asamk/signal/manager/syncStorage/DefaultStorageRecordProcessor.java @@ -0,0 +1,96 @@ +package org.asamk.signal.manager.syncStorage; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.whispersystems.signalservice.api.storage.SignalRecord; +import org.whispersystems.signalservice.api.storage.StorageId; + +import java.sql.SQLException; +import java.util.Comparator; +import java.util.Optional; +import java.util.Set; +import java.util.TreeSet; + +/** + * An implementation of {@link StorageRecordProcessor} that solidifies a pattern and reduces + * duplicate code in individual implementations. + *

+ * Concerning the implementation of {@link #compare(Object, Object)}, it's purpose is to detect if + * two items would map to the same logical entity (i.e. they would correspond to the same record in + * our local store). We use it for a {@link TreeSet}, so mainly it's just important that the '0' + * case is correct. Other cases are whatever, just make it something stable. + */ +abstract class DefaultStorageRecordProcessor implements StorageRecordProcessor, Comparator { + + private static final Logger logger = LoggerFactory.getLogger(DefaultStorageRecordProcessor.class); + private final Set matchedRecords = new TreeSet<>(this); + + /** + * One type of invalid remote data this handles is two records mapping to the same local data. We + * have to trim this bad data out, because if we don't, we'll upload an ID set that only has one + * of the IDs in it, but won't properly delete the dupes, which will then fail our validation + * checks. + *

+ * This is a bit tricky -- as we process records, IDs are written back to the local store, so we + * can't easily be like "oh multiple records are mapping to the same local storage ID". And in + * general we rely on SignalRecords to implement an equals() that includes the StorageId, so using + * a regular set is out. Instead, we use a {@link TreeSet}, which allows us to define a custom + * comparator for checking equality. Then we delegate to the subclass to tell us if two items are + * the same based on their actual data (i.e. two contacts having the same UUID, or two groups + * having the same MasterKey). + */ + @Override + public void process(E remote) throws SQLException { + if (isInvalid(remote)) { + debug(remote.getId(), remote, "Found invalid key! Ignoring it."); + return; + } + + final var local = getMatching(remote); + + if (local.isEmpty()) { + debug(remote.getId(), remote, "No matching local record. Inserting."); + insertLocal(remote); + return; + } + + if (matchedRecords.contains(local.get())) { + debug(remote.getId(), remote, "Multiple remote records map to the same local record! Ignoring this one."); + return; + } + + matchedRecords.add(local.get()); + + final var merged = merge(remote, local.get()); + if (!merged.equals(remote)) { + debug(remote.getId(), remote, "[Remote Update] " + merged.describeDiff(remote)); + } + + if (!merged.equals(local.get())) { + final var update = new StorageRecordUpdate<>(local.get(), merged); + debug(remote.getId(), remote, "[Local Update] " + update); + updateLocal(update); + } + } + + private void debug(StorageId i, E record, String message) { + logger.debug("[" + i + "][" + record.getClass().getSimpleName() + "] " + message); + } + + /** + * @return True if the record is invalid and should be removed from storage service, otherwise false. + */ + protected abstract boolean isInvalid(E remote) throws SQLException; + + /** + * Only records that pass the validity check (i.e. return false from {@link #isInvalid(SignalRecord)}) + * make it to here, so you can assume all records are valid. + */ + protected abstract Optional getMatching(E remote) throws SQLException; + + protected abstract E merge(E remote, E local); + + protected abstract void insertLocal(E record) throws SQLException; + + protected abstract void updateLocal(StorageRecordUpdate update) throws SQLException; +} diff --git a/lib/src/main/java/org/asamk/signal/manager/syncStorage/GroupV1RecordProcessor.java b/lib/src/main/java/org/asamk/signal/manager/syncStorage/GroupV1RecordProcessor.java new file mode 100644 index 00000000..be95591b --- /dev/null +++ b/lib/src/main/java/org/asamk/signal/manager/syncStorage/GroupV1RecordProcessor.java @@ -0,0 +1,137 @@ +package org.asamk.signal.manager.syncStorage; + +import org.asamk.signal.manager.api.GroupId; +import org.asamk.signal.manager.api.GroupIdV1; +import org.asamk.signal.manager.storage.SignalAccount; +import org.asamk.signal.manager.storage.groups.GroupInfoV2; +import org.asamk.signal.manager.util.KeyUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.whispersystems.signalservice.api.storage.SignalGroupV1Record; + +import java.sql.Connection; +import java.sql.SQLException; +import java.util.Arrays; +import java.util.Optional; + +/** + * Handles merging remote storage updates into local group v1 state. + */ +public final class GroupV1RecordProcessor extends DefaultStorageRecordProcessor { + + private static final Logger logger = LoggerFactory.getLogger(GroupV1RecordProcessor.class); + private final SignalAccount account; + private final Connection connection; + + public GroupV1RecordProcessor(SignalAccount account, Connection connection) { + this.account = account; + this.connection = connection; + } + + /** + * We want to catch: + * - Invalid group IDs + * - GV1 IDs that map to GV2 IDs, meaning we've already migrated them. + */ + @Override + protected boolean isInvalid(SignalGroupV1Record remote) throws SQLException { + try { + final var id = GroupId.unknownVersion(remote.getGroupId()); + if (!(id instanceof GroupIdV1)) { + return true; + } + final var group = account.getGroupStore().getGroup(connection, id); + + if (group instanceof GroupInfoV2) { + logger.debug("We already have an upgraded V2 group for this V1 group -- marking as invalid."); + return true; + } else { + return false; + } + } catch (AssertionError e) { + logger.debug("Bad Group ID -- marking as invalid."); + return true; + } + } + + @Override + protected Optional getMatching(SignalGroupV1Record remote) throws SQLException { + final var id = GroupId.v1(remote.getGroupId()); + final var group = account.getGroupStore().getGroup(connection, id); + + if (group == null) { + return Optional.empty(); + } + + final var storageId = account.getGroupStore().getGroupStorageId(connection, id); + return Optional.of(StorageSyncModels.localToRemoteRecord(group, storageId.getRaw()).getGroupV1().get()); + } + + @Override + protected SignalGroupV1Record merge(SignalGroupV1Record remote, SignalGroupV1Record local) { + final var unknownFields = remote.serializeUnknownFields(); + final var blocked = remote.isBlocked(); + final var profileSharing = remote.isProfileSharingEnabled(); + final var archived = remote.isArchived(); + final var forcedUnread = remote.isForcedUnread(); + final var muteUntil = remote.getMuteUntil(); + + final var mergedBuilder = new SignalGroupV1Record.Builder(remote.getId().getRaw(), + remote.getGroupId(), + unknownFields).setBlocked(blocked) + .setProfileSharingEnabled(profileSharing) + .setForcedUnread(forcedUnread) + .setMuteUntil(muteUntil) + .setArchived(archived); + + final var merged = mergedBuilder.build(); + + final var matchesRemote = doProtosMatch(merged, remote); + if (matchesRemote) { + return remote; + } + + final var matchesLocal = doProtosMatch(merged, local); + if (matchesLocal) { + return local; + } + + return mergedBuilder.setId(KeyUtils.createRawStorageId()).build(); + } + + @Override + protected void insertLocal(SignalGroupV1Record record) throws SQLException { + // TODO send group info request (after server message queue is empty) + // context.getGroupHelper().sendGroupInfoRequest(groupIdV1, account.getSelfRecipientId()); + StorageRecordUpdate update = new StorageRecordUpdate<>(null, record); + updateLocal(update); + } + + @Override + protected void updateLocal(StorageRecordUpdate update) throws SQLException { + final var groupV1Record = update.newRecord(); + final var groupIdV1 = GroupId.v1(groupV1Record.getGroupId()); + + final var group = account.getGroupStore().getGroup(connection, groupIdV1); + group.setBlocked(groupV1Record.isBlocked()); + account.getGroupStore().updateGroup(connection, group); + account.getGroupStore() + .storeStorageRecord(connection, + group.getGroupId(), + groupV1Record.getId(), + groupV1Record.toProto().encode()); + } + + @Override + public int compare(SignalGroupV1Record lhs, SignalGroupV1Record rhs) { + if (Arrays.equals(lhs.getGroupId(), rhs.getGroupId())) { + return 0; + } else { + return 1; + } + } + + private static boolean doProtosMatch(SignalGroupV1Record merged, SignalGroupV1Record other) { + return Arrays.equals(merged.toProto().encode(), other.toProto().encode()); + } +} diff --git a/lib/src/main/java/org/asamk/signal/manager/syncStorage/GroupV2RecordProcessor.java b/lib/src/main/java/org/asamk/signal/manager/syncStorage/GroupV2RecordProcessor.java new file mode 100644 index 00000000..4d41901a --- /dev/null +++ b/lib/src/main/java/org/asamk/signal/manager/syncStorage/GroupV2RecordProcessor.java @@ -0,0 +1,115 @@ +package org.asamk.signal.manager.syncStorage; + +import org.asamk.signal.manager.groups.GroupUtils; +import org.asamk.signal.manager.storage.SignalAccount; +import org.asamk.signal.manager.util.KeyUtils; +import org.signal.libsignal.zkgroup.groups.GroupMasterKey; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.whispersystems.signalservice.api.storage.SignalGroupV2Record; + +import java.sql.Connection; +import java.sql.SQLException; +import java.util.Arrays; +import java.util.Optional; + +public final class GroupV2RecordProcessor extends DefaultStorageRecordProcessor { + + private static final Logger logger = LoggerFactory.getLogger(GroupV2RecordProcessor.class); + private final SignalAccount account; + private final Connection connection; + + public GroupV2RecordProcessor(SignalAccount account, Connection connection) { + this.account = account; + this.connection = connection; + } + + @Override + protected boolean isInvalid(SignalGroupV2Record remote) { + return remote.getMasterKeyBytes().length != GroupMasterKey.SIZE; + } + + @Override + protected Optional getMatching(SignalGroupV2Record remote) throws SQLException { + final var id = GroupUtils.getGroupIdV2(remote.getMasterKeyOrThrow()); + final var group = account.getGroupStore().getGroup(connection, id); + + if (group == null) { + return Optional.empty(); + } + + final var storageId = account.getGroupStore().getGroupStorageId(connection, id); + return Optional.of(StorageSyncModels.localToRemoteRecord(group, storageId.getRaw()).getGroupV2().get()); + } + + @Override + protected SignalGroupV2Record merge(SignalGroupV2Record remote, SignalGroupV2Record local) { + final var unknownFields = remote.serializeUnknownFields(); + final var blocked = remote.isBlocked(); + final var profileSharing = remote.isProfileSharingEnabled(); + final var archived = remote.isArchived(); + final var forcedUnread = remote.isForcedUnread(); + final var muteUntil = remote.getMuteUntil(); + final var notifyForMentionsWhenMuted = remote.notifyForMentionsWhenMuted(); + final var hideStory = remote.shouldHideStory(); + final var storySendMode = remote.getStorySendMode(); + + final var mergedBuilder = new SignalGroupV2Record.Builder(remote.getId().getRaw(), + remote.getMasterKeyBytes(), + unknownFields).setBlocked(blocked) + .setProfileSharingEnabled(profileSharing) + .setArchived(archived) + .setForcedUnread(forcedUnread) + .setMuteUntil(muteUntil) + .setNotifyForMentionsWhenMuted(notifyForMentionsWhenMuted) + .setHideStory(hideStory) + .setStorySendMode(storySendMode); + final var merged = mergedBuilder.build(); + + final var matchesRemote = doProtosMatch(merged, remote); + if (matchesRemote) { + return remote; + } + + final var matchesLocal = doProtosMatch(merged, local); + if (matchesLocal) { + return local; + } + + return mergedBuilder.setId(KeyUtils.createRawStorageId()).build(); + } + + @Override + protected void insertLocal(SignalGroupV2Record record) throws SQLException { + StorageRecordUpdate update = new StorageRecordUpdate<>(null, record); + updateLocal(update); + } + + @Override + protected void updateLocal(StorageRecordUpdate update) throws SQLException { + final var groupV2Record = update.newRecord(); + final var groupMasterKey = groupV2Record.getMasterKeyOrThrow(); + + final var group = account.getGroupStore().getGroupOrPartialMigrate(connection, groupMasterKey); + group.setBlocked(groupV2Record.isBlocked()); + account.getGroupStore().updateGroup(connection, group); + account.getGroupStore() + .storeStorageRecord(connection, + group.getGroupId(), + groupV2Record.getId(), + groupV2Record.toProto().encode()); + } + + @Override + public int compare(SignalGroupV2Record lhs, SignalGroupV2Record rhs) { + if (Arrays.equals(lhs.getMasterKeyBytes(), rhs.getMasterKeyBytes())) { + return 0; + } else { + return 1; + } + } + + private static boolean doProtosMatch(SignalGroupV2Record merged, SignalGroupV2Record other) { + return Arrays.equals(merged.toProto().encode(), other.toProto().encode()); + } +} diff --git a/lib/src/main/java/org/asamk/signal/manager/syncStorage/StorageRecordProcessor.java b/lib/src/main/java/org/asamk/signal/manager/syncStorage/StorageRecordProcessor.java new file mode 100644 index 00000000..45a99562 --- /dev/null +++ b/lib/src/main/java/org/asamk/signal/manager/syncStorage/StorageRecordProcessor.java @@ -0,0 +1,14 @@ +package org.asamk.signal.manager.syncStorage; + +import org.whispersystems.signalservice.api.storage.SignalRecord; + +import java.sql.SQLException; + +/** + * Handles processing a remote record, which involves applying any local changes that need to be + * made based on the remote records. + */ +interface StorageRecordProcessor { + + void process(E remoteRecord) throws SQLException; +} diff --git a/lib/src/main/java/org/asamk/signal/manager/syncStorage/StorageRecordUpdate.java b/lib/src/main/java/org/asamk/signal/manager/syncStorage/StorageRecordUpdate.java new file mode 100644 index 00000000..8dcb7b34 --- /dev/null +++ b/lib/src/main/java/org/asamk/signal/manager/syncStorage/StorageRecordUpdate.java @@ -0,0 +1,14 @@ +package org.asamk.signal.manager.syncStorage; + +import org.whispersystems.signalservice.api.storage.SignalRecord; + +/** + * Represents a pair of records: one old, and one new. The new record should replace the old. + */ +record StorageRecordUpdate(E oldRecord, E newRecord) { + + @Override + public String toString() { + return newRecord.describeDiff(oldRecord); + } +} diff --git a/lib/src/main/java/org/asamk/signal/manager/syncStorage/StorageSyncModels.java b/lib/src/main/java/org/asamk/signal/manager/syncStorage/StorageSyncModels.java new file mode 100644 index 00000000..c18434c3 --- /dev/null +++ b/lib/src/main/java/org/asamk/signal/manager/syncStorage/StorageSyncModels.java @@ -0,0 +1,145 @@ +package org.asamk.signal.manager.syncStorage; + +import org.asamk.signal.manager.api.PhoneNumberSharingMode; +import org.asamk.signal.manager.api.TrustLevel; +import org.asamk.signal.manager.storage.configuration.ConfigurationStore; +import org.asamk.signal.manager.storage.groups.GroupInfoV1; +import org.asamk.signal.manager.storage.groups.GroupInfoV2; +import org.asamk.signal.manager.storage.identities.IdentityInfo; +import org.asamk.signal.manager.storage.recipients.Recipient; +import org.whispersystems.signalservice.api.push.UsernameLinkComponents; +import org.whispersystems.signalservice.api.storage.SignalAccountRecord; +import org.whispersystems.signalservice.api.storage.SignalContactRecord; +import org.whispersystems.signalservice.api.storage.SignalGroupV1Record; +import org.whispersystems.signalservice.api.storage.SignalGroupV2Record; +import org.whispersystems.signalservice.api.storage.SignalStorageRecord; +import org.whispersystems.signalservice.api.util.UuidUtil; +import org.whispersystems.signalservice.internal.storage.protos.AccountRecord; +import org.whispersystems.signalservice.internal.storage.protos.AccountRecord.UsernameLink; +import org.whispersystems.signalservice.internal.storage.protos.ContactRecord; +import org.whispersystems.signalservice.internal.storage.protos.ContactRecord.IdentityState; + +import java.util.Optional; + +import okio.ByteString; + +public final class StorageSyncModels { + + private StorageSyncModels() { + } + + public static AccountRecord.PhoneNumberSharingMode localToRemote(PhoneNumberSharingMode phoneNumberPhoneNumberSharingMode) { + return switch (phoneNumberPhoneNumberSharingMode) { + case EVERYBODY -> AccountRecord.PhoneNumberSharingMode.EVERYBODY; + case CONTACTS, NOBODY -> AccountRecord.PhoneNumberSharingMode.NOBODY; + }; + } + + public static PhoneNumberSharingMode remoteToLocal(AccountRecord.PhoneNumberSharingMode phoneNumberPhoneNumberSharingMode) { + return switch (phoneNumberPhoneNumberSharingMode) { + case EVERYBODY -> PhoneNumberSharingMode.EVERYBODY; + case UNKNOWN, NOBODY -> PhoneNumberSharingMode.NOBODY; + }; + } + + public static SignalStorageRecord localToRemoteRecord( + ConfigurationStore configStore, + Recipient self, + UsernameLinkComponents usernameLinkComponents, + byte[] rawStorageId + ) { + final var builder = new SignalAccountRecord.Builder(rawStorageId, self.getStorageRecord()); + if (self.getProfileKey() != null) { + builder.setProfileKey(self.getProfileKey().serialize()); + } + if (self.getProfile() != null) { + builder.setGivenName(self.getProfile().getGivenName()) + .setFamilyName(self.getProfile().getFamilyName()) + .setAvatarUrlPath(self.getProfile().getAvatarUrlPath()); + } + builder.setTypingIndicatorsEnabled(Optional.ofNullable(configStore.getTypingIndicators()).orElse(true)) + .setReadReceiptsEnabled(Optional.ofNullable(configStore.getReadReceipts()).orElse(true)) + .setSealedSenderIndicatorsEnabled(Optional.ofNullable(configStore.getUnidentifiedDeliveryIndicators()) + .orElse(true)) + .setLinkPreviewsEnabled(Optional.ofNullable(configStore.getLinkPreviews()).orElse(true)) + .setUnlistedPhoneNumber(Optional.ofNullable(configStore.getPhoneNumberUnlisted()).orElse(true)) + .setPhoneNumberSharingMode(localToRemote(Optional.ofNullable(configStore.getPhoneNumberSharingMode()) + .orElse(PhoneNumberSharingMode.EVERYBODY))) + .setE164(self.getAddress().number().orElse("")) + .setUsername(self.getAddress().username().orElse(null)); + if (usernameLinkComponents != null) { + final var linkColor = configStore.getUsernameLinkColor(); + builder.setUsernameLink(new UsernameLink.Builder().entropy(ByteString.of(usernameLinkComponents.getEntropy())) + .serverId(UuidUtil.toByteString(usernameLinkComponents.getServerId())) + .color(linkColor == null ? UsernameLink.Color.UNKNOWN : UsernameLink.Color.valueOf(linkColor)) + .build()); + } + + return SignalStorageRecord.forAccount(builder.build()); + } + + public static SignalStorageRecord localToRemoteRecord( + Recipient recipient, IdentityInfo identity, byte[] rawStorageId + ) { + final var address = recipient.getAddress(); + final var builder = new SignalContactRecord.Builder(rawStorageId, + address.aci().orElse(null), + recipient.getStorageRecord()).setE164(address.number().orElse(null)) + .setPni(address.pni().orElse(null)) + .setUsername(address.username().orElse(null)) + .setProfileKey(recipient.getProfileKey() == null ? null : recipient.getProfileKey().serialize()); + if (recipient.getProfile() != null) { + builder.setProfileGivenName(recipient.getProfile().getGivenName()) + .setProfileFamilyName(recipient.getProfile().getFamilyName()); + } + if (recipient.getContact() != null) { + builder.setSystemGivenName(recipient.getContact().givenName()) + .setSystemFamilyName((recipient.getContact().familyName())) + .setBlocked(recipient.getContact().isBlocked()) + .setProfileSharingEnabled(recipient.getContact().isProfileSharingEnabled()) + .setArchived(recipient.getContact().isArchived()) + .setHidden(recipient.getContact().isHidden()); + } + if (identity != null) { + builder.setIdentityKey(identity.getIdentityKey().serialize()) + .setIdentityState(localToRemote(identity.getTrustLevel())); + } + return SignalStorageRecord.forContact(builder.build()); + } + + public static SignalStorageRecord localToRemoteRecord( + GroupInfoV1 group, byte[] rawStorageId + ) { + final var builder = new SignalGroupV1Record.Builder(rawStorageId, + group.getGroupId().serialize(), + group.getStorageRecord()); + builder.setBlocked(group.isBlocked()).setArchived(group.archived); + return SignalStorageRecord.forGroupV1(builder.build()); + } + + public static SignalStorageRecord localToRemoteRecord( + GroupInfoV2 group, byte[] rawStorageId + ) { + final var builder = new SignalGroupV2Record.Builder(rawStorageId, + group.getMasterKey(), + group.getStorageRecord()); + builder.setBlocked(group.isBlocked()); + return SignalStorageRecord.forGroupV2(builder.build()); + } + + public static TrustLevel remoteToLocal(ContactRecord.IdentityState identityState) { + return switch (identityState) { + case DEFAULT -> TrustLevel.TRUSTED_UNVERIFIED; + case UNVERIFIED -> TrustLevel.UNTRUSTED; + case VERIFIED -> TrustLevel.TRUSTED_VERIFIED; + }; + } + + private static IdentityState localToRemote(TrustLevel local) { + return switch (local) { + case TRUSTED_VERIFIED -> IdentityState.VERIFIED; + case UNTRUSTED -> IdentityState.UNVERIFIED; + default -> IdentityState.DEFAULT; + }; + } +} diff --git a/lib/src/main/java/org/asamk/signal/manager/syncStorage/StorageSyncValidations.java b/lib/src/main/java/org/asamk/signal/manager/syncStorage/StorageSyncValidations.java new file mode 100644 index 00000000..2168a2ef --- /dev/null +++ b/lib/src/main/java/org/asamk/signal/manager/syncStorage/StorageSyncValidations.java @@ -0,0 +1,238 @@ +package org.asamk.signal.manager.syncStorage; + +import org.asamk.signal.manager.storage.recipients.RecipientAddress; +import org.signal.core.util.Base64; +import org.signal.core.util.SetUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.whispersystems.signalservice.api.push.ServiceId; +import org.whispersystems.signalservice.api.storage.SignalStorageManifest; +import org.whispersystems.signalservice.api.storage.SignalStorageRecord; +import org.whispersystems.signalservice.api.storage.StorageId; +import org.whispersystems.signalservice.internal.storage.protos.ManifestRecord; + +import java.nio.ByteBuffer; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +public final class StorageSyncValidations { + + private static final Logger logger = LoggerFactory.getLogger(StorageSyncValidations.class); + + private StorageSyncValidations() { + } + + public static void validate( + WriteOperationResult result, + SignalStorageManifest previousManifest, + boolean forcePushPending, + RecipientAddress self + ) { + validateManifestAndInserts(result.manifest(), result.inserts(), self); + + if (!result.deletes().isEmpty()) { + Set allSetEncoded = result.manifest() + .getStorageIds() + .stream() + .map(StorageId::getRaw) + .map(Base64::encodeWithPadding) + .collect(Collectors.toSet()); + + for (byte[] delete : result.deletes()) { + String encoded = Base64.encodeWithPadding(delete); + if (allSetEncoded.contains(encoded)) { + throw new DeletePresentInFullIdSetError(); + } + } + } + + if (previousManifest.getVersion() == 0) { + logger.debug( + "Previous manifest is empty, not bothering with additional validations around the diffs between the two manifests."); + return; + } + + if (result.manifest().getVersion() != previousManifest.getVersion() + 1) { + throw new IncorrectManifestVersionError(); + } + + if (forcePushPending) { + logger.debug( + "Force push pending, not bothering with additional validations around the diffs between the two manifests."); + return; + } + + Set previousIds = previousManifest.getStorageIds() + .stream() + .map(id -> ByteBuffer.wrap(id.getRaw())) + .collect(Collectors.toSet()); + Set newIds = result.manifest() + .getStorageIds() + .stream() + .map(id -> ByteBuffer.wrap(id.getRaw())) + .collect(Collectors.toSet()); + + Set manifestInserts = SetUtil.difference(newIds, previousIds); + Set manifestDeletes = SetUtil.difference(previousIds, newIds); + + Set declaredInserts = result.inserts() + .stream() + .map(r -> ByteBuffer.wrap(r.getId().getRaw())) + .collect(Collectors.toSet()); + Set declaredDeletes = result.deletes().stream().map(ByteBuffer::wrap).collect(Collectors.toSet()); + + if (declaredInserts.size() > manifestInserts.size()) { + logger.debug("DeclaredInserts: " + declaredInserts.size() + ", ManifestInserts: " + manifestInserts.size()); + throw new MoreInsertsThanExpectedError(); + } + + if (declaredInserts.size() < manifestInserts.size()) { + logger.debug("DeclaredInserts: " + declaredInserts.size() + ", ManifestInserts: " + manifestInserts.size()); + throw new LessInsertsThanExpectedError(); + } + + if (!declaredInserts.containsAll(manifestInserts)) { + throw new InsertMismatchError(); + } + + if (declaredDeletes.size() > manifestDeletes.size()) { + logger.debug("DeclaredDeletes: " + declaredDeletes.size() + ", ManifestDeletes: " + manifestDeletes.size()); + throw new MoreDeletesThanExpectedError(); + } + + if (declaredDeletes.size() < manifestDeletes.size()) { + logger.debug("DeclaredDeletes: " + declaredDeletes.size() + ", ManifestDeletes: " + manifestDeletes.size()); + throw new LessDeletesThanExpectedError(); + } + + if (!declaredDeletes.containsAll(manifestDeletes)) { + throw new DeleteMismatchError(); + } + } + + public static void validateForcePush( + SignalStorageManifest manifest, List inserts, RecipientAddress self + ) { + validateManifestAndInserts(manifest, inserts, self); + } + + private static void validateManifestAndInserts( + SignalStorageManifest manifest, List inserts, RecipientAddress self + ) { + int accountCount = 0; + for (StorageId id : manifest.getStorageIds()) { + accountCount += id.getType() == ManifestRecord.Identifier.Type.ACCOUNT.getValue() ? 1 : 0; + } + + if (accountCount > 1) { + throw new MultipleAccountError(); + } + + if (accountCount == 0) { + throw new MissingAccountError(); + } + + Set allSet = new HashSet<>(manifest.getStorageIds()); + Set insertSet = inserts.stream().map(SignalStorageRecord::getId).collect(Collectors.toSet()); + Set rawIdSet = allSet.stream().map(id -> ByteBuffer.wrap(id.getRaw())).collect(Collectors.toSet()); + + if (allSet.size() != manifest.getStorageIds().size()) { + throw new DuplicateStorageIdError(); + } + + if (rawIdSet.size() != allSet.size()) { + List ids = manifest.getStorageIdsByType().get(ManifestRecord.Identifier.Type.CONTACT.getValue()); + if (ids.size() != new HashSet<>(ids).size()) { + throw new DuplicateContactIdError(); + } + + ids = manifest.getStorageIdsByType().get(ManifestRecord.Identifier.Type.GROUPV1.getValue()); + if (ids.size() != new HashSet<>(ids).size()) { + throw new DuplicateGroupV1IdError(); + } + + ids = manifest.getStorageIdsByType().get(ManifestRecord.Identifier.Type.GROUPV2.getValue()); + if (ids.size() != new HashSet<>(ids).size()) { + throw new DuplicateGroupV2IdError(); + } + + ids = manifest.getStorageIdsByType().get(ManifestRecord.Identifier.Type.STORY_DISTRIBUTION_LIST.getValue()); + if (ids.size() != new HashSet<>(ids).size()) { + throw new DuplicateDistributionListIdError(); + } + + throw new DuplicateRawIdAcrossTypesError(); + } + + if (inserts.size() > insertSet.size()) { + throw new DuplicateInsertInWriteError(); + } + + for (SignalStorageRecord insert : inserts) { + if (!allSet.contains(insert.getId())) { + throw new InsertNotPresentInFullIdSetError(); + } + + if (insert.isUnknown()) { + throw new UnknownInsertError(); + } + + if (insert.getContact().isPresent()) { + final var contact = insert.getContact().get(); + final var serviceId = contact.getServiceId().map(ServiceId.class::cast); + final var pni = contact.getPni(); + final var number = contact.getNumber(); + final var username = contact.getUsername(); + final var address = new RecipientAddress(serviceId, pni, number, username); + if (self.matches(address)) { + throw new SelfAddedAsContactError(); + } + } + if (insert.getAccount().isPresent() && insert.getAccount().get().getProfileKey().isEmpty()) { + logger.debug("Uploading a null profile key in our AccountRecord!"); + } + } + } + + private static final class DuplicateStorageIdError extends Error {} + + private static final class DuplicateRawIdAcrossTypesError extends Error {} + + private static final class DuplicateContactIdError extends Error {} + + private static final class DuplicateGroupV1IdError extends Error {} + + private static final class DuplicateGroupV2IdError extends Error {} + + private static final class DuplicateDistributionListIdError extends Error {} + + private static final class DuplicateInsertInWriteError extends Error {} + + private static final class InsertNotPresentInFullIdSetError extends Error {} + + private static final class DeletePresentInFullIdSetError extends Error {} + + private static final class UnknownInsertError extends Error {} + + private static final class MultipleAccountError extends Error {} + + private static final class MissingAccountError extends Error {} + + private static final class SelfAddedAsContactError extends Error {} + + private static final class IncorrectManifestVersionError extends Error {} + + private static final class MoreInsertsThanExpectedError extends Error {} + + private static final class LessInsertsThanExpectedError extends Error {} + + private static final class InsertMismatchError extends Error {} + + private static final class MoreDeletesThanExpectedError extends Error {} + + private static final class LessDeletesThanExpectedError extends Error {} + + private static final class DeleteMismatchError extends Error {} +} diff --git a/lib/src/main/java/org/asamk/signal/manager/syncStorage/WriteOperationResult.java b/lib/src/main/java/org/asamk/signal/manager/syncStorage/WriteOperationResult.java new file mode 100644 index 00000000..97e3579a --- /dev/null +++ b/lib/src/main/java/org/asamk/signal/manager/syncStorage/WriteOperationResult.java @@ -0,0 +1,30 @@ +package org.asamk.signal.manager.syncStorage; + +import org.whispersystems.signalservice.api.storage.SignalStorageManifest; +import org.whispersystems.signalservice.api.storage.SignalStorageRecord; + +import java.util.List; +import java.util.Locale; + +public record WriteOperationResult( + SignalStorageManifest manifest, List inserts, List deletes +) { + + public boolean isEmpty() { + return inserts.isEmpty() && deletes.isEmpty(); + } + + @Override + public String toString() { + if (isEmpty()) { + return "Empty"; + } else { + return String.format(Locale.ROOT, + "ManifestVersion: %d, Total Keys: %d, Inserts: %d, Deletes: %d", + manifest.getVersion(), + manifest.getStorageIds().size(), + inserts.size(), + deletes.size()); + } + } +} diff --git a/lib/src/main/java/org/asamk/signal/manager/util/KeyUtils.java b/lib/src/main/java/org/asamk/signal/manager/util/KeyUtils.java index 486e3655..bfcb750c 100644 --- a/lib/src/main/java/org/asamk/signal/manager/util/KeyUtils.java +++ b/lib/src/main/java/org/asamk/signal/manager/util/KeyUtils.java @@ -113,6 +113,10 @@ public class KeyUtils { return MasterKey.createNew(secureRandom); } + public static byte[] createRawStorageId() { + return getSecretBytes(16); + } + private static String getSecret(int size) { var secret = getSecretBytes(size); return Base64.getEncoder().encodeToString(secret); -- 2.50.1