1 package org
.asamk
.signal
.manager
.storage
.sendLog
;
3 import org
.asamk
.signal
.manager
.groups
.GroupId
;
4 import org
.asamk
.signal
.manager
.groups
.GroupUtils
;
5 import org
.asamk
.signal
.manager
.storage
.Database
;
6 import org
.asamk
.signal
.manager
.storage
.recipients
.RecipientId
;
7 import org
.asamk
.signal
.manager
.storage
.recipients
.RecipientResolver
;
8 import org
.signal
.libsignal
.zkgroup
.InvalidInputException
;
9 import org
.signal
.libsignal
.zkgroup
.groups
.GroupMasterKey
;
10 import org
.slf4j
.Logger
;
11 import org
.slf4j
.LoggerFactory
;
12 import org
.whispersystems
.signalservice
.api
.crypto
.ContentHint
;
13 import org
.whispersystems
.signalservice
.api
.messages
.SendMessageResult
;
14 import org
.whispersystems
.signalservice
.internal
.push
.SignalServiceProtos
;
16 import java
.io
.IOException
;
17 import java
.sql
.Connection
;
18 import java
.sql
.PreparedStatement
;
19 import java
.sql
.ResultSet
;
20 import java
.sql
.SQLException
;
21 import java
.time
.Duration
;
22 import java
.util
.List
;
23 import java
.util
.Objects
;
24 import java
.util
.Optional
;
25 import java
.util
.Spliterator
;
26 import java
.util
.Spliterators
;
27 import java
.util
.function
.Consumer
;
28 import java
.util
.stream
.Stream
;
29 import java
.util
.stream
.StreamSupport
;
31 public class MessageSendLogStore
implements AutoCloseable
{
33 private static final Logger logger
= LoggerFactory
.getLogger(MessageSendLogStore
.class);
35 private static final String TABLE_MESSAGE_SEND_LOG
= "message_send_log";
36 private static final String TABLE_MESSAGE_SEND_LOG_CONTENT
= "message_send_log_content";
38 private static final Duration LOG_DURATION
= Duration
.ofDays(1);
40 private final RecipientResolver recipientResolver
;
41 private final Database database
;
42 private final Thread cleanupThread
;
44 public MessageSendLogStore(
45 final RecipientResolver recipientResolver
, final Database database
47 this.recipientResolver
= recipientResolver
;
48 this.database
= database
;
49 this.cleanupThread
= new Thread(() -> {
51 final var interval
= Duration
.ofHours(1).toMillis();
52 while (!Thread
.interrupted()) {
53 try (final var connection
= database
.getConnection()) {
54 deleteOutdatedEntries(connection
);
55 } catch (SQLException e
) {
56 logger
.warn("Deleting outdated entries failed");
59 Thread
.sleep(interval
);
61 } catch (InterruptedException e
) {
62 logger
.debug("Stopping msl cleanup thread");
65 cleanupThread
.setName("msl-cleanup");
66 cleanupThread
.setDaemon(true);
67 cleanupThread
.start();
70 public static void createSql(Connection connection
) throws SQLException
{
71 try (final var statement
= connection
.createStatement()) {
72 statement
.executeUpdate("""
73 CREATE TABLE message_send_log (
74 _id INTEGER PRIMARY KEY,
75 content_id INTEGER NOT NULL REFERENCES message_send_log_content (_id) ON DELETE CASCADE,
76 recipient_id INTEGER NOT NULL,
77 device_id INTEGER NOT NULL
79 CREATE TABLE message_send_log_content (
80 _id INTEGER PRIMARY KEY,
82 timestamp INTEGER NOT NULL,
83 content BLOB NOT NULL,
84 content_hint INTEGER NOT NULL
86 CREATE INDEX mslc_timestamp_index ON message_send_log_content (timestamp);
87 CREATE INDEX msl_recipient_index ON message_send_log (recipient_id, device_id, content_id);
88 CREATE INDEX msl_content_index ON message_send_log (content_id);
93 public List
<MessageSendLogEntry
> findMessages(
94 final RecipientId recipientId
, final int deviceId
, final long timestamp
, final boolean isSenderKey
97 SELECT group_id, content, content_hint
99 INNER JOIN %s lc ON l.content_id = lc._id
100 WHERE l.recipient_id = ? AND l.device_id = ? AND lc.timestamp = ?
101 """.formatted(TABLE_MESSAGE_SEND_LOG
, TABLE_MESSAGE_SEND_LOG_CONTENT
);
102 try (final var connection
= database
.getConnection()) {
103 deleteOutdatedEntries(connection
);
105 try (final var statement
= connection
.prepareStatement(sql
)) {
106 statement
.setLong(1, recipientId
.id());
107 statement
.setInt(2, deviceId
);
108 statement
.setLong(3, timestamp
);
109 try (var result
= executeQueryForStream(statement
, resultSet
-> {
110 final var groupId
= Optional
.ofNullable(resultSet
.getBytes("group_id"))
111 .map(GroupId
::unknownVersion
);
112 final SignalServiceProtos
.Content content
;
114 content
= SignalServiceProtos
.Content
.parseFrom(resultSet
.getBinaryStream("content"));
115 } catch (IOException e
) {
116 logger
.warn("Failed to parse content from message send log", e
);
119 final var contentHint
= ContentHint
.fromType(resultSet
.getInt("content_hint"));
120 final var urgent
= true; // TODO
121 return new MessageSendLogEntry(groupId
, content
, contentHint
, urgent
);
123 return result
.filter(Objects
::nonNull
)
124 .filter(e
-> !isSenderKey
|| e
.groupId().isPresent())
128 } catch (SQLException e
) {
129 logger
.warn("Failed read from message send log", e
);
134 public long insertIfPossible(
135 long sentTimestamp
, SendMessageResult sendMessageResult
, ContentHint contentHint
, boolean urgent
137 final RecipientDevices recipientDevice
= getRecipientDevices(sendMessageResult
);
138 if (recipientDevice
== null) {
142 return insert(List
.of(recipientDevice
),
144 sendMessageResult
.getSuccess().getContent().get(),
149 public long insertIfPossible(
150 long sentTimestamp
, List
<SendMessageResult
> sendMessageResults
, ContentHint contentHint
, boolean urgent
152 final var recipientDevices
= sendMessageResults
.stream()
153 .map(this::getRecipientDevices
)
154 .filter(Objects
::nonNull
)
156 if (recipientDevices
.isEmpty()) {
160 final var content
= sendMessageResults
.stream()
161 .filter(r
-> r
.isSuccess() && r
.getSuccess().getContent().isPresent())
162 .map(r
-> r
.getSuccess().getContent().get())
166 return insert(recipientDevices
, sentTimestamp
, content
, contentHint
, urgent
);
169 public void addRecipientToExistingEntryIfPossible(final long contentId
, final SendMessageResult sendMessageResult
) {
170 final RecipientDevices recipientDevice
= getRecipientDevices(sendMessageResult
);
171 if (recipientDevice
== null) {
175 insertRecipientsForExistingContent(contentId
, List
.of(recipientDevice
));
178 public void addRecipientToExistingEntryIfPossible(
179 final long contentId
, final List
<SendMessageResult
> sendMessageResults
181 final var recipientDevices
= sendMessageResults
.stream()
182 .map(this::getRecipientDevices
)
183 .filter(Objects
::nonNull
)
185 if (recipientDevices
.isEmpty()) {
189 insertRecipientsForExistingContent(contentId
, recipientDevices
);
192 public void deleteEntryForGroup(long sentTimestamp
, GroupId groupId
) {
195 WHERE lc.timestamp = ? AND lc.group_id = ?
196 """.formatted(TABLE_MESSAGE_SEND_LOG_CONTENT
);
197 try (final var connection
= database
.getConnection()) {
198 try (final var statement
= connection
.prepareStatement(sql
)) {
199 statement
.setLong(1, sentTimestamp
);
200 statement
.setBytes(2, groupId
.serialize());
201 statement
.executeUpdate();
203 } catch (SQLException e
) {
204 logger
.warn("Failed delete from message send log", e
);
208 public void deleteEntryForRecipientNonGroup(long sentTimestamp
, RecipientId recipientId
) {
211 WHERE lc.timestamp = ? AND lc.group_id IS NULL AND lc._id IN (SELECT content_id FROM %s l WHERE l.recipient_id = ?)
212 """.formatted(TABLE_MESSAGE_SEND_LOG_CONTENT
, TABLE_MESSAGE_SEND_LOG
);
213 try (final var connection
= database
.getConnection()) {
214 connection
.setAutoCommit(false);
215 try (final var statement
= connection
.prepareStatement(sql
)) {
216 statement
.setLong(1, sentTimestamp
);
217 statement
.setLong(2, recipientId
.id());
218 statement
.executeUpdate();
221 deleteOrphanedLogContents(connection
);
223 } catch (SQLException e
) {
224 logger
.warn("Failed delete from message send log", e
);
228 public void deleteEntryForRecipient(long sentTimestamp
, RecipientId recipientId
, int deviceId
) {
229 deleteEntriesForRecipient(List
.of(sentTimestamp
), recipientId
, deviceId
);
232 public void deleteEntriesForRecipient(List
<Long
> sentTimestamps
, RecipientId recipientId
, int deviceId
) {
235 WHERE l.content_id IN (SELECT _id FROM %s lc WHERE lc.timestamp = ?) AND l.recipient_id = ? AND l.device_id = ?
236 """.formatted(TABLE_MESSAGE_SEND_LOG
, TABLE_MESSAGE_SEND_LOG_CONTENT
);
237 try (final var connection
= database
.getConnection()) {
238 connection
.setAutoCommit(false);
239 try (final var statement
= connection
.prepareStatement(sql
)) {
240 for (final var sentTimestamp
: sentTimestamps
) {
241 statement
.setLong(1, sentTimestamp
);
242 statement
.setLong(2, recipientId
.id());
243 statement
.setInt(3, deviceId
);
244 statement
.executeUpdate();
248 deleteOrphanedLogContents(connection
);
250 } catch (SQLException e
) {
251 logger
.warn("Failed delete from message send log", e
);
256 public void close() {
257 cleanupThread
.interrupt();
259 cleanupThread
.join();
260 } catch (InterruptedException ignored
) {
264 private RecipientDevices
getRecipientDevices(final SendMessageResult sendMessageResult
) {
265 if (sendMessageResult
.isSuccess() && sendMessageResult
.getSuccess().getContent().isPresent()) {
266 final var recipientId
= recipientResolver
.resolveRecipient(sendMessageResult
.getAddress());
267 return new RecipientDevices(recipientId
, sendMessageResult
.getSuccess().getDevices());
274 final List
<RecipientDevices
> recipientDevices
,
275 final long sentTimestamp
,
276 final SignalServiceProtos
.Content content
,
277 final ContentHint contentHint
,
280 byte[] groupId
= getGroupId(content
);
284 INSERT INTO %s (timestamp, group_id, content, content_hint)
286 """.formatted(TABLE_MESSAGE_SEND_LOG_CONTENT
);
287 try (final var connection
= database
.getConnection()) {
288 connection
.setAutoCommit(false);
289 final long contentId
;
290 try (final var statement
= connection
.prepareStatement(sql
)) {
291 statement
.setLong(1, sentTimestamp
);
292 statement
.setBytes(2, groupId
);
293 statement
.setBytes(3, content
.toByteArray());
294 statement
.setInt(4, contentHint
.getType());
295 statement
.executeUpdate();
296 final var generatedKeys
= statement
.getGeneratedKeys();
297 if (generatedKeys
.next()) {
298 contentId
= generatedKeys
.getLong(1);
303 if (contentId
== -1) {
304 logger
.warn("Failed to insert message send log content");
307 insertRecipientsForExistingContent(contentId
, recipientDevices
, connection
);
311 } catch (SQLException e
) {
312 logger
.warn("Failed to insert into message send log", e
);
317 private byte[] getGroupId(final SignalServiceProtos
.Content content
) {
319 return !content
.hasDataMessage()
321 : content
.getDataMessage().hasGroup()
322 ? content
.getDataMessage().getGroup().getId().toByteArray()
323 : content
.getDataMessage().hasGroupV2()
324 ? GroupUtils
.getGroupIdV2(new GroupMasterKey(content
.getDataMessage()
327 .toByteArray())).serialize()
329 } catch (InvalidInputException e
) {
330 logger
.warn("Failed to parse groupId id from content");
335 private void insertRecipientsForExistingContent(
336 final long contentId
, final List
<RecipientDevices
> recipientDevices
338 try (final var connection
= database
.getConnection()) {
339 connection
.setAutoCommit(false);
340 insertRecipientsForExistingContent(contentId
, recipientDevices
, connection
);
342 } catch (SQLException e
) {
343 logger
.warn("Failed to append recipients to message send log", e
);
347 private void insertRecipientsForExistingContent(
348 final long contentId
, final List
<RecipientDevices
> recipientDevices
, final Connection connection
349 ) throws SQLException
{
351 INSERT INTO %s (recipient_id, device_id, content_id)
353 """.formatted(TABLE_MESSAGE_SEND_LOG
);
354 try (final var statement
= connection
.prepareStatement(sql
)) {
355 for (final var recipientDevice
: recipientDevices
) {
356 for (final var deviceId
: recipientDevice
.deviceIds()) {
357 statement
.setLong(1, recipientDevice
.recipientId().id());
358 statement
.setInt(2, deviceId
);
359 statement
.setLong(3, contentId
);
360 statement
.executeUpdate();
366 private void deleteOutdatedEntries(final Connection connection
) throws SQLException
{
370 """.formatted(TABLE_MESSAGE_SEND_LOG_CONTENT
);
371 try (final var statement
= connection
.prepareStatement(sql
)) {
372 statement
.setLong(1, System
.currentTimeMillis() - LOG_DURATION
.toMillis());
373 final var rowCount
= statement
.executeUpdate();
375 logger
.debug("Removed {} outdated entries from the message send log", rowCount
);
377 logger
.trace("No outdated entries to be removed from message send log.");
382 private void deleteOrphanedLogContents(final Connection connection
) throws SQLException
{
385 WHERE _id NOT IN (SELECT content_id FROM %s)
386 """.formatted(TABLE_MESSAGE_SEND_LOG_CONTENT
, TABLE_MESSAGE_SEND_LOG
);
387 try (final var statement
= connection
.prepareStatement(sql
)) {
388 statement
.executeUpdate();
392 private <T
> Stream
<T
> executeQueryForStream(
393 PreparedStatement statement
, ResultSetMapper
<T
> mapper
394 ) throws SQLException
{
395 final var resultSet
= statement
.executeQuery();
397 return StreamSupport
.stream(new Spliterators
.AbstractSpliterator
<>(Long
.MAX_VALUE
, Spliterator
.ORDERED
) {
399 public boolean tryAdvance(final Consumer
<?
super T
> consumer
) {
401 if (!resultSet
.next()) {
404 consumer
.accept(mapper
.apply(resultSet
));
406 } catch (SQLException e
) {
407 logger
.warn("Failed to read from database result", e
);
408 throw new RuntimeException(e
);
414 private interface ResultSetMapper
<T
> {
416 T
apply(ResultSet resultSet
) throws SQLException
;
419 private record RecipientDevices(RecipientId recipientId
, List
<Integer
> deviceIds
) {}