1 package org
.asamk
.signal
.manager
.storage
.sendLog
;
3 import org
.asamk
.signal
.manager
.groups
.GroupId
;
4 import org
.asamk
.signal
.manager
.groups
.GroupUtils
;
5 import org
.asamk
.signal
.manager
.storage
.Database
;
6 import org
.asamk
.signal
.manager
.storage
.Utils
;
7 import org
.asamk
.signal
.manager
.storage
.recipients
.RecipientId
;
8 import org
.asamk
.signal
.manager
.storage
.recipients
.RecipientResolver
;
9 import org
.signal
.libsignal
.zkgroup
.InvalidInputException
;
10 import org
.signal
.libsignal
.zkgroup
.groups
.GroupMasterKey
;
11 import org
.slf4j
.Logger
;
12 import org
.slf4j
.LoggerFactory
;
13 import org
.whispersystems
.signalservice
.api
.crypto
.ContentHint
;
14 import org
.whispersystems
.signalservice
.api
.messages
.SendMessageResult
;
15 import org
.whispersystems
.signalservice
.internal
.push
.SignalServiceProtos
;
17 import java
.io
.IOException
;
18 import java
.sql
.Connection
;
19 import java
.sql
.ResultSet
;
20 import java
.sql
.SQLException
;
21 import java
.time
.Duration
;
22 import java
.util
.List
;
23 import java
.util
.Objects
;
24 import java
.util
.Optional
;
26 public class MessageSendLogStore
implements AutoCloseable
{
28 private static final Logger logger
= LoggerFactory
.getLogger(MessageSendLogStore
.class);
30 private static final String TABLE_MESSAGE_SEND_LOG
= "message_send_log";
31 private static final String TABLE_MESSAGE_SEND_LOG_CONTENT
= "message_send_log_content";
33 private static final Duration LOG_DURATION
= Duration
.ofDays(1);
35 private final RecipientResolver recipientResolver
;
36 private final Database database
;
37 private final Thread cleanupThread
;
39 public MessageSendLogStore(
40 final RecipientResolver recipientResolver
, final Database database
42 this.recipientResolver
= recipientResolver
;
43 this.database
= database
;
44 this.cleanupThread
= new Thread(() -> {
46 final var interval
= Duration
.ofHours(1).toMillis();
47 while (!Thread
.interrupted()) {
48 try (final var connection
= database
.getConnection()) {
49 deleteOutdatedEntries(connection
);
50 } catch (SQLException e
) {
51 logger
.warn("Deleting outdated entries failed");
54 Thread
.sleep(interval
);
56 } catch (InterruptedException e
) {
57 logger
.debug("Stopping msl cleanup thread");
60 cleanupThread
.setName("msl-cleanup");
61 cleanupThread
.setDaemon(true);
62 cleanupThread
.start();
65 public static void createSql(Connection connection
) throws SQLException
{
66 // When modifying the CREATE statement here, also add a migration in AccountDatabase.java
67 try (final var statement
= connection
.createStatement()) {
68 statement
.executeUpdate("""
69 CREATE TABLE message_send_log (
70 _id INTEGER PRIMARY KEY,
71 content_id INTEGER NOT NULL REFERENCES message_send_log_content (_id) ON DELETE CASCADE,
72 recipient_id INTEGER NOT NULL REFERENCES recipient (_id) ON DELETE CASCADE,
73 device_id INTEGER NOT NULL
75 CREATE TABLE message_send_log_content (
76 _id INTEGER PRIMARY KEY,
78 timestamp INTEGER NOT NULL,
79 content BLOB NOT NULL,
80 content_hint INTEGER NOT NULL,
81 urgent BOOLEAN NOT NULL
83 CREATE INDEX mslc_timestamp_index ON message_send_log_content (timestamp);
84 CREATE INDEX msl_recipient_index ON message_send_log (recipient_id, device_id, content_id);
85 CREATE INDEX msl_content_index ON message_send_log (content_id);
90 public List
<MessageSendLogEntry
> findMessages(
91 final RecipientId recipientId
, final int deviceId
, final long timestamp
, final boolean isSenderKey
94 SELECT group_id, content, content_hint
96 INNER JOIN %s lc ON l.content_id = lc._id
97 WHERE l.recipient_id = ? AND l.device_id = ? AND lc.timestamp = ?
98 """.formatted(TABLE_MESSAGE_SEND_LOG
, TABLE_MESSAGE_SEND_LOG_CONTENT
);
99 try (final var connection
= database
.getConnection()) {
100 deleteOutdatedEntries(connection
);
102 try (final var statement
= connection
.prepareStatement(sql
)) {
103 statement
.setLong(1, recipientId
.id());
104 statement
.setInt(2, deviceId
);
105 statement
.setLong(3, timestamp
);
106 try (var result
= Utils
.executeQueryForStream(statement
, this::getMessageSendLogEntryFromResultSet
)) {
107 return result
.filter(Objects
::nonNull
)
108 .filter(e
-> !isSenderKey
|| e
.groupId().isPresent())
112 } catch (SQLException e
) {
113 logger
.warn("Failed read from message send log", e
);
118 public long insertIfPossible(
119 long sentTimestamp
, SendMessageResult sendMessageResult
, ContentHint contentHint
, boolean urgent
121 final RecipientDevices recipientDevice
= getRecipientDevices(sendMessageResult
);
122 if (recipientDevice
== null) {
126 return insert(List
.of(recipientDevice
),
128 sendMessageResult
.getSuccess().getContent().get(),
133 public long insertIfPossible(
134 long sentTimestamp
, List
<SendMessageResult
> sendMessageResults
, ContentHint contentHint
, boolean urgent
136 final var recipientDevices
= sendMessageResults
.stream()
137 .map(this::getRecipientDevices
)
138 .filter(Objects
::nonNull
)
140 if (recipientDevices
.isEmpty()) {
144 final var content
= sendMessageResults
.stream()
145 .filter(r
-> r
.isSuccess() && r
.getSuccess().getContent().isPresent())
146 .map(r
-> r
.getSuccess().getContent().get())
150 return insert(recipientDevices
, sentTimestamp
, content
, contentHint
, urgent
);
153 public void addRecipientToExistingEntryIfPossible(final long contentId
, final SendMessageResult sendMessageResult
) {
154 final RecipientDevices recipientDevice
= getRecipientDevices(sendMessageResult
);
155 if (recipientDevice
== null) {
159 insertRecipientsForExistingContent(contentId
, List
.of(recipientDevice
));
162 public void addRecipientToExistingEntryIfPossible(
163 final long contentId
, final List
<SendMessageResult
> sendMessageResults
165 final var recipientDevices
= sendMessageResults
.stream()
166 .map(this::getRecipientDevices
)
167 .filter(Objects
::nonNull
)
169 if (recipientDevices
.isEmpty()) {
173 insertRecipientsForExistingContent(contentId
, recipientDevices
);
176 public void deleteEntryForGroup(long sentTimestamp
, GroupId groupId
) {
179 WHERE lc.timestamp = ? AND lc.group_id = ?
180 """.formatted(TABLE_MESSAGE_SEND_LOG_CONTENT
);
181 try (final var connection
= database
.getConnection()) {
182 try (final var statement
= connection
.prepareStatement(sql
)) {
183 statement
.setLong(1, sentTimestamp
);
184 statement
.setBytes(2, groupId
.serialize());
185 statement
.executeUpdate();
187 } catch (SQLException e
) {
188 logger
.warn("Failed delete from message send log", e
);
192 public void deleteEntryForRecipientNonGroup(long sentTimestamp
, RecipientId recipientId
) {
195 WHERE lc.timestamp = ? AND lc.group_id IS NULL AND lc._id IN (SELECT content_id FROM %s l WHERE l.recipient_id = ?)
196 """.formatted(TABLE_MESSAGE_SEND_LOG_CONTENT
, TABLE_MESSAGE_SEND_LOG
);
197 try (final var connection
= database
.getConnection()) {
198 connection
.setAutoCommit(false);
199 try (final var statement
= connection
.prepareStatement(sql
)) {
200 statement
.setLong(1, sentTimestamp
);
201 statement
.setLong(2, recipientId
.id());
202 statement
.executeUpdate();
205 deleteOrphanedLogContents(connection
);
207 } catch (SQLException e
) {
208 logger
.warn("Failed delete from message send log", e
);
212 public void deleteEntryForRecipient(long sentTimestamp
, RecipientId recipientId
, int deviceId
) {
213 deleteEntriesForRecipient(List
.of(sentTimestamp
), recipientId
, deviceId
);
216 public void deleteEntriesForRecipient(List
<Long
> sentTimestamps
, RecipientId recipientId
, int deviceId
) {
219 WHERE l.content_id IN (SELECT _id FROM %s lc WHERE lc.timestamp = ?) AND l.recipient_id = ? AND l.device_id = ?
220 """.formatted(TABLE_MESSAGE_SEND_LOG
, TABLE_MESSAGE_SEND_LOG_CONTENT
);
221 try (final var connection
= database
.getConnection()) {
222 connection
.setAutoCommit(false);
223 try (final var statement
= connection
.prepareStatement(sql
)) {
224 for (final var sentTimestamp
: sentTimestamps
) {
225 statement
.setLong(1, sentTimestamp
);
226 statement
.setLong(2, recipientId
.id());
227 statement
.setInt(3, deviceId
);
228 statement
.executeUpdate();
232 deleteOrphanedLogContents(connection
);
234 } catch (SQLException e
) {
235 logger
.warn("Failed delete from message send log", e
);
240 public void close() {
241 cleanupThread
.interrupt();
243 cleanupThread
.join();
244 } catch (InterruptedException ignored
) {
248 private RecipientDevices
getRecipientDevices(final SendMessageResult sendMessageResult
) {
249 if (sendMessageResult
.isSuccess() && sendMessageResult
.getSuccess().getContent().isPresent()) {
250 final var recipientId
= recipientResolver
.resolveRecipient(sendMessageResult
.getAddress());
251 return new RecipientDevices(recipientId
, sendMessageResult
.getSuccess().getDevices());
258 final List
<RecipientDevices
> recipientDevices
,
259 final long sentTimestamp
,
260 final SignalServiceProtos
.Content content
,
261 final ContentHint contentHint
,
264 byte[] groupId
= getGroupId(content
);
267 INSERT INTO %s (timestamp, group_id, content, content_hint, urgent)
269 """.formatted(TABLE_MESSAGE_SEND_LOG_CONTENT
);
270 try (final var connection
= database
.getConnection()) {
271 connection
.setAutoCommit(false);
272 final long contentId
;
273 try (final var statement
= connection
.prepareStatement(sql
)) {
274 statement
.setLong(1, sentTimestamp
);
275 statement
.setBytes(2, groupId
);
276 statement
.setBytes(3, content
.toByteArray());
277 statement
.setInt(4, contentHint
.getType());
278 statement
.setBoolean(5, urgent
);
279 statement
.executeUpdate();
280 final var generatedKeys
= statement
.getGeneratedKeys();
281 if (generatedKeys
.next()) {
282 contentId
= generatedKeys
.getLong(1);
287 if (contentId
== -1) {
288 logger
.warn("Failed to insert message send log content");
291 insertRecipientsForExistingContent(contentId
, recipientDevices
, connection
);
295 } catch (SQLException e
) {
296 logger
.warn("Failed to insert into message send log", e
);
301 private byte[] getGroupId(final SignalServiceProtos
.Content content
) {
303 return !content
.hasDataMessage()
305 : content
.getDataMessage().hasGroup()
306 ? content
.getDataMessage().getGroup().getId().toByteArray()
307 : content
.getDataMessage().hasGroupV2()
308 ? GroupUtils
.getGroupIdV2(new GroupMasterKey(content
.getDataMessage()
311 .toByteArray())).serialize()
313 } catch (InvalidInputException e
) {
314 logger
.warn("Failed to parse groupId id from content");
319 private void insertRecipientsForExistingContent(
320 final long contentId
, final List
<RecipientDevices
> recipientDevices
322 try (final var connection
= database
.getConnection()) {
323 connection
.setAutoCommit(false);
324 insertRecipientsForExistingContent(contentId
, recipientDevices
, connection
);
326 } catch (SQLException e
) {
327 logger
.warn("Failed to append recipients to message send log", e
);
331 private void insertRecipientsForExistingContent(
332 final long contentId
, final List
<RecipientDevices
> recipientDevices
, final Connection connection
333 ) throws SQLException
{
335 INSERT INTO %s (recipient_id, device_id, content_id)
337 """.formatted(TABLE_MESSAGE_SEND_LOG
);
338 try (final var statement
= connection
.prepareStatement(sql
)) {
339 for (final var recipientDevice
: recipientDevices
) {
340 for (final var deviceId
: recipientDevice
.deviceIds()) {
341 statement
.setLong(1, recipientDevice
.recipientId().id());
342 statement
.setInt(2, deviceId
);
343 statement
.setLong(3, contentId
);
344 statement
.executeUpdate();
350 private void deleteOutdatedEntries(final Connection connection
) throws SQLException
{
354 """.formatted(TABLE_MESSAGE_SEND_LOG_CONTENT
);
355 try (final var statement
= connection
.prepareStatement(sql
)) {
356 statement
.setLong(1, System
.currentTimeMillis() - LOG_DURATION
.toMillis());
357 final var rowCount
= statement
.executeUpdate();
359 logger
.debug("Removed {} outdated entries from the message send log", rowCount
);
361 logger
.trace("No outdated entries to be removed from message send log.");
366 private void deleteOrphanedLogContents(final Connection connection
) throws SQLException
{
369 WHERE _id NOT IN (SELECT content_id FROM %s)
370 """.formatted(TABLE_MESSAGE_SEND_LOG_CONTENT
, TABLE_MESSAGE_SEND_LOG
);
371 try (final var statement
= connection
.prepareStatement(sql
)) {
372 statement
.executeUpdate();
376 private MessageSendLogEntry
getMessageSendLogEntryFromResultSet(ResultSet resultSet
) throws SQLException
{
377 final var groupId
= Optional
.ofNullable(resultSet
.getBytes("group_id")).map(GroupId
::unknownVersion
);
378 final SignalServiceProtos
.Content content
;
380 content
= SignalServiceProtos
.Content
.parseFrom(resultSet
.getBinaryStream("content"));
381 } catch (IOException e
) {
382 logger
.warn("Failed to parse content from message send log", e
);
385 final var contentHint
= ContentHint
.fromType(resultSet
.getInt("content_hint"));
386 final var urgent
= resultSet
.getBoolean("urgent");
387 return new MessageSendLogEntry(groupId
, content
, contentHint
, urgent
);
390 private record RecipientDevices(RecipientId recipientId
, List
<Integer
> deviceIds
) {}