]> nmode's Git Repositories - signal-cli/commitdiff
Implement MessageSendLog for resending after encryption error
authorAsamK <asamk@gmx.de>
Sun, 23 Jan 2022 19:50:23 +0000 (20:50 +0100)
committerAsamK <asamk@gmx.de>
Fri, 28 Jan 2022 21:55:51 +0000 (22:55 +0100)
16 files changed:
graalvm-config-dir/jni-config.json
graalvm-config-dir/proxy-config.json
graalvm-config-dir/reflect-config.json
graalvm-config-dir/resource-config.json
lib/build.gradle.kts
lib/src/main/java/org/asamk/signal/manager/Manager.java
lib/src/main/java/org/asamk/signal/manager/ManagerImpl.java
lib/src/main/java/org/asamk/signal/manager/actions/ResendMessageAction.java [new file with mode: 0644]
lib/src/main/java/org/asamk/signal/manager/helper/IncomingMessageHandler.java
lib/src/main/java/org/asamk/signal/manager/helper/SendHelper.java
lib/src/main/java/org/asamk/signal/manager/storage/Database.java [new file with mode: 0644]
lib/src/main/java/org/asamk/signal/manager/storage/SignalAccount.java
lib/src/main/java/org/asamk/signal/manager/storage/sendLog/MessageSendLogEntry.java [new file with mode: 0644]
lib/src/main/java/org/asamk/signal/manager/storage/sendLog/MessageSendLogStore.java [new file with mode: 0644]
lib/src/main/java/org/asamk/signal/manager/storage/senderKeys/SenderKeySharedStore.java
lib/src/main/java/org/asamk/signal/manager/storage/senderKeys/SenderKeyStore.java

index 4865f74737b36fabf3f2cb84b7832e00411279dc..acbf7f3fdb9a6b6380d2d8cd6da2dda034c207a8 100644 (file)
   "name":"org.graalvm.jniutils.JNIExceptionWrapperEntryPoints",
   "methods":[{"name":"getClassName","parameterTypes":["java.lang.Class"] }]
 },
+{
+  "name":"org.sqlite.Collation"
+},
+{
+  "name":"org.sqlite.Function"
+},
+{
+  "name":"org.sqlite.Function$Aggregate"
+},
+{
+  "name":"org.sqlite.Function$Window"
+},
+{
+  "name":"org.sqlite.ProgressHandler"
+},
+{
+  "name":"org.sqlite.core.DB",
+  "methods":[{"name":"throwex","parameterTypes":["int"] }]
+},
+{
+  "name":"org.sqlite.core.DB$ProgressObserver"
+},
+{
+  "name":"org.sqlite.core.NativeDB",
+  "fields":[
+    {"name":"colldatalist"}, 
+    {"name":"pointer"}, 
+    {"name":"udfdatalist"}
+  ]
+},
 {
   "name":"org.whispersystems.libsignal.DuplicateMessageException",
   "methods":[{"name":"<init>","parameterTypes":["java.lang.String"] }]
   "name":"org.whispersystems.libsignal.IdentityKeyPair",
   "methods":[{"name":"serialize","parameterTypes":[] }]
 },
+{
+  "name":"org.whispersystems.libsignal.InvalidKeyException",
+  "methods":[{"name":"<init>","parameterTypes":["java.lang.String"] }]
+},
 {
   "name":"org.whispersystems.libsignal.InvalidMessageException",
   "methods":[{"name":"<init>","parameterTypes":["java.lang.String"] }]
index 426606183cfd479c178489c08052731d6f17df65..3110286d0529612fb004aafc273131c924361ead 100644 (file)
@@ -1,4 +1,7 @@
 [
+  {
+    "interfaces":["java.sql.Connection"]}
+  ,
   {
     "interfaces":["org.asamk.Signal"]}
   ,
index c735396935645f9ef9b534f2d843b217851fbd74..c5e61cee32929f5fcf978fce4b9a78d517d1d73d 100644 (file)
 {
   "name":"[J"
 },
+{
+  "name":"[Lcom.zaxxer.hikari.util.ConcurrentBag$IConcurrentBagEntry;"
+},
 {
   "name":"[Ljava.lang.String;"
 },
+{
+  "name":"[Ljava.sql.Statement;"
+},
 {
   "name":"[Lorg.whispersystems.signalservice.api.groupsv2.TemporalCredential;"
 },
   "name":"com.sun.crypto.provider.TlsPrfGenerator$V12",
   "methods":[{"name":"<init>","parameterTypes":[] }]
 },
+{
+  "name":"com.zaxxer.hikari.HikariConfig",
+  "allDeclaredFields":true,
+  "queryAllPublicMethods":true,
+  "methods":[
+    {"name":"getCatalog","parameterTypes":[] }, 
+    {"name":"getConnectionInitSql","parameterTypes":[] }, 
+    {"name":"getConnectionTestQuery","parameterTypes":[] }, 
+    {"name":"getConnectionTimeout","parameterTypes":[] }, 
+    {"name":"getDataSource","parameterTypes":[] }, 
+    {"name":"getDataSourceClassName","parameterTypes":[] }, 
+    {"name":"getDataSourceJNDI","parameterTypes":[] }, 
+    {"name":"getDataSourceProperties","parameterTypes":[] }, 
+    {"name":"getDriverClassName","parameterTypes":[] }, 
+    {"name":"getExceptionOverrideClassName","parameterTypes":[] }, 
+    {"name":"getHealthCheckProperties","parameterTypes":[] }, 
+    {"name":"getHealthCheckRegistry","parameterTypes":[] }, 
+    {"name":"getIdleTimeout","parameterTypes":[] }, 
+    {"name":"getInitializationFailTimeout","parameterTypes":[] }, 
+    {"name":"getJdbcUrl","parameterTypes":[] }, 
+    {"name":"getKeepaliveTime","parameterTypes":[] }, 
+    {"name":"getLeakDetectionThreshold","parameterTypes":[] }, 
+    {"name":"getMaxLifetime","parameterTypes":[] }, 
+    {"name":"getMaximumPoolSize","parameterTypes":[] }, 
+    {"name":"getMetricRegistry","parameterTypes":[] }, 
+    {"name":"getMetricsTrackerFactory","parameterTypes":[] }, 
+    {"name":"getMinimumIdle","parameterTypes":[] }, 
+    {"name":"getPassword","parameterTypes":[] }, 
+    {"name":"getPoolName","parameterTypes":[] }, 
+    {"name":"getScheduledExecutor","parameterTypes":[] }, 
+    {"name":"getSchema","parameterTypes":[] }, 
+    {"name":"getThreadFactory","parameterTypes":[] }, 
+    {"name":"getTransactionIsolation","parameterTypes":[] }, 
+    {"name":"getUsername","parameterTypes":[] }, 
+    {"name":"getValidationTimeout","parameterTypes":[] }, 
+    {"name":"isAllowPoolSuspension","parameterTypes":[] }, 
+    {"name":"isAutoCommit","parameterTypes":[] }, 
+    {"name":"isIsolateInternalQueries","parameterTypes":[] }, 
+    {"name":"isReadOnly","parameterTypes":[] }, 
+    {"name":"isRegisterMbeans","parameterTypes":[] }
+  ]
+},
 {
   "name":"int",
   "allDeclaredMethods":true,
 {
   "name":"org.signal.storageservice.protos.groups.GroupJoinInfo",
   "fields":[
-    {"name":"addFromInviteLink_"},
-    {"name":"avatar_"},
-    {"name":"description_"},
-    {"name":"memberCount_"},
-    {"name":"pendingAdminApproval_"},
-    {"name":"publicKey_"},
-    {"name":"revision_"},
+    {"name":"addFromInviteLink_"}, 
+    {"name":"avatar_"}, 
+    {"name":"description_"}, 
+    {"name":"memberCount_"}, 
+    {"name":"pendingAdminApproval_"}, 
+    {"name":"publicKey_"}, 
+    {"name":"revision_"}, 
     {"name":"title_"}
   ]
 },
 {
   "name":"org.signal.storageservice.protos.groups.local.DecryptedGroupJoinInfo",
   "fields":[
-    {"name":"addFromInviteLink_"},
-    {"name":"avatar_"},
-    {"name":"description_"},
-    {"name":"isAnnouncementGroup_"},
-    {"name":"memberCount_"},
-    {"name":"pendingAdminApproval_"},
-    {"name":"publicKey_"},
-    {"name":"revision_"},
+    {"name":"addFromInviteLink_"}, 
+    {"name":"avatar_"}, 
+    {"name":"description_"}, 
+    {"name":"isAnnouncementGroup_"}, 
+    {"name":"memberCount_"}, 
+    {"name":"pendingAdminApproval_"}, 
+    {"name":"publicKey_"}, 
+    {"name":"revision_"}, 
     {"name":"title_"}
   ]
 },
   "queryAllDeclaredMethods":true,
   "queryAllDeclaredConstructors":true
 },
+{
+  "name":"org.sqlite.JDBC"
+},
 {
   "name":"org.whispersystems.libsignal.state.IdentityKeyStore",
   "allDeclaredMethods":true
index 8a4863406108551613a56e29f46ec9c2660747d7..a52689aebfcf753c49bcbf18ca6dfe805bc7d04d 100644 (file)
@@ -1,6 +1,12 @@
 {
   "resources":{
   "includes":[
+    {
+      "pattern":"\\QMETA-INF/maven/org.xerial/sqlite-jdbc/pom.properties\\E"
+    }, 
+    {
+      "pattern":"\\QMETA-INF/services/java.sql.Driver\\E"
+    }, 
     {
       "pattern":"\\QMETA-INF/services/org.freedesktop.dbus.spi.transport.ITransportProvider\\E"
     }, 
     {
       "pattern":"\\Qorg/slf4j/impl/StaticLoggerBinder.class\\E"
     }, 
+    {
+      "pattern":"\\Qorg/sqlite/native/Linux/x86_64/libsqlitejdbc.so\\E"
+    }, 
+    {
+      "pattern":"\\Qsqlite-jdbc.properties\\E"
+    }, 
     {
       "pattern":"com/google/i18n/phonenumbers/data/.*"
     }
index 21dfb19a13dde71f09403a08e6b867467f868b9f..5b891b8bae6b378accf30fed4519fcbf5eca1628 100644 (file)
@@ -19,6 +19,8 @@ dependencies {
     implementation("com.google.protobuf", "protobuf-javalite", "3.11.4")
     implementation("org.bouncycastle", "bcprov-jdk15on", "1.70")
     implementation("org.slf4j", "slf4j-api", "1.7.32")
+    implementation("org.xerial", "sqlite-jdbc", "3.36.0.3")
+    implementation("com.zaxxer", "HikariCP", "5.0.1")
 }
 
 configurations {
index 2c677dae5bca99fe220b661dac5c0ba538dd5381..ed70bceebe760cfc31c468663ff1321632aa2e0f 100644 (file)
@@ -67,6 +67,7 @@ public interface Manager extends Closeable {
             throw new NotRegisteredException();
         }
 
+        account.initDatabase();
         final var serviceEnvironmentConfig = ServiceConfig.getServiceEnvironmentConfig(serviceEnvironment, userAgent);
 
         return new ManagerImpl(account, pathConfig, serviceEnvironmentConfig, userAgent);
index 0e100d722b08505cd78ff5105e3ddf335633c04e..7c8cff5d6297d11f185cd3e3283fdc8d9446f948 100644 (file)
@@ -571,6 +571,17 @@ public class ManagerImpl implements Manager {
     ) throws IOException, NotAGroupMemberException, GroupNotFoundException, GroupSendingNotAllowedException {
         var delete = new SignalServiceDataMessage.RemoteDelete(targetSentTimestamp);
         final var messageBuilder = SignalServiceDataMessage.newBuilder().withRemoteDelete(delete);
+        for (final var recipient : recipients) {
+            if (recipient instanceof RecipientIdentifier.Single r) {
+                try {
+                    final var recipientId = context.getRecipientHelper().resolveRecipient(r);
+                    account.getMessageSendLogStore().deleteEntryForRecipientNonGroup(targetSentTimestamp, recipientId);
+                } catch (UnregisteredRecipientException ignored) {
+                }
+            } else if (recipient instanceof RecipientIdentifier.Group r) {
+                account.getMessageSendLogStore().deleteEntryForGroup(targetSentTimestamp, r.groupId());
+            }
+        }
         return sendMessage(messageBuilder, recipients);
     }
 
diff --git a/lib/src/main/java/org/asamk/signal/manager/actions/ResendMessageAction.java b/lib/src/main/java/org/asamk/signal/manager/actions/ResendMessageAction.java
new file mode 100644 (file)
index 0000000..9f399dd
--- /dev/null
@@ -0,0 +1,42 @@
+package org.asamk.signal.manager.actions;
+
+import org.asamk.signal.manager.helper.Context;
+import org.asamk.signal.manager.storage.recipients.RecipientId;
+import org.asamk.signal.manager.storage.sendLog.MessageSendLogEntry;
+
+import java.util.Objects;
+
+public class ResendMessageAction implements HandleAction {
+
+    private final RecipientId recipientId;
+    private final long timestamp;
+    private final MessageSendLogEntry messageSendLogEntry;
+
+    public ResendMessageAction(
+            final RecipientId recipientId, final long timestamp, final MessageSendLogEntry messageSendLogEntry
+    ) {
+        this.recipientId = recipientId;
+        this.timestamp = timestamp;
+        this.messageSendLogEntry = messageSendLogEntry;
+    }
+
+    @Override
+    public void execute(Context context) throws Throwable {
+        context.getSendHelper().resendMessage(recipientId, timestamp, messageSendLogEntry);
+    }
+
+    @Override
+    public boolean equals(final Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        final ResendMessageAction that = (ResendMessageAction) o;
+        return timestamp == that.timestamp
+                && recipientId.equals(that.recipientId)
+                && messageSendLogEntry.equals(that.messageSendLogEntry);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(recipientId, timestamp, messageSendLogEntry);
+    }
+}
index 251dfde38c3b2cefbe2eb38c0d9777bfade28aa2..eece81d36a43a8298a4f9ba11b93dfd3bd0cbe7c 100644 (file)
@@ -7,6 +7,7 @@ import org.asamk.signal.manager.UntrustedIdentityException;
 import org.asamk.signal.manager.actions.HandleAction;
 import org.asamk.signal.manager.actions.RefreshPreKeysAction;
 import org.asamk.signal.manager.actions.RenewSessionAction;
+import org.asamk.signal.manager.actions.ResendMessageAction;
 import org.asamk.signal.manager.actions.RetrieveProfileAction;
 import org.asamk.signal.manager.actions.RetrieveStorageDataAction;
 import org.asamk.signal.manager.actions.SendGroupInfoAction;
@@ -41,6 +42,7 @@ import org.signal.zkgroup.profiles.ProfileKey;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.whispersystems.libsignal.SignalProtocolAddress;
+import org.whispersystems.libsignal.protocol.DecryptionErrorMessage;
 import org.whispersystems.signalservice.api.messages.SignalServiceContent;
 import org.whispersystems.signalservice.api.messages.SignalServiceDataMessage;
 import org.whispersystems.signalservice.api.messages.SignalServiceEnvelope;
@@ -165,6 +167,13 @@ public final class IncomingMessageHandler {
             // address/uuid is validated by unidentified sender certificate
             account.getRecipientStore().resolveRecipientTrusted(content.getSender());
         }
+        if (envelope.isReceipt()) {
+            final var senderPair = getSender(envelope, content);
+            final var sender = senderPair.first();
+            final var senderDeviceId = senderPair.second();
+            account.getMessageSendLogStore().deleteEntryForRecipient(envelope.getTimestamp(), sender, senderDeviceId);
+        }
+
         if (isMessageBlocked(envelope, content)) {
             logger.info("Ignoring a message from blocked user/group: {}", envelope.getTimestamp());
             return List.of();
@@ -198,6 +207,14 @@ public final class IncomingMessageHandler {
         final var sender = senderPair.first();
         final var senderDeviceId = senderPair.second();
 
+        if (content.getReceiptMessage().isPresent()) {
+            final var message = content.getReceiptMessage().get();
+            if (message.isDeliveryReceipt()) {
+                account.getMessageSendLogStore()
+                        .deleteEntriesForRecipient(message.getTimestamps(), sender, senderDeviceId);
+            }
+        }
+
         if (content.getSenderKeyDistributionMessage().isPresent()) {
             final var message = content.getSenderKeyDistributionMessage().get();
             final var protocolAddress = new SignalProtocolAddress(context.getRecipientHelper()
@@ -212,15 +229,10 @@ public final class IncomingMessageHandler {
         if (content.getDecryptionErrorMessage().isPresent()) {
             var message = content.getDecryptionErrorMessage().get();
             logger.debug("Received a decryption error message (resend request for {})", message.getTimestamp());
-            if (message.getRatchetKey().isPresent()) {
-                if (message.getDeviceId() == account.getDeviceId() && account.getSessionStore()
-                        .isCurrentRatchetKey(sender, senderDeviceId, message.getRatchetKey().get())) {
-                    logger.debug("Renewing the session with sender");
-                    actions.add(new RenewSessionAction(sender));
-                }
+            if (message.getDeviceId() == account.getDeviceId()) {
+                handleDecryptionErrorMessage(actions, sender, senderDeviceId, message);
             } else {
-                logger.debug("Reset shared sender keys with this recipient");
-                account.getSenderKeyStore().deleteSharedWith(sender);
+                logger.debug("Request is for another one of our devices");
             }
         }
 
@@ -246,6 +258,54 @@ public final class IncomingMessageHandler {
         return actions;
     }
 
+    private void handleDecryptionErrorMessage(
+            final List<HandleAction> actions,
+            final RecipientId sender,
+            final int senderDeviceId,
+            final DecryptionErrorMessage message
+    ) {
+        final var logEntries = account.getMessageSendLogStore()
+                .findMessages(sender, senderDeviceId, message.getTimestamp(), !message.getRatchetKey().isPresent());
+
+        for (final var logEntry : logEntries) {
+            actions.add(new ResendMessageAction(sender, message.getTimestamp(), logEntry));
+        }
+
+        if (message.getRatchetKey().isPresent()) {
+            if (account.getSessionStore().isCurrentRatchetKey(sender, senderDeviceId, message.getRatchetKey().get())) {
+                if (logEntries.isEmpty()) {
+                    logger.debug("Renewing the session with sender");
+                    actions.add(new RenewSessionAction(sender));
+                } else {
+                    logger.trace("Archiving the session with sender, a resend message has already been queued");
+                    context.getAccount().getSessionStore().archiveSessions(sender);
+                }
+            }
+            return;
+        }
+
+        var found = false;
+        for (final var logEntry : logEntries) {
+            if (logEntry.groupId().isEmpty()) {
+                continue;
+            }
+            final var group = account.getGroupStore().getGroup(logEntry.groupId().get());
+            if (group == null) {
+                continue;
+            }
+            found = true;
+            logger.trace("Deleting shared sender key with {} ({}): {}",
+                    sender,
+                    senderDeviceId,
+                    group.getDistributionId());
+            account.getSenderKeyStore().deleteSharedWith(sender, senderDeviceId, group.getDistributionId());
+        }
+        if (!found) {
+            logger.debug("Reset all shared sender keys with this recipient, no related message found in send log");
+            account.getSenderKeyStore().deleteSharedWith(sender);
+        }
+    }
+
     private List<HandleAction> handleSyncMessage(
             final SignalServiceSyncMessage syncMessage, final RecipientId sender, final boolean ignoreAttachments
     ) {
index 4fa1aaeb275d0fcc1c6c1f322ffd81f9ac1d1289..aedd29a2c9290b3382bc51e2cd7efa1d505a3b69 100644 (file)
@@ -1,5 +1,7 @@
 package org.asamk.signal.manager.helper;
 
+import com.google.protobuf.ByteString;
+
 import org.asamk.signal.manager.SignalDependencies;
 import org.asamk.signal.manager.api.UnregisteredRecipientException;
 import org.asamk.signal.manager.groups.GroupId;
@@ -11,11 +13,13 @@ import org.asamk.signal.manager.storage.SignalAccount;
 import org.asamk.signal.manager.storage.groups.GroupInfo;
 import org.asamk.signal.manager.storage.recipients.Profile;
 import org.asamk.signal.manager.storage.recipients.RecipientId;
+import org.asamk.signal.manager.storage.sendLog.MessageSendLogEntry;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.whispersystems.libsignal.InvalidKeyException;
 import org.whispersystems.libsignal.InvalidRegistrationIdException;
 import org.whispersystems.libsignal.NoSessionException;
+import org.whispersystems.libsignal.SignalProtocolAddress;
 import org.whispersystems.libsignal.protocol.DecryptionErrorMessage;
 import org.whispersystems.libsignal.util.guava.Optional;
 import org.whispersystems.signalservice.api.SignalServiceMessageSender;
@@ -45,6 +49,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 
 public class SendHelper {
@@ -74,9 +79,7 @@ public class SendHelper {
         messageBuilder.withProfileKey(account.getProfileKey().serialize());
 
         final var message = messageBuilder.build();
-        final var result = sendMessage(message, recipientId);
-        handleSendMessageResult(result);
-        return result;
+        return sendMessage(message, recipientId);
     }
 
     /**
@@ -90,29 +93,6 @@ public class SendHelper {
         return sendAsGroupMessage(messageBuilder, g);
     }
 
-    private List<SendMessageResult> sendAsGroupMessage(
-            final SignalServiceDataMessage.Builder messageBuilder, final GroupInfo g
-    ) throws IOException, GroupSendingNotAllowedException {
-        GroupUtils.setGroupContext(messageBuilder, g);
-        messageBuilder.withExpiration(g.getMessageExpirationTimer());
-
-        final var message = messageBuilder.build();
-        final var recipients = g.getMembersWithout(account.getSelfRecipientId());
-
-        if (g.isAnnouncementGroup() && !g.isAdmin(account.getSelfRecipientId())) {
-            if (message.getBody().isPresent()
-                    || message.getAttachments().isPresent()
-                    || message.getQuote().isPresent()
-                    || message.getPreviews().isPresent()
-                    || message.getMentions().isPresent()
-                    || message.getSticker().isPresent()) {
-                throw new GroupSendingNotAllowedException(g.getGroupId(), g.getTitle());
-            }
-        }
-
-        return sendGroupMessage(message, recipients, g.getDistributionId());
-    }
-
     /**
      * Send a complete group message to the given recipients (should be current/old/new members)
      * This method should only be used for create/update/quit group messages.
@@ -122,31 +102,7 @@ public class SendHelper {
             final Set<RecipientId> recipientIds,
             final DistributionId distributionId
     ) throws IOException {
-        final var messageSender = dependencies.getMessageSender();
-        final var results = sendGroupMessageInternal((recipients, unidentifiedAccess, isRecipientUpdate) -> messageSender.sendDataMessage(
-                        recipients,
-                        unidentifiedAccess,
-                        isRecipientUpdate,
-                        ContentHint.DEFAULT,
-                        message,
-                        SignalServiceMessageSender.LegacyGroupEvents.EMPTY,
-                        sendResult -> logger.trace("Partial message send result: {}", sendResult.isSuccess()),
-                        () -> false),
-                (distId, recipients, unidentifiedAccess, isRecipientUpdate) -> messageSender.sendGroupDataMessage(distId,
-                        recipients,
-                        unidentifiedAccess,
-                        isRecipientUpdate,
-                        ContentHint.DEFAULT,
-                        message,
-                        SignalServiceMessageSender.SenderKeyGroupEvents.EMPTY),
-                recipientIds,
-                distributionId);
-
-        for (var r : results) {
-            handleSendMessageResult(r);
-        }
-
-        return results;
+        return sendGroupMessage(message, recipientIds, distributionId, ContentHint.IMPLICIT);
     }
 
     public SendMessageResult sendDeliveryReceipt(
@@ -162,10 +118,14 @@ public class SendHelper {
     public SendMessageResult sendReceiptMessage(
             final SignalServiceReceiptMessage receiptMessage, final RecipientId recipientId
     ) {
-        return handleSendMessage(recipientId,
+        final var messageSendLogStore = account.getMessageSendLogStore();
+        final var result = handleSendMessage(recipientId,
                 (messageSender, address, unidentifiedAccess) -> messageSender.sendReceipt(address,
                         unidentifiedAccess,
                         receiptMessage));
+        messageSendLogStore.insertIfPossible(receiptMessage.getWhen(), result, ContentHint.IMPLICIT);
+        handleSendMessageResult(result);
+        return result;
     }
 
     public SendMessageResult sendRetryReceipt(
@@ -175,15 +135,19 @@ public class SendHelper {
                 errorMessage.getTimestamp(),
                 recipientId,
                 errorMessage.getDeviceId());
-        return handleSendMessage(recipientId,
+        final var result = handleSendMessage(recipientId,
                 (messageSender, address, unidentifiedAccess) -> messageSender.sendRetryReceipt(address,
                         unidentifiedAccess,
                         groupId.transform(GroupId::serialize),
                         errorMessage));
+        handleSendMessageResult(result);
+        return result;
     }
 
     public SendMessageResult sendNullMessage(RecipientId recipientId) {
-        return handleSendMessage(recipientId, SignalServiceMessageSender::sendNullMessage);
+        final var result = handleSendMessage(recipientId, SignalServiceMessageSender::sendNullMessage);
+        handleSendMessageResult(result);
+        return result;
     }
 
     public SendMessageResult sendSelfMessage(
@@ -225,10 +189,12 @@ public class SendHelper {
     public SendMessageResult sendTypingMessage(
             SignalServiceTypingMessage message, RecipientId recipientId
     ) {
-        return handleSendMessage(recipientId,
+        final var result = handleSendMessage(recipientId,
                 (messageSender, address, unidentifiedAccess) -> messageSender.sendTyping(address,
                         unidentifiedAccess,
                         message));
+        handleSendMessageResult(result);
+        return result;
     }
 
     public List<SendMessageResult> sendGroupTypingMessage(
@@ -244,6 +210,142 @@ public class SendHelper {
         return sendGroupTypingMessage(message, recipientIds, distributionId);
     }
 
+    public SendMessageResult resendMessage(
+            final RecipientId recipientId, final long timestamp, final MessageSendLogEntry messageSendLogEntry
+    ) {
+        if (messageSendLogEntry.groupId().isEmpty()) {
+            return handleSendMessage(recipientId,
+                    (messageSender, address, unidentifiedAccess) -> messageSender.resendContent(address,
+                            unidentifiedAccess,
+                            timestamp,
+                            messageSendLogEntry.content(),
+                            messageSendLogEntry.contentHint(),
+                            Optional.absent()));
+        }
+
+        final var groupId = messageSendLogEntry.groupId().get();
+        final var group = account.getGroupStore().getGroup(groupId);
+
+        if (group == null) {
+            logger.debug("Could not find a matching group for the groupId {}! Skipping message send.",
+                    groupId.toBase64());
+            return null;
+        } else if (!group.getMembers().contains(recipientId)) {
+            logger.warn("The target user is no longer in the group {}! Skipping message send.", groupId.toBase64());
+            return null;
+        }
+
+        final var senderKeyDistributionMessage = dependencies.getMessageSender()
+                .getOrCreateNewGroupSession(group.getDistributionId());
+        final var distributionBytes = ByteString.copyFrom(senderKeyDistributionMessage.serialize());
+        final var contentToSend = messageSendLogEntry.content()
+                .toBuilder()
+                .setSenderKeyDistributionMessage(distributionBytes)
+                .build();
+
+        final var result = handleSendMessage(recipientId,
+                (messageSender, address, unidentifiedAccess) -> messageSender.resendContent(address,
+                        unidentifiedAccess,
+                        timestamp,
+                        contentToSend,
+                        messageSendLogEntry.contentHint(),
+                        Optional.of(group.getGroupId().serialize())));
+
+        if (result.isSuccess()) {
+            final var address = context.getRecipientHelper().resolveSignalServiceAddress(recipientId);
+            final var addresses = result.getSuccess()
+                    .getDevices()
+                    .stream()
+                    .map(device -> new SignalProtocolAddress(address.getIdentifier(), device))
+                    .collect(Collectors.toList());
+
+            account.getSenderKeyStore().markSenderKeySharedWith(group.getDistributionId(), addresses);
+        }
+
+        return result;
+    }
+
+    private List<SendMessageResult> sendAsGroupMessage(
+            final SignalServiceDataMessage.Builder messageBuilder, final GroupInfo g
+    ) throws IOException, GroupSendingNotAllowedException {
+        GroupUtils.setGroupContext(messageBuilder, g);
+        messageBuilder.withExpiration(g.getMessageExpirationTimer());
+
+        final var message = messageBuilder.build();
+        final var recipients = g.getMembersWithout(account.getSelfRecipientId());
+
+        if (g.isAnnouncementGroup() && !g.isAdmin(account.getSelfRecipientId())) {
+            if (message.getBody().isPresent()
+                    || message.getAttachments().isPresent()
+                    || message.getQuote().isPresent()
+                    || message.getPreviews().isPresent()
+                    || message.getMentions().isPresent()
+                    || message.getSticker().isPresent()) {
+                throw new GroupSendingNotAllowedException(g.getGroupId(), g.getTitle());
+            }
+        }
+
+        return sendGroupMessage(message, recipients, g.getDistributionId(), ContentHint.RESENDABLE);
+    }
+
+    private List<SendMessageResult> sendGroupMessage(
+            final SignalServiceDataMessage message,
+            final Set<RecipientId> recipientIds,
+            final DistributionId distributionId,
+            final ContentHint contentHint
+    ) throws IOException {
+        final var messageSender = dependencies.getMessageSender();
+        final var messageSendLogStore = account.getMessageSendLogStore();
+        final AtomicLong entryId = new AtomicLong(-1);
+
+        final LegacySenderHandler legacySender = (recipients, unidentifiedAccess, isRecipientUpdate) -> messageSender.sendDataMessage(
+                recipients,
+                unidentifiedAccess,
+                isRecipientUpdate,
+                contentHint,
+                message,
+                SignalServiceMessageSender.LegacyGroupEvents.EMPTY,
+                sendResult -> {
+                    logger.trace("Partial message send result: {}", sendResult.isSuccess());
+                    synchronized (entryId) {
+                        if (entryId.get() == -1) {
+                            final var newId = messageSendLogStore.insertIfPossible(message.getTimestamp(),
+                                    sendResult,
+                                    contentHint);
+                            entryId.set(newId);
+                        } else {
+                            messageSendLogStore.addRecipientToExistingEntryIfPossible(entryId.get(), sendResult);
+                        }
+                    }
+                },
+                () -> false);
+        final SenderKeySenderHandler senderKeySender = (distId, recipients, unidentifiedAccess, isRecipientUpdate) -> {
+            final var res = messageSender.sendGroupDataMessage(distId,
+                    recipients,
+                    unidentifiedAccess,
+                    isRecipientUpdate,
+                    contentHint,
+                    message,
+                    SignalServiceMessageSender.SenderKeyGroupEvents.EMPTY);
+            synchronized (entryId) {
+                if (entryId.get() == -1) {
+                    final var newId = messageSendLogStore.insertIfPossible(message.getTimestamp(), res, contentHint);
+                    entryId.set(newId);
+                } else {
+                    messageSendLogStore.addRecipientToExistingEntryIfPossible(entryId.get(), res);
+                }
+            }
+            return res;
+        };
+        final var results = sendGroupMessageInternal(legacySender, senderKeySender, recipientIds, distributionId);
+
+        for (var r : results) {
+            handleSendMessageResult(r);
+        }
+
+        return results;
+    }
+
     private List<SendMessageResult> sendGroupTypingMessage(
             final SignalServiceTypingMessage message,
             final Set<RecipientId> recipientIds,
@@ -462,12 +564,16 @@ public class SendHelper {
     private SendMessageResult sendMessage(
             SignalServiceDataMessage message, RecipientId recipientId
     ) {
-        return handleSendMessage(recipientId,
+        final var messageSendLogStore = account.getMessageSendLogStore();
+        final var result = handleSendMessage(recipientId,
                 (messageSender, address, unidentifiedAccess) -> messageSender.sendDataMessage(address,
                         unidentifiedAccess,
-                        ContentHint.DEFAULT,
+                        ContentHint.RESENDABLE,
                         message,
                         SignalServiceMessageSender.IndividualSendEvents.EMPTY));
+        messageSendLogStore.insertIfPossible(message.getTimestamp(), result, ContentHint.RESENDABLE);
+        handleSendMessageResult(result);
+        return result;
     }
 
     private SendMessageResult handleSendMessage(RecipientId recipientId, SenderHandler s) {
diff --git a/lib/src/main/java/org/asamk/signal/manager/storage/Database.java b/lib/src/main/java/org/asamk/signal/manager/storage/Database.java
new file mode 100644 (file)
index 0000000..1d69236
--- /dev/null
@@ -0,0 +1,94 @@
+package org.asamk.signal.manager.storage;
+
+import com.zaxxer.hikari.HikariConfig;
+import com.zaxxer.hikari.HikariDataSource;
+
+import org.asamk.signal.manager.storage.sendLog.MessageSendLogStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.sqlite.SQLiteConfig;
+
+import java.io.File;
+import java.sql.Connection;
+import java.sql.SQLException;
+
+public class Database implements AutoCloseable {
+
+    private final static Logger logger = LoggerFactory.getLogger(SignalAccount.class);
+    private static final long DATABASE_VERSION = 1;
+
+    private final HikariDataSource dataSource;
+
+    private Database(final HikariDataSource dataSource) {
+        this.dataSource = dataSource;
+    }
+
+    public static Database init(File databaseFile) throws SQLException {
+        HikariDataSource dataSource = null;
+
+        try {
+            dataSource = getHikariDataSource(databaseFile.getAbsolutePath());
+
+            try (final var connection = dataSource.getConnection()) {
+                final var userVersion = getUserVersion(connection);
+                logger.trace("Current database version: {} Program database version: {}",
+                        userVersion,
+                        DATABASE_VERSION);
+
+                if (userVersion > DATABASE_VERSION) {
+                    logger.error("Database has been updated by a newer signal-cli version");
+                    throw new SQLException("Database has been updated by a newer signal-cli version");
+                } else if (userVersion < DATABASE_VERSION) {
+                    if (userVersion < 1) {
+                        logger.debug("Updating database: Creating message send log tables");
+                        MessageSendLogStore.createSql(connection);
+                    }
+                    setUserVersion(connection, DATABASE_VERSION);
+                }
+
+                final var result = new Database(dataSource);
+                dataSource = null;
+                return result;
+            }
+        } finally {
+            if (dataSource != null) {
+                dataSource.close();
+            }
+        }
+    }
+
+    public Connection getConnection() throws SQLException {
+        return dataSource.getConnection();
+    }
+
+    @Override
+    public void close() throws SQLException {
+        dataSource.close();
+    }
+
+    private static long getUserVersion(final Connection connection) throws SQLException {
+        try (final var statement = connection.createStatement()) {
+            final var resultSet = statement.executeQuery("PRAGMA user_version");
+            return resultSet.getLong(1);
+        }
+    }
+
+    private static void setUserVersion(final Connection connection, long userVersion) throws SQLException {
+        try (final var statement = connection.createStatement()) {
+            statement.executeUpdate("PRAGMA user_version = " + userVersion);
+        }
+    }
+
+    private static HikariDataSource getHikariDataSource(final String databaseFile) {
+        final var sqliteConfig = new SQLiteConfig();
+        sqliteConfig.setBusyTimeout(10_000);
+        sqliteConfig.setTransactionMode(SQLiteConfig.TransactionMode.IMMEDIATE);
+
+        HikariConfig config = new HikariConfig();
+        config.setJdbcUrl("jdbc:sqlite:" + databaseFile);
+        config.setDataSourceProperties(sqliteConfig.toProperties());
+        config.setMinimumIdle(1);
+        config.setConnectionInitSql("PRAGMA foreign_keys=ON");
+        return new HikariDataSource(config);
+    }
+}
index f23aea03f3276717bc58caa8d8f2924edc0d55f2..862971c9f3ed8d20cf455d2a7a50490127e4514b 100644 (file)
@@ -27,6 +27,7 @@ import org.asamk.signal.manager.storage.recipients.Profile;
 import org.asamk.signal.manager.storage.recipients.RecipientAddress;
 import org.asamk.signal.manager.storage.recipients.RecipientId;
 import org.asamk.signal.manager.storage.recipients.RecipientStore;
+import org.asamk.signal.manager.storage.sendLog.MessageSendLogStore;
 import org.asamk.signal.manager.storage.senderKeys.SenderKeyStore;
 import org.asamk.signal.manager.storage.sessions.SessionStore;
 import org.asamk.signal.manager.storage.stickers.StickerStore;
@@ -62,6 +63,7 @@ import java.nio.channels.ClosedChannelException;
 import java.nio.channels.FileChannel;
 import java.nio.channels.FileLock;
 import java.security.SecureRandom;
+import java.sql.SQLException;
 import java.util.Base64;
 import java.util.Date;
 import java.util.HashSet;
@@ -120,6 +122,9 @@ public class SignalAccount implements Closeable {
     private ConfigurationStore.Storage configurationStoreStorage;
 
     private MessageCache messageCache;
+    private MessageSendLogStore messageSendLogStore;
+
+    private Database database;
 
     private SignalAccount(final FileChannel fileChannel, final FileLock lock) {
         this.fileChannel = fileChannel;
@@ -227,6 +232,10 @@ public class SignalAccount implements Closeable {
         return signalAccount;
     }
 
+    public void initDatabase() {
+        getDatabase();
+    }
+
     private void clearAllPreKeys() {
         this.preKeyIdOffset = new SecureRandom().nextInt(Medium.MAX_VALUE);
         this.nextSignedPreKeyId = new SecureRandom().nextInt(Medium.MAX_VALUE);
@@ -383,6 +392,10 @@ public class SignalAccount implements Closeable {
         return new File(getUserPath(dataPath, account), "recipients-store");
     }
 
+    private static File getDatabaseFile(File dataPath, String account) {
+        return new File(getUserPath(dataPath, account), "account.db");
+    }
+
     public static boolean userExists(File dataPath, String account) {
         if (account == null) {
             return false;
@@ -869,6 +882,21 @@ public class SignalAccount implements Closeable {
                 () -> messageCache = new MessageCache(getMessageCachePath(dataPath, account)));
     }
 
+    public Database getDatabase() {
+        return getOrCreate(() -> database, () -> {
+            try {
+                database = Database.init(getDatabaseFile(dataPath, account));
+            } catch (SQLException e) {
+                throw new RuntimeException(e);
+            }
+        });
+    }
+
+    public MessageSendLogStore getMessageSendLogStore() {
+        return getOrCreate(() -> messageSendLogStore,
+                () -> messageSendLogStore = new MessageSendLogStore(getRecipientStore(), getDatabase()));
+    }
+
     public String getAccount() {
         return account;
     }
@@ -1050,6 +1078,16 @@ public class SignalAccount implements Closeable {
     @Override
     public void close() {
         synchronized (fileChannel) {
+            if (database != null) {
+                try {
+                    database.close();
+                } catch (SQLException e) {
+                    logger.warn("Failed to close account database: {}", e.getMessage(), e);
+                }
+            }
+            if (messageSendLogStore != null) {
+                messageSendLogStore.close();
+            }
             try {
                 try {
                     lock.close();
diff --git a/lib/src/main/java/org/asamk/signal/manager/storage/sendLog/MessageSendLogEntry.java b/lib/src/main/java/org/asamk/signal/manager/storage/sendLog/MessageSendLogEntry.java
new file mode 100644 (file)
index 0000000..31a4252
--- /dev/null
@@ -0,0 +1,11 @@
+package org.asamk.signal.manager.storage.sendLog;
+
+import org.asamk.signal.manager.groups.GroupId;
+import org.whispersystems.signalservice.api.crypto.ContentHint;
+import org.whispersystems.signalservice.internal.push.SignalServiceProtos;
+
+import java.util.Optional;
+
+public record MessageSendLogEntry(
+        Optional<GroupId> groupId, SignalServiceProtos.Content content, ContentHint contentHint
+) {}
diff --git a/lib/src/main/java/org/asamk/signal/manager/storage/sendLog/MessageSendLogStore.java b/lib/src/main/java/org/asamk/signal/manager/storage/sendLog/MessageSendLogStore.java
new file mode 100644 (file)
index 0000000..795919f
--- /dev/null
@@ -0,0 +1,396 @@
+package org.asamk.signal.manager.storage.sendLog;
+
+import org.asamk.signal.manager.groups.GroupId;
+import org.asamk.signal.manager.groups.GroupUtils;
+import org.asamk.signal.manager.storage.Database;
+import org.asamk.signal.manager.storage.recipients.RecipientId;
+import org.asamk.signal.manager.storage.recipients.RecipientResolver;
+import org.signal.zkgroup.InvalidInputException;
+import org.signal.zkgroup.groups.GroupMasterKey;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.whispersystems.signalservice.api.crypto.ContentHint;
+import org.whispersystems.signalservice.api.messages.SendMessageResult;
+import org.whispersystems.signalservice.internal.push.SignalServiceProtos;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.time.Duration;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Spliterator;
+import java.util.Spliterators;
+import java.util.function.Consumer;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
+public class MessageSendLogStore implements AutoCloseable {
+
+    private static final Logger logger = LoggerFactory.getLogger(MessageSendLogStore.class);
+
+    private static final String TABLE_MESSAGE_SEND_LOG = "message_send_log";
+    private static final String TABLE_MESSAGE_SEND_LOG_CONTENT = "message_send_log_content";
+
+    private static final Duration LOG_DURATION = Duration.ofDays(1);
+
+    private final RecipientResolver recipientResolver;
+    private final Database database;
+    private final Thread cleanupThread;
+
+    public MessageSendLogStore(
+            final RecipientResolver recipientResolver, final Database database
+    ) {
+        this.recipientResolver = recipientResolver;
+        this.database = database;
+        this.cleanupThread = new Thread(() -> {
+            try {
+                final var interval = Duration.ofHours(1).toMillis();
+                while (true) {
+                    try (final var connection = database.getConnection()) {
+                        deleteOutdatedEntries(connection);
+                        Thread.sleep(interval);
+                    } catch (SQLException e) {
+                        logger.warn("Deleting outdated entries failed");
+                        break;
+                    }
+                }
+            } catch (InterruptedException e) {
+                logger.debug("Stopping msl cleanup thread");
+            }
+        });
+        cleanupThread.setDaemon(true);
+        cleanupThread.start();
+    }
+
+    public static void createSql(Connection connection) throws SQLException {
+        try (final var statement = connection.createStatement()) {
+            statement.executeUpdate("""
+                    CREATE TABLE message_send_log (
+                      _id INTEGER PRIMARY KEY,
+                      content_id INTEGER NOT NULL REFERENCES message_send_log_content (_id) ON DELETE CASCADE,
+                      recipient_id INTEGER NOT NULL,
+                      device_id INTEGER NOT NULL
+                    );
+                    CREATE TABLE message_send_log_content (
+                      _id INTEGER PRIMARY KEY,
+                      group_id BLOB,
+                      timestamp INTEGER NOT NULL,
+                      content BLOB NOT NULL,
+                      content_hint INTEGER NOT NULL
+                    );
+                    CREATE INDEX mslc_timestamp_index ON message_send_log_content (timestamp);
+                    CREATE INDEX msl_recipient_index ON message_send_log (recipient_id, device_id, content_id);
+                    CREATE INDEX msl_content_index ON message_send_log (content_id);
+                    """);
+        }
+    }
+
+    public List<MessageSendLogEntry> findMessages(
+            final RecipientId recipientId, final int deviceId, final long timestamp, final boolean isSenderKey
+    ) {
+        try (final var connection = database.getConnection()) {
+            deleteOutdatedEntries(connection);
+
+            try (final var statement = connection.prepareStatement(
+                    "SELECT group_id, content, content_hint FROM %s l INNER JOIN %s lc ON l.content_id = lc._id WHERE l.recipient_id = ? AND l.device_id = ? AND lc.timestamp = ?".formatted(
+                            TABLE_MESSAGE_SEND_LOG,
+                            TABLE_MESSAGE_SEND_LOG_CONTENT))) {
+                statement.setLong(1, recipientId.id());
+                statement.setInt(2, deviceId);
+                statement.setLong(3, timestamp);
+                try (var result = executeQueryForStream(statement, resultSet -> {
+                    final var groupId = Optional.ofNullable(resultSet.getBytes("group_id"))
+                            .map(GroupId::unknownVersion);
+                    final SignalServiceProtos.Content content;
+                    try {
+                        content = SignalServiceProtos.Content.parseFrom(resultSet.getBinaryStream("content"));
+                    } catch (IOException e) {
+                        logger.warn("Failed to parse content from message send log", e);
+                        return null;
+                    }
+                    final var contentHint = ContentHint.fromType(resultSet.getInt("content_hint"));
+                    return new MessageSendLogEntry(groupId, content, contentHint);
+                })) {
+                    return result.filter(Objects::nonNull)
+                            .filter(e -> !isSenderKey || e.groupId().isPresent())
+                            .toList();
+                }
+            }
+        } catch (SQLException e) {
+            logger.warn("Failed read from message send log", e);
+            return List.of();
+        }
+    }
+
+    public long insertIfPossible(
+            long sentTimestamp, SendMessageResult sendMessageResult, ContentHint contentHint
+    ) {
+        final RecipientDevices recipientDevice = getRecipientDevices(sendMessageResult);
+        if (recipientDevice == null) {
+            return -1;
+        }
+
+        return insert(List.of(recipientDevice),
+                sentTimestamp,
+                sendMessageResult.getSuccess().getContent().get(),
+                contentHint);
+    }
+
+    public long insertIfPossible(
+            long sentTimestamp, List<SendMessageResult> sendMessageResults, ContentHint contentHint
+    ) {
+        final var recipientDevices = sendMessageResults.stream()
+                .map(this::getRecipientDevices)
+                .filter(Objects::nonNull)
+                .toList();
+        if (recipientDevices.isEmpty()) {
+            return -1;
+        }
+
+        final var content = sendMessageResults.stream()
+                .filter(r -> r.isSuccess() && r.getSuccess().getContent().isPresent())
+                .map(r -> r.getSuccess().getContent().get())
+                .findFirst()
+                .get();
+
+        return insert(recipientDevices, sentTimestamp, content, contentHint);
+    }
+
+    public void addRecipientToExistingEntryIfPossible(final long contentId, final SendMessageResult sendMessageResult) {
+        final RecipientDevices recipientDevice = getRecipientDevices(sendMessageResult);
+        if (recipientDevice == null) {
+            return;
+        }
+
+        insertRecipientsForExistingContent(contentId, List.of(recipientDevice));
+    }
+
+    public void addRecipientToExistingEntryIfPossible(
+            final long contentId, final List<SendMessageResult> sendMessageResults
+    ) {
+        final var recipientDevices = sendMessageResults.stream()
+                .map(this::getRecipientDevices)
+                .filter(Objects::nonNull)
+                .toList();
+        if (recipientDevices.isEmpty()) {
+            return;
+        }
+
+        insertRecipientsForExistingContent(contentId, recipientDevices);
+    }
+
+    public void deleteEntryForGroup(long sentTimestamp, GroupId groupId) {
+        try (final var connection = database.getConnection()) {
+            try (final var statement = connection.prepareStatement(
+                    "DELETE FROM %s AS lc WHERE lc.timestamp = ? AND lc.group_id = ?".formatted(
+                            TABLE_MESSAGE_SEND_LOG_CONTENT))) {
+                statement.setLong(1, sentTimestamp);
+                statement.setBytes(2, groupId.serialize());
+                statement.executeUpdate();
+            }
+        } catch (SQLException e) {
+            logger.warn("Failed delete from message send log", e);
+        }
+    }
+
+    public void deleteEntryForRecipientNonGroup(long sentTimestamp, RecipientId recipientId) {
+        try (final var connection = database.getConnection()) {
+            connection.setAutoCommit(false);
+            try (final var statement = connection.prepareStatement(
+                    "DELETE FROM %s AS lc WHERE lc.timestamp = ? AND lc.group_id IS NULL AND lc._id IN (SELECT content_id FROM %s l WHERE l.recipient_id = ?)".formatted(
+                            TABLE_MESSAGE_SEND_LOG_CONTENT,
+                            TABLE_MESSAGE_SEND_LOG))) {
+                statement.setLong(1, sentTimestamp);
+                statement.setLong(2, recipientId.id());
+                statement.executeUpdate();
+            }
+
+            deleteOrphanedLogContents(connection);
+            connection.commit();
+        } catch (SQLException e) {
+            logger.warn("Failed delete from message send log", e);
+        }
+    }
+
+    public void deleteEntryForRecipient(long sentTimestamp, RecipientId recipientId, int deviceId) {
+        deleteEntriesForRecipient(List.of(sentTimestamp), recipientId, deviceId);
+    }
+
+    public void deleteEntriesForRecipient(List<Long> sentTimestamps, RecipientId recipientId, int deviceId) {
+        try (final var connection = database.getConnection()) {
+            connection.setAutoCommit(false);
+            try (final var statement = connection.prepareStatement(
+                    "DELETE FROM %s AS l WHERE l.content_id IN (SELECT _id FROM %s lc WHERE lc.timestamp = ?) AND l.recipient_id = ? AND l.device_id = ?".formatted(
+                            TABLE_MESSAGE_SEND_LOG,
+                            TABLE_MESSAGE_SEND_LOG_CONTENT))) {
+                for (final var sentTimestamp : sentTimestamps) {
+                    statement.setLong(1, sentTimestamp);
+                    statement.setLong(2, recipientId.id());
+                    statement.setInt(3, deviceId);
+                    statement.executeUpdate();
+                }
+            }
+
+            deleteOrphanedLogContents(connection);
+            connection.commit();
+        } catch (SQLException e) {
+            logger.warn("Failed delete from message send log", e);
+        }
+    }
+
+    @Override
+    public void close() {
+        cleanupThread.interrupt();
+        try {
+            cleanupThread.join();
+        } catch (InterruptedException ignored) {
+        }
+    }
+
+    private RecipientDevices getRecipientDevices(final SendMessageResult sendMessageResult) {
+        if (sendMessageResult.isSuccess() && sendMessageResult.getSuccess().getContent().isPresent()) {
+            final var recipientId = recipientResolver.resolveRecipient(sendMessageResult.getAddress());
+            return new RecipientDevices(recipientId, sendMessageResult.getSuccess().getDevices());
+        } else {
+            return null;
+        }
+    }
+
+    private long insert(
+            final List<RecipientDevices> recipientDevices,
+            final long sentTimestamp,
+            final SignalServiceProtos.Content content,
+            final ContentHint contentHint
+    ) {
+        byte[] groupId = getGroupId(content);
+
+        try (final var connection = database.getConnection()) {
+            connection.setAutoCommit(false);
+            final long contentId;
+            try (final var statement = connection.prepareStatement(
+                    "INSERT INTO %s (timestamp, group_id, content, content_hint) VALUES (?,?,?,?)".formatted(
+                            TABLE_MESSAGE_SEND_LOG_CONTENT))) {
+                statement.setLong(1, sentTimestamp);
+                statement.setBytes(2, groupId);
+                statement.setBytes(3, content.toByteArray());
+                statement.setInt(4, contentHint.getType());
+                statement.executeUpdate();
+                final var generatedKeys = statement.getGeneratedKeys();
+                if (generatedKeys.next()) {
+                    contentId = generatedKeys.getLong(1);
+                } else {
+                    contentId = -1;
+                }
+            }
+            if (contentId == -1) {
+                logger.warn("Failed to insert message send log content");
+                return -1;
+            }
+            insertRecipientsForExistingContent(contentId, recipientDevices, connection);
+
+            connection.commit();
+            return contentId;
+        } catch (SQLException e) {
+            logger.warn("Failed to insert into message send log", e);
+            return -1;
+        }
+    }
+
+    private byte[] getGroupId(final SignalServiceProtos.Content content) {
+        try {
+            return !content.hasDataMessage()
+                    ? null
+                    : content.getDataMessage().hasGroup()
+                            ? content.getDataMessage().getGroup().getId().toByteArray()
+                            : content.getDataMessage().hasGroupV2()
+                                    ? GroupUtils.getGroupIdV2(new GroupMasterKey(content.getDataMessage()
+                                    .getGroupV2()
+                                    .getMasterKey()
+                                    .toByteArray())).serialize()
+                                    : null;
+        } catch (InvalidInputException e) {
+            logger.warn("Failed to parse groupId id from content");
+            return null;
+        }
+    }
+
+    private void insertRecipientsForExistingContent(
+            final long contentId, final List<RecipientDevices> recipientDevices
+    ) {
+        try (final var connection = database.getConnection()) {
+            connection.setAutoCommit(false);
+            insertRecipientsForExistingContent(contentId, recipientDevices, connection);
+            connection.commit();
+        } catch (SQLException e) {
+            logger.warn("Failed to append recipients to message send log", e);
+        }
+    }
+
+    private void insertRecipientsForExistingContent(
+            final long contentId, final List<RecipientDevices> recipientDevices, final Connection connection
+    ) throws SQLException {
+        try (final var statement = connection.prepareStatement(
+                "INSERT INTO %s (recipient_id, device_id, content_id) VALUES (?,?,?)".formatted(TABLE_MESSAGE_SEND_LOG))) {
+            for (final var recipientDevice : recipientDevices) {
+                for (final var deviceId : recipientDevice.deviceIds()) {
+                    statement.setLong(1, recipientDevice.recipientId().id());
+                    statement.setInt(2, deviceId);
+                    statement.setLong(3, contentId);
+                    statement.executeUpdate();
+                }
+            }
+        }
+    }
+
+    private void deleteOutdatedEntries(final Connection connection) throws SQLException {
+        try (final var statement = connection.prepareStatement("DELETE FROM %s WHERE timestamp < ?".formatted(
+                TABLE_MESSAGE_SEND_LOG_CONTENT))) {
+            statement.setLong(1, System.currentTimeMillis() - LOG_DURATION.toMillis());
+            final var rowCount = statement.executeUpdate();
+            if (rowCount > 0) {
+                logger.debug("Removed {} outdated entries from the message send log", rowCount);
+            }
+        }
+    }
+
+    private void deleteOrphanedLogContents(final Connection connection) throws SQLException {
+        try (final var statement = connection.prepareStatement(
+                "DELETE FROM %s WHERE _id NOT IN (SELECT content_id FROM %s)".formatted(TABLE_MESSAGE_SEND_LOG_CONTENT,
+                        TABLE_MESSAGE_SEND_LOG))) {
+            statement.executeUpdate();
+        }
+    }
+
+    private <T> Stream<T> executeQueryForStream(
+            PreparedStatement statement, ResultSetMapper<T> mapper
+    ) throws SQLException {
+        final var resultSet = statement.executeQuery();
+
+        return StreamSupport.stream(new Spliterators.AbstractSpliterator<>(Long.MAX_VALUE, Spliterator.ORDERED) {
+            @Override
+            public boolean tryAdvance(final Consumer<? super T> consumer) {
+                try {
+                    if (!resultSet.next()) {
+                        return false;
+                    }
+                    consumer.accept(mapper.apply(resultSet));
+                    return true;
+                } catch (SQLException e) {
+                    logger.warn("Failed to read from database result", e);
+                    throw new RuntimeException(e);
+                }
+            }
+        }, false);
+    }
+
+    private interface ResultSetMapper<T> {
+
+        T apply(ResultSet resultSet) throws SQLException;
+    }
+
+    private record RecipientDevices(RecipientId recipientId, List<Integer> deviceIds) {}
+}
index 9cc6ba429df6df7fbe23bc00a9948bd213bd419e..a5947ef21bd5c46863c9e2ea97eb0e32e74f23fe 100644 (file)
@@ -164,6 +164,21 @@ public class SenderKeySharedStore {
         }
     }
 
+    public void deleteSharedWith(
+            final RecipientId recipientId, final int deviceId, final DistributionId distributionId
+    ) {
+        synchronized (sharedSenderKeys) {
+            final var entries = sharedSenderKeys.getOrDefault(distributionId.asUuid(), Set.of());
+
+            sharedSenderKeys.put(distributionId.asUuid(), new HashSet<>(entries) {
+                {
+                    remove(new SenderKeySharedEntry(recipientId, deviceId));
+                }
+            });
+            saveLocked();
+        }
+    }
+
     public void deleteAllFor(final DistributionId distributionId) {
         synchronized (sharedSenderKeys) {
             if (sharedSenderKeys.remove(distributionId.asUuid()) != null) {
index 8674945cf4dc35648fbd0f4d5484bfc18e4a925d..5318b3f2d8cb0d5483829b0334016d2a447cb942 100644 (file)
@@ -71,6 +71,10 @@ public class SenderKeyStore implements SignalServiceSenderKeyStore {
         senderKeySharedStore.deleteAllFor(recipientId);
     }
 
+    public void deleteSharedWith(RecipientId recipientId, int deviceId, DistributionId distributionId) {
+        senderKeySharedStore.deleteSharedWith(recipientId, deviceId, distributionId);
+    }
+
     public void deleteOurKey(RecipientId selfRecipientId, DistributionId distributionId) {
         senderKeySharedStore.deleteAllFor(distributionId);
         senderKeyRecordStore.deleteSenderKey(selfRecipientId, distributionId.asUuid());