From 08dc65350f069de5f394c62f4dced7e9860f3256 Mon Sep 17 00:00:00 2001 From: AsamK Date: Fri, 10 Jun 2022 23:21:39 +0200 Subject: [PATCH] Move sender key store to database --- graalvm-config-dir/reflect-config.json | 14 +- .../manager/storage/AccountDatabase.java | 30 +- .../signal/manager/storage/SignalAccount.java | 18 +- .../LegacySenderKeyRecordStore.java | 101 ++++++ .../LegacySenderKeySharedStore.java | 56 +++ .../senderKeys/SenderKeyRecordStore.java | 329 +++++++++--------- .../senderKeys/SenderKeySharedStore.java | 316 +++++++++-------- .../storage/senderKeys/SenderKeyStore.java | 23 +- 8 files changed, 563 insertions(+), 324 deletions(-) create mode 100644 lib/src/main/java/org/asamk/signal/manager/storage/senderKeys/LegacySenderKeyRecordStore.java create mode 100644 lib/src/main/java/org/asamk/signal/manager/storage/senderKeys/LegacySenderKeySharedStore.java diff --git a/graalvm-config-dir/reflect-config.json b/graalvm-config-dir/reflect-config.json index 3bd9bf08..27c87951 100644 --- a/graalvm-config-dir/reflect-config.json +++ b/graalvm-config-dir/reflect-config.json @@ -1168,16 +1168,18 @@ ] }, { - "name":"org.asamk.signal.manager.storage.senderKeys.SenderKeySharedStore$Storage", + "name":"org.asamk.signal.manager.storage.senderKeys.LegacySenderKeySharedStore$Storage", "allDeclaredFields":true, - "allDeclaredMethods":true, - "allDeclaredConstructors":true + "queryAllDeclaredMethods":true, + "queryAllDeclaredConstructors":true, + "methods":[{"name":"","parameterTypes":["java.util.List"] }] }, { - "name":"org.asamk.signal.manager.storage.senderKeys.SenderKeySharedStore$Storage$SharedSenderKey", + "name":"org.asamk.signal.manager.storage.senderKeys.LegacySenderKeySharedStore$Storage$SharedSenderKey", "allDeclaredFields":true, - "allDeclaredMethods":true, - "allDeclaredConstructors":true + "queryAllDeclaredMethods":true, + "queryAllDeclaredConstructors":true, + "methods":[{"name":"","parameterTypes":["long","int","java.lang.String"] }] }, { "name":"org.asamk.signal.manager.storage.stickerPacks.JsonStickerPack", diff --git a/lib/src/main/java/org/asamk/signal/manager/storage/AccountDatabase.java b/lib/src/main/java/org/asamk/signal/manager/storage/AccountDatabase.java index 48b82ea2..26c2e366 100644 --- a/lib/src/main/java/org/asamk/signal/manager/storage/AccountDatabase.java +++ b/lib/src/main/java/org/asamk/signal/manager/storage/AccountDatabase.java @@ -8,6 +8,8 @@ import org.asamk.signal.manager.storage.prekeys.PreKeyStore; import org.asamk.signal.manager.storage.prekeys.SignedPreKeyStore; import org.asamk.signal.manager.storage.recipients.RecipientStore; import org.asamk.signal.manager.storage.sendLog.MessageSendLogStore; +import org.asamk.signal.manager.storage.senderKeys.SenderKeyRecordStore; +import org.asamk.signal.manager.storage.senderKeys.SenderKeySharedStore; import org.asamk.signal.manager.storage.sessions.SessionStore; import org.asamk.signal.manager.storage.stickers.StickerStore; import org.slf4j.Logger; @@ -20,7 +22,7 @@ import java.sql.SQLException; public class AccountDatabase extends Database { private final static Logger logger = LoggerFactory.getLogger(AccountDatabase.class); - private static final long DATABASE_VERSION = 7; + private static final long DATABASE_VERSION = 8; private AccountDatabase(final HikariDataSource dataSource) { super(logger, DATABASE_VERSION, dataSource); @@ -40,6 +42,8 @@ public class AccountDatabase extends Database { GroupStore.createSql(connection); SessionStore.createSql(connection); IdentityKeyStore.createSql(connection); + SenderKeyRecordStore.createSql(connection); + SenderKeySharedStore.createSql(connection); } @Override @@ -176,5 +180,29 @@ public class AccountDatabase extends Database { """); } } + if (oldVersion < 8) { + logger.debug("Updating database: Creating sender key tables"); + try (final var statement = connection.createStatement()) { + statement.executeUpdate(""" + CREATE TABLE sender_key ( + _id INTEGER PRIMARY KEY, + recipient_id INTEGER NOT NULL REFERENCES recipient (_id) ON DELETE CASCADE, + device_id INTEGER NOT NULL, + distribution_id BLOB NOT NULL, + record BLOB NOT NULL, + created_timestamp INTEGER NOT NULL, + UNIQUE(recipient_id, device_id, distribution_id) + ); + CREATE TABLE sender_key_shared ( + _id INTEGER PRIMARY KEY, + recipient_id INTEGER NOT NULL REFERENCES recipient (_id) ON DELETE CASCADE, + device_id INTEGER NOT NULL, + distribution_id BLOB NOT NULL, + timestamp INTEGER NOT NULL, + UNIQUE(recipient_id, device_id, distribution_id) + ); + """); + } + } } } diff --git a/lib/src/main/java/org/asamk/signal/manager/storage/SignalAccount.java b/lib/src/main/java/org/asamk/signal/manager/storage/SignalAccount.java index e09cda3e..78601e07 100644 --- a/lib/src/main/java/org/asamk/signal/manager/storage/SignalAccount.java +++ b/lib/src/main/java/org/asamk/signal/manager/storage/SignalAccount.java @@ -38,6 +38,8 @@ import org.asamk.signal.manager.storage.recipients.RecipientResolver; import org.asamk.signal.manager.storage.recipients.RecipientStore; import org.asamk.signal.manager.storage.recipients.RecipientTrustedResolver; import org.asamk.signal.manager.storage.sendLog.MessageSendLogStore; +import org.asamk.signal.manager.storage.senderKeys.LegacySenderKeyRecordStore; +import org.asamk.signal.manager.storage.senderKeys.LegacySenderKeySharedStore; import org.asamk.signal.manager.storage.senderKeys.SenderKeyStore; import org.asamk.signal.manager.storage.sessions.LegacySessionStore; import org.asamk.signal.manager.storage.sessions.SessionStore; @@ -668,6 +670,16 @@ public class SignalAccount implements Closeable { migratedLegacyConfig = loadLegacyStores(rootNode, legacySignalProtocolStore) || migratedLegacyConfig; + final var legacySenderKeysPath = getSenderKeysPath(dataPath, accountPath); + if (legacySenderKeysPath.exists()) { + LegacySenderKeyRecordStore.migrate(legacySenderKeysPath, getRecipientResolver(), getSenderKeyStore()); + migratedLegacyConfig = true; + } + final var legacySenderKeysSharedPath = getSharedSenderKeysFile(dataPath, accountPath); + if (legacySenderKeysSharedPath.exists()) { + LegacySenderKeySharedStore.migrate(legacySenderKeysSharedPath, getRecipientResolver(), getSenderKeyStore()); + migratedLegacyConfig = true; + } if (rootNode.hasNonNull("groupStore")) { final var groupStoreStorage = jsonProcessor.convertValue(rootNode.get("groupStore"), LegacyGroupStore.Storage.class); @@ -1196,10 +1208,10 @@ public class SignalAccount implements Closeable { public SenderKeyStore getSenderKeyStore() { return getOrCreate(() -> senderKeyStore, - () -> senderKeyStore = new SenderKeyStore(getSharedSenderKeysFile(dataPath, accountPath), - getSenderKeysPath(dataPath, accountPath), + () -> senderKeyStore = new SenderKeyStore(getAccountDatabase(), getRecipientAddressResolver(), - getRecipientResolver())); + getRecipientResolver(), + getRecipientIdCreator())); } public ConfigurationStore getConfigurationStore() { diff --git a/lib/src/main/java/org/asamk/signal/manager/storage/senderKeys/LegacySenderKeyRecordStore.java b/lib/src/main/java/org/asamk/signal/manager/storage/senderKeys/LegacySenderKeyRecordStore.java new file mode 100644 index 00000000..ea823cb2 --- /dev/null +++ b/lib/src/main/java/org/asamk/signal/manager/storage/senderKeys/LegacySenderKeyRecordStore.java @@ -0,0 +1,101 @@ +package org.asamk.signal.manager.storage.senderKeys; + +import org.asamk.signal.manager.api.Pair; +import org.asamk.signal.manager.storage.recipients.RecipientResolver; +import org.signal.libsignal.protocol.InvalidMessageException; +import org.signal.libsignal.protocol.groups.state.SenderKeyRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.nio.file.Files; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.UUID; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import static org.asamk.signal.manager.storage.senderKeys.SenderKeyRecordStore.Key; + +public class LegacySenderKeyRecordStore { + + private final static Logger logger = LoggerFactory.getLogger(LegacySenderKeyRecordStore.class); + + public static void migrate( + final File senderKeysPath, final RecipientResolver resolver, SenderKeyStore senderKeyStore + ) { + final var files = senderKeysPath.listFiles(); + if (files == null) { + return; + } + + final var senderKeys = parseFileNames(files, resolver).stream().map(key -> { + final var record = loadSenderKeyLocked(key, senderKeysPath); + if (record == null) { + return null; + } + return new Pair<>(key, record); + }).filter(Objects::nonNull).toList(); + + senderKeyStore.addLegacySenderKeys(senderKeys); + deleteAllSenderKeys(senderKeysPath); + } + + private static void deleteAllSenderKeys(File senderKeysPath) { + final var files = senderKeysPath.listFiles(); + if (files == null) { + return; + } + + for (var file : files) { + try { + Files.delete(file.toPath()); + } catch (IOException e) { + logger.error("Failed to delete sender key file {}: {}", file, e.getMessage()); + } + } + try { + Files.delete(senderKeysPath.toPath()); + } catch (IOException e) { + logger.error("Failed to delete sender keys directory {}: {}", senderKeysPath, e.getMessage()); + } + } + + final static Pattern senderKeyFileNamePattern = Pattern.compile("(\\d+)_(\\d+)_([\\da-z\\-]+)"); + + private static List parseFileNames(final File[] files, final RecipientResolver resolver) { + return Arrays.stream(files) + .map(f -> senderKeyFileNamePattern.matcher(f.getName())) + .filter(Matcher::matches) + .map(matcher -> { + final var recipientId = resolver.resolveRecipient(Long.parseLong(matcher.group(1))); + if (recipientId == null) { + return null; + } + return new Key(recipientId, Integer.parseInt(matcher.group(2)), UUID.fromString(matcher.group(3))); + }) + .filter(Objects::nonNull) + .toList(); + } + + private static File getSenderKeyFile(Key key, final File senderKeysPath) { + return new File(senderKeysPath, + key.recipientId().id() + "_" + key.deviceId() + "_" + key.distributionId().toString()); + } + + private static SenderKeyRecord loadSenderKeyLocked(final Key key, final File senderKeysPath) { + final var file = getSenderKeyFile(key, senderKeysPath); + if (!file.exists()) { + return null; + } + try (var inputStream = new FileInputStream(file)) { + return new SenderKeyRecord(inputStream.readAllBytes()); + } catch (IOException | InvalidMessageException e) { + logger.warn("Failed to load sender key, resetting sender key: {}", e.getMessage()); + return null; + } + } +} diff --git a/lib/src/main/java/org/asamk/signal/manager/storage/senderKeys/LegacySenderKeySharedStore.java b/lib/src/main/java/org/asamk/signal/manager/storage/senderKeys/LegacySenderKeySharedStore.java new file mode 100644 index 00000000..87c298f4 --- /dev/null +++ b/lib/src/main/java/org/asamk/signal/manager/storage/senderKeys/LegacySenderKeySharedStore.java @@ -0,0 +1,56 @@ +package org.asamk.signal.manager.storage.senderKeys; + +import org.asamk.signal.manager.storage.Utils; +import org.asamk.signal.manager.storage.recipients.RecipientResolver; +import org.asamk.signal.manager.storage.senderKeys.SenderKeySharedStore.SenderKeySharedEntry; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.whispersystems.signalservice.api.push.DistributionId; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.nio.file.Files; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +public class LegacySenderKeySharedStore { + + private final static Logger logger = LoggerFactory.getLogger(LegacySenderKeySharedStore.class); + + public static void migrate( + final File file, final RecipientResolver resolver, SenderKeyStore senderKeyStore + ) { + final var objectMapper = Utils.createStorageObjectMapper(); + try (var inputStream = new FileInputStream(file)) { + final var storage = objectMapper.readValue(inputStream, Storage.class); + final var sharedSenderKeys = new HashMap>(); + for (final var senderKey : storage.sharedSenderKeys) { + final var recipientId = resolver.resolveRecipient(senderKey.recipientId); + if (recipientId == null) { + continue; + } + final var entry = new SenderKeySharedEntry(recipientId, senderKey.deviceId); + final var distributionId = DistributionId.from(senderKey.distributionId); + var entries = sharedSenderKeys.get(distributionId); + if (entries == null) { + entries = new HashSet<>(); + } + entries.add(entry); + sharedSenderKeys.put(distributionId, entries); + } + + senderKeyStore.addLegacySenderKeysShared(sharedSenderKeys); + Files.delete(file.toPath()); + } catch (IOException e) { + logger.info("Failed to load shared sender key store, ignoring", e); + } + } + + private record Storage(List sharedSenderKeys) { + + private record SharedSenderKey(long recipientId, int deviceId, String distributionId) {} + } +} diff --git a/lib/src/main/java/org/asamk/signal/manager/storage/senderKeys/SenderKeyRecordStore.java b/lib/src/main/java/org/asamk/signal/manager/storage/senderKeys/SenderKeyRecordStore.java index 83302faf..9b612aac 100644 --- a/lib/src/main/java/org/asamk/signal/manager/storage/senderKeys/SenderKeyRecordStore.java +++ b/lib/src/main/java/org/asamk/signal/manager/storage/senderKeys/SenderKeyRecordStore.java @@ -1,43 +1,53 @@ package org.asamk.signal.manager.storage.senderKeys; +import org.asamk.signal.manager.api.Pair; +import org.asamk.signal.manager.storage.Database; +import org.asamk.signal.manager.storage.Utils; import org.asamk.signal.manager.storage.recipients.RecipientId; import org.asamk.signal.manager.storage.recipients.RecipientResolver; -import org.asamk.signal.manager.util.IOUtils; import org.signal.libsignal.protocol.InvalidMessageException; import org.signal.libsignal.protocol.SignalProtocolAddress; import org.signal.libsignal.protocol.groups.state.SenderKeyRecord; import org.signal.libsignal.protocol.groups.state.SenderKeyStore; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.whispersystems.signalservice.api.util.UuidUtil; -import java.io.File; -import java.io.FileInputStream; -import java.io.FileOutputStream; -import java.io.IOException; -import java.nio.file.Files; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Objects; +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.Collection; import java.util.UUID; -import java.util.regex.Matcher; -import java.util.regex.Pattern; public class SenderKeyRecordStore implements SenderKeyStore { private final static Logger logger = LoggerFactory.getLogger(SenderKeyRecordStore.class); + private final static String TABLE_SENDER_KEY = "sender_key"; - private final Map cachedSenderKeys = new HashMap<>(); - - private final File senderKeysPath; - + private final Database database; private final RecipientResolver resolver; - public SenderKeyRecordStore( - final File senderKeysPath, final RecipientResolver resolver + public static void createSql(Connection connection) throws SQLException { + // When modifying the CREATE statement here, also add a migration in AccountDatabase.java + try (final var statement = connection.createStatement()) { + statement.executeUpdate(""" + CREATE TABLE sender_key ( + _id INTEGER PRIMARY KEY, + recipient_id INTEGER NOT NULL REFERENCES recipient (_id) ON DELETE CASCADE, + device_id INTEGER NOT NULL, + distribution_id BLOB NOT NULL, + record BLOB NOT NULL, + created_timestamp INTEGER NOT NULL, + UNIQUE(recipient_id, device_id, distribution_id) + ); + """); + } + } + + SenderKeyRecordStore( + final Database database, final RecipientResolver resolver ) { - this.senderKeysPath = senderKeysPath; + this.database = database; this.resolver = resolver; } @@ -45,8 +55,10 @@ public class SenderKeyRecordStore implements SenderKeyStore { public SenderKeyRecord loadSenderKey(final SignalProtocolAddress address, final UUID distributionId) { final var key = getKey(address, distributionId); - synchronized (cachedSenderKeys) { - return loadSenderKeyLocked(key); + try (final var connection = database.getConnection()) { + return loadSenderKey(connection, key); + } catch (SQLException e) { + throw new RuntimeException("Failed read from sender key store", e); } } @@ -56,86 +68,109 @@ public class SenderKeyRecordStore implements SenderKeyStore { ) { final var key = getKey(address, distributionId); - synchronized (cachedSenderKeys) { - storeSenderKeyLocked(key, record); + try (final var connection = database.getConnection()) { + storeSenderKey(connection, key, record); + } catch (SQLException e) { + throw new RuntimeException("Failed update sender key store", e); } } long getCreateTimeForKey(final RecipientId selfRecipientId, final int selfDeviceId, final UUID distributionId) { - final var key = getKey(selfRecipientId, selfDeviceId, distributionId); - final var senderKeyFile = getSenderKeyFile(key); - - if (!senderKeyFile.exists()) { - return -1; + final var sql = ( + """ + SELECT s.created_timestamp + FROM %s AS s + WHERE s.recipient_id = ? AND s.device_id = ? AND s.distribution_id = ? + """ + ).formatted(TABLE_SENDER_KEY); + try (final var connection = database.getConnection()) { + try (final var statement = connection.prepareStatement(sql)) { + statement.setLong(1, selfRecipientId.id()); + statement.setInt(2, selfDeviceId); + statement.setBytes(3, UuidUtil.toByteArray(distributionId)); + return Utils.executeQueryForOptional(statement, res -> res.getLong("created_timestamp")).orElse(-1L); + } + } catch (SQLException e) { + throw new RuntimeException("Failed read from sender key store", e); } - - return IOUtils.getFileCreateTime(senderKeyFile); } void deleteSenderKey(final RecipientId recipientId, final UUID distributionId) { - synchronized (cachedSenderKeys) { - cachedSenderKeys.clear(); - final var keys = getKeysLocked(recipientId); - for (var key : keys) { - if (key.distributionId.equals(distributionId)) { - deleteSenderKeyLocked(key); - } + final var sql = ( + """ + DELETE FROM %s AS s + WHERE s.recipient_id = ? AND s.distribution_id = ? + """ + ).formatted(TABLE_SENDER_KEY); + try (final var connection = database.getConnection()) { + try (final var statement = connection.prepareStatement(sql)) { + statement.setLong(1, recipientId.id()); + statement.setBytes(2, UuidUtil.toByteArray(distributionId)); + statement.executeUpdate(); } + } catch (SQLException e) { + throw new RuntimeException("Failed update sender key store", e); } } void deleteAll() { - synchronized (cachedSenderKeys) { - cachedSenderKeys.clear(); - final var files = senderKeysPath.listFiles((_file, s) -> senderKeyFileNamePattern.matcher(s).matches()); - if (files == null) { - return; - } - - for (final var file : files) { - try { - Files.delete(file.toPath()); - } catch (IOException e) { - logger.error("Failed to delete sender key file {}: {}", file, e.getMessage()); - } + final var sql = """ + DELETE FROM %s AS s + """.formatted(TABLE_SENDER_KEY); + try (final var connection = database.getConnection()) { + try (final var statement = connection.prepareStatement(sql)) { + statement.executeUpdate(); } + } catch (SQLException e) { + throw new RuntimeException("Failed update sender key store", e); } } void deleteAllFor(final RecipientId recipientId) { - synchronized (cachedSenderKeys) { - cachedSenderKeys.clear(); - final var keys = getKeysLocked(recipientId); - for (var key : keys) { - deleteSenderKeyLocked(key); - } + try (final var connection = database.getConnection()) { + deleteAllFor(connection, recipientId); + } catch (SQLException e) { + throw new RuntimeException("Failed update sender key store", e); } } void mergeRecipients(RecipientId recipientId, RecipientId toBeMergedRecipientId) { - synchronized (cachedSenderKeys) { - final var keys = getKeysLocked(toBeMergedRecipientId); - final var otherHasSenderKeys = keys.size() > 0; - if (!otherHasSenderKeys) { - return; - } - - logger.debug("To be merged recipient had sender keys, re-assigning to the new recipient."); - for (var key : keys) { - final var toBeMergedSenderKey = loadSenderKeyLocked(key); - deleteSenderKeyLocked(key); - if (toBeMergedSenderKey == null) { - continue; + try (final var connection = database.getConnection()) { + connection.setAutoCommit(false); + final var sql = """ + UPDATE OR IGNORE %s + SET recipient_id = ? + WHERE recipient_id = ? + """.formatted(TABLE_SENDER_KEY); + try (final var statement = connection.prepareStatement(sql)) { + statement.setLong(1, recipientId.id()); + statement.setLong(2, toBeMergedRecipientId.id()); + final var rows = statement.executeUpdate(); + if (rows > 0) { + logger.debug("Reassigned {} sender keys of to be merged recipient.", rows); } + } + // Delete all conflicting sender keys now + deleteAllFor(connection, toBeMergedRecipientId); + connection.commit(); + } catch (SQLException e) { + throw new RuntimeException("Failed update sender key store", e); + } + } - final var newKey = new Key(recipientId, key.deviceId(), key.distributionId); - final var senderKeyRecord = loadSenderKeyLocked(newKey); - if (senderKeyRecord != null) { - continue; - } - storeSenderKeyLocked(newKey, toBeMergedSenderKey); + void addLegacySenderKeys(final Collection> senderKeys) { + logger.debug("Migrating legacy sender keys to database"); + long start = System.nanoTime(); + try (final var connection = database.getConnection()) { + connection.setAutoCommit(false); + for (final var pair : senderKeys) { + storeSenderKey(connection, pair.first(), pair.second()); } + connection.commit(); + } catch (SQLException e) { + throw new RuntimeException("Failed update sender keys store", e); } + logger.debug("Complete sender keys migration took {}ms", (System.nanoTime() - start) / 1000000); } /** @@ -145,106 +180,86 @@ public class SenderKeyRecordStore implements SenderKeyStore { return resolver.resolveRecipient(identifier); } - private Key getKey(final RecipientId recipientId, int deviceId, final UUID distributionId) { - return new Key(recipientId, deviceId, distributionId); - } - private Key getKey(final SignalProtocolAddress address, final UUID distributionId) { final var recipientId = resolveRecipient(address.getName()); return new Key(recipientId, address.getDeviceId(), distributionId); } - private List getKeysLocked(RecipientId recipientId) { - final var files = senderKeysPath.listFiles((_file, s) -> s.startsWith(recipientId.id() + "_")); - if (files == null) { - return List.of(); + private SenderKeyRecord loadSenderKey(final Connection connection, final Key key) throws SQLException { + final var sql = ( + """ + SELECT s.record + FROM %s AS s + WHERE s.recipient_id = ? AND s.device_id = ? AND s.distribution_id = ? + """ + ).formatted(TABLE_SENDER_KEY); + try (final var statement = connection.prepareStatement(sql)) { + statement.setLong(1, key.recipientId().id()); + statement.setInt(2, key.deviceId()); + statement.setBytes(3, UuidUtil.toByteArray(key.distributionId())); + return Utils.executeQueryForOptional(statement, this::getSenderKeyRecordFromResultSet).orElse(null); } - return parseFileNames(files); - } - - final Pattern senderKeyFileNamePattern = Pattern.compile("(\\d+)_(\\d+)_([\\da-z\\-]+)"); - - private List parseFileNames(final File[] files) { - return Arrays.stream(files) - .map(f -> senderKeyFileNamePattern.matcher(f.getName())) - .filter(Matcher::matches) - .map(matcher -> { - final var recipientId = resolver.resolveRecipient(Long.parseLong(matcher.group(1))); - if (recipientId == null) { - return null; - } - return new Key(recipientId, Integer.parseInt(matcher.group(2)), UUID.fromString(matcher.group(3))); - }) - .filter(Objects::nonNull) - .toList(); } - private File getSenderKeyFile(Key key) { - try { - IOUtils.createPrivateDirectories(senderKeysPath); - } catch (IOException e) { - throw new AssertionError("Failed to create sender keys path: " + e.getMessage(), e); - } - return new File(senderKeysPath, - key.recipientId().id() + "_" + key.deviceId() + "_" + key.distributionId.toString()); - } - - private SenderKeyRecord loadSenderKeyLocked(final Key key) { - { - final var senderKeyRecord = cachedSenderKeys.get(key); - if (senderKeyRecord != null) { - return senderKeyRecord; + private void storeSenderKey( + final Connection connection, final Key key, final SenderKeyRecord senderKeyRecord + ) throws SQLException { + final var sqlUpdate = """ + UPDATE %s + SET record = ? + WHERE recipient_id = ? AND device_id = ? and distribution_id = ? + """.formatted(TABLE_SENDER_KEY); + try (final var statement = connection.prepareStatement(sqlUpdate)) { + statement.setBytes(1, senderKeyRecord.serialize()); + statement.setLong(2, key.recipientId().id()); + statement.setLong(3, key.deviceId()); + statement.setBytes(4, UuidUtil.toByteArray(key.distributionId())); + final var rows = statement.executeUpdate(); + if (rows > 0) { + return; } } - final var file = getSenderKeyFile(key); - if (!file.exists()) { - return null; - } - try (var inputStream = new FileInputStream(file)) { - final var senderKeyRecord = new SenderKeyRecord(inputStream.readAllBytes()); - cachedSenderKeys.put(key, senderKeyRecord); - return senderKeyRecord; - } catch (IOException | InvalidMessageException e) { - logger.warn("Failed to load sender key, resetting sender key: {}", e.getMessage()); - return null; + // Record doesn't exist yet, creating a new one + final var sqlInsert = ( + """ + INSERT OR REPLACE INTO %s (recipient_id, device_id, distribution_id, record, created_timestamp) + VALUES (?, ?, ?, ?, ?) + """ + ).formatted(TABLE_SENDER_KEY); + try (final var statement = connection.prepareStatement(sqlInsert)) { + statement.setLong(1, key.recipientId().id()); + statement.setInt(2, key.deviceId()); + statement.setBytes(3, UuidUtil.toByteArray(key.distributionId())); + statement.setBytes(4, senderKeyRecord.serialize()); + statement.setLong(5, System.currentTimeMillis()); + statement.executeUpdate(); } } - private void storeSenderKeyLocked(final Key key, final SenderKeyRecord senderKeyRecord) { - cachedSenderKeys.put(key, senderKeyRecord); - - final var file = getSenderKeyFile(key); - try { - try (var outputStream = new FileOutputStream(file)) { - outputStream.write(senderKeyRecord.serialize()); - } - } catch (IOException e) { - logger.warn("Failed to store sender key, trying to delete file and retry: {}", e.getMessage()); - try { - Files.delete(file.toPath()); - try (var outputStream = new FileOutputStream(file)) { - outputStream.write(senderKeyRecord.serialize()); - } - } catch (IOException e2) { - logger.error("Failed to store sender key file {}: {}", file, e2.getMessage()); - } + private void deleteAllFor(final Connection connection, final RecipientId recipientId) throws SQLException { + final var sql = ( + """ + DELETE FROM %s AS s + WHERE s.recipient_id = ? + """ + ).formatted(TABLE_SENDER_KEY); + try (final var statement = connection.prepareStatement(sql)) { + statement.setLong(1, recipientId.id()); + statement.executeUpdate(); } } - private void deleteSenderKeyLocked(final Key key) { - cachedSenderKeys.remove(key); - - final var file = getSenderKeyFile(key); - if (!file.exists()) { - return; - } + private SenderKeyRecord getSenderKeyRecordFromResultSet(ResultSet resultSet) throws SQLException { try { - Files.delete(file.toPath()); - } catch (IOException e) { - logger.error("Failed to delete sender key file {}: {}", file, e.getMessage()); + final var record = resultSet.getBytes("record"); + + return new SenderKeyRecord(record); + } catch (InvalidMessageException e) { + logger.warn("Failed to load sender key, resetting: {}", e.getMessage()); + return null; } } - private record Key(RecipientId recipientId, int deviceId, UUID distributionId) {} + record Key(RecipientId recipientId, int deviceId, UUID distributionId) {} } diff --git a/lib/src/main/java/org/asamk/signal/manager/storage/senderKeys/SenderKeySharedStore.java b/lib/src/main/java/org/asamk/signal/manager/storage/senderKeys/SenderKeySharedStore.java index f27f6139..2eb7c4bc 100644 --- a/lib/src/main/java/org/asamk/signal/manager/storage/senderKeys/SenderKeySharedStore.java +++ b/lib/src/main/java/org/asamk/signal/manager/storage/senderKeys/SenderKeySharedStore.java @@ -1,10 +1,10 @@ package org.asamk.signal.manager.storage.senderKeys; -import com.fasterxml.jackson.databind.ObjectMapper; - import org.asamk.signal.manager.helper.RecipientAddressResolver; +import org.asamk.signal.manager.storage.Database; import org.asamk.signal.manager.storage.Utils; import org.asamk.signal.manager.storage.recipients.RecipientId; +import org.asamk.signal.manager.storage.recipients.RecipientIdCreator; import org.asamk.signal.manager.storage.recipients.RecipientResolver; import org.signal.libsignal.protocol.SignalProtocolAddress; import org.slf4j.Logger; @@ -12,94 +12,70 @@ import org.slf4j.LoggerFactory; import org.whispersystems.signalservice.api.push.DistributionId; import org.whispersystems.signalservice.api.util.UuidUtil; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.File; -import java.io.FileInputStream; -import java.io.FileNotFoundException; -import java.io.FileOutputStream; -import java.io.IOException; +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; import java.util.Collection; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; import java.util.Map; import java.util.Set; -import java.util.UUID; import java.util.stream.Collectors; public class SenderKeySharedStore { private final static Logger logger = LoggerFactory.getLogger(SenderKeySharedStore.class); + private final static String TABLE_SENDER_KEY_SHARED = "sender_key_shared"; - private final Map> sharedSenderKeys; - - private final ObjectMapper objectMapper; - private final File file; - + private final Database database; + private final RecipientIdCreator recipientIdCreator; private final RecipientResolver resolver; private final RecipientAddressResolver addressResolver; - public static SenderKeySharedStore load( - final File file, final RecipientAddressResolver addressResolver, final RecipientResolver resolver - ) { - final var objectMapper = Utils.createStorageObjectMapper(); - try (var inputStream = new FileInputStream(file)) { - final var storage = objectMapper.readValue(inputStream, Storage.class); - final var sharedSenderKeys = new HashMap>(); - for (final var senderKey : storage.sharedSenderKeys) { - final var recipientId = resolver.resolveRecipient(senderKey.recipientId); - if (recipientId == null) { - continue; - } - final var entry = new SenderKeySharedEntry(recipientId, senderKey.deviceId); - final var distributionId = UuidUtil.parseOrNull(senderKey.distributionId); - if (distributionId == null) { - logger.warn("Read invalid distribution id from storage {}, ignoring", senderKey.distributionId); - continue; - } - var entries = sharedSenderKeys.get(distributionId); - if (entries == null) { - entries = new HashSet<>(); - } - entries.add(entry); - sharedSenderKeys.put(distributionId, entries); - } - - return new SenderKeySharedStore(sharedSenderKeys, objectMapper, file, addressResolver, resolver); - } catch (FileNotFoundException e) { - logger.trace("Creating new shared sender key store."); - return new SenderKeySharedStore(new HashMap<>(), objectMapper, file, addressResolver, resolver); - } catch (IOException e) { - logger.warn("Failed to load shared sender key store", e); - throw new RuntimeException(e); + public static void createSql(Connection connection) throws SQLException { + // When modifying the CREATE statement here, also add a migration in AccountDatabase.java + try (final var statement = connection.createStatement()) { + statement.executeUpdate(""" + CREATE TABLE sender_key_shared ( + _id INTEGER PRIMARY KEY, + recipient_id INTEGER NOT NULL REFERENCES recipient (_id) ON DELETE CASCADE, + device_id INTEGER NOT NULL, + distribution_id BLOB NOT NULL, + timestamp INTEGER NOT NULL, + UNIQUE(recipient_id, device_id, distribution_id) + ); + """); } } - private SenderKeySharedStore( - final Map> sharedSenderKeys, - final ObjectMapper objectMapper, - final File file, + SenderKeySharedStore( + final Database database, + final RecipientIdCreator recipientIdCreator, final RecipientAddressResolver addressResolver, final RecipientResolver resolver ) { - this.sharedSenderKeys = sharedSenderKeys; - this.objectMapper = objectMapper; - this.file = file; + this.database = database; + this.recipientIdCreator = recipientIdCreator; this.addressResolver = addressResolver; this.resolver = resolver; } public Set getSenderKeySharedWith(final DistributionId distributionId) { - synchronized (sharedSenderKeys) { - final var addresses = sharedSenderKeys.get(distributionId.asUuid()); - if (addresses == null) { - return Set.of(); + try (final var connection = database.getConnection()) { + final var sql = ( + """ + SELECT s.recipient_id, s.device_id + FROM %s AS s + WHERE s.distribution_id = ? + """ + ).formatted(TABLE_SENDER_KEY_SHARED); + try (final var statement = connection.prepareStatement(sql)) { + statement.setBytes(1, UuidUtil.toByteArray(distributionId.asUuid())); + return Utils.executeQueryForStream(statement, this::getSenderKeySharedEntryFromResultSet) + .map(k -> new SignalProtocolAddress(addressResolver.resolveRecipientAddress(k.recipientId()) + .getIdentifier(), k.deviceId())) + .collect(Collectors.toSet()); } - return addresses.stream() - .map(k -> new SignalProtocolAddress(addressResolver.resolveRecipientAddress(k.recipientId()) - .getIdentifier(), k.deviceId())) - .collect(Collectors.toSet()); + } catch (SQLException e) { + throw new RuntimeException("Failed read from shared sender key store", e); } } @@ -107,135 +83,173 @@ public class SenderKeySharedStore { final DistributionId distributionId, final Collection addresses ) { final var newEntries = addresses.stream() - .map(a -> new SenderKeySharedEntry(resolveRecipient(a.getName()), a.getDeviceId())) + .map(a -> new SenderKeySharedEntry(resolver.resolveRecipient(a.getName()), a.getDeviceId())) .collect(Collectors.toSet()); - synchronized (sharedSenderKeys) { - final var previousEntries = sharedSenderKeys.getOrDefault(distributionId.asUuid(), Set.of()); - - sharedSenderKeys.put(distributionId.asUuid(), new HashSet<>() { - { - addAll(previousEntries); - addAll(newEntries); - } - }); - saveLocked(); + try (final var connection = database.getConnection()) { + connection.setAutoCommit(false); + markSenderKeysSharedWith(connection, distributionId, newEntries); + connection.commit(); + } catch (SQLException e) { + throw new RuntimeException("Failed update shared sender key store", e); } } public void clearSenderKeySharedWith(final Collection addresses) { final var entriesToDelete = addresses.stream() - .map(a -> new SenderKeySharedEntry(resolveRecipient(a.getName()), a.getDeviceId())) + .map(a -> new SenderKeySharedEntry(resolver.resolveRecipient(a.getName()), a.getDeviceId())) .collect(Collectors.toSet()); - synchronized (sharedSenderKeys) { - for (final var distributionId : sharedSenderKeys.keySet()) { - final var entries = sharedSenderKeys.getOrDefault(distributionId, Set.of()); - - sharedSenderKeys.put(distributionId, new HashSet<>(entries) { - { - removeAll(entriesToDelete); - } - }); + try (final var connection = database.getConnection()) { + connection.setAutoCommit(false); + final var sql = ( + """ + DELETE FROM %s AS s + WHERE recipient_id = ? AND device_id = ? + """ + ).formatted(TABLE_SENDER_KEY_SHARED); + try (final var statement = connection.prepareStatement(sql)) { + for (final var entry : entriesToDelete) { + statement.setLong(1, entry.recipientId().id()); + statement.setInt(2, entry.deviceId()); + statement.executeUpdate(); + } } - saveLocked(); + connection.commit(); + } catch (SQLException e) { + throw new RuntimeException("Failed update shared sender key store", e); } } public void deleteAll() { - synchronized (sharedSenderKeys) { - sharedSenderKeys.clear(); - saveLocked(); + try (final var connection = database.getConnection()) { + final var sql = ( + """ + DELETE FROM %s AS s + """ + ).formatted(TABLE_SENDER_KEY_SHARED); + try (final var statement = connection.prepareStatement(sql)) { + statement.executeUpdate(); + } + } catch (SQLException e) { + throw new RuntimeException("Failed update shared sender key store", e); } } public void deleteAllFor(final RecipientId recipientId) { - synchronized (sharedSenderKeys) { - for (final var distributionId : sharedSenderKeys.keySet()) { - final var entries = sharedSenderKeys.getOrDefault(distributionId, Set.of()); - - sharedSenderKeys.put(distributionId, new HashSet<>(entries) { - { - removeIf(e -> e.recipientId().equals(recipientId)); - } - }); + try (final var connection = database.getConnection()) { + final var sql = ( + """ + DELETE FROM %s AS s + WHERE recipient_id = ? + """ + ).formatted(TABLE_SENDER_KEY_SHARED); + try (final var statement = connection.prepareStatement(sql)) { + statement.setLong(1, recipientId.id()); + statement.executeUpdate(); } - saveLocked(); + } catch (SQLException e) { + throw new RuntimeException("Failed update shared sender key store", e); } } public void deleteSharedWith( final RecipientId recipientId, final int deviceId, final DistributionId distributionId ) { - synchronized (sharedSenderKeys) { - final var entries = sharedSenderKeys.getOrDefault(distributionId.asUuid(), Set.of()); - - sharedSenderKeys.put(distributionId.asUuid(), new HashSet<>(entries) { - { - remove(new SenderKeySharedEntry(recipientId, deviceId)); - } - }); - saveLocked(); + try (final var connection = database.getConnection()) { + final var sql = ( + """ + DELETE FROM %s AS s + WHERE recipient_id = ? AND device_id = ? AND distribution_id = ? + """ + ).formatted(TABLE_SENDER_KEY_SHARED); + try (final var statement = connection.prepareStatement(sql)) { + statement.setLong(1, recipientId.id()); + statement.setInt(2, deviceId); + statement.setBytes(3, UuidUtil.toByteArray(distributionId.asUuid())); + statement.executeUpdate(); + } + } catch (SQLException e) { + throw new RuntimeException("Failed update shared sender key store", e); } } public void deleteAllFor(final DistributionId distributionId) { - synchronized (sharedSenderKeys) { - if (sharedSenderKeys.remove(distributionId.asUuid()) != null) { - saveLocked(); + try (final var connection = database.getConnection()) { + final var sql = ( + """ + DELETE FROM %s AS s + WHERE distribution_id = ? + """ + ).formatted(TABLE_SENDER_KEY_SHARED); + try (final var statement = connection.prepareStatement(sql)) { + statement.setBytes(1, UuidUtil.toByteArray(distributionId.asUuid())); + statement.executeUpdate(); } + } catch (SQLException e) { + throw new RuntimeException("Failed update shared sender key store", e); } } public void mergeRecipients(RecipientId recipientId, RecipientId toBeMergedRecipientId) { - synchronized (sharedSenderKeys) { - for (final var distributionId : sharedSenderKeys.keySet()) { - final var entries = sharedSenderKeys.getOrDefault(distributionId, Set.of()); - - sharedSenderKeys.put(distributionId, - entries.stream() - .map(e -> e.recipientId.equals(toBeMergedRecipientId) ? new SenderKeySharedEntry( - recipientId, - e.deviceId()) : e) - .collect(Collectors.toSet())); + try (final var connection = database.getConnection()) { + final var sql = ( + """ + UPDATE OR REPLACE %s + SET recipient_id = ? + WHERE recipient_id = ? + """ + ).formatted(TABLE_SENDER_KEY_SHARED); + try (final var statement = connection.prepareStatement(sql)) { + statement.setLong(1, recipientId.id()); + statement.setLong(2, toBeMergedRecipientId.id()); + statement.executeUpdate(); } - saveLocked(); + } catch (SQLException e) { + throw new RuntimeException("Failed update shared sender key store", e); } } - /** - * @param identifier can be either a serialized uuid or a e164 phone number - */ - private RecipientId resolveRecipient(String identifier) { - return resolver.resolveRecipient(identifier); + void addLegacySenderKeysShared(final Map> sharedSenderKeys) { + logger.debug("Migrating legacy sender keys shared to database"); + long start = System.nanoTime(); + try (final var connection = database.getConnection()) { + connection.setAutoCommit(false); + for (final var entry : sharedSenderKeys.entrySet()) { + markSenderKeysSharedWith(connection, entry.getKey(), entry.getValue()); + } + connection.commit(); + } catch (SQLException e) { + throw new RuntimeException("Failed update shared sender key store", e); + } + logger.debug("Complete sender keys shared migration took {}ms", (System.nanoTime() - start) / 1000000); } - private void saveLocked() { - var storage = new Storage(sharedSenderKeys.entrySet().stream().flatMap(pair -> { - final var sharedWith = pair.getValue(); - return sharedWith.stream() - .map(entry -> new Storage.SharedSenderKey(entry.recipientId().id(), - entry.deviceId(), - pair.getKey().toString())); - }).toList()); - - // Write to memory first to prevent corrupting the file in case of serialization errors - try (var inMemoryOutput = new ByteArrayOutputStream()) { - objectMapper.writeValue(inMemoryOutput, storage); - - var input = new ByteArrayInputStream(inMemoryOutput.toByteArray()); - try (var outputStream = new FileOutputStream(file)) { - input.transferTo(outputStream); + private void markSenderKeysSharedWith( + final Connection connection, final DistributionId distributionId, final Set newEntries + ) throws SQLException { + final var sql = ( + """ + INSERT OR REPLACE INTO %s (recipient_id, device_id, distribution_id, timestamp) + VALUES (?, ?, ?, ?) + """ + ).formatted(TABLE_SENDER_KEY_SHARED); + try (final var statement = connection.prepareStatement(sql)) { + for (final var entry : newEntries) { + statement.setLong(1, entry.recipientId().id()); + statement.setInt(2, entry.deviceId()); + statement.setBytes(3, UuidUtil.toByteArray(distributionId.asUuid())); + statement.setLong(4, System.currentTimeMillis()); + statement.executeUpdate(); } - } catch (Exception e) { - logger.error("Error saving shared sender key store file: {}", e.getMessage()); } } - private record Storage(List sharedSenderKeys) { - - private record SharedSenderKey(long recipientId, int deviceId, String distributionId) {} + private SenderKeySharedEntry getSenderKeySharedEntryFromResultSet(ResultSet resultSet) throws SQLException { + final var recipientId = resultSet.getLong("recipient_id"); + final var deviceId = resultSet.getInt("device_id"); + return new SenderKeySharedEntry(recipientIdCreator.create(recipientId), deviceId); } - private record SenderKeySharedEntry(RecipientId recipientId, int deviceId) {} + record SenderKeySharedEntry(RecipientId recipientId, int deviceId) {} } diff --git a/lib/src/main/java/org/asamk/signal/manager/storage/senderKeys/SenderKeyStore.java b/lib/src/main/java/org/asamk/signal/manager/storage/senderKeys/SenderKeyStore.java index a9504497..6a3da47e 100644 --- a/lib/src/main/java/org/asamk/signal/manager/storage/senderKeys/SenderKeyStore.java +++ b/lib/src/main/java/org/asamk/signal/manager/storage/senderKeys/SenderKeyStore.java @@ -1,15 +1,18 @@ package org.asamk.signal.manager.storage.senderKeys; +import org.asamk.signal.manager.api.Pair; import org.asamk.signal.manager.helper.RecipientAddressResolver; +import org.asamk.signal.manager.storage.Database; import org.asamk.signal.manager.storage.recipients.RecipientId; +import org.asamk.signal.manager.storage.recipients.RecipientIdCreator; import org.asamk.signal.manager.storage.recipients.RecipientResolver; import org.signal.libsignal.protocol.SignalProtocolAddress; import org.signal.libsignal.protocol.groups.state.SenderKeyRecord; import org.whispersystems.signalservice.api.SignalServiceSenderKeyStore; import org.whispersystems.signalservice.api.push.DistributionId; -import java.io.File; import java.util.Collection; +import java.util.Map; import java.util.Set; import java.util.UUID; @@ -19,13 +22,13 @@ public class SenderKeyStore implements SignalServiceSenderKeyStore { private final SenderKeySharedStore senderKeySharedStore; public SenderKeyStore( - final File file, - final File senderKeysPath, + final Database database, final RecipientAddressResolver addressResolver, - final RecipientResolver resolver + final RecipientResolver resolver, + final RecipientIdCreator recipientIdCreator ) { - this.senderKeyRecordStore = new SenderKeyRecordStore(senderKeysPath, resolver); - this.senderKeySharedStore = SenderKeySharedStore.load(file, addressResolver, resolver); + this.senderKeyRecordStore = new SenderKeyRecordStore(database, resolver); + this.senderKeySharedStore = new SenderKeySharedStore(database, recipientIdCreator, addressResolver, resolver); } @Override @@ -88,4 +91,12 @@ public class SenderKeyStore implements SignalServiceSenderKeyStore { senderKeySharedStore.mergeRecipients(recipientId, toBeMergedRecipientId); senderKeyRecordStore.mergeRecipients(recipientId, toBeMergedRecipientId); } + + void addLegacySenderKeys(final Collection> senderKeys) { + senderKeyRecordStore.addLegacySenderKeys(senderKeys); + } + + void addLegacySenderKeysShared(final Map> sharedSenderKeys) { + senderKeySharedStore.addLegacySenderKeysShared(sharedSenderKeys); + } } -- 2.50.1