1 package org
.asamk
.signal
.manager
.storage
.sendLog
;
3 import org
.asamk
.signal
.manager
.api
.GroupId
;
4 import org
.asamk
.signal
.manager
.groups
.GroupUtils
;
5 import org
.asamk
.signal
.manager
.storage
.Database
;
6 import org
.asamk
.signal
.manager
.storage
.Utils
;
7 import org
.signal
.libsignal
.zkgroup
.InvalidInputException
;
8 import org
.signal
.libsignal
.zkgroup
.groups
.GroupMasterKey
;
9 import org
.slf4j
.Logger
;
10 import org
.slf4j
.LoggerFactory
;
11 import org
.whispersystems
.signalservice
.api
.crypto
.ContentHint
;
12 import org
.whispersystems
.signalservice
.api
.messages
.SendMessageResult
;
13 import org
.whispersystems
.signalservice
.api
.push
.ServiceId
;
14 import org
.whispersystems
.signalservice
.internal
.push
.Content
;
16 import java
.io
.IOException
;
17 import java
.sql
.Connection
;
18 import java
.sql
.ResultSet
;
19 import java
.sql
.SQLException
;
20 import java
.time
.Duration
;
21 import java
.util
.List
;
22 import java
.util
.Objects
;
23 import java
.util
.Optional
;
25 public class MessageSendLogStore
implements AutoCloseable
{
27 private static final Logger logger
= LoggerFactory
.getLogger(MessageSendLogStore
.class);
29 private static final String TABLE_MESSAGE_SEND_LOG
= "message_send_log";
30 private static final String TABLE_MESSAGE_SEND_LOG_CONTENT
= "message_send_log_content";
32 private static final Duration LOG_DURATION
= Duration
.ofDays(1);
34 private final Database database
;
35 private final Thread cleanupThread
;
36 private final boolean sendLogDisabled
;
38 public MessageSendLogStore(final Database database
, final boolean disableMessageSendLog
) {
39 this.database
= database
;
40 this.sendLogDisabled
= disableMessageSendLog
;
41 this.cleanupThread
= Thread
.ofPlatform().name("msl-cleanup").daemon().start(() -> {
43 final var interval
= Duration
.ofHours(1).toMillis();
44 while (!Thread
.interrupted()) {
45 try (final var connection
= database
.getConnection()) {
46 deleteOutdatedEntries(connection
);
47 } catch (SQLException e
) {
48 logger
.debug("MSL", e
);
49 logger
.warn("Deleting outdated entries failed");
52 Thread
.sleep(interval
);
54 } catch (InterruptedException e
) {
55 logger
.debug("Stopping msl cleanup thread");
60 public static void createSql(Connection connection
) throws SQLException
{
61 // When modifying the CREATE statement here, also add a migration in AccountDatabase.java
62 try (final var statement
= connection
.createStatement()) {
63 statement
.executeUpdate("""
64 CREATE TABLE message_send_log (
65 _id INTEGER PRIMARY KEY,
66 content_id INTEGER NOT NULL REFERENCES message_send_log_content (_id) ON DELETE CASCADE,
67 address TEXT NOT NULL,
68 device_id INTEGER NOT NULL
70 CREATE TABLE message_send_log_content (
71 _id INTEGER PRIMARY KEY,
73 timestamp INTEGER NOT NULL,
74 content BLOB NOT NULL,
75 content_hint INTEGER NOT NULL,
76 urgent INTEGER NOT NULL
78 CREATE INDEX mslc_timestamp_index ON message_send_log_content (timestamp);
79 CREATE INDEX msl_recipient_index ON message_send_log (address, device_id, content_id);
80 CREATE INDEX msl_content_index ON message_send_log (content_id);
85 public List
<MessageSendLogEntry
> findMessages(
86 final ServiceId serviceId
, final int deviceId
, final long timestamp
, final boolean isSenderKey
89 SELECT group_id, content, content_hint, urgent
91 INNER JOIN %s lc ON l.content_id = lc._id
92 WHERE l.address = ? AND l.device_id = ? AND lc.timestamp = ?
93 """.formatted(TABLE_MESSAGE_SEND_LOG
, TABLE_MESSAGE_SEND_LOG_CONTENT
);
94 try (final var connection
= database
.getConnection()) {
95 deleteOutdatedEntries(connection
);
97 try (final var statement
= connection
.prepareStatement(sql
)) {
98 statement
.setString(1, serviceId
.toString());
99 statement
.setInt(2, deviceId
);
100 statement
.setLong(3, timestamp
);
101 try (var result
= Utils
.executeQueryForStream(statement
, this::getMessageSendLogEntryFromResultSet
)) {
102 return result
.filter(Objects
::nonNull
)
103 .filter(e
-> !isSenderKey
|| e
.groupId().isPresent())
107 } catch (SQLException e
) {
108 logger
.warn("Failed read from message send log", e
);
113 public long insertIfPossible(
114 long sentTimestamp
, SendMessageResult sendMessageResult
, ContentHint contentHint
, boolean urgent
116 if (sendLogDisabled
) {
119 final RecipientDevices recipientDevice
= getRecipientDevices(sendMessageResult
);
120 if (recipientDevice
== null) {
124 return insert(List
.of(recipientDevice
),
126 sendMessageResult
.getSuccess().getContent().get(),
131 public long insertIfPossible(
132 long sentTimestamp
, List
<SendMessageResult
> sendMessageResults
, ContentHint contentHint
, boolean urgent
134 if (sendLogDisabled
) {
137 final var recipientDevices
= sendMessageResults
.stream()
138 .map(this::getRecipientDevices
)
139 .filter(Objects
::nonNull
)
141 if (recipientDevices
.isEmpty()) {
145 final var content
= sendMessageResults
.stream()
146 .filter(r
-> r
.isSuccess() && r
.getSuccess().getContent().isPresent())
147 .map(r
-> r
.getSuccess().getContent().get())
151 return insert(recipientDevices
, sentTimestamp
, content
, contentHint
, urgent
);
154 public void addRecipientToExistingEntryIfPossible(final long contentId
, final SendMessageResult sendMessageResult
) {
155 if (sendLogDisabled
) {
158 final RecipientDevices recipientDevice
= getRecipientDevices(sendMessageResult
);
159 if (recipientDevice
== null) {
163 insertRecipientsForExistingContent(contentId
, List
.of(recipientDevice
));
166 public void addRecipientToExistingEntryIfPossible(
167 final long contentId
, final List
<SendMessageResult
> sendMessageResults
169 if (sendLogDisabled
) {
172 final var recipientDevices
= sendMessageResults
.stream()
173 .map(this::getRecipientDevices
)
174 .filter(Objects
::nonNull
)
176 if (recipientDevices
.isEmpty()) {
180 insertRecipientsForExistingContent(contentId
, recipientDevices
);
183 public void deleteEntryForGroup(long sentTimestamp
, GroupId groupId
) {
186 WHERE lc.timestamp = ? AND lc.group_id = ?
187 """.formatted(TABLE_MESSAGE_SEND_LOG_CONTENT
);
188 try (final var connection
= database
.getConnection()) {
189 try (final var statement
= connection
.prepareStatement(sql
)) {
190 statement
.setLong(1, sentTimestamp
);
191 statement
.setBytes(2, groupId
.serialize());
192 statement
.executeUpdate();
194 } catch (SQLException e
) {
195 logger
.warn("Failed delete from message send log", e
);
199 public void deleteEntryForRecipientNonGroup(long sentTimestamp
, ServiceId serviceId
) {
202 WHERE lc.timestamp = ? AND lc.group_id IS NULL AND lc._id IN (SELECT content_id FROM %s l WHERE l.address = ?)
203 """.formatted(TABLE_MESSAGE_SEND_LOG_CONTENT
, TABLE_MESSAGE_SEND_LOG
);
204 try (final var connection
= database
.getConnection()) {
205 connection
.setAutoCommit(false);
206 try (final var statement
= connection
.prepareStatement(sql
)) {
207 statement
.setLong(1, sentTimestamp
);
208 statement
.setString(2, serviceId
.toString());
209 statement
.executeUpdate();
212 deleteOrphanedLogContents(connection
);
214 } catch (SQLException e
) {
215 logger
.warn("Failed delete from message send log", e
);
219 public void deleteEntryForRecipient(long sentTimestamp
, ServiceId serviceId
, int deviceId
) {
220 deleteEntriesForRecipient(List
.of(sentTimestamp
), serviceId
, deviceId
);
223 public void deleteEntriesForRecipient(List
<Long
> sentTimestamps
, ServiceId serviceId
, int deviceId
) {
226 WHERE l.content_id IN (SELECT _id FROM %s lc WHERE lc.timestamp = ?) AND l.address = ? AND l.device_id = ?
227 """.formatted(TABLE_MESSAGE_SEND_LOG
, TABLE_MESSAGE_SEND_LOG_CONTENT
);
228 try (final var connection
= database
.getConnection()) {
229 connection
.setAutoCommit(false);
230 try (final var statement
= connection
.prepareStatement(sql
)) {
231 for (final var sentTimestamp
: sentTimestamps
) {
232 statement
.setLong(1, sentTimestamp
);
233 statement
.setString(2, serviceId
.toString());
234 statement
.setInt(3, deviceId
);
235 statement
.executeUpdate();
239 deleteOrphanedLogContents(connection
);
241 } catch (SQLException e
) {
242 logger
.warn("Failed delete from message send log", e
);
247 public void close() {
248 cleanupThread
.interrupt();
250 cleanupThread
.join();
251 } catch (InterruptedException ignored
) {
255 private RecipientDevices
getRecipientDevices(final SendMessageResult sendMessageResult
) {
256 if (sendMessageResult
.isSuccess() && sendMessageResult
.getSuccess().getContent().isPresent()) {
257 final var serviceId
= sendMessageResult
.getAddress().getServiceId();
258 return new RecipientDevices(serviceId
, sendMessageResult
.getSuccess().getDevices());
265 final List
<RecipientDevices
> recipientDevices
,
266 final long sentTimestamp
,
267 final Content content
,
268 final ContentHint contentHint
,
271 byte[] groupId
= getGroupId(content
);
274 INSERT INTO %s (timestamp, group_id, content, content_hint, urgent)
277 """.formatted(TABLE_MESSAGE_SEND_LOG_CONTENT
);
278 try (final var connection
= database
.getConnection()) {
279 connection
.setAutoCommit(false);
280 final long contentId
;
281 try (final var statement
= connection
.prepareStatement(sql
)) {
282 statement
.setLong(1, sentTimestamp
);
283 statement
.setBytes(2, groupId
);
284 statement
.setBytes(3, content
.encode());
285 statement
.setInt(4, contentHint
.getType());
286 statement
.setBoolean(5, urgent
);
287 final var generatedKey
= Utils
.executeQueryForOptional(statement
, Utils
::getIdMapper
);
288 if (generatedKey
.isPresent()) {
289 contentId
= generatedKey
.get();
294 if (contentId
== -1) {
295 logger
.warn("Failed to insert message send log content");
298 insertRecipientsForExistingContent(contentId
, recipientDevices
, connection
);
302 } catch (SQLException e
) {
303 logger
.warn("Failed to insert into message send log", e
);
308 private byte[] getGroupId(final Content content
) {
310 return content
.dataMessage
== null
312 : content
.dataMessage
.group
!= null && content
.dataMessage
.group
.id
!= null
313 ? content
.dataMessage
.group
.id
.toByteArray()
314 : content
.dataMessage
.groupV2
!= null && content
.dataMessage
.groupV2
.masterKey
!= null
315 ? GroupUtils
.getGroupIdV2(new GroupMasterKey(content
.dataMessage
.groupV2
.masterKey
.toByteArray()))
318 } catch (InvalidInputException e
) {
319 logger
.warn("Failed to parse groupId id from content");
324 private void insertRecipientsForExistingContent(
325 final long contentId
, final List
<RecipientDevices
> recipientDevices
327 try (final var connection
= database
.getConnection()) {
328 connection
.setAutoCommit(false);
329 insertRecipientsForExistingContent(contentId
, recipientDevices
, connection
);
331 } catch (SQLException e
) {
332 logger
.warn("Failed to append recipients to message send log", e
);
336 private void insertRecipientsForExistingContent(
337 final long contentId
, final List
<RecipientDevices
> recipientDevices
, final Connection connection
338 ) throws SQLException
{
340 INSERT INTO %s (address, device_id, content_id)
342 """.formatted(TABLE_MESSAGE_SEND_LOG
);
343 try (final var statement
= connection
.prepareStatement(sql
)) {
344 for (final var recipientDevice
: recipientDevices
) {
345 for (final var deviceId
: recipientDevice
.deviceIds()) {
346 statement
.setString(1, recipientDevice
.serviceId().toString());
347 statement
.setInt(2, deviceId
);
348 statement
.setLong(3, contentId
);
349 statement
.executeUpdate();
355 private void deleteOutdatedEntries(final Connection connection
) throws SQLException
{
359 """.formatted(TABLE_MESSAGE_SEND_LOG_CONTENT
);
360 try (final var statement
= connection
.prepareStatement(sql
)) {
361 statement
.setLong(1, System
.currentTimeMillis() - LOG_DURATION
.toMillis());
362 final var rowCount
= statement
.executeUpdate();
364 logger
.debug("Removed {} outdated entries from the message send log", rowCount
);
366 logger
.trace("No outdated entries to be removed from message send log.");
371 private void deleteOrphanedLogContents(final Connection connection
) throws SQLException
{
374 WHERE _id NOT IN (SELECT content_id FROM %s)
375 """.formatted(TABLE_MESSAGE_SEND_LOG_CONTENT
, TABLE_MESSAGE_SEND_LOG
);
376 try (final var statement
= connection
.prepareStatement(sql
)) {
377 statement
.executeUpdate();
381 private MessageSendLogEntry
getMessageSendLogEntryFromResultSet(ResultSet resultSet
) throws SQLException
{
382 final var groupId
= Optional
.ofNullable(resultSet
.getBytes("group_id")).map(GroupId
::unknownVersion
);
383 final Content content
;
385 content
= Content
.ADAPTER
.decode(resultSet
.getBinaryStream("content"));
386 } catch (IOException e
) {
387 logger
.warn("Failed to parse content from message send log", e
);
390 final var contentHint
= ContentHint
.fromType(resultSet
.getInt("content_hint"));
391 final var urgent
= resultSet
.getBoolean("urgent");
392 return new MessageSendLogEntry(groupId
, content
, contentHint
, urgent
);
395 private record RecipientDevices(ServiceId serviceId
, List
<Integer
> deviceIds
) {}