From: AsamK Date: Sun, 22 May 2022 19:47:40 +0000 (+0200) Subject: Move recipient store to database X-Git-Tag: v0.11.0~26 X-Git-Url: https://git.nmode.ca/signal-cli/commitdiff_plain/862c2fec8707f87076233b0991e47e6c0b37dfad Move recipient store to database --- diff --git a/graalvm-config-dir/reflect-config.json b/graalvm-config-dir/reflect-config.json index 6777c329..1ccbb0fd 100644 --- a/graalvm-config-dir/reflect-config.json +++ b/graalvm-config-dir/reflect-config.json @@ -1134,38 +1134,42 @@ "methods":[{"name":"","parameterTypes":[] }] }, { - "name":"org.asamk.signal.manager.storage.recipients.RecipientAddress", + "name":"org.asamk.signal.manager.storage.recipients.LegacyRecipientStore2$Storage", "allDeclaredFields":true, "queryAllDeclaredMethods":true, "queryAllDeclaredConstructors":true, - "methods":[ - {"name":"number","parameterTypes":[] }, - {"name":"uuid","parameterTypes":[] } - ] + "methods":[{"name":"","parameterTypes":["java.util.List","long"] }] }, { - "name":"org.asamk.signal.manager.storage.recipients.RecipientStore$Storage", + "name":"org.asamk.signal.manager.storage.recipients.LegacyRecipientStore2$Storage$Recipient", "allDeclaredFields":true, - "allDeclaredMethods":true, - "allDeclaredConstructors":true + "queryAllDeclaredMethods":true, + "queryAllDeclaredConstructors":true, + "methods":[{"name":"","parameterTypes":["long","java.lang.String","java.lang.String","java.lang.String","java.lang.String","org.asamk.signal.manager.storage.recipients.LegacyRecipientStore2$Storage$Recipient$Contact","org.asamk.signal.manager.storage.recipients.LegacyRecipientStore2$Storage$Recipient$Profile"] }] }, { - "name":"org.asamk.signal.manager.storage.recipients.RecipientStore$Storage$Recipient", + "name":"org.asamk.signal.manager.storage.recipients.LegacyRecipientStore2$Storage$Recipient$Contact", "allDeclaredFields":true, - "allDeclaredMethods":true, - "allDeclaredConstructors":true + "queryAllDeclaredMethods":true, + "queryAllDeclaredConstructors":true, + "methods":[{"name":"","parameterTypes":["java.lang.String","java.lang.String","int","boolean","boolean","boolean"] }] }, { - "name":"org.asamk.signal.manager.storage.recipients.RecipientStore$Storage$Recipient$Contact", + "name":"org.asamk.signal.manager.storage.recipients.LegacyRecipientStore2$Storage$Recipient$Profile", "allDeclaredFields":true, - "allDeclaredMethods":true, - "allDeclaredConstructors":true + "queryAllDeclaredMethods":true, + "queryAllDeclaredConstructors":true, + "methods":[{"name":"","parameterTypes":["long","java.lang.String","java.lang.String","java.lang.String","java.lang.String","java.lang.String","java.lang.String","java.lang.String","java.util.Set"] }] }, { - "name":"org.asamk.signal.manager.storage.recipients.RecipientStore$Storage$Recipient$Profile", + "name":"org.asamk.signal.manager.storage.recipients.RecipientAddress", "allDeclaredFields":true, - "allDeclaredMethods":true, - "allDeclaredConstructors":true + "queryAllDeclaredMethods":true, + "queryAllDeclaredConstructors":true, + "methods":[ + {"name":"number","parameterTypes":[] }, + {"name":"uuid","parameterTypes":[] } + ] }, { "name":"org.asamk.signal.manager.storage.senderKeys.SenderKeySharedStore$Storage", diff --git a/lib/src/main/java/org/asamk/signal/manager/helper/ProfileHelper.java b/lib/src/main/java/org/asamk/signal/manager/helper/ProfileHelper.java index f4497e5d..45ff107a 100644 --- a/lib/src/main/java/org/asamk/signal/manager/helper/ProfileHelper.java +++ b/lib/src/main/java/org/asamk/signal/manager/helper/ProfileHelper.java @@ -108,17 +108,12 @@ public final class ProfileHelper { } public List getExpiringProfileKeyCredential(List recipientIds) { - try { - account.getRecipientStore().setBulkUpdating(true); - final var profileFetches = Flowable.fromIterable(recipientIds) - .filter(recipientId -> !ExpiringProfileCredentialUtil.isValid(account.getProfileStore() - .getExpiringProfileKeyCredential(recipientId))) - .map(recipientId -> retrieveProfile(recipientId, - SignalServiceProfile.RequestType.PROFILE_AND_CREDENTIAL).onErrorComplete()); - Maybe.merge(profileFetches, 10).blockingSubscribe(); - } finally { - account.getRecipientStore().setBulkUpdating(false); - } + final var profileFetches = Flowable.fromIterable(recipientIds) + .filter(recipientId -> !ExpiringProfileCredentialUtil.isValid(account.getProfileStore() + .getExpiringProfileKeyCredential(recipientId))) + .map(recipientId -> retrieveProfile(recipientId, + SignalServiceProfile.RequestType.PROFILE_AND_CREDENTIAL).onErrorComplete()); + Maybe.merge(profileFetches, 10).blockingSubscribe(); return recipientIds.stream().map(r -> account.getProfileStore().getExpiringProfileKeyCredential(r)).toList(); } @@ -233,16 +228,11 @@ public final class ProfileHelper { private List getRecipientProfiles(Collection recipientIds, boolean force) { final var profileStore = account.getProfileStore(); - try { - account.getRecipientStore().setBulkUpdating(true); - final var profileFetches = Flowable.fromIterable(recipientIds) - .filter(recipientId -> force || isProfileRefreshRequired(profileStore.getProfile(recipientId))) - .map(recipientId -> retrieveProfile(recipientId, - SignalServiceProfile.RequestType.PROFILE).onErrorComplete()); - Maybe.merge(profileFetches, 10).blockingSubscribe(); - } finally { - account.getRecipientStore().setBulkUpdating(false); - } + final var profileFetches = Flowable.fromIterable(recipientIds) + .filter(recipientId -> force || isProfileRefreshRequired(profileStore.getProfile(recipientId))) + .map(recipientId -> retrieveProfile(recipientId, + SignalServiceProfile.RequestType.PROFILE).onErrorComplete()); + Maybe.merge(profileFetches, 10).blockingSubscribe(); return recipientIds.stream().map(profileStore::getProfile).toList(); } diff --git a/lib/src/main/java/org/asamk/signal/manager/storage/AccountDatabase.java b/lib/src/main/java/org/asamk/signal/manager/storage/AccountDatabase.java index b4315940..9c504b47 100644 --- a/lib/src/main/java/org/asamk/signal/manager/storage/AccountDatabase.java +++ b/lib/src/main/java/org/asamk/signal/manager/storage/AccountDatabase.java @@ -2,6 +2,7 @@ package org.asamk.signal.manager.storage; import com.zaxxer.hikari.HikariDataSource; +import org.asamk.signal.manager.storage.recipients.RecipientStore; import org.asamk.signal.manager.storage.sendLog.MessageSendLogStore; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -13,7 +14,7 @@ import java.sql.SQLException; public class AccountDatabase extends Database { private final static Logger logger = LoggerFactory.getLogger(AccountDatabase.class); - private static final long DATABASE_VERSION = 1; + private static final long DATABASE_VERSION = 2; private AccountDatabase(final HikariDataSource dataSource) { super(logger, DATABASE_VERSION, dataSource); @@ -23,11 +24,46 @@ public class AccountDatabase extends Database { return initDatabase(databaseFile, AccountDatabase::new); } + @Override + protected void createDatabase(final Connection connection) throws SQLException { + RecipientStore.createSql(connection); + MessageSendLogStore.createSql(connection); + } + @Override protected void upgradeDatabase(final Connection connection, final long oldVersion) throws SQLException { - if (oldVersion < 1) { - logger.debug("Updating database: Creating message send log tables"); - MessageSendLogStore.createSql(connection); + if (oldVersion < 2) { + logger.debug("Updating database: Creating recipient table"); + try (final var statement = connection.createStatement()) { + statement.executeUpdate(""" + CREATE TABLE recipient ( + _id INTEGER PRIMARY KEY AUTOINCREMENT, + number TEXT UNIQUE, + uuid BLOB UNIQUE, + profile_key BLOB, + profile_key_credential BLOB, + + given_name TEXT, + family_name TEXT, + color TEXT, + + expiration_time INTEGER NOT NULL DEFAULT 0, + blocked BOOLEAN NOT NULL DEFAULT FALSE, + archived BOOLEAN NOT NULL DEFAULT FALSE, + profile_sharing BOOLEAN NOT NULL DEFAULT FALSE, + + profile_last_update_timestamp INTEGER NOT NULL DEFAULT 0, + profile_given_name TEXT, + profile_family_name TEXT, + profile_about TEXT, + profile_about_emoji TEXT, + profile_avatar_url_path TEXT, + profile_mobile_coin_address BLOB, + profile_unidentified_access_mode TEXT, + profile_capabilities TEXT + ); + """); + } } } } diff --git a/lib/src/main/java/org/asamk/signal/manager/storage/Database.java b/lib/src/main/java/org/asamk/signal/manager/storage/Database.java index 88d34421..a55a8eed 100644 --- a/lib/src/main/java/org/asamk/signal/manager/storage/Database.java +++ b/lib/src/main/java/org/asamk/signal/manager/storage/Database.java @@ -53,19 +53,26 @@ public abstract class Database implements AutoCloseable { protected final void initDb() throws SQLException { try (final var connection = dataSource.getConnection()) { + connection.setAutoCommit(false); final var userVersion = getUserVersion(connection); logger.trace("Current database version: {} Program database version: {}", userVersion, databaseVersion); - if (userVersion > databaseVersion) { + if (userVersion == 0) { + createDatabase(connection); + setUserVersion(connection, databaseVersion); + } else if (userVersion > databaseVersion) { logger.error("Database has been updated by a newer signal-cli version"); throw new SQLException("Database has been updated by a newer signal-cli version"); } else if (userVersion < databaseVersion) { upgradeDatabase(connection, userVersion); setUserVersion(connection, databaseVersion); } + connection.commit(); } } + protected abstract void createDatabase(final Connection connection) throws SQLException; + protected abstract void upgradeDatabase(final Connection connection, long oldVersion) throws SQLException; private static long getUserVersion(final Connection connection) throws SQLException { diff --git a/lib/src/main/java/org/asamk/signal/manager/storage/SignalAccount.java b/lib/src/main/java/org/asamk/signal/manager/storage/SignalAccount.java index 21af35ed..3761c5c4 100644 --- a/lib/src/main/java/org/asamk/signal/manager/storage/SignalAccount.java +++ b/lib/src/main/java/org/asamk/signal/manager/storage/SignalAccount.java @@ -26,6 +26,7 @@ import org.asamk.signal.manager.storage.protocol.LegacyJsonSignalProtocolStore; import org.asamk.signal.manager.storage.protocol.SignalProtocolStore; import org.asamk.signal.manager.storage.recipients.Contact; import org.asamk.signal.manager.storage.recipients.LegacyRecipientStore; +import org.asamk.signal.manager.storage.recipients.LegacyRecipientStore2; import org.asamk.signal.manager.storage.recipients.Profile; import org.asamk.signal.manager.storage.recipients.RecipientAddress; import org.asamk.signal.manager.storage.recipients.RecipientId; @@ -93,7 +94,7 @@ public class SignalAccount implements Closeable { private final static Logger logger = LoggerFactory.getLogger(SignalAccount.class); private static final int MINIMUM_STORAGE_VERSION = 1; - private static final int CURRENT_STORAGE_VERSION = 4; + private static final int CURRENT_STORAGE_VERSION = 5; private final Object LOCK = new Object(); @@ -392,8 +393,6 @@ public class SignalAccount implements Closeable { // Old config file, creating new profile key setProfileKey(KeyUtils.createProfileKey()); } - // Ensure our profile key is stored in profile store - getProfileStore().storeSelfProfileKey(getSelfRecipientId(), getProfileKey()); if (previousStorageVersion < 3) { for (final var group : groupStore.getGroups()) { if (group instanceof GroupInfoV2 && group.getDistributionId() == null) { @@ -514,9 +513,10 @@ public class SignalAccount implements Closeable { if (rootNode.hasNonNull("version")) { var accountVersion = rootNode.get("version").asInt(1); if (accountVersion > CURRENT_STORAGE_VERSION) { - throw new IOException("Config file was created by a more recent version!"); + throw new IOException("Config file was created by a more recent version: " + accountVersion); } else if (accountVersion < MINIMUM_STORAGE_VERSION) { - throw new IOException("Config file was created by a no longer supported older version!"); + throw new IOException("Config file was created by a no longer supported older version: " + + accountVersion); } previousStorageVersion = accountVersion; if (accountVersion < CURRENT_STORAGE_VERSION) { @@ -621,6 +621,15 @@ public class SignalAccount implements Closeable { } } + if (previousStorageVersion < 5) { + final var legacyRecipientsStoreFile = getRecipientsStoreFile(dataPath, accountPath); + if (legacyRecipientsStoreFile.exists()) { + LegacyRecipientStore2.migrate(legacyRecipientsStoreFile, getRecipientStore()); + // Ensure our profile key is stored in profile store + getProfileStore().storeSelfProfileKey(getSelfRecipientId(), getProfileKey()); + migratedLegacyConfig = true; + } + } final var legacySignalProtocolStore = rootNode.hasNonNull("axolotlStore") ? jsonProcessor.convertValue(Utils.getNotNullNode(rootNode, "axolotlStore"), LegacyJsonSignalProtocolStore.class) @@ -681,7 +690,8 @@ public class SignalAccount implements Closeable { logger.debug("Migrating legacy recipient store."); var legacyRecipientStore = jsonProcessor.convertValue(legacyRecipientStoreNode, LegacyRecipientStore.class); if (legacyRecipientStore != null) { - getRecipientStore().resolveRecipientsTrusted(legacyRecipientStore.getAddresses()); + legacyRecipientStore.getAddresses() + .forEach(recipient -> getRecipientStore().resolveRecipientTrusted(recipient)); } getRecipientTrustedResolver().resolveSelfRecipientTrusted(getSelfRecipientAddress()); migrated = true; @@ -1094,22 +1104,42 @@ public class SignalAccount implements Closeable { } public RecipientResolver getRecipientResolver() { - return getRecipientStore(); + return new RecipientResolver() { + @Override + public RecipientId resolveRecipient(final RecipientAddress address) { + return getRecipientStore().resolveRecipient(address); + } + + @Override + public RecipientId resolveRecipient(final long recipientId) { + return getRecipientStore().resolveRecipient(recipientId); + } + }; } public RecipientTrustedResolver getRecipientTrustedResolver() { - return getRecipientStore(); + return new RecipientTrustedResolver() { + @Override + public RecipientId resolveSelfRecipientTrusted(final RecipientAddress address) { + return getRecipientStore().resolveSelfRecipientTrusted(address); + } + + @Override + public RecipientId resolveRecipientTrusted(final SignalServiceAddress address) { + return getRecipientStore().resolveRecipientTrusted(address); + } + }; } public RecipientAddressResolver getRecipientAddressResolver() { - return getRecipientStore()::resolveRecipientAddress; + return recipientId -> getRecipientStore().resolveRecipientAddress(recipientId); } public RecipientStore getRecipientStore() { return getOrCreate(() -> recipientStore, - () -> recipientStore = RecipientStore.load(getRecipientsStoreFile(dataPath, accountPath), - this::mergeRecipients, - this::getSelfRecipientAddress)); + () -> recipientStore = new RecipientStore(this::mergeRecipients, + this::getSelfRecipientAddress, + getAccountDatabase())); } public ProfileStore getProfileStore() { diff --git a/lib/src/main/java/org/asamk/signal/manager/storage/Utils.java b/lib/src/main/java/org/asamk/signal/manager/storage/Utils.java index e4b639a2..aa279a65 100644 --- a/lib/src/main/java/org/asamk/signal/manager/storage/Utils.java +++ b/lib/src/main/java/org/asamk/signal/manager/storage/Utils.java @@ -10,13 +10,25 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.SerializationFeature; import org.asamk.signal.manager.storage.recipients.RecipientAddress; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.whispersystems.signalservice.api.util.UuidUtil; import java.io.InvalidObjectException; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; import java.util.Optional; +import java.util.Spliterator; +import java.util.Spliterators; +import java.util.function.Consumer; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; public class Utils { + private static final Logger logger = LoggerFactory.getLogger(Utils.class); + private Utils() { } @@ -49,4 +61,51 @@ public class Utils { return new RecipientAddress(Optional.empty(), Optional.of(identifier)); } } + + public static T executeQuerySingleRow( + PreparedStatement statement, ResultSetMapper mapper + ) throws SQLException { + final var resultSet = statement.executeQuery(); + if (!resultSet.next()) { + throw new RuntimeException("Expected a row in result set, but none found."); + } + return mapper.apply(resultSet); + } + + public static Optional executeQueryForOptional( + PreparedStatement statement, ResultSetMapper mapper + ) throws SQLException { + final var resultSet = statement.executeQuery(); + if (!resultSet.next()) { + return Optional.empty(); + } + return Optional.ofNullable(mapper.apply(resultSet)); + } + + public static Stream executeQueryForStream( + PreparedStatement statement, ResultSetMapper mapper + ) throws SQLException { + final var resultSet = statement.executeQuery(); + + return StreamSupport.stream(new Spliterators.AbstractSpliterator<>(Long.MAX_VALUE, Spliterator.ORDERED) { + @Override + public boolean tryAdvance(final Consumer consumer) { + try { + if (!resultSet.next()) { + return false; + } + consumer.accept(mapper.apply(resultSet)); + return true; + } catch (SQLException e) { + logger.warn("Failed to read from database result", e); + throw new RuntimeException(e); + } + } + }, false); + } + + public interface ResultSetMapper { + + T apply(ResultSet resultSet) throws SQLException; + } } diff --git a/lib/src/main/java/org/asamk/signal/manager/storage/profiles/ProfileStore.java b/lib/src/main/java/org/asamk/signal/manager/storage/profiles/ProfileStore.java index acdcbb18..9e36bf85 100644 --- a/lib/src/main/java/org/asamk/signal/manager/storage/profiles/ProfileStore.java +++ b/lib/src/main/java/org/asamk/signal/manager/storage/profiles/ProfileStore.java @@ -20,7 +20,6 @@ public interface ProfileStore { void storeProfileKey(RecipientId recipientId, ProfileKey profileKey); void storeExpiringProfileKeyCredential( - RecipientId recipientId, - ExpiringProfileKeyCredential expiringProfileKeyCredential + RecipientId recipientId, ExpiringProfileKeyCredential expiringProfileKeyCredential ); } diff --git a/lib/src/main/java/org/asamk/signal/manager/storage/recipients/LegacyRecipientStore2.java b/lib/src/main/java/org/asamk/signal/manager/storage/recipients/LegacyRecipientStore2.java new file mode 100644 index 00000000..29b30ffd --- /dev/null +++ b/lib/src/main/java/org/asamk/signal/manager/storage/recipients/LegacyRecipientStore2.java @@ -0,0 +1,130 @@ +package org.asamk.signal.manager.storage.recipients; + +import org.asamk.signal.manager.storage.Utils; +import org.signal.libsignal.zkgroup.InvalidInputException; +import org.signal.libsignal.zkgroup.profiles.ExpiringProfileKeyCredential; +import org.signal.libsignal.zkgroup.profiles.ProfileKey; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.whispersystems.signalservice.api.util.UuidUtil; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.nio.file.Files; +import java.util.Base64; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + +public class LegacyRecipientStore2 { + + private final static Logger logger = LoggerFactory.getLogger(LegacyRecipientStore2.class); + + public static void migrate(File file, RecipientStore recipientStore) { + final var objectMapper = Utils.createStorageObjectMapper(); + try (var inputStream = new FileInputStream(file)) { + final var storage = objectMapper.readValue(inputStream, Storage.class); + + final var recipients = storage.recipients.stream().map(r -> { + final var recipientId = new RecipientId(r.id, recipientStore); + final var address = new RecipientAddress(Optional.ofNullable(r.uuid).map(UuidUtil::parseOrThrow), + Optional.ofNullable(r.number)); + + Contact contact = null; + if (r.contact != null) { + contact = new Contact(r.contact.name, + null, + r.contact.color, + r.contact.messageExpirationTime, + r.contact.blocked, + r.contact.archived, + r.contact.profileSharingEnabled); + } + + ProfileKey profileKey = null; + if (r.profileKey != null) { + try { + profileKey = new ProfileKey(Base64.getDecoder().decode(r.profileKey)); + } catch (InvalidInputException ignored) { + } + } + + ExpiringProfileKeyCredential expiringProfileKeyCredential = null; + if (r.expiringProfileKeyCredential != null) { + try { + expiringProfileKeyCredential = new ExpiringProfileKeyCredential(Base64.getDecoder() + .decode(r.expiringProfileKeyCredential)); + } catch (Throwable ignored) { + } + } + + Profile profile = null; + if (r.profile != null) { + profile = new Profile(r.profile.lastUpdateTimestamp, + r.profile.givenName, + r.profile.familyName, + r.profile.about, + r.profile.aboutEmoji, + r.profile.avatarUrlPath, + r.profile.mobileCoinAddress == null + ? null + : Base64.getDecoder().decode(r.profile.mobileCoinAddress), + Profile.UnidentifiedAccessMode.valueOfOrUnknown(r.profile.unidentifiedAccessMode), + r.profile.capabilities.stream() + .map(Profile.Capability::valueOfOrNull) + .filter(Objects::nonNull) + .collect(Collectors.toSet())); + } + + return new Recipient(recipientId, address, contact, profileKey, expiringProfileKeyCredential, profile); + }).collect(Collectors.toMap(Recipient::getRecipientId, r -> r)); + + recipientStore.addLegacyRecipients(recipients); + Files.delete(file.toPath()); + } catch (FileNotFoundException e) { + // nothing to migrate + } catch (IOException e) { + logger.warn("Failed to load recipient store", e); + throw new RuntimeException(e); + } + } + + private record Storage(List recipients, long lastId) { + + private record Recipient( + long id, + String number, + String uuid, + String profileKey, + String expiringProfileKeyCredential, + Contact contact, + Profile profile + ) { + + private record Contact( + String name, + String color, + int messageExpirationTime, + boolean blocked, + boolean archived, + boolean profileSharingEnabled + ) {} + + private record Profile( + long lastUpdateTimestamp, + String givenName, + String familyName, + String about, + String aboutEmoji, + String avatarUrlPath, + String mobileCoinAddress, + String unidentifiedAccessMode, + Set capabilities + ) {} + } + } +} diff --git a/lib/src/main/java/org/asamk/signal/manager/storage/recipients/RecipientResolver.java b/lib/src/main/java/org/asamk/signal/manager/storage/recipients/RecipientResolver.java index 8744eeb9..f20e4bbb 100644 --- a/lib/src/main/java/org/asamk/signal/manager/storage/recipients/RecipientResolver.java +++ b/lib/src/main/java/org/asamk/signal/manager/storage/recipients/RecipientResolver.java @@ -1,17 +1,24 @@ package org.asamk.signal.manager.storage.recipients; +import org.asamk.signal.manager.storage.Utils; import org.whispersystems.signalservice.api.push.ServiceId; import org.whispersystems.signalservice.api.push.SignalServiceAddress; public interface RecipientResolver { - RecipientId resolveRecipient(String identifier); - RecipientId resolveRecipient(RecipientAddress address); - RecipientId resolveRecipient(SignalServiceAddress address); + RecipientId resolveRecipient(long recipientId); + + default RecipientId resolveRecipient(String identifier) { + return resolveRecipient(Utils.getRecipientAddressFromIdentifier(identifier)); + } - RecipientId resolveRecipient(ServiceId aci); + default RecipientId resolveRecipient(SignalServiceAddress address) { + return resolveRecipient(new RecipientAddress(address)); + } - RecipientId resolveRecipient(long recipientId); + default RecipientId resolveRecipient(ServiceId serviceId) { + return resolveRecipient(new RecipientAddress(serviceId.uuid())); + } } diff --git a/lib/src/main/java/org/asamk/signal/manager/storage/recipients/RecipientStore.java b/lib/src/main/java/org/asamk/signal/manager/storage/recipients/RecipientStore.java index 7244a96c..37ffa3b2 100644 --- a/lib/src/main/java/org/asamk/signal/manager/storage/recipients/RecipientStore.java +++ b/lib/src/main/java/org/asamk/signal/manager/storage/recipients/RecipientStore.java @@ -1,9 +1,8 @@ package org.asamk.signal.manager.storage.recipients; -import com.fasterxml.jackson.databind.ObjectMapper; - import org.asamk.signal.manager.api.Pair; import org.asamk.signal.manager.api.UnregisteredRecipientException; +import org.asamk.signal.manager.storage.Database; import org.asamk.signal.manager.storage.Utils; import org.asamk.signal.manager.storage.contacts.ContactsStore; import org.asamk.signal.manager.storage.profiles.ProfileStore; @@ -13,19 +12,14 @@ import org.signal.libsignal.zkgroup.profiles.ProfileKey; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.whispersystems.signalservice.api.push.ACI; -import org.whispersystems.signalservice.api.push.ServiceId; import org.whispersystems.signalservice.api.push.SignalServiceAddress; import org.whispersystems.signalservice.api.util.UuidUtil; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.File; -import java.io.FileInputStream; -import java.io.FileNotFoundException; -import java.io.FileOutputStream; -import java.io.IOException; +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; import java.util.ArrayList; -import java.util.Base64; +import java.util.Arrays; import java.util.Collection; import java.util.HashMap; import java.util.List; @@ -40,172 +34,126 @@ import java.util.stream.Collectors; public class RecipientStore implements RecipientResolver, RecipientTrustedResolver, ContactsStore, ProfileStore { private final static Logger logger = LoggerFactory.getLogger(RecipientStore.class); + private static final String TABLE_RECIPIENT = "recipient"; + private static final String SQL_IS_CONTACT = "r.given_name IS NOT NULL OR r.family_name IS NOT NULL OR r.expiration_time > 0 OR r.profile_sharing = TRUE OR r.color IS NOT NULL OR r.blocked = TRUE OR r.archived = TRUE"; - private final ObjectMapper objectMapper; - private final File file; private final RecipientMergeHandler recipientMergeHandler; private final SelfAddressProvider selfAddressProvider; + private final Database database; - private final Map recipients; + private final Object recipientsLock = new Object(); private final Map recipientsMerged = new HashMap<>(); - private long lastId; - private boolean isBulkUpdating; - - public static RecipientStore load( - File file, RecipientMergeHandler recipientMergeHandler, SelfAddressProvider selfAddressProvider - ) { - final var objectMapper = Utils.createStorageObjectMapper(); - try (var inputStream = new FileInputStream(file)) { - final var storage = objectMapper.readValue(inputStream, Storage.class); - - final var recipientStore = new RecipientStore(objectMapper, - file, - recipientMergeHandler, - selfAddressProvider, - new HashMap<>(), - storage.lastId); - final var recipients = storage.recipients.stream().map(r -> { - final var recipientId = new RecipientId(r.id, recipientStore); - final var address = new RecipientAddress(Optional.ofNullable(r.uuid).map(UuidUtil::parseOrThrow), - Optional.ofNullable(r.number)); - - Contact contact = null; - if (r.contact != null) { - contact = new Contact(r.contact.name, - r.contact.familyName, - r.contact.color, - r.contact.messageExpirationTime, - r.contact.blocked, - r.contact.archived, - r.contact.profileSharingEnabled); - } - - ProfileKey profileKey = null; - if (r.profileKey != null) { - try { - profileKey = new ProfileKey(Base64.getDecoder().decode(r.profileKey)); - } catch (InvalidInputException ignored) { - } - } - - ExpiringProfileKeyCredential expiringProfileKeyCredential = null; - if (r.expiringProfileKeyCredential != null) { - try { - expiringProfileKeyCredential = new ExpiringProfileKeyCredential(Base64.getDecoder() - .decode(r.expiringProfileKeyCredential)); - } catch (Throwable ignored) { - } - } - - Profile profile = null; - if (r.profile != null) { - profile = new Profile(r.profile.lastUpdateTimestamp, - r.profile.givenName, - r.profile.familyName, - r.profile.about, - r.profile.aboutEmoji, - r.profile.avatarUrlPath, - r.profile.mobileCoinAddress == null - ? null - : Base64.getDecoder().decode(r.profile.mobileCoinAddress), - Profile.UnidentifiedAccessMode.valueOfOrUnknown(r.profile.unidentifiedAccessMode), - r.profile.capabilities.stream() - .map(Profile.Capability::valueOfOrNull) - .filter(Objects::nonNull) - .collect(Collectors.toSet())); - } - - return new Recipient(recipientId, address, contact, profileKey, expiringProfileKeyCredential, profile); - }).collect(Collectors.toMap(Recipient::getRecipientId, r -> r)); - - recipientStore.addRecipients(recipients); - - return recipientStore; - } catch (FileNotFoundException e) { - logger.trace("Creating new recipient store."); - return new RecipientStore(objectMapper, - file, - recipientMergeHandler, - selfAddressProvider, - new HashMap<>(), - 0); - } catch (IOException e) { - logger.warn("Failed to load recipient store", e); - throw new RuntimeException(e); + public static void createSql(Connection connection) throws SQLException { + // When modifying the CREATE statement here, also add a migration in AccountDatabase.java + try (final var statement = connection.createStatement()) { + statement.executeUpdate(""" + CREATE TABLE recipient ( + _id INTEGER PRIMARY KEY AUTOINCREMENT, + number TEXT UNIQUE, + uuid BLOB UNIQUE, + profile_key BLOB, + profile_key_credential BLOB, + + given_name TEXT, + family_name TEXT, + color TEXT, + + expiration_time INTEGER NOT NULL DEFAULT 0, + blocked BOOLEAN NOT NULL DEFAULT FALSE, + archived BOOLEAN NOT NULL DEFAULT FALSE, + profile_sharing BOOLEAN NOT NULL DEFAULT FALSE, + + profile_last_update_timestamp INTEGER NOT NULL DEFAULT 0, + profile_given_name TEXT, + profile_family_name TEXT, + profile_about TEXT, + profile_about_emoji TEXT, + profile_avatar_url_path TEXT, + profile_mobile_coin_address BLOB, + profile_unidentified_access_mode TEXT, + profile_capabilities TEXT + ); + """); } } - private RecipientStore( - final ObjectMapper objectMapper, - final File file, + public RecipientStore( final RecipientMergeHandler recipientMergeHandler, final SelfAddressProvider selfAddressProvider, - final Map recipients, - final long lastId + final Database database ) { - this.objectMapper = objectMapper; - this.file = file; this.recipientMergeHandler = recipientMergeHandler; this.selfAddressProvider = selfAddressProvider; - this.recipients = recipients; - this.lastId = lastId; - } - - public void setBulkUpdating(final boolean bulkUpdating) { - isBulkUpdating = bulkUpdating; - if (!bulkUpdating) { - synchronized (recipients) { - saveLocked(); - } - } + this.database = database; } public RecipientAddress resolveRecipientAddress(RecipientId recipientId) { - synchronized (recipients) { - return getRecipient(recipientId).getAddress(); - } - } - - public Recipient getRecipient(RecipientId recipientId) { - synchronized (recipients) { - return recipients.get(recipientId); + final var sql = ( + """ + SELECT r.number, r.uuid + FROM %s r + WHERE r._id = ? + """ + ).formatted(TABLE_RECIPIENT); + try (final var connection = database.getConnection()) { + try (final var statement = connection.prepareStatement(sql)) { + statement.setLong(1, recipientId.id()); + return Utils.executeQuerySingleRow(statement, this::getRecipientAddressFromResultSet); + } + } catch (SQLException e) { + throw new RuntimeException("Failed read from recipient store", e); } } public Collection getRecipientIdsWithEnabledProfileSharing() { - synchronized (recipients) { - return recipients.values().stream().filter(r -> { - final var contact = r.getContact(); - return contact != null && !contact.isBlocked() && contact.isProfileSharingEnabled(); - }).map(Recipient::getRecipientId).toList(); + final var sql = ( + """ + SELECT r._id + FROM %s r + WHERE r.blocked = FALSE AND r.profile_sharing = TRUE + """ + ).formatted(TABLE_RECIPIENT); + try (final var connection = database.getConnection()) { + try (final var statement = connection.prepareStatement(sql)) { + try (var result = Utils.executeQueryForStream(statement, this::getRecipientIdFromResultSet)) { + return result.toList(); + } + } + } catch (SQLException e) { + throw new RuntimeException("Failed read from recipient store", e); } } @Override - public RecipientId resolveRecipient(ServiceId serviceId) { - return resolveRecipient(new RecipientAddress(serviceId.uuid()), false, false); - } - - @Override - public RecipientId resolveRecipient(final long recipientId) { - final var recipient = getRecipient(new RecipientId(recipientId, this)); - return recipient == null ? null : recipient.getRecipientId(); - } - - @Override - public RecipientId resolveRecipient(final String identifier) { - return resolveRecipient(Utils.getRecipientAddressFromIdentifier(identifier), false, false); + public RecipientId resolveRecipient(final long rawRecipientId) { + final var sql = ( + """ + SELECT r._id + FROM %s r + WHERE r._id = ? + """ + ).formatted(TABLE_RECIPIENT); + try (final var connection = database.getConnection()) { + try (final var statement = connection.prepareStatement(sql)) { + statement.setLong(1, rawRecipientId); + return Utils.executeQueryForOptional(statement, this::getRecipientIdFromResultSet).orElse(null); + } + } catch (SQLException e) { + throw new RuntimeException("Failed read from recipient store", e); + } } public RecipientId resolveRecipient( final String number, Supplier aciSupplier ) throws UnregisteredRecipientException { - final Optional byNumber; - synchronized (recipients) { - byNumber = findByNumberLocked(number); + final Optional byNumber; + try (final var connection = database.getConnection()) { + byNumber = findByNumber(connection, number); + } catch (SQLException e) { + throw new RuntimeException("Failed read from recipient store", e); } - if (byNumber.isEmpty() || byNumber.get().getAddress().uuid().isEmpty()) { + if (byNumber.isEmpty() || byNumber.get().address().uuid().isEmpty()) { final var aci = aciSupplier.get(); if (aci == null) { throw new UnregisteredRecipientException(new RecipientAddress(null, number)); @@ -213,18 +161,13 @@ public class RecipientStore implements RecipientResolver, RecipientTrustedResolv return resolveRecipient(new RecipientAddress(aci.uuid(), number), false, false); } - return byNumber.get().getRecipientId(); + return byNumber.get().id(); } public RecipientId resolveRecipient(RecipientAddress address) { return resolveRecipient(address, false, false); } - @Override - public RecipientId resolveRecipient(final SignalServiceAddress address) { - return resolveRecipient(new RecipientAddress(address), false, false); - } - @Override public RecipientId resolveSelfRecipientTrusted(RecipientAddress address) { return resolveRecipient(address, true, true); @@ -239,170 +182,329 @@ public class RecipientStore implements RecipientResolver, RecipientTrustedResolv return resolveRecipient(new RecipientAddress(address), true, false); } - public List resolveRecipientsTrusted(List addresses) { - final List recipientIds; - final List> toBeMerged = new ArrayList<>(); - synchronized (recipients) { - recipientIds = addresses.stream().map(address -> { - final var pair = resolveRecipientLocked(address, true, false); - if (pair.second().isPresent()) { - toBeMerged.add(new Pair<>(pair.first(), pair.second().get())); - } - return pair.first(); - }).toList(); - } - for (var pair : toBeMerged) { - recipientMergeHandler.mergeRecipients(pair.first(), pair.second()); - } - return recipientIds; - } - @Override public void storeContact(RecipientId recipientId, final Contact contact) { - synchronized (recipients) { - final var recipient = recipients.get(recipientId); - storeRecipientLocked(recipientId, Recipient.newBuilder(recipient).withContact(contact).build()); + try (final var connection = database.getConnection()) { + storeContact(connection, recipientId, contact); + } catch (SQLException e) { + throw new RuntimeException("Failed update recipient store", e); } } @Override public Contact getContact(RecipientId recipientId) { - final var recipient = getRecipient(recipientId); - return recipient == null ? null : recipient.getContact(); + try (final var connection = database.getConnection()) { + return getContact(connection, recipientId); + } catch (SQLException e) { + throw new RuntimeException("Failed read from recipient store", e); + } } @Override public List> getContacts() { - return recipients.entrySet() - .stream() - .filter(e -> e.getValue().getContact() != null) - .map(e -> new Pair<>(e.getKey(), e.getValue().getContact())) - .toList(); + final var sql = ( + """ + SELECT r._id, r.given_name, r.family_name, r.expiration_time, r.profile_sharing, r.color, r.blocked, r.archived + FROM %s r + WHERE (r.number IS NOT NULL OR r.uuid IS NOT NULL) AND %s + """ + ).formatted(TABLE_RECIPIENT, SQL_IS_CONTACT); + try (final var connection = database.getConnection()) { + try (final var statement = connection.prepareStatement(sql)) { + try (var result = Utils.executeQueryForStream(statement, + resultSet -> new Pair<>(getRecipientIdFromResultSet(resultSet), + getContactFromResultSet(resultSet)))) { + return result.toList(); + } + } + } catch (SQLException e) { + throw new RuntimeException("Failed read from recipient store", e); + } } public List getRecipients( boolean onlyContacts, Optional blocked, Set recipientIds, Optional name ) { - return recipients.values() - .stream() - .filter(r -> !onlyContacts || r.getContact() != null) - .filter(r -> blocked.isEmpty() || ( - blocked.get() == ( - r.getContact() != null && r.getContact().isBlocked() - ) - )) - .filter(r -> recipientIds.isEmpty() || (recipientIds.contains(r.getRecipientId()))) - .filter(r -> name.isEmpty() - || (r.getContact() != null && name.get().equals(r.getContact().getName())) - || (r.getProfile() != null && name.get().equals(r.getProfile().getDisplayName()))) - .toList(); + final var sqlWhere = new ArrayList(); + if (onlyContacts) { + sqlWhere.add("(" + SQL_IS_CONTACT + ")"); + } + if (blocked.isPresent()) { + sqlWhere.add("r.blocked = ?"); + } + if (!recipientIds.isEmpty()) { + final var recipientIdsCommaSeparated = recipientIds.stream() + .map(recipientId -> String.valueOf(recipientId.id())) + .collect(Collectors.joining(",")); + sqlWhere.add("r._id IN (" + recipientIdsCommaSeparated + ")"); + } + final var sql = ( + """ + SELECT r._id, + r.number, r.uuid, + r.profile_key, r.profile_key_credential, + r.given_name, r.family_name, r.expiration_time, r.profile_sharing, r.color, r.blocked, r.archived, + r.profile_last_update_timestamp, r.profile_given_name, r.profile_family_name, r.profile_about, r.profile_about_emoji, r.profile_avatar_url_path, r.profile_mobile_coin_address, r.profile_unidentified_access_mode, r.profile_capabilities + FROM %s r + WHERE (r.number IS NOT NULL OR r.uuid IS NOT NULL) AND %s + """ + ).formatted(TABLE_RECIPIENT, sqlWhere.size() == 0 ? "TRUE" : String.join(" AND ", sqlWhere)); + try (final var connection = database.getConnection()) { + try (final var statement = connection.prepareStatement(sql)) { + if (blocked.isPresent()) { + statement.setBoolean(1, blocked.get()); + } + try (var result = Utils.executeQueryForStream(statement, this::getRecipientFromResultSet)) { + return result.filter(r -> name.isEmpty() || ( + r.getContact() != null && name.get().equals(r.getContact().getName()) + ) || (r.getProfile() != null && name.get().equals(r.getProfile().getDisplayName()))).toList(); + } + } + } catch (SQLException e) { + throw new RuntimeException("Failed read from recipient store", e); + } } @Override public void deleteContact(RecipientId recipientId) { - synchronized (recipients) { - final var recipient = recipients.get(recipientId); - storeRecipientLocked(recipientId, Recipient.newBuilder(recipient).withContact(null).build()); - } + storeContact(recipientId, null); } public void deleteRecipientData(RecipientId recipientId) { - synchronized (recipients) { - logger.debug("Deleting recipient data for {}", recipientId); - final var recipient = recipients.get(recipientId); - recipient.getAddress() - .uuid() - .ifPresent(uuid -> storeRecipientLocked(recipientId, - Recipient.newBuilder() - .withRecipientId(recipientId) - .withAddress(new RecipientAddress(uuid)) - .build())); + logger.debug("Deleting recipient data for {}", recipientId); + try (final var connection = database.getConnection()) { + connection.setAutoCommit(false); + storeContact(connection, recipientId, null); + storeProfile(connection, recipientId, null); + storeProfileKey(connection, recipientId, null, false); + storeExpiringProfileKeyCredential(connection, recipientId, null); + deleteRecipient(connection, recipientId); + connection.commit(); + } catch (SQLException e) { + throw new RuntimeException("Failed update recipient store", e); } } @Override public Profile getProfile(final RecipientId recipientId) { - final var recipient = getRecipient(recipientId); - return recipient == null ? null : recipient.getProfile(); + try (final var connection = database.getConnection()) { + return getProfile(connection, recipientId); + } catch (SQLException e) { + throw new RuntimeException("Failed read from recipient store", e); + } } @Override public ProfileKey getProfileKey(final RecipientId recipientId) { - final var recipient = getRecipient(recipientId); - return recipient == null ? null : recipient.getProfileKey(); + try (final var connection = database.getConnection()) { + return getProfileKey(connection, recipientId); + } catch (SQLException e) { + throw new RuntimeException("Failed read from recipient store", e); + } } @Override public ExpiringProfileKeyCredential getExpiringProfileKeyCredential(final RecipientId recipientId) { - final var recipient = getRecipient(recipientId); - return recipient == null ? null : recipient.getExpiringProfileKeyCredential(); + try (final var connection = database.getConnection()) { + return getExpiringProfileKeyCredential(connection, recipientId); + } catch (SQLException e) { + throw new RuntimeException("Failed read from recipient store", e); + } } @Override public void storeProfile(RecipientId recipientId, final Profile profile) { - synchronized (recipients) { - final var recipient = recipients.get(recipientId); - storeRecipientLocked(recipientId, Recipient.newBuilder(recipient).withProfile(profile).build()); + try (final var connection = database.getConnection()) { + storeProfile(connection, recipientId, profile); + } catch (SQLException e) { + throw new RuntimeException("Failed update recipient store", e); } } @Override public void storeSelfProfileKey(final RecipientId recipientId, final ProfileKey profileKey) { - storeProfileKey(recipientId, profileKey, false); + try (final var connection = database.getConnection()) { + storeProfileKey(connection, recipientId, profileKey, false); + } catch (SQLException e) { + throw new RuntimeException("Failed update recipient store", e); + } } @Override public void storeProfileKey(RecipientId recipientId, final ProfileKey profileKey) { - storeProfileKey(recipientId, profileKey, true); - } - - private void storeProfileKey(RecipientId recipientId, final ProfileKey profileKey, boolean resetProfile) { - synchronized (recipients) { - final var recipient = recipients.get(recipientId); - if (profileKey != null && profileKey.equals(recipient.getProfileKey()) && ( - recipient.getProfile() == null || ( - recipient.getProfile().getUnidentifiedAccessMode() != Profile.UnidentifiedAccessMode.UNKNOWN - && recipient.getProfile().getUnidentifiedAccessMode() - != Profile.UnidentifiedAccessMode.DISABLED - ) - )) { - return; - } - - final var builder = Recipient.newBuilder(recipient) - .withProfileKey(profileKey) - .withExpiringProfileKeyCredential(null); - if (resetProfile) { - builder.withProfile(recipient.getProfile() == null - ? null - : Profile.newBuilder(recipient.getProfile()).withLastUpdateTimestamp(0).build()); - } - final var newRecipient = builder.build(); - storeRecipientLocked(recipientId, newRecipient); + try (final var connection = database.getConnection()) { + storeProfileKey(connection, recipientId, profileKey, true); + } catch (SQLException e) { + throw new RuntimeException("Failed update recipient store", e); } } @Override public void storeExpiringProfileKeyCredential( - RecipientId recipientId, final ExpiringProfileKeyCredential expiringProfileKeyCredential + RecipientId recipientId, final ExpiringProfileKeyCredential profileKeyCredential ) { - synchronized (recipients) { - final var recipient = recipients.get(recipientId); - storeRecipientLocked(recipientId, - Recipient.newBuilder(recipient) - .withExpiringProfileKeyCredential(expiringProfileKeyCredential) - .build()); + try (final var connection = database.getConnection()) { + storeExpiringProfileKeyCredential(connection, recipientId, profileKeyCredential); + } catch (SQLException e) { + throw new RuntimeException("Failed update recipient store", e); + } + } + + void addLegacyRecipients(final Map recipients) { + logger.debug("Migrating legacy recipients to database"); + long start = System.nanoTime(); + final var sql = ( + """ + INSERT INTO %s (_id, number, uuid) + VALUES (?, ?, ?) + """ + ).formatted(TABLE_RECIPIENT); + try (final var connection = database.getConnection()) { + connection.setAutoCommit(false); + try (final var statement = connection.prepareStatement("DELETE FROM %s".formatted(TABLE_RECIPIENT))) { + statement.executeUpdate(); + } + try (final var statement = connection.prepareStatement(sql)) { + for (final var recipient : recipients.values()) { + statement.setLong(1, recipient.getRecipientId().id()); + statement.setString(2, recipient.getAddress().number().orElse(null)); + statement.setBytes(3, recipient.getAddress().uuid().map(UuidUtil::toByteArray).orElse(null)); + statement.executeUpdate(); + } + } + logger.debug("Initial inserts took {}ms", (System.nanoTime() - start) / 1000000); + + for (final var recipient : recipients.values()) { + if (recipient.getContact() != null) { + storeContact(connection, recipient.getRecipientId(), recipient.getContact()); + } + if (recipient.getProfile() != null) { + storeProfile(connection, recipient.getRecipientId(), recipient.getProfile()); + } + if (recipient.getProfileKey() != null) { + storeProfileKey(connection, recipient.getRecipientId(), recipient.getProfileKey(), false); + } + if (recipient.getExpiringProfileKeyCredential() != null) { + storeExpiringProfileKeyCredential(connection, + recipient.getRecipientId(), + recipient.getExpiringProfileKeyCredential()); + } + } + connection.commit(); + } catch (SQLException e) { + throw new RuntimeException("Failed update recipient store", e); + } + logger.debug("Complete recipients migration took {}ms", (System.nanoTime() - start) / 1000000); + } + + long getActualRecipientId(long recipientId) { + while (recipientsMerged.containsKey(recipientId)) { + final var newRecipientId = recipientsMerged.get(recipientId); + logger.debug("Using {} instead of {}, because recipients have been merged", newRecipientId, recipientId); + recipientId = newRecipientId; + } + return recipientId; + } + + private void storeContact( + final Connection connection, final RecipientId recipientId, final Contact contact + ) throws SQLException { + final var sql = ( + """ + UPDATE %s + SET given_name = ?, family_name = ?, expiration_time = ?, profile_sharing = ?, color = ?, blocked = ?, archived = ? + WHERE _id = ? + """ + ).formatted(TABLE_RECIPIENT); + try (final var statement = connection.prepareStatement(sql)) { + statement.setString(1, contact == null ? null : contact.getGivenName()); + statement.setString(2, contact == null ? null : contact.getFamilyName()); + statement.setInt(3, contact == null ? 0 : contact.getMessageExpirationTime()); + statement.setBoolean(4, contact != null && contact.isProfileSharingEnabled()); + statement.setString(5, contact == null ? null : contact.getColor()); + statement.setBoolean(6, contact != null && contact.isBlocked()); + statement.setBoolean(7, contact != null && contact.isArchived()); + statement.setLong(8, recipientId.id()); + statement.executeUpdate(); + } + } + + private void storeExpiringProfileKeyCredential( + final Connection connection, + final RecipientId recipientId, + final ExpiringProfileKeyCredential profileKeyCredential + ) throws SQLException { + final var sql = ( + """ + UPDATE %s + SET profile_key_credential = ? + WHERE _id = ? + """ + ).formatted(TABLE_RECIPIENT); + try (final var statement = connection.prepareStatement(sql)) { + statement.setBytes(1, profileKeyCredential == null ? null : profileKeyCredential.serialize()); + statement.setLong(2, recipientId.id()); + statement.executeUpdate(); } } - public boolean isEmpty() { - synchronized (recipients) { - return recipients.isEmpty(); + private void storeProfile( + final Connection connection, final RecipientId recipientId, final Profile profile + ) throws SQLException { + final var sql = ( + """ + UPDATE %s + SET profile_last_update_timestamp = ?, profile_given_name = ?, profile_family_name = ?, profile_about = ?, profile_about_emoji = ?, profile_avatar_url_path = ?, profile_mobile_coin_address = ?, profile_unidentified_access_mode = ?, profile_capabilities = ? + WHERE _id = ? + """ + ).formatted(TABLE_RECIPIENT); + try (final var statement = connection.prepareStatement(sql)) { + statement.setLong(1, profile == null ? 0 : profile.getLastUpdateTimestamp()); + statement.setString(2, profile == null ? null : profile.getGivenName()); + statement.setString(3, profile == null ? null : profile.getFamilyName()); + statement.setString(4, profile == null ? null : profile.getAbout()); + statement.setString(5, profile == null ? null : profile.getAboutEmoji()); + statement.setString(6, profile == null ? null : profile.getAvatarUrlPath()); + statement.setBytes(7, profile == null ? null : profile.getMobileCoinAddress()); + statement.setString(8, profile == null ? null : profile.getUnidentifiedAccessMode().name()); + statement.setString(9, + profile == null + ? null + : profile.getCapabilities().stream().map(Enum::name).collect(Collectors.joining(","))); + statement.setLong(10, recipientId.id()); + statement.executeUpdate(); } } - private void addRecipients(final Map recipients) { - this.recipients.putAll(recipients); + private void storeProfileKey( + Connection connection, RecipientId recipientId, final ProfileKey profileKey, boolean resetProfile + ) throws SQLException { + if (profileKey != null) { + final var recipientProfileKey = getProfileKey(recipientId); + if (profileKey.equals(recipientProfileKey)) { + final var recipientProfile = getProfile(recipientId); + if (recipientProfile == null || ( + recipientProfile.getUnidentifiedAccessMode() != Profile.UnidentifiedAccessMode.UNKNOWN + && recipientProfile.getUnidentifiedAccessMode() + != Profile.UnidentifiedAccessMode.DISABLED + )) { + return; + } + } + } + + final var sql = ( + """ + UPDATE %s + SET profile_key = ?, profile_key_credential = NULL%s + WHERE _id = ? + """ + ).formatted(TABLE_RECIPIENT, resetProfile ? ", profile_last_update_timestamp = 0" : ""); + try (final var statement = connection.prepareStatement(sql)) { + statement.setBytes(1, profileKey == null ? null : profileKey.serialize()); + statement.setLong(2, recipientId.id()); + statement.executeUpdate(); + } } /** @@ -411,259 +513,373 @@ public class RecipientStore implements RecipientResolver, RecipientTrustedResolv */ private RecipientId resolveRecipient(RecipientAddress address, boolean isHighTrust, boolean isSelf) { final Pair> pair; - synchronized (recipients) { - pair = resolveRecipientLocked(address, isHighTrust, isSelf); + synchronized (recipientsLock) { + try (final var connection = database.getConnection()) { + connection.setAutoCommit(false); + pair = resolveRecipientLocked(connection, address, isHighTrust, isSelf); + connection.commit(); + } catch (SQLException e) { + throw new RuntimeException("Failed update recipient store", e); + } } if (pair.second().isPresent()) { recipientMergeHandler.mergeRecipients(pair.first(), pair.second().get()); + try (final var connection = database.getConnection()) { + deleteRecipient(connection, pair.second().get()); + } catch (SQLException e) { + throw new RuntimeException("Failed update recipient store", e); + } } return pair.first(); } private Pair> resolveRecipientLocked( - RecipientAddress address, boolean isHighTrust, boolean isSelf - ) { + Connection connection, RecipientAddress address, boolean isHighTrust, boolean isSelf + ) throws SQLException { if (isHighTrust && !isSelf) { if (selfAddressProvider.getSelfAddress().matches(address)) { isHighTrust = false; } } final var byNumber = address.number().isEmpty() - ? Optional.empty() - : findByNumberLocked(address.number().get()); + ? Optional.empty() + : findByNumber(connection, address.number().get()); final var byUuid = address.uuid().isEmpty() - ? Optional.empty() - : findByUuidLocked(address.uuid().get()); + ? Optional.empty() + : findByUuid(connection, address.uuid().get()); if (byNumber.isEmpty() && byUuid.isEmpty()) { logger.debug("Got new recipient, both uuid and number are unknown"); if (isHighTrust || address.uuid().isEmpty() || address.number().isEmpty()) { - return new Pair<>(addNewRecipientLocked(address), Optional.empty()); + return new Pair<>(addNewRecipient(connection, address), Optional.empty()); } - return new Pair<>(addNewRecipientLocked(new RecipientAddress(address.uuid().get())), Optional.empty()); + return new Pair<>(addNewRecipient(connection, new RecipientAddress(address.uuid().get())), + Optional.empty()); } if (!isHighTrust || address.uuid().isEmpty() || address.number().isEmpty() || byNumber.equals(byUuid)) { - return new Pair<>(byUuid.or(() -> byNumber).map(Recipient::getRecipientId).get(), Optional.empty()); + return new Pair<>(byUuid.or(() -> byNumber).map(RecipientWithAddress::id).get(), Optional.empty()); } if (byNumber.isEmpty()) { - logger.debug("Got recipient {} existing with uuid, updating with high trust number", - byUuid.get().getRecipientId()); - updateRecipientAddressLocked(byUuid.get().getRecipientId(), address); - return new Pair<>(byUuid.get().getRecipientId(), Optional.empty()); + logger.debug("Got recipient {} existing with uuid, updating with high trust number", byUuid.get().id()); + updateRecipientAddress(connection, byUuid.get().id(), address); + return new Pair<>(byUuid.get().id(), Optional.empty()); } final var byNumberRecipient = byNumber.get(); if (byUuid.isEmpty()) { - if (byNumberRecipient.getAddress().uuid().isPresent()) { + if (byNumberRecipient.address().uuid().isPresent()) { logger.debug( "Got recipient {} existing with number, but different uuid, so stripping its number and adding new recipient", - byNumberRecipient.getRecipientId()); + byNumberRecipient.id()); - updateRecipientAddressLocked(byNumberRecipient.getRecipientId(), - new RecipientAddress(byNumberRecipient.getAddress().uuid().get())); - return new Pair<>(addNewRecipientLocked(address), Optional.empty()); + updateRecipientAddress(connection, + byNumberRecipient.id(), + new RecipientAddress(byNumberRecipient.address().uuid().get())); + return new Pair<>(addNewRecipient(connection, address), Optional.empty()); } logger.debug("Got recipient {} existing with number and no uuid, updating with high trust uuid", - byNumberRecipient.getRecipientId()); - updateRecipientAddressLocked(byNumberRecipient.getRecipientId(), address); - return new Pair<>(byNumberRecipient.getRecipientId(), Optional.empty()); + byNumberRecipient.id()); + updateRecipientAddress(connection, byNumberRecipient.id(), address); + return new Pair<>(byNumberRecipient.id(), Optional.empty()); } final var byUuidRecipient = byUuid.get(); - if (byNumberRecipient.getAddress().uuid().isPresent()) { + if (byNumberRecipient.address().uuid().isPresent()) { logger.debug( "Got separate recipients for high trust number {} and uuid {}, recipient for number has different uuid, so stripping its number", - byNumberRecipient.getRecipientId(), - byUuidRecipient.getRecipientId()); - - updateRecipientAddressLocked(byNumberRecipient.getRecipientId(), - new RecipientAddress(byNumberRecipient.getAddress().uuid().get())); - updateRecipientAddressLocked(byUuidRecipient.getRecipientId(), address); - return new Pair<>(byUuidRecipient.getRecipientId(), Optional.empty()); + byNumberRecipient.id(), + byUuidRecipient.id()); + + updateRecipientAddress(connection, + byNumberRecipient.id(), + new RecipientAddress(byNumberRecipient.address().uuid().get())); + updateRecipientAddress(connection, byUuidRecipient.id(), address); + return new Pair<>(byUuidRecipient.id(), Optional.empty()); } logger.debug("Got separate recipients for high trust number {} and uuid {}, need to merge them", - byNumberRecipient.getRecipientId(), - byUuidRecipient.getRecipientId()); - updateRecipientAddressLocked(byUuidRecipient.getRecipientId(), address); + byNumberRecipient.id(), + byUuidRecipient.id()); // Create a fixed RecipientId that won't update its id after merge - final var toBeMergedRecipientId = new RecipientId(byNumberRecipient.getRecipientId().id(), null); - mergeRecipientsLocked(byUuidRecipient.getRecipientId(), toBeMergedRecipientId); - return new Pair<>(byUuidRecipient.getRecipientId(), Optional.of(toBeMergedRecipientId)); + final var toBeMergedRecipientId = new RecipientId(byNumberRecipient.id().id(), null); + mergeRecipientsLocked(connection, byUuidRecipient.id(), toBeMergedRecipientId); + removeRecipientAddress(connection, toBeMergedRecipientId); + updateRecipientAddress(connection, byUuidRecipient.id(), address); + return new Pair<>(byUuidRecipient.id(), Optional.of(toBeMergedRecipientId)); + } + + private RecipientId addNewRecipient( + final Connection connection, final RecipientAddress address + ) throws SQLException { + final var sql = ( + """ + INSERT INTO %s (number, uuid) + VALUES (?, ?) + """ + ).formatted(TABLE_RECIPIENT); + try (final var statement = connection.prepareStatement(sql)) { + statement.setString(1, address.number().orElse(null)); + statement.setBytes(2, address.uuid().map(UuidUtil::toByteArray).orElse(null)); + statement.executeUpdate(); + final var generatedKeys = statement.getGeneratedKeys(); + if (generatedKeys.next()) { + final var recipientId = new RecipientId(generatedKeys.getLong(1), this); + logger.debug("Added new recipient {} with address {}", recipientId, address); + return recipientId; + } else { + throw new RuntimeException("Failed to add new recipient to database"); + } + } } - private RecipientId addNewRecipientLocked(final RecipientAddress address) { - final var nextRecipientId = nextIdLocked(); - logger.debug("Adding new recipient {} with address {}", nextRecipientId, address); - storeRecipientLocked(nextRecipientId, new Recipient(nextRecipientId, address, null, null, null, null)); - return nextRecipientId; + private void removeRecipientAddress(Connection connection, RecipientId recipientId) throws SQLException { + final var sql = ( + """ + UPDATE %s + SET number = NULL, uuid = NULL + WHERE _id = ? + """ + ).formatted(TABLE_RECIPIENT); + try (final var statement = connection.prepareStatement(sql)) { + statement.setLong(1, recipientId.id()); + statement.executeUpdate(); + } } - private void updateRecipientAddressLocked(RecipientId recipientId, final RecipientAddress address) { - final var recipient = recipients.get(recipientId); - storeRecipientLocked(recipientId, Recipient.newBuilder(recipient).withAddress(address).build()); + private void updateRecipientAddress( + Connection connection, RecipientId recipientId, final RecipientAddress address + ) throws SQLException { + final var sql = ( + """ + UPDATE %s + SET number = ?, uuid = ? + WHERE _id = ? + """ + ).formatted(TABLE_RECIPIENT); + try (final var statement = connection.prepareStatement(sql)) { + statement.setString(1, address.number().orElse(null)); + statement.setBytes(2, address.uuid().map(UuidUtil::toByteArray).orElse(null)); + statement.setLong(3, recipientId.id()); + statement.executeUpdate(); + } } - long getActualRecipientId(long recipientId) { - while (recipientsMerged.containsKey(recipientId)) { - final var newRecipientId = recipientsMerged.get(recipientId); - logger.debug("Using {} instead of {}, because recipients have been merged", newRecipientId, recipientId); - recipientId = newRecipientId; + private void deleteRecipient(final Connection connection, final RecipientId recipientId) throws SQLException { + final var sql = ( + """ + DELETE FROM %s + WHERE _id = ? + """ + ).formatted(TABLE_RECIPIENT); + try (final var statement = connection.prepareStatement(sql)) { + statement.setLong(1, recipientId.id()); + statement.executeUpdate(); } - return recipientId; } - private void storeRecipientLocked(final RecipientId recipientId, final Recipient recipient) { - final var existingRecipient = recipients.get(recipientId); - if (existingRecipient == null || !existingRecipient.equals(recipient)) { - recipients.put(recipientId, recipient); - saveLocked(); - } - } - - private void mergeRecipientsLocked(RecipientId recipientId, RecipientId toBeMergedRecipientId) { - final var recipient = recipients.get(recipientId); - final var toBeMergedRecipient = recipients.get(toBeMergedRecipientId); - recipients.put(recipientId, - new Recipient(recipientId, - recipient.getAddress(), - recipient.getContact() != null ? recipient.getContact() : toBeMergedRecipient.getContact(), - recipient.getProfileKey() != null - ? recipient.getProfileKey() - : toBeMergedRecipient.getProfileKey(), - recipient.getExpiringProfileKeyCredential() != null - ? recipient.getExpiringProfileKeyCredential() - : toBeMergedRecipient.getExpiringProfileKeyCredential(), - recipient.getProfile() != null ? recipient.getProfile() : toBeMergedRecipient.getProfile())); - recipients.remove(toBeMergedRecipientId); + private void mergeRecipientsLocked( + Connection connection, RecipientId recipientId, RecipientId toBeMergedRecipientId + ) throws SQLException { + final var contact = getContact(connection, recipientId); + if (contact == null) { + final var toBeMergedContact = getContact(connection, toBeMergedRecipientId); + storeContact(connection, recipientId, toBeMergedContact); + } + + final var profileKey = getProfileKey(connection, recipientId); + if (profileKey == null) { + final var toBeMergedProfileKey = getProfileKey(connection, toBeMergedRecipientId); + storeProfileKey(connection, recipientId, toBeMergedProfileKey, false); + } + + final var profileKeyCredential = getExpiringProfileKeyCredential(connection, recipientId); + if (profileKeyCredential == null) { + final var toBeMergedProfileKeyCredential = getExpiringProfileKeyCredential(connection, toBeMergedRecipientId); + storeExpiringProfileKeyCredential(connection, recipientId, toBeMergedProfileKeyCredential); + } + + final var profile = getProfile(connection, recipientId); + if (profile == null) { + final var toBeMergedProfile = getProfile(connection, toBeMergedRecipientId); + storeProfile(connection, recipientId, toBeMergedProfile); + } + recipientsMerged.put(toBeMergedRecipientId.id(), recipientId.id()); - saveLocked(); - } - - private Optional findByNumberLocked(final String number) { - return recipients.entrySet() - .stream() - .filter(entry -> entry.getValue().getAddress().number().isPresent() && number.equals(entry.getValue() - .getAddress() - .number() - .get())) - .findFirst() - .map(Map.Entry::getValue); - } - - private Optional findByUuidLocked(final UUID uuid) { - return recipients.entrySet() - .stream() - .filter(entry -> entry.getValue().getAddress().uuid().isPresent() && uuid.equals(entry.getValue() - .getAddress() - .uuid() - .get())) - .findFirst() - .map(Map.Entry::getValue); - } - - private RecipientId nextIdLocked() { - return new RecipientId(++this.lastId, this); - } - - private void saveLocked() { - if (isBulkUpdating) { - return; - } - final var base64 = Base64.getEncoder(); - var storage = new Storage(recipients.entrySet().stream().map(pair -> { - final var recipient = pair.getValue(); - final var recipientContact = recipient.getContact(); - final var contact = recipientContact == null - ? null - : new Storage.Recipient.Contact(recipientContact.getGivenName(), - recipientContact.getFamilyName(), - recipientContact.getColor(), - recipientContact.getMessageExpirationTime(), - recipientContact.isBlocked(), - recipientContact.isArchived(), - recipientContact.isProfileSharingEnabled()); - final var recipientProfile = recipient.getProfile(); - final var profile = recipientProfile == null - ? null - : new Storage.Recipient.Profile(recipientProfile.getLastUpdateTimestamp(), - recipientProfile.getGivenName(), - recipientProfile.getFamilyName(), - recipientProfile.getAbout(), - recipientProfile.getAboutEmoji(), - recipientProfile.getAvatarUrlPath(), - recipientProfile.getMobileCoinAddress() == null - ? null - : base64.encodeToString(recipientProfile.getMobileCoinAddress()), - recipientProfile.getUnidentifiedAccessMode().name(), - recipientProfile.getCapabilities().stream().map(Enum::name).collect(Collectors.toSet())); - return new Storage.Recipient(pair.getKey().id(), - recipient.getAddress().number().orElse(null), - recipient.getAddress().uuid().map(UUID::toString).orElse(null), - recipient.getProfileKey() == null - ? null - : base64.encodeToString(recipient.getProfileKey().serialize()), - recipient.getExpiringProfileKeyCredential() == null - ? null - : base64.encodeToString(recipient.getExpiringProfileKeyCredential().serialize()), - contact, - profile); - }).toList(), lastId); - - // Write to memory first to prevent corrupting the file in case of serialization errors - try (var inMemoryOutput = new ByteArrayOutputStream()) { - objectMapper.writeValue(inMemoryOutput, storage); - - var input = new ByteArrayInputStream(inMemoryOutput.toByteArray()); - try (var outputStream = new FileOutputStream(file)) { - input.transferTo(outputStream); - } - } catch (Exception e) { - logger.error("Error saving recipient store file: {}", e.getMessage()); - } - } - - private record Storage(List recipients, long lastId) { - - private record Recipient( - long id, - String number, - String uuid, - String profileKey, - String expiringProfileKeyCredential, - Storage.Recipient.Contact contact, - Storage.Recipient.Profile profile - ) { - - private record Contact( - String name, - String familyName, - String color, - int messageExpirationTime, - boolean blocked, - boolean archived, - boolean profileSharingEnabled - ) {} - - private record Profile( - long lastUpdateTimestamp, - String givenName, - String familyName, - String about, - String aboutEmoji, - String avatarUrlPath, - String mobileCoinAddress, - String unidentifiedAccessMode, - Set capabilities - ) {} + } + + private Optional findByNumber( + final Connection connection, final String number + ) throws SQLException { + final var sql = """ + SELECT r._id, r.number, r.uuid + FROM %s r + WHERE r.number = ? + """.formatted(TABLE_RECIPIENT); + try (final var statement = connection.prepareStatement(sql)) { + statement.setString(1, number); + return Utils.executeQueryForOptional(statement, this::getRecipientWithAddressFromResultSet); + } + } + + private Optional findByUuid( + final Connection connection, final UUID uuid + ) throws SQLException { + final var sql = """ + SELECT r._id, r.number, r.uuid + FROM %s r + WHERE r.uuid = ? + """.formatted(TABLE_RECIPIENT); + try (final var statement = connection.prepareStatement(sql)) { + statement.setBytes(1, UuidUtil.toByteArray(uuid)); + return Utils.executeQueryForOptional(statement, this::getRecipientWithAddressFromResultSet); + } + } + + private Contact getContact(final Connection connection, final RecipientId recipientId) throws SQLException { + final var sql = ( + """ + SELECT r.given_name, r.family_name, r.expiration_time, r.profile_sharing, r.color, r.blocked, r.archived + FROM %s r + WHERE r._id = ? AND (%s) + """ + ).formatted(TABLE_RECIPIENT, SQL_IS_CONTACT); + try (final var statement = connection.prepareStatement(sql)) { + statement.setLong(1, recipientId.id()); + return Utils.executeQueryForOptional(statement, this::getContactFromResultSet).orElse(null); + } + } + + private ProfileKey getProfileKey(final Connection connection, final RecipientId recipientId) throws SQLException { + final var sql = ( + """ + SELECT r.profile_key + FROM %s r + WHERE r._id = ? + """ + ).formatted(TABLE_RECIPIENT); + try (final var statement = connection.prepareStatement(sql)) { + statement.setLong(1, recipientId.id()); + return Utils.executeQueryForOptional(statement, this::getProfileKeyFromResultSet).orElse(null); + } + } + + private ExpiringProfileKeyCredential getExpiringProfileKeyCredential( + final Connection connection, final RecipientId recipientId + ) throws SQLException { + final var sql = ( + """ + SELECT r.profile_key_credential + FROM %s r + WHERE r._id = ? + """ + ).formatted(TABLE_RECIPIENT); + try (final var statement = connection.prepareStatement(sql)) { + statement.setLong(1, recipientId.id()); + return Utils.executeQueryForOptional(statement, this::getExpiringProfileKeyCredentialFromResultSet) + .orElse(null); + } + } + + private Profile getProfile(final Connection connection, final RecipientId recipientId) throws SQLException { + final var sql = ( + """ + SELECT r.profile_last_update_timestamp, r.profile_given_name, r.profile_family_name, r.profile_about, r.profile_about_emoji, r.profile_avatar_url_path, r.profile_mobile_coin_address, r.profile_unidentified_access_mode, r.profile_capabilities + FROM %s r + WHERE r._id = ? AND r.profile_capabilities IS NOT NULL + """ + ).formatted(TABLE_RECIPIENT); + try (final var statement = connection.prepareStatement(sql)) { + statement.setLong(1, recipientId.id()); + return Utils.executeQueryForOptional(statement, this::getProfileFromResultSet).orElse(null); + } + } + + private RecipientAddress getRecipientAddressFromResultSet(ResultSet resultSet) throws SQLException { + final var uuid = Optional.ofNullable(resultSet.getBytes("uuid")).map(UuidUtil::parseOrNull); + final var number = Optional.ofNullable(resultSet.getString("number")); + return new RecipientAddress(uuid, number); + } + + private RecipientId getRecipientIdFromResultSet(ResultSet resultSet) throws SQLException { + return new RecipientId(resultSet.getLong("_id"), this); + } + + private RecipientWithAddress getRecipientWithAddressFromResultSet(final ResultSet resultSet) throws SQLException { + return new RecipientWithAddress(getRecipientIdFromResultSet(resultSet), + getRecipientAddressFromResultSet(resultSet)); + } + + private Recipient getRecipientFromResultSet(final ResultSet resultSet) throws SQLException { + return new Recipient(getRecipientIdFromResultSet(resultSet), + getRecipientAddressFromResultSet(resultSet), + getContactFromResultSet(resultSet), + getProfileKeyFromResultSet(resultSet), + getExpiringProfileKeyCredentialFromResultSet(resultSet), + getProfileFromResultSet(resultSet)); + } + + private Contact getContactFromResultSet(ResultSet resultSet) throws SQLException { + return new Contact(resultSet.getString("given_name"), + resultSet.getString("family_name"), + resultSet.getString("color"), + resultSet.getInt("expiration_time"), + resultSet.getBoolean("blocked"), + resultSet.getBoolean("archived"), + resultSet.getBoolean("profile_sharing")); + } + + private Profile getProfileFromResultSet(ResultSet resultSet) throws SQLException { + final var profileCapabilities = resultSet.getString("profile_capabilities"); + final var profileUnidentifiedAccessMode = resultSet.getString("profile_unidentified_access_mode"); + return new Profile(resultSet.getLong("profile_last_update_timestamp"), + resultSet.getString("profile_given_name"), + resultSet.getString("profile_family_name"), + resultSet.getString("profile_about"), + resultSet.getString("profile_about_emoji"), + resultSet.getString("profile_avatar_url_path"), + resultSet.getBytes("profile_mobile_coin_address"), + profileUnidentifiedAccessMode == null + ? Profile.UnidentifiedAccessMode.UNKNOWN + : Profile.UnidentifiedAccessMode.valueOfOrUnknown(profileUnidentifiedAccessMode), + profileCapabilities == null + ? Set.of() + : Arrays.stream(profileCapabilities.split(",")) + .map(Profile.Capability::valueOfOrNull) + .filter(Objects::nonNull) + .collect(Collectors.toSet())); + } + + private ProfileKey getProfileKeyFromResultSet(ResultSet resultSet) throws SQLException { + final var profileKey = resultSet.getBytes("profile_key"); + + if (profileKey == null) { + return null; + } + try { + return new ProfileKey(profileKey); + } catch (InvalidInputException ignored) { + return null; + } + } + + private ExpiringProfileKeyCredential getExpiringProfileKeyCredentialFromResultSet(ResultSet resultSet) throws SQLException { + final var profileKeyCredential = resultSet.getBytes("profile_key_credential"); + + if (profileKeyCredential == null) { + return null; + } + try { + return new ExpiringProfileKeyCredential(profileKeyCredential); + } catch (Throwable ignored) { + return null; } } @@ -671,4 +887,6 @@ public class RecipientStore implements RecipientResolver, RecipientTrustedResolv void mergeRecipients(RecipientId recipientId, RecipientId toBeMergedRecipientId); } + + private record RecipientWithAddress(RecipientId id, RecipientAddress address) {} } diff --git a/lib/src/main/java/org/asamk/signal/manager/storage/sendLog/MessageSendLogStore.java b/lib/src/main/java/org/asamk/signal/manager/storage/sendLog/MessageSendLogStore.java index b53c5ac0..5dde88b2 100644 --- a/lib/src/main/java/org/asamk/signal/manager/storage/sendLog/MessageSendLogStore.java +++ b/lib/src/main/java/org/asamk/signal/manager/storage/sendLog/MessageSendLogStore.java @@ -3,6 +3,7 @@ package org.asamk.signal.manager.storage.sendLog; import org.asamk.signal.manager.groups.GroupId; import org.asamk.signal.manager.groups.GroupUtils; import org.asamk.signal.manager.storage.Database; +import org.asamk.signal.manager.storage.Utils; import org.asamk.signal.manager.storage.recipients.RecipientId; import org.asamk.signal.manager.storage.recipients.RecipientResolver; import org.signal.libsignal.zkgroup.InvalidInputException; @@ -15,18 +16,11 @@ import org.whispersystems.signalservice.internal.push.SignalServiceProtos; import java.io.IOException; import java.sql.Connection; -import java.sql.PreparedStatement; -import java.sql.ResultSet; import java.sql.SQLException; import java.time.Duration; import java.util.List; import java.util.Objects; import java.util.Optional; -import java.util.Spliterator; -import java.util.Spliterators; -import java.util.function.Consumer; -import java.util.stream.Stream; -import java.util.stream.StreamSupport; public class MessageSendLogStore implements AutoCloseable { @@ -68,12 +62,13 @@ public class MessageSendLogStore implements AutoCloseable { } public static void createSql(Connection connection) throws SQLException { + // When modifying the CREATE statement here, also add a migration in AccountDatabase.java try (final var statement = connection.createStatement()) { statement.executeUpdate(""" CREATE TABLE message_send_log ( _id INTEGER PRIMARY KEY, content_id INTEGER NOT NULL REFERENCES message_send_log_content (_id) ON DELETE CASCADE, - recipient_id INTEGER NOT NULL, + recipient_id INTEGER NOT NULL REFERENCES recipient (_id) ON DELETE CASCADE, device_id INTEGER NOT NULL ); CREATE TABLE message_send_log_content ( @@ -106,7 +101,7 @@ public class MessageSendLogStore implements AutoCloseable { statement.setLong(1, recipientId.id()); statement.setInt(2, deviceId); statement.setLong(3, timestamp); - try (var result = executeQueryForStream(statement, resultSet -> { + try (var result = Utils.executeQueryForStream(statement, resultSet -> { final var groupId = Optional.ofNullable(resultSet.getBytes("group_id")) .map(GroupId::unknownVersion); final SignalServiceProtos.Content content; @@ -389,32 +384,5 @@ public class MessageSendLogStore implements AutoCloseable { } } - private Stream executeQueryForStream( - PreparedStatement statement, ResultSetMapper mapper - ) throws SQLException { - final var resultSet = statement.executeQuery(); - - return StreamSupport.stream(new Spliterators.AbstractSpliterator<>(Long.MAX_VALUE, Spliterator.ORDERED) { - @Override - public boolean tryAdvance(final Consumer consumer) { - try { - if (!resultSet.next()) { - return false; - } - consumer.accept(mapper.apply(resultSet)); - return true; - } catch (SQLException e) { - logger.warn("Failed to read from database result", e); - throw new RuntimeException(e); - } - } - }, false); - } - - private interface ResultSetMapper { - - T apply(ResultSet resultSet) throws SQLException; - } - private record RecipientDevices(RecipientId recipientId, List deviceIds) {} }