From: AsamK Date: Sun, 23 Jan 2022 19:50:23 +0000 (+0100) Subject: Implement MessageSendLog for resending after encryption error X-Git-Tag: v0.10.3~6 X-Git-Url: https://git.nmode.ca/signal-cli/commitdiff_plain/95cc0ae7fdaf0cc34742bce38bb456e02653db43 Implement MessageSendLog for resending after encryption error --- diff --git a/graalvm-config-dir/jni-config.json b/graalvm-config-dir/jni-config.json index 4865f747..acbf7f3f 100644 --- a/graalvm-config-dir/jni-config.json +++ b/graalvm-config-dir/jni-config.json @@ -62,6 +62,36 @@ "name":"org.graalvm.jniutils.JNIExceptionWrapperEntryPoints", "methods":[{"name":"getClassName","parameterTypes":["java.lang.Class"] }] }, +{ + "name":"org.sqlite.Collation" +}, +{ + "name":"org.sqlite.Function" +}, +{ + "name":"org.sqlite.Function$Aggregate" +}, +{ + "name":"org.sqlite.Function$Window" +}, +{ + "name":"org.sqlite.ProgressHandler" +}, +{ + "name":"org.sqlite.core.DB", + "methods":[{"name":"throwex","parameterTypes":["int"] }] +}, +{ + "name":"org.sqlite.core.DB$ProgressObserver" +}, +{ + "name":"org.sqlite.core.NativeDB", + "fields":[ + {"name":"colldatalist"}, + {"name":"pointer"}, + {"name":"udfdatalist"} + ] +}, { "name":"org.whispersystems.libsignal.DuplicateMessageException", "methods":[{"name":"","parameterTypes":["java.lang.String"] }] @@ -77,6 +107,10 @@ "name":"org.whispersystems.libsignal.IdentityKeyPair", "methods":[{"name":"serialize","parameterTypes":[] }] }, +{ + "name":"org.whispersystems.libsignal.InvalidKeyException", + "methods":[{"name":"","parameterTypes":["java.lang.String"] }] +}, { "name":"org.whispersystems.libsignal.InvalidMessageException", "methods":[{"name":"","parameterTypes":["java.lang.String"] }] diff --git a/graalvm-config-dir/proxy-config.json b/graalvm-config-dir/proxy-config.json index 42660618..3110286d 100644 --- a/graalvm-config-dir/proxy-config.json +++ b/graalvm-config-dir/proxy-config.json @@ -1,4 +1,7 @@ [ + { + "interfaces":["java.sql.Connection"]} + , { "interfaces":["org.asamk.Signal"]} , diff --git a/graalvm-config-dir/reflect-config.json b/graalvm-config-dir/reflect-config.json index c7353969..c5e61cee 100644 --- a/graalvm-config-dir/reflect-config.json +++ b/graalvm-config-dir/reflect-config.json @@ -15,9 +15,15 @@ { "name":"[J" }, +{ + "name":"[Lcom.zaxxer.hikari.util.ConcurrentBag$IConcurrentBagEntry;" +}, { "name":"[Ljava.lang.String;" }, +{ + "name":"[Ljava.sql.Statement;" +}, { "name":"[Lorg.whispersystems.signalservice.api.groupsv2.TemporalCredential;" }, @@ -118,6 +124,48 @@ "name":"com.sun.crypto.provider.TlsPrfGenerator$V12", "methods":[{"name":"","parameterTypes":[] }] }, +{ + "name":"com.zaxxer.hikari.HikariConfig", + "allDeclaredFields":true, + "queryAllPublicMethods":true, + "methods":[ + {"name":"getCatalog","parameterTypes":[] }, + {"name":"getConnectionInitSql","parameterTypes":[] }, + {"name":"getConnectionTestQuery","parameterTypes":[] }, + {"name":"getConnectionTimeout","parameterTypes":[] }, + {"name":"getDataSource","parameterTypes":[] }, + {"name":"getDataSourceClassName","parameterTypes":[] }, + {"name":"getDataSourceJNDI","parameterTypes":[] }, + {"name":"getDataSourceProperties","parameterTypes":[] }, + {"name":"getDriverClassName","parameterTypes":[] }, + {"name":"getExceptionOverrideClassName","parameterTypes":[] }, + {"name":"getHealthCheckProperties","parameterTypes":[] }, + {"name":"getHealthCheckRegistry","parameterTypes":[] }, + {"name":"getIdleTimeout","parameterTypes":[] }, + {"name":"getInitializationFailTimeout","parameterTypes":[] }, + {"name":"getJdbcUrl","parameterTypes":[] }, + {"name":"getKeepaliveTime","parameterTypes":[] }, + {"name":"getLeakDetectionThreshold","parameterTypes":[] }, + {"name":"getMaxLifetime","parameterTypes":[] }, + {"name":"getMaximumPoolSize","parameterTypes":[] }, + {"name":"getMetricRegistry","parameterTypes":[] }, + {"name":"getMetricsTrackerFactory","parameterTypes":[] }, + {"name":"getMinimumIdle","parameterTypes":[] }, + {"name":"getPassword","parameterTypes":[] }, + {"name":"getPoolName","parameterTypes":[] }, + {"name":"getScheduledExecutor","parameterTypes":[] }, + {"name":"getSchema","parameterTypes":[] }, + {"name":"getThreadFactory","parameterTypes":[] }, + {"name":"getTransactionIsolation","parameterTypes":[] }, + {"name":"getUsername","parameterTypes":[] }, + {"name":"getValidationTimeout","parameterTypes":[] }, + {"name":"isAllowPoolSuspension","parameterTypes":[] }, + {"name":"isAutoCommit","parameterTypes":[] }, + {"name":"isIsolateInternalQueries","parameterTypes":[] }, + {"name":"isReadOnly","parameterTypes":[] }, + {"name":"isRegisterMbeans","parameterTypes":[] } + ] +}, { "name":"int", "allDeclaredMethods":true, @@ -1607,13 +1655,13 @@ { "name":"org.signal.storageservice.protos.groups.GroupJoinInfo", "fields":[ - {"name":"addFromInviteLink_"}, - {"name":"avatar_"}, - {"name":"description_"}, - {"name":"memberCount_"}, - {"name":"pendingAdminApproval_"}, - {"name":"publicKey_"}, - {"name":"revision_"}, + {"name":"addFromInviteLink_"}, + {"name":"avatar_"}, + {"name":"description_"}, + {"name":"memberCount_"}, + {"name":"pendingAdminApproval_"}, + {"name":"publicKey_"}, + {"name":"revision_"}, {"name":"title_"} ] }, @@ -1696,14 +1744,14 @@ { "name":"org.signal.storageservice.protos.groups.local.DecryptedGroupJoinInfo", "fields":[ - {"name":"addFromInviteLink_"}, - {"name":"avatar_"}, - {"name":"description_"}, - {"name":"isAnnouncementGroup_"}, - {"name":"memberCount_"}, - {"name":"pendingAdminApproval_"}, - {"name":"publicKey_"}, - {"name":"revision_"}, + {"name":"addFromInviteLink_"}, + {"name":"avatar_"}, + {"name":"description_"}, + {"name":"isAnnouncementGroup_"}, + {"name":"memberCount_"}, + {"name":"pendingAdminApproval_"}, + {"name":"publicKey_"}, + {"name":"revision_"}, {"name":"title_"} ] }, @@ -1773,6 +1821,9 @@ "queryAllDeclaredMethods":true, "queryAllDeclaredConstructors":true }, +{ + "name":"org.sqlite.JDBC" +}, { "name":"org.whispersystems.libsignal.state.IdentityKeyStore", "allDeclaredMethods":true diff --git a/graalvm-config-dir/resource-config.json b/graalvm-config-dir/resource-config.json index 8a486340..a52689ae 100644 --- a/graalvm-config-dir/resource-config.json +++ b/graalvm-config-dir/resource-config.json @@ -1,6 +1,12 @@ { "resources":{ "includes":[ + { + "pattern":"\\QMETA-INF/maven/org.xerial/sqlite-jdbc/pom.properties\\E" + }, + { + "pattern":"\\QMETA-INF/services/java.sql.Driver\\E" + }, { "pattern":"\\QMETA-INF/services/org.freedesktop.dbus.spi.transport.ITransportProvider\\E" }, @@ -187,6 +193,12 @@ { "pattern":"\\Qorg/slf4j/impl/StaticLoggerBinder.class\\E" }, + { + "pattern":"\\Qorg/sqlite/native/Linux/x86_64/libsqlitejdbc.so\\E" + }, + { + "pattern":"\\Qsqlite-jdbc.properties\\E" + }, { "pattern":"com/google/i18n/phonenumbers/data/.*" } diff --git a/lib/build.gradle.kts b/lib/build.gradle.kts index 21dfb19a..5b891b8b 100644 --- a/lib/build.gradle.kts +++ b/lib/build.gradle.kts @@ -19,6 +19,8 @@ dependencies { implementation("com.google.protobuf", "protobuf-javalite", "3.11.4") implementation("org.bouncycastle", "bcprov-jdk15on", "1.70") implementation("org.slf4j", "slf4j-api", "1.7.32") + implementation("org.xerial", "sqlite-jdbc", "3.36.0.3") + implementation("com.zaxxer", "HikariCP", "5.0.1") } configurations { diff --git a/lib/src/main/java/org/asamk/signal/manager/Manager.java b/lib/src/main/java/org/asamk/signal/manager/Manager.java index 2c677dae..ed70bcee 100644 --- a/lib/src/main/java/org/asamk/signal/manager/Manager.java +++ b/lib/src/main/java/org/asamk/signal/manager/Manager.java @@ -67,6 +67,7 @@ public interface Manager extends Closeable { throw new NotRegisteredException(); } + account.initDatabase(); final var serviceEnvironmentConfig = ServiceConfig.getServiceEnvironmentConfig(serviceEnvironment, userAgent); return new ManagerImpl(account, pathConfig, serviceEnvironmentConfig, userAgent); diff --git a/lib/src/main/java/org/asamk/signal/manager/ManagerImpl.java b/lib/src/main/java/org/asamk/signal/manager/ManagerImpl.java index 0e100d72..7c8cff5d 100644 --- a/lib/src/main/java/org/asamk/signal/manager/ManagerImpl.java +++ b/lib/src/main/java/org/asamk/signal/manager/ManagerImpl.java @@ -571,6 +571,17 @@ public class ManagerImpl implements Manager { ) throws IOException, NotAGroupMemberException, GroupNotFoundException, GroupSendingNotAllowedException { var delete = new SignalServiceDataMessage.RemoteDelete(targetSentTimestamp); final var messageBuilder = SignalServiceDataMessage.newBuilder().withRemoteDelete(delete); + for (final var recipient : recipients) { + if (recipient instanceof RecipientIdentifier.Single r) { + try { + final var recipientId = context.getRecipientHelper().resolveRecipient(r); + account.getMessageSendLogStore().deleteEntryForRecipientNonGroup(targetSentTimestamp, recipientId); + } catch (UnregisteredRecipientException ignored) { + } + } else if (recipient instanceof RecipientIdentifier.Group r) { + account.getMessageSendLogStore().deleteEntryForGroup(targetSentTimestamp, r.groupId()); + } + } return sendMessage(messageBuilder, recipients); } diff --git a/lib/src/main/java/org/asamk/signal/manager/actions/ResendMessageAction.java b/lib/src/main/java/org/asamk/signal/manager/actions/ResendMessageAction.java new file mode 100644 index 00000000..9f399dd8 --- /dev/null +++ b/lib/src/main/java/org/asamk/signal/manager/actions/ResendMessageAction.java @@ -0,0 +1,42 @@ +package org.asamk.signal.manager.actions; + +import org.asamk.signal.manager.helper.Context; +import org.asamk.signal.manager.storage.recipients.RecipientId; +import org.asamk.signal.manager.storage.sendLog.MessageSendLogEntry; + +import java.util.Objects; + +public class ResendMessageAction implements HandleAction { + + private final RecipientId recipientId; + private final long timestamp; + private final MessageSendLogEntry messageSendLogEntry; + + public ResendMessageAction( + final RecipientId recipientId, final long timestamp, final MessageSendLogEntry messageSendLogEntry + ) { + this.recipientId = recipientId; + this.timestamp = timestamp; + this.messageSendLogEntry = messageSendLogEntry; + } + + @Override + public void execute(Context context) throws Throwable { + context.getSendHelper().resendMessage(recipientId, timestamp, messageSendLogEntry); + } + + @Override + public boolean equals(final Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + final ResendMessageAction that = (ResendMessageAction) o; + return timestamp == that.timestamp + && recipientId.equals(that.recipientId) + && messageSendLogEntry.equals(that.messageSendLogEntry); + } + + @Override + public int hashCode() { + return Objects.hash(recipientId, timestamp, messageSendLogEntry); + } +} diff --git a/lib/src/main/java/org/asamk/signal/manager/helper/IncomingMessageHandler.java b/lib/src/main/java/org/asamk/signal/manager/helper/IncomingMessageHandler.java index 251dfde3..eece81d3 100644 --- a/lib/src/main/java/org/asamk/signal/manager/helper/IncomingMessageHandler.java +++ b/lib/src/main/java/org/asamk/signal/manager/helper/IncomingMessageHandler.java @@ -7,6 +7,7 @@ import org.asamk.signal.manager.UntrustedIdentityException; import org.asamk.signal.manager.actions.HandleAction; import org.asamk.signal.manager.actions.RefreshPreKeysAction; import org.asamk.signal.manager.actions.RenewSessionAction; +import org.asamk.signal.manager.actions.ResendMessageAction; import org.asamk.signal.manager.actions.RetrieveProfileAction; import org.asamk.signal.manager.actions.RetrieveStorageDataAction; import org.asamk.signal.manager.actions.SendGroupInfoAction; @@ -41,6 +42,7 @@ import org.signal.zkgroup.profiles.ProfileKey; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.whispersystems.libsignal.SignalProtocolAddress; +import org.whispersystems.libsignal.protocol.DecryptionErrorMessage; import org.whispersystems.signalservice.api.messages.SignalServiceContent; import org.whispersystems.signalservice.api.messages.SignalServiceDataMessage; import org.whispersystems.signalservice.api.messages.SignalServiceEnvelope; @@ -165,6 +167,13 @@ public final class IncomingMessageHandler { // address/uuid is validated by unidentified sender certificate account.getRecipientStore().resolveRecipientTrusted(content.getSender()); } + if (envelope.isReceipt()) { + final var senderPair = getSender(envelope, content); + final var sender = senderPair.first(); + final var senderDeviceId = senderPair.second(); + account.getMessageSendLogStore().deleteEntryForRecipient(envelope.getTimestamp(), sender, senderDeviceId); + } + if (isMessageBlocked(envelope, content)) { logger.info("Ignoring a message from blocked user/group: {}", envelope.getTimestamp()); return List.of(); @@ -198,6 +207,14 @@ public final class IncomingMessageHandler { final var sender = senderPair.first(); final var senderDeviceId = senderPair.second(); + if (content.getReceiptMessage().isPresent()) { + final var message = content.getReceiptMessage().get(); + if (message.isDeliveryReceipt()) { + account.getMessageSendLogStore() + .deleteEntriesForRecipient(message.getTimestamps(), sender, senderDeviceId); + } + } + if (content.getSenderKeyDistributionMessage().isPresent()) { final var message = content.getSenderKeyDistributionMessage().get(); final var protocolAddress = new SignalProtocolAddress(context.getRecipientHelper() @@ -212,15 +229,10 @@ public final class IncomingMessageHandler { if (content.getDecryptionErrorMessage().isPresent()) { var message = content.getDecryptionErrorMessage().get(); logger.debug("Received a decryption error message (resend request for {})", message.getTimestamp()); - if (message.getRatchetKey().isPresent()) { - if (message.getDeviceId() == account.getDeviceId() && account.getSessionStore() - .isCurrentRatchetKey(sender, senderDeviceId, message.getRatchetKey().get())) { - logger.debug("Renewing the session with sender"); - actions.add(new RenewSessionAction(sender)); - } + if (message.getDeviceId() == account.getDeviceId()) { + handleDecryptionErrorMessage(actions, sender, senderDeviceId, message); } else { - logger.debug("Reset shared sender keys with this recipient"); - account.getSenderKeyStore().deleteSharedWith(sender); + logger.debug("Request is for another one of our devices"); } } @@ -246,6 +258,54 @@ public final class IncomingMessageHandler { return actions; } + private void handleDecryptionErrorMessage( + final List actions, + final RecipientId sender, + final int senderDeviceId, + final DecryptionErrorMessage message + ) { + final var logEntries = account.getMessageSendLogStore() + .findMessages(sender, senderDeviceId, message.getTimestamp(), !message.getRatchetKey().isPresent()); + + for (final var logEntry : logEntries) { + actions.add(new ResendMessageAction(sender, message.getTimestamp(), logEntry)); + } + + if (message.getRatchetKey().isPresent()) { + if (account.getSessionStore().isCurrentRatchetKey(sender, senderDeviceId, message.getRatchetKey().get())) { + if (logEntries.isEmpty()) { + logger.debug("Renewing the session with sender"); + actions.add(new RenewSessionAction(sender)); + } else { + logger.trace("Archiving the session with sender, a resend message has already been queued"); + context.getAccount().getSessionStore().archiveSessions(sender); + } + } + return; + } + + var found = false; + for (final var logEntry : logEntries) { + if (logEntry.groupId().isEmpty()) { + continue; + } + final var group = account.getGroupStore().getGroup(logEntry.groupId().get()); + if (group == null) { + continue; + } + found = true; + logger.trace("Deleting shared sender key with {} ({}): {}", + sender, + senderDeviceId, + group.getDistributionId()); + account.getSenderKeyStore().deleteSharedWith(sender, senderDeviceId, group.getDistributionId()); + } + if (!found) { + logger.debug("Reset all shared sender keys with this recipient, no related message found in send log"); + account.getSenderKeyStore().deleteSharedWith(sender); + } + } + private List handleSyncMessage( final SignalServiceSyncMessage syncMessage, final RecipientId sender, final boolean ignoreAttachments ) { diff --git a/lib/src/main/java/org/asamk/signal/manager/helper/SendHelper.java b/lib/src/main/java/org/asamk/signal/manager/helper/SendHelper.java index 4fa1aaeb..aedd29a2 100644 --- a/lib/src/main/java/org/asamk/signal/manager/helper/SendHelper.java +++ b/lib/src/main/java/org/asamk/signal/manager/helper/SendHelper.java @@ -1,5 +1,7 @@ package org.asamk.signal.manager.helper; +import com.google.protobuf.ByteString; + import org.asamk.signal.manager.SignalDependencies; import org.asamk.signal.manager.api.UnregisteredRecipientException; import org.asamk.signal.manager.groups.GroupId; @@ -11,11 +13,13 @@ import org.asamk.signal.manager.storage.SignalAccount; import org.asamk.signal.manager.storage.groups.GroupInfo; import org.asamk.signal.manager.storage.recipients.Profile; import org.asamk.signal.manager.storage.recipients.RecipientId; +import org.asamk.signal.manager.storage.sendLog.MessageSendLogEntry; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.whispersystems.libsignal.InvalidKeyException; import org.whispersystems.libsignal.InvalidRegistrationIdException; import org.whispersystems.libsignal.NoSessionException; +import org.whispersystems.libsignal.SignalProtocolAddress; import org.whispersystems.libsignal.protocol.DecryptionErrorMessage; import org.whispersystems.libsignal.util.guava.Optional; import org.whispersystems.signalservice.api.SignalServiceMessageSender; @@ -45,6 +49,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; public class SendHelper { @@ -74,9 +79,7 @@ public class SendHelper { messageBuilder.withProfileKey(account.getProfileKey().serialize()); final var message = messageBuilder.build(); - final var result = sendMessage(message, recipientId); - handleSendMessageResult(result); - return result; + return sendMessage(message, recipientId); } /** @@ -90,29 +93,6 @@ public class SendHelper { return sendAsGroupMessage(messageBuilder, g); } - private List sendAsGroupMessage( - final SignalServiceDataMessage.Builder messageBuilder, final GroupInfo g - ) throws IOException, GroupSendingNotAllowedException { - GroupUtils.setGroupContext(messageBuilder, g); - messageBuilder.withExpiration(g.getMessageExpirationTimer()); - - final var message = messageBuilder.build(); - final var recipients = g.getMembersWithout(account.getSelfRecipientId()); - - if (g.isAnnouncementGroup() && !g.isAdmin(account.getSelfRecipientId())) { - if (message.getBody().isPresent() - || message.getAttachments().isPresent() - || message.getQuote().isPresent() - || message.getPreviews().isPresent() - || message.getMentions().isPresent() - || message.getSticker().isPresent()) { - throw new GroupSendingNotAllowedException(g.getGroupId(), g.getTitle()); - } - } - - return sendGroupMessage(message, recipients, g.getDistributionId()); - } - /** * Send a complete group message to the given recipients (should be current/old/new members) * This method should only be used for create/update/quit group messages. @@ -122,31 +102,7 @@ public class SendHelper { final Set recipientIds, final DistributionId distributionId ) throws IOException { - final var messageSender = dependencies.getMessageSender(); - final var results = sendGroupMessageInternal((recipients, unidentifiedAccess, isRecipientUpdate) -> messageSender.sendDataMessage( - recipients, - unidentifiedAccess, - isRecipientUpdate, - ContentHint.DEFAULT, - message, - SignalServiceMessageSender.LegacyGroupEvents.EMPTY, - sendResult -> logger.trace("Partial message send result: {}", sendResult.isSuccess()), - () -> false), - (distId, recipients, unidentifiedAccess, isRecipientUpdate) -> messageSender.sendGroupDataMessage(distId, - recipients, - unidentifiedAccess, - isRecipientUpdate, - ContentHint.DEFAULT, - message, - SignalServiceMessageSender.SenderKeyGroupEvents.EMPTY), - recipientIds, - distributionId); - - for (var r : results) { - handleSendMessageResult(r); - } - - return results; + return sendGroupMessage(message, recipientIds, distributionId, ContentHint.IMPLICIT); } public SendMessageResult sendDeliveryReceipt( @@ -162,10 +118,14 @@ public class SendHelper { public SendMessageResult sendReceiptMessage( final SignalServiceReceiptMessage receiptMessage, final RecipientId recipientId ) { - return handleSendMessage(recipientId, + final var messageSendLogStore = account.getMessageSendLogStore(); + final var result = handleSendMessage(recipientId, (messageSender, address, unidentifiedAccess) -> messageSender.sendReceipt(address, unidentifiedAccess, receiptMessage)); + messageSendLogStore.insertIfPossible(receiptMessage.getWhen(), result, ContentHint.IMPLICIT); + handleSendMessageResult(result); + return result; } public SendMessageResult sendRetryReceipt( @@ -175,15 +135,19 @@ public class SendHelper { errorMessage.getTimestamp(), recipientId, errorMessage.getDeviceId()); - return handleSendMessage(recipientId, + final var result = handleSendMessage(recipientId, (messageSender, address, unidentifiedAccess) -> messageSender.sendRetryReceipt(address, unidentifiedAccess, groupId.transform(GroupId::serialize), errorMessage)); + handleSendMessageResult(result); + return result; } public SendMessageResult sendNullMessage(RecipientId recipientId) { - return handleSendMessage(recipientId, SignalServiceMessageSender::sendNullMessage); + final var result = handleSendMessage(recipientId, SignalServiceMessageSender::sendNullMessage); + handleSendMessageResult(result); + return result; } public SendMessageResult sendSelfMessage( @@ -225,10 +189,12 @@ public class SendHelper { public SendMessageResult sendTypingMessage( SignalServiceTypingMessage message, RecipientId recipientId ) { - return handleSendMessage(recipientId, + final var result = handleSendMessage(recipientId, (messageSender, address, unidentifiedAccess) -> messageSender.sendTyping(address, unidentifiedAccess, message)); + handleSendMessageResult(result); + return result; } public List sendGroupTypingMessage( @@ -244,6 +210,142 @@ public class SendHelper { return sendGroupTypingMessage(message, recipientIds, distributionId); } + public SendMessageResult resendMessage( + final RecipientId recipientId, final long timestamp, final MessageSendLogEntry messageSendLogEntry + ) { + if (messageSendLogEntry.groupId().isEmpty()) { + return handleSendMessage(recipientId, + (messageSender, address, unidentifiedAccess) -> messageSender.resendContent(address, + unidentifiedAccess, + timestamp, + messageSendLogEntry.content(), + messageSendLogEntry.contentHint(), + Optional.absent())); + } + + final var groupId = messageSendLogEntry.groupId().get(); + final var group = account.getGroupStore().getGroup(groupId); + + if (group == null) { + logger.debug("Could not find a matching group for the groupId {}! Skipping message send.", + groupId.toBase64()); + return null; + } else if (!group.getMembers().contains(recipientId)) { + logger.warn("The target user is no longer in the group {}! Skipping message send.", groupId.toBase64()); + return null; + } + + final var senderKeyDistributionMessage = dependencies.getMessageSender() + .getOrCreateNewGroupSession(group.getDistributionId()); + final var distributionBytes = ByteString.copyFrom(senderKeyDistributionMessage.serialize()); + final var contentToSend = messageSendLogEntry.content() + .toBuilder() + .setSenderKeyDistributionMessage(distributionBytes) + .build(); + + final var result = handleSendMessage(recipientId, + (messageSender, address, unidentifiedAccess) -> messageSender.resendContent(address, + unidentifiedAccess, + timestamp, + contentToSend, + messageSendLogEntry.contentHint(), + Optional.of(group.getGroupId().serialize()))); + + if (result.isSuccess()) { + final var address = context.getRecipientHelper().resolveSignalServiceAddress(recipientId); + final var addresses = result.getSuccess() + .getDevices() + .stream() + .map(device -> new SignalProtocolAddress(address.getIdentifier(), device)) + .collect(Collectors.toList()); + + account.getSenderKeyStore().markSenderKeySharedWith(group.getDistributionId(), addresses); + } + + return result; + } + + private List sendAsGroupMessage( + final SignalServiceDataMessage.Builder messageBuilder, final GroupInfo g + ) throws IOException, GroupSendingNotAllowedException { + GroupUtils.setGroupContext(messageBuilder, g); + messageBuilder.withExpiration(g.getMessageExpirationTimer()); + + final var message = messageBuilder.build(); + final var recipients = g.getMembersWithout(account.getSelfRecipientId()); + + if (g.isAnnouncementGroup() && !g.isAdmin(account.getSelfRecipientId())) { + if (message.getBody().isPresent() + || message.getAttachments().isPresent() + || message.getQuote().isPresent() + || message.getPreviews().isPresent() + || message.getMentions().isPresent() + || message.getSticker().isPresent()) { + throw new GroupSendingNotAllowedException(g.getGroupId(), g.getTitle()); + } + } + + return sendGroupMessage(message, recipients, g.getDistributionId(), ContentHint.RESENDABLE); + } + + private List sendGroupMessage( + final SignalServiceDataMessage message, + final Set recipientIds, + final DistributionId distributionId, + final ContentHint contentHint + ) throws IOException { + final var messageSender = dependencies.getMessageSender(); + final var messageSendLogStore = account.getMessageSendLogStore(); + final AtomicLong entryId = new AtomicLong(-1); + + final LegacySenderHandler legacySender = (recipients, unidentifiedAccess, isRecipientUpdate) -> messageSender.sendDataMessage( + recipients, + unidentifiedAccess, + isRecipientUpdate, + contentHint, + message, + SignalServiceMessageSender.LegacyGroupEvents.EMPTY, + sendResult -> { + logger.trace("Partial message send result: {}", sendResult.isSuccess()); + synchronized (entryId) { + if (entryId.get() == -1) { + final var newId = messageSendLogStore.insertIfPossible(message.getTimestamp(), + sendResult, + contentHint); + entryId.set(newId); + } else { + messageSendLogStore.addRecipientToExistingEntryIfPossible(entryId.get(), sendResult); + } + } + }, + () -> false); + final SenderKeySenderHandler senderKeySender = (distId, recipients, unidentifiedAccess, isRecipientUpdate) -> { + final var res = messageSender.sendGroupDataMessage(distId, + recipients, + unidentifiedAccess, + isRecipientUpdate, + contentHint, + message, + SignalServiceMessageSender.SenderKeyGroupEvents.EMPTY); + synchronized (entryId) { + if (entryId.get() == -1) { + final var newId = messageSendLogStore.insertIfPossible(message.getTimestamp(), res, contentHint); + entryId.set(newId); + } else { + messageSendLogStore.addRecipientToExistingEntryIfPossible(entryId.get(), res); + } + } + return res; + }; + final var results = sendGroupMessageInternal(legacySender, senderKeySender, recipientIds, distributionId); + + for (var r : results) { + handleSendMessageResult(r); + } + + return results; + } + private List sendGroupTypingMessage( final SignalServiceTypingMessage message, final Set recipientIds, @@ -462,12 +564,16 @@ public class SendHelper { private SendMessageResult sendMessage( SignalServiceDataMessage message, RecipientId recipientId ) { - return handleSendMessage(recipientId, + final var messageSendLogStore = account.getMessageSendLogStore(); + final var result = handleSendMessage(recipientId, (messageSender, address, unidentifiedAccess) -> messageSender.sendDataMessage(address, unidentifiedAccess, - ContentHint.DEFAULT, + ContentHint.RESENDABLE, message, SignalServiceMessageSender.IndividualSendEvents.EMPTY)); + messageSendLogStore.insertIfPossible(message.getTimestamp(), result, ContentHint.RESENDABLE); + handleSendMessageResult(result); + return result; } private SendMessageResult handleSendMessage(RecipientId recipientId, SenderHandler s) { diff --git a/lib/src/main/java/org/asamk/signal/manager/storage/Database.java b/lib/src/main/java/org/asamk/signal/manager/storage/Database.java new file mode 100644 index 00000000..1d69236b --- /dev/null +++ b/lib/src/main/java/org/asamk/signal/manager/storage/Database.java @@ -0,0 +1,94 @@ +package org.asamk.signal.manager.storage; + +import com.zaxxer.hikari.HikariConfig; +import com.zaxxer.hikari.HikariDataSource; + +import org.asamk.signal.manager.storage.sendLog.MessageSendLogStore; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.sqlite.SQLiteConfig; + +import java.io.File; +import java.sql.Connection; +import java.sql.SQLException; + +public class Database implements AutoCloseable { + + private final static Logger logger = LoggerFactory.getLogger(SignalAccount.class); + private static final long DATABASE_VERSION = 1; + + private final HikariDataSource dataSource; + + private Database(final HikariDataSource dataSource) { + this.dataSource = dataSource; + } + + public static Database init(File databaseFile) throws SQLException { + HikariDataSource dataSource = null; + + try { + dataSource = getHikariDataSource(databaseFile.getAbsolutePath()); + + try (final var connection = dataSource.getConnection()) { + final var userVersion = getUserVersion(connection); + logger.trace("Current database version: {} Program database version: {}", + userVersion, + DATABASE_VERSION); + + if (userVersion > DATABASE_VERSION) { + logger.error("Database has been updated by a newer signal-cli version"); + throw new SQLException("Database has been updated by a newer signal-cli version"); + } else if (userVersion < DATABASE_VERSION) { + if (userVersion < 1) { + logger.debug("Updating database: Creating message send log tables"); + MessageSendLogStore.createSql(connection); + } + setUserVersion(connection, DATABASE_VERSION); + } + + final var result = new Database(dataSource); + dataSource = null; + return result; + } + } finally { + if (dataSource != null) { + dataSource.close(); + } + } + } + + public Connection getConnection() throws SQLException { + return dataSource.getConnection(); + } + + @Override + public void close() throws SQLException { + dataSource.close(); + } + + private static long getUserVersion(final Connection connection) throws SQLException { + try (final var statement = connection.createStatement()) { + final var resultSet = statement.executeQuery("PRAGMA user_version"); + return resultSet.getLong(1); + } + } + + private static void setUserVersion(final Connection connection, long userVersion) throws SQLException { + try (final var statement = connection.createStatement()) { + statement.executeUpdate("PRAGMA user_version = " + userVersion); + } + } + + private static HikariDataSource getHikariDataSource(final String databaseFile) { + final var sqliteConfig = new SQLiteConfig(); + sqliteConfig.setBusyTimeout(10_000); + sqliteConfig.setTransactionMode(SQLiteConfig.TransactionMode.IMMEDIATE); + + HikariConfig config = new HikariConfig(); + config.setJdbcUrl("jdbc:sqlite:" + databaseFile); + config.setDataSourceProperties(sqliteConfig.toProperties()); + config.setMinimumIdle(1); + config.setConnectionInitSql("PRAGMA foreign_keys=ON"); + return new HikariDataSource(config); + } +} diff --git a/lib/src/main/java/org/asamk/signal/manager/storage/SignalAccount.java b/lib/src/main/java/org/asamk/signal/manager/storage/SignalAccount.java index f23aea03..862971c9 100644 --- a/lib/src/main/java/org/asamk/signal/manager/storage/SignalAccount.java +++ b/lib/src/main/java/org/asamk/signal/manager/storage/SignalAccount.java @@ -27,6 +27,7 @@ import org.asamk.signal.manager.storage.recipients.Profile; import org.asamk.signal.manager.storage.recipients.RecipientAddress; import org.asamk.signal.manager.storage.recipients.RecipientId; import org.asamk.signal.manager.storage.recipients.RecipientStore; +import org.asamk.signal.manager.storage.sendLog.MessageSendLogStore; import org.asamk.signal.manager.storage.senderKeys.SenderKeyStore; import org.asamk.signal.manager.storage.sessions.SessionStore; import org.asamk.signal.manager.storage.stickers.StickerStore; @@ -62,6 +63,7 @@ import java.nio.channels.ClosedChannelException; import java.nio.channels.FileChannel; import java.nio.channels.FileLock; import java.security.SecureRandom; +import java.sql.SQLException; import java.util.Base64; import java.util.Date; import java.util.HashSet; @@ -120,6 +122,9 @@ public class SignalAccount implements Closeable { private ConfigurationStore.Storage configurationStoreStorage; private MessageCache messageCache; + private MessageSendLogStore messageSendLogStore; + + private Database database; private SignalAccount(final FileChannel fileChannel, final FileLock lock) { this.fileChannel = fileChannel; @@ -227,6 +232,10 @@ public class SignalAccount implements Closeable { return signalAccount; } + public void initDatabase() { + getDatabase(); + } + private void clearAllPreKeys() { this.preKeyIdOffset = new SecureRandom().nextInt(Medium.MAX_VALUE); this.nextSignedPreKeyId = new SecureRandom().nextInt(Medium.MAX_VALUE); @@ -383,6 +392,10 @@ public class SignalAccount implements Closeable { return new File(getUserPath(dataPath, account), "recipients-store"); } + private static File getDatabaseFile(File dataPath, String account) { + return new File(getUserPath(dataPath, account), "account.db"); + } + public static boolean userExists(File dataPath, String account) { if (account == null) { return false; @@ -869,6 +882,21 @@ public class SignalAccount implements Closeable { () -> messageCache = new MessageCache(getMessageCachePath(dataPath, account))); } + public Database getDatabase() { + return getOrCreate(() -> database, () -> { + try { + database = Database.init(getDatabaseFile(dataPath, account)); + } catch (SQLException e) { + throw new RuntimeException(e); + } + }); + } + + public MessageSendLogStore getMessageSendLogStore() { + return getOrCreate(() -> messageSendLogStore, + () -> messageSendLogStore = new MessageSendLogStore(getRecipientStore(), getDatabase())); + } + public String getAccount() { return account; } @@ -1050,6 +1078,16 @@ public class SignalAccount implements Closeable { @Override public void close() { synchronized (fileChannel) { + if (database != null) { + try { + database.close(); + } catch (SQLException e) { + logger.warn("Failed to close account database: {}", e.getMessage(), e); + } + } + if (messageSendLogStore != null) { + messageSendLogStore.close(); + } try { try { lock.close(); diff --git a/lib/src/main/java/org/asamk/signal/manager/storage/sendLog/MessageSendLogEntry.java b/lib/src/main/java/org/asamk/signal/manager/storage/sendLog/MessageSendLogEntry.java new file mode 100644 index 00000000..31a4252a --- /dev/null +++ b/lib/src/main/java/org/asamk/signal/manager/storage/sendLog/MessageSendLogEntry.java @@ -0,0 +1,11 @@ +package org.asamk.signal.manager.storage.sendLog; + +import org.asamk.signal.manager.groups.GroupId; +import org.whispersystems.signalservice.api.crypto.ContentHint; +import org.whispersystems.signalservice.internal.push.SignalServiceProtos; + +import java.util.Optional; + +public record MessageSendLogEntry( + Optional groupId, SignalServiceProtos.Content content, ContentHint contentHint +) {} diff --git a/lib/src/main/java/org/asamk/signal/manager/storage/sendLog/MessageSendLogStore.java b/lib/src/main/java/org/asamk/signal/manager/storage/sendLog/MessageSendLogStore.java new file mode 100644 index 00000000..795919f6 --- /dev/null +++ b/lib/src/main/java/org/asamk/signal/manager/storage/sendLog/MessageSendLogStore.java @@ -0,0 +1,396 @@ +package org.asamk.signal.manager.storage.sendLog; + +import org.asamk.signal.manager.groups.GroupId; +import org.asamk.signal.manager.groups.GroupUtils; +import org.asamk.signal.manager.storage.Database; +import org.asamk.signal.manager.storage.recipients.RecipientId; +import org.asamk.signal.manager.storage.recipients.RecipientResolver; +import org.signal.zkgroup.InvalidInputException; +import org.signal.zkgroup.groups.GroupMasterKey; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.whispersystems.signalservice.api.crypto.ContentHint; +import org.whispersystems.signalservice.api.messages.SendMessageResult; +import org.whispersystems.signalservice.internal.push.SignalServiceProtos; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.time.Duration; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.Spliterator; +import java.util.Spliterators; +import java.util.function.Consumer; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + +public class MessageSendLogStore implements AutoCloseable { + + private static final Logger logger = LoggerFactory.getLogger(MessageSendLogStore.class); + + private static final String TABLE_MESSAGE_SEND_LOG = "message_send_log"; + private static final String TABLE_MESSAGE_SEND_LOG_CONTENT = "message_send_log_content"; + + private static final Duration LOG_DURATION = Duration.ofDays(1); + + private final RecipientResolver recipientResolver; + private final Database database; + private final Thread cleanupThread; + + public MessageSendLogStore( + final RecipientResolver recipientResolver, final Database database + ) { + this.recipientResolver = recipientResolver; + this.database = database; + this.cleanupThread = new Thread(() -> { + try { + final var interval = Duration.ofHours(1).toMillis(); + while (true) { + try (final var connection = database.getConnection()) { + deleteOutdatedEntries(connection); + Thread.sleep(interval); + } catch (SQLException e) { + logger.warn("Deleting outdated entries failed"); + break; + } + } + } catch (InterruptedException e) { + logger.debug("Stopping msl cleanup thread"); + } + }); + cleanupThread.setDaemon(true); + cleanupThread.start(); + } + + public static void createSql(Connection connection) throws SQLException { + try (final var statement = connection.createStatement()) { + statement.executeUpdate(""" + CREATE TABLE message_send_log ( + _id INTEGER PRIMARY KEY, + content_id INTEGER NOT NULL REFERENCES message_send_log_content (_id) ON DELETE CASCADE, + recipient_id INTEGER NOT NULL, + device_id INTEGER NOT NULL + ); + CREATE TABLE message_send_log_content ( + _id INTEGER PRIMARY KEY, + group_id BLOB, + timestamp INTEGER NOT NULL, + content BLOB NOT NULL, + content_hint INTEGER NOT NULL + ); + CREATE INDEX mslc_timestamp_index ON message_send_log_content (timestamp); + CREATE INDEX msl_recipient_index ON message_send_log (recipient_id, device_id, content_id); + CREATE INDEX msl_content_index ON message_send_log (content_id); + """); + } + } + + public List findMessages( + final RecipientId recipientId, final int deviceId, final long timestamp, final boolean isSenderKey + ) { + try (final var connection = database.getConnection()) { + deleteOutdatedEntries(connection); + + try (final var statement = connection.prepareStatement( + "SELECT group_id, content, content_hint FROM %s l INNER JOIN %s lc ON l.content_id = lc._id WHERE l.recipient_id = ? AND l.device_id = ? AND lc.timestamp = ?".formatted( + TABLE_MESSAGE_SEND_LOG, + TABLE_MESSAGE_SEND_LOG_CONTENT))) { + statement.setLong(1, recipientId.id()); + statement.setInt(2, deviceId); + statement.setLong(3, timestamp); + try (var result = executeQueryForStream(statement, resultSet -> { + final var groupId = Optional.ofNullable(resultSet.getBytes("group_id")) + .map(GroupId::unknownVersion); + final SignalServiceProtos.Content content; + try { + content = SignalServiceProtos.Content.parseFrom(resultSet.getBinaryStream("content")); + } catch (IOException e) { + logger.warn("Failed to parse content from message send log", e); + return null; + } + final var contentHint = ContentHint.fromType(resultSet.getInt("content_hint")); + return new MessageSendLogEntry(groupId, content, contentHint); + })) { + return result.filter(Objects::nonNull) + .filter(e -> !isSenderKey || e.groupId().isPresent()) + .toList(); + } + } + } catch (SQLException e) { + logger.warn("Failed read from message send log", e); + return List.of(); + } + } + + public long insertIfPossible( + long sentTimestamp, SendMessageResult sendMessageResult, ContentHint contentHint + ) { + final RecipientDevices recipientDevice = getRecipientDevices(sendMessageResult); + if (recipientDevice == null) { + return -1; + } + + return insert(List.of(recipientDevice), + sentTimestamp, + sendMessageResult.getSuccess().getContent().get(), + contentHint); + } + + public long insertIfPossible( + long sentTimestamp, List sendMessageResults, ContentHint contentHint + ) { + final var recipientDevices = sendMessageResults.stream() + .map(this::getRecipientDevices) + .filter(Objects::nonNull) + .toList(); + if (recipientDevices.isEmpty()) { + return -1; + } + + final var content = sendMessageResults.stream() + .filter(r -> r.isSuccess() && r.getSuccess().getContent().isPresent()) + .map(r -> r.getSuccess().getContent().get()) + .findFirst() + .get(); + + return insert(recipientDevices, sentTimestamp, content, contentHint); + } + + public void addRecipientToExistingEntryIfPossible(final long contentId, final SendMessageResult sendMessageResult) { + final RecipientDevices recipientDevice = getRecipientDevices(sendMessageResult); + if (recipientDevice == null) { + return; + } + + insertRecipientsForExistingContent(contentId, List.of(recipientDevice)); + } + + public void addRecipientToExistingEntryIfPossible( + final long contentId, final List sendMessageResults + ) { + final var recipientDevices = sendMessageResults.stream() + .map(this::getRecipientDevices) + .filter(Objects::nonNull) + .toList(); + if (recipientDevices.isEmpty()) { + return; + } + + insertRecipientsForExistingContent(contentId, recipientDevices); + } + + public void deleteEntryForGroup(long sentTimestamp, GroupId groupId) { + try (final var connection = database.getConnection()) { + try (final var statement = connection.prepareStatement( + "DELETE FROM %s AS lc WHERE lc.timestamp = ? AND lc.group_id = ?".formatted( + TABLE_MESSAGE_SEND_LOG_CONTENT))) { + statement.setLong(1, sentTimestamp); + statement.setBytes(2, groupId.serialize()); + statement.executeUpdate(); + } + } catch (SQLException e) { + logger.warn("Failed delete from message send log", e); + } + } + + public void deleteEntryForRecipientNonGroup(long sentTimestamp, RecipientId recipientId) { + try (final var connection = database.getConnection()) { + connection.setAutoCommit(false); + try (final var statement = connection.prepareStatement( + "DELETE FROM %s AS lc WHERE lc.timestamp = ? AND lc.group_id IS NULL AND lc._id IN (SELECT content_id FROM %s l WHERE l.recipient_id = ?)".formatted( + TABLE_MESSAGE_SEND_LOG_CONTENT, + TABLE_MESSAGE_SEND_LOG))) { + statement.setLong(1, sentTimestamp); + statement.setLong(2, recipientId.id()); + statement.executeUpdate(); + } + + deleteOrphanedLogContents(connection); + connection.commit(); + } catch (SQLException e) { + logger.warn("Failed delete from message send log", e); + } + } + + public void deleteEntryForRecipient(long sentTimestamp, RecipientId recipientId, int deviceId) { + deleteEntriesForRecipient(List.of(sentTimestamp), recipientId, deviceId); + } + + public void deleteEntriesForRecipient(List sentTimestamps, RecipientId recipientId, int deviceId) { + try (final var connection = database.getConnection()) { + connection.setAutoCommit(false); + try (final var statement = connection.prepareStatement( + "DELETE FROM %s AS l WHERE l.content_id IN (SELECT _id FROM %s lc WHERE lc.timestamp = ?) AND l.recipient_id = ? AND l.device_id = ?".formatted( + TABLE_MESSAGE_SEND_LOG, + TABLE_MESSAGE_SEND_LOG_CONTENT))) { + for (final var sentTimestamp : sentTimestamps) { + statement.setLong(1, sentTimestamp); + statement.setLong(2, recipientId.id()); + statement.setInt(3, deviceId); + statement.executeUpdate(); + } + } + + deleteOrphanedLogContents(connection); + connection.commit(); + } catch (SQLException e) { + logger.warn("Failed delete from message send log", e); + } + } + + @Override + public void close() { + cleanupThread.interrupt(); + try { + cleanupThread.join(); + } catch (InterruptedException ignored) { + } + } + + private RecipientDevices getRecipientDevices(final SendMessageResult sendMessageResult) { + if (sendMessageResult.isSuccess() && sendMessageResult.getSuccess().getContent().isPresent()) { + final var recipientId = recipientResolver.resolveRecipient(sendMessageResult.getAddress()); + return new RecipientDevices(recipientId, sendMessageResult.getSuccess().getDevices()); + } else { + return null; + } + } + + private long insert( + final List recipientDevices, + final long sentTimestamp, + final SignalServiceProtos.Content content, + final ContentHint contentHint + ) { + byte[] groupId = getGroupId(content); + + try (final var connection = database.getConnection()) { + connection.setAutoCommit(false); + final long contentId; + try (final var statement = connection.prepareStatement( + "INSERT INTO %s (timestamp, group_id, content, content_hint) VALUES (?,?,?,?)".formatted( + TABLE_MESSAGE_SEND_LOG_CONTENT))) { + statement.setLong(1, sentTimestamp); + statement.setBytes(2, groupId); + statement.setBytes(3, content.toByteArray()); + statement.setInt(4, contentHint.getType()); + statement.executeUpdate(); + final var generatedKeys = statement.getGeneratedKeys(); + if (generatedKeys.next()) { + contentId = generatedKeys.getLong(1); + } else { + contentId = -1; + } + } + if (contentId == -1) { + logger.warn("Failed to insert message send log content"); + return -1; + } + insertRecipientsForExistingContent(contentId, recipientDevices, connection); + + connection.commit(); + return contentId; + } catch (SQLException e) { + logger.warn("Failed to insert into message send log", e); + return -1; + } + } + + private byte[] getGroupId(final SignalServiceProtos.Content content) { + try { + return !content.hasDataMessage() + ? null + : content.getDataMessage().hasGroup() + ? content.getDataMessage().getGroup().getId().toByteArray() + : content.getDataMessage().hasGroupV2() + ? GroupUtils.getGroupIdV2(new GroupMasterKey(content.getDataMessage() + .getGroupV2() + .getMasterKey() + .toByteArray())).serialize() + : null; + } catch (InvalidInputException e) { + logger.warn("Failed to parse groupId id from content"); + return null; + } + } + + private void insertRecipientsForExistingContent( + final long contentId, final List recipientDevices + ) { + try (final var connection = database.getConnection()) { + connection.setAutoCommit(false); + insertRecipientsForExistingContent(contentId, recipientDevices, connection); + connection.commit(); + } catch (SQLException e) { + logger.warn("Failed to append recipients to message send log", e); + } + } + + private void insertRecipientsForExistingContent( + final long contentId, final List recipientDevices, final Connection connection + ) throws SQLException { + try (final var statement = connection.prepareStatement( + "INSERT INTO %s (recipient_id, device_id, content_id) VALUES (?,?,?)".formatted(TABLE_MESSAGE_SEND_LOG))) { + for (final var recipientDevice : recipientDevices) { + for (final var deviceId : recipientDevice.deviceIds()) { + statement.setLong(1, recipientDevice.recipientId().id()); + statement.setInt(2, deviceId); + statement.setLong(3, contentId); + statement.executeUpdate(); + } + } + } + } + + private void deleteOutdatedEntries(final Connection connection) throws SQLException { + try (final var statement = connection.prepareStatement("DELETE FROM %s WHERE timestamp < ?".formatted( + TABLE_MESSAGE_SEND_LOG_CONTENT))) { + statement.setLong(1, System.currentTimeMillis() - LOG_DURATION.toMillis()); + final var rowCount = statement.executeUpdate(); + if (rowCount > 0) { + logger.debug("Removed {} outdated entries from the message send log", rowCount); + } + } + } + + private void deleteOrphanedLogContents(final Connection connection) throws SQLException { + try (final var statement = connection.prepareStatement( + "DELETE FROM %s WHERE _id NOT IN (SELECT content_id FROM %s)".formatted(TABLE_MESSAGE_SEND_LOG_CONTENT, + TABLE_MESSAGE_SEND_LOG))) { + statement.executeUpdate(); + } + } + + private Stream executeQueryForStream( + PreparedStatement statement, ResultSetMapper mapper + ) throws SQLException { + final var resultSet = statement.executeQuery(); + + return StreamSupport.stream(new Spliterators.AbstractSpliterator<>(Long.MAX_VALUE, Spliterator.ORDERED) { + @Override + public boolean tryAdvance(final Consumer consumer) { + try { + if (!resultSet.next()) { + return false; + } + consumer.accept(mapper.apply(resultSet)); + return true; + } catch (SQLException e) { + logger.warn("Failed to read from database result", e); + throw new RuntimeException(e); + } + } + }, false); + } + + private interface ResultSetMapper { + + T apply(ResultSet resultSet) throws SQLException; + } + + private record RecipientDevices(RecipientId recipientId, List deviceIds) {} +} diff --git a/lib/src/main/java/org/asamk/signal/manager/storage/senderKeys/SenderKeySharedStore.java b/lib/src/main/java/org/asamk/signal/manager/storage/senderKeys/SenderKeySharedStore.java index 9cc6ba42..a5947ef2 100644 --- a/lib/src/main/java/org/asamk/signal/manager/storage/senderKeys/SenderKeySharedStore.java +++ b/lib/src/main/java/org/asamk/signal/manager/storage/senderKeys/SenderKeySharedStore.java @@ -164,6 +164,21 @@ public class SenderKeySharedStore { } } + public void deleteSharedWith( + final RecipientId recipientId, final int deviceId, final DistributionId distributionId + ) { + synchronized (sharedSenderKeys) { + final var entries = sharedSenderKeys.getOrDefault(distributionId.asUuid(), Set.of()); + + sharedSenderKeys.put(distributionId.asUuid(), new HashSet<>(entries) { + { + remove(new SenderKeySharedEntry(recipientId, deviceId)); + } + }); + saveLocked(); + } + } + public void deleteAllFor(final DistributionId distributionId) { synchronized (sharedSenderKeys) { if (sharedSenderKeys.remove(distributionId.asUuid()) != null) { diff --git a/lib/src/main/java/org/asamk/signal/manager/storage/senderKeys/SenderKeyStore.java b/lib/src/main/java/org/asamk/signal/manager/storage/senderKeys/SenderKeyStore.java index 8674945c..5318b3f2 100644 --- a/lib/src/main/java/org/asamk/signal/manager/storage/senderKeys/SenderKeyStore.java +++ b/lib/src/main/java/org/asamk/signal/manager/storage/senderKeys/SenderKeyStore.java @@ -71,6 +71,10 @@ public class SenderKeyStore implements SignalServiceSenderKeyStore { senderKeySharedStore.deleteAllFor(recipientId); } + public void deleteSharedWith(RecipientId recipientId, int deviceId, DistributionId distributionId) { + senderKeySharedStore.deleteSharedWith(recipientId, deviceId, distributionId); + } + public void deleteOurKey(RecipientId selfRecipientId, DistributionId distributionId) { senderKeySharedStore.deleteAllFor(distributionId); senderKeyRecordStore.deleteSenderKey(selfRecipientId, distributionId.asUuid());