]> nmode's Git Repositories - signal-cli/blob - lib/src/main/java/org/asamk/signal/manager/storage/sendLog/MessageSendLogStore.java
910da3cf464d7f95aa858665d092fb94e3c6fdb8
[signal-cli] / lib / src / main / java / org / asamk / signal / manager / storage / sendLog / MessageSendLogStore.java
1 package org.asamk.signal.manager.storage.sendLog;
2
3 import org.asamk.signal.manager.api.GroupId;
4 import org.asamk.signal.manager.groups.GroupUtils;
5 import org.asamk.signal.manager.storage.Database;
6 import org.asamk.signal.manager.storage.Utils;
7 import org.signal.libsignal.zkgroup.InvalidInputException;
8 import org.signal.libsignal.zkgroup.groups.GroupMasterKey;
9 import org.slf4j.Logger;
10 import org.slf4j.LoggerFactory;
11 import org.whispersystems.signalservice.api.crypto.ContentHint;
12 import org.whispersystems.signalservice.api.messages.SendMessageResult;
13 import org.whispersystems.signalservice.api.push.ServiceId;
14 import org.whispersystems.signalservice.internal.push.Content;
15
16 import java.io.IOException;
17 import java.sql.Connection;
18 import java.sql.ResultSet;
19 import java.sql.SQLException;
20 import java.time.Duration;
21 import java.util.List;
22 import java.util.Objects;
23 import java.util.Optional;
24
25 public class MessageSendLogStore implements AutoCloseable {
26
27 private static final Logger logger = LoggerFactory.getLogger(MessageSendLogStore.class);
28
29 private static final String TABLE_MESSAGE_SEND_LOG = "message_send_log";
30 private static final String TABLE_MESSAGE_SEND_LOG_CONTENT = "message_send_log_content";
31
32 private static final Duration LOG_DURATION = Duration.ofDays(1);
33
34 private final Database database;
35 private final Thread cleanupThread;
36 private final boolean sendLogDisabled;
37
38 public MessageSendLogStore(final Database database, final boolean disableMessageSendLog) {
39 this.database = database;
40 this.sendLogDisabled = disableMessageSendLog;
41 this.cleanupThread = Thread.ofPlatform().name("msl-cleanup").daemon().start(() -> {
42 try {
43 final var interval = Duration.ofHours(1).toMillis();
44 while (!Thread.interrupted()) {
45 try (final var connection = database.getConnection()) {
46 deleteOutdatedEntries(connection);
47 } catch (SQLException e) {
48 logger.debug("MSL", e);
49 logger.warn("Deleting outdated entries failed");
50 break;
51 }
52 Thread.sleep(interval);
53 }
54 } catch (InterruptedException e) {
55 logger.debug("Stopping msl cleanup thread");
56 }
57 });
58 }
59
60 public static void createSql(Connection connection) throws SQLException {
61 // When modifying the CREATE statement here, also add a migration in AccountDatabase.java
62 try (final var statement = connection.createStatement()) {
63 statement.executeUpdate("""
64 CREATE TABLE message_send_log (
65 _id INTEGER PRIMARY KEY,
66 content_id INTEGER NOT NULL REFERENCES message_send_log_content (_id) ON DELETE CASCADE,
67 address TEXT NOT NULL,
68 device_id INTEGER NOT NULL
69 ) STRICT;
70 CREATE TABLE message_send_log_content (
71 _id INTEGER PRIMARY KEY,
72 group_id BLOB,
73 timestamp INTEGER NOT NULL,
74 content BLOB NOT NULL,
75 content_hint INTEGER NOT NULL,
76 urgent INTEGER NOT NULL
77 ) STRICT;
78 CREATE INDEX mslc_timestamp_index ON message_send_log_content (timestamp);
79 CREATE INDEX msl_recipient_index ON message_send_log (address, device_id, content_id);
80 CREATE INDEX msl_content_index ON message_send_log (content_id);
81 """);
82 }
83 }
84
85 public List<MessageSendLogEntry> findMessages(
86 final ServiceId serviceId, final int deviceId, final long timestamp, final boolean isSenderKey
87 ) {
88 final var sql = """
89 SELECT group_id, content, content_hint, urgent
90 FROM %s l
91 INNER JOIN %s lc ON l.content_id = lc._id
92 WHERE l.address = ? AND l.device_id = ? AND lc.timestamp = ?
93 """.formatted(TABLE_MESSAGE_SEND_LOG, TABLE_MESSAGE_SEND_LOG_CONTENT);
94 try (final var connection = database.getConnection()) {
95 deleteOutdatedEntries(connection);
96
97 try (final var statement = connection.prepareStatement(sql)) {
98 statement.setString(1, serviceId.toString());
99 statement.setInt(2, deviceId);
100 statement.setLong(3, timestamp);
101 try (var result = Utils.executeQueryForStream(statement, this::getMessageSendLogEntryFromResultSet)) {
102 return result.filter(Objects::nonNull)
103 .filter(e -> !isSenderKey || e.groupId().isPresent())
104 .toList();
105 }
106 }
107 } catch (SQLException e) {
108 logger.warn("Failed read from message send log", e);
109 return List.of();
110 }
111 }
112
113 public long insertIfPossible(
114 long sentTimestamp, SendMessageResult sendMessageResult, ContentHint contentHint, boolean urgent
115 ) {
116 if (sendLogDisabled) {
117 return -1;
118 }
119 final RecipientDevices recipientDevice = getRecipientDevices(sendMessageResult);
120 if (recipientDevice == null) {
121 return -1;
122 }
123
124 return insert(List.of(recipientDevice),
125 sentTimestamp,
126 sendMessageResult.getSuccess().getContent().get(),
127 contentHint,
128 urgent);
129 }
130
131 public long insertIfPossible(
132 long sentTimestamp, List<SendMessageResult> sendMessageResults, ContentHint contentHint, boolean urgent
133 ) {
134 if (sendLogDisabled) {
135 return -1;
136 }
137 final var recipientDevices = sendMessageResults.stream()
138 .map(this::getRecipientDevices)
139 .filter(Objects::nonNull)
140 .toList();
141 if (recipientDevices.isEmpty()) {
142 return -1;
143 }
144
145 final var content = sendMessageResults.stream()
146 .filter(r -> r.isSuccess() && r.getSuccess().getContent().isPresent())
147 .map(r -> r.getSuccess().getContent().get())
148 .findFirst()
149 .get();
150
151 return insert(recipientDevices, sentTimestamp, content, contentHint, urgent);
152 }
153
154 public void addRecipientToExistingEntryIfPossible(final long contentId, final SendMessageResult sendMessageResult) {
155 if (sendLogDisabled) {
156 return;
157 }
158 final RecipientDevices recipientDevice = getRecipientDevices(sendMessageResult);
159 if (recipientDevice == null) {
160 return;
161 }
162
163 insertRecipientsForExistingContent(contentId, List.of(recipientDevice));
164 }
165
166 public void addRecipientToExistingEntryIfPossible(
167 final long contentId, final List<SendMessageResult> sendMessageResults
168 ) {
169 if (sendLogDisabled) {
170 return;
171 }
172 final var recipientDevices = sendMessageResults.stream()
173 .map(this::getRecipientDevices)
174 .filter(Objects::nonNull)
175 .toList();
176 if (recipientDevices.isEmpty()) {
177 return;
178 }
179
180 insertRecipientsForExistingContent(contentId, recipientDevices);
181 }
182
183 public void deleteEntryForGroup(long sentTimestamp, GroupId groupId) {
184 final var sql = """
185 DELETE FROM %s AS lc
186 WHERE lc.timestamp = ? AND lc.group_id = ?
187 """.formatted(TABLE_MESSAGE_SEND_LOG_CONTENT);
188 try (final var connection = database.getConnection()) {
189 try (final var statement = connection.prepareStatement(sql)) {
190 statement.setLong(1, sentTimestamp);
191 statement.setBytes(2, groupId.serialize());
192 statement.executeUpdate();
193 }
194 } catch (SQLException e) {
195 logger.warn("Failed delete from message send log", e);
196 }
197 }
198
199 public void deleteEntryForRecipientNonGroup(long sentTimestamp, ServiceId serviceId) {
200 final var sql = """
201 DELETE FROM %s AS lc
202 WHERE lc.timestamp = ? AND lc.group_id IS NULL AND lc._id IN (SELECT content_id FROM %s l WHERE l.address = ?)
203 """.formatted(TABLE_MESSAGE_SEND_LOG_CONTENT, TABLE_MESSAGE_SEND_LOG);
204 try (final var connection = database.getConnection()) {
205 connection.setAutoCommit(false);
206 try (final var statement = connection.prepareStatement(sql)) {
207 statement.setLong(1, sentTimestamp);
208 statement.setString(2, serviceId.toString());
209 statement.executeUpdate();
210 }
211
212 deleteOrphanedLogContents(connection);
213 connection.commit();
214 } catch (SQLException e) {
215 logger.warn("Failed delete from message send log", e);
216 }
217 }
218
219 public void deleteEntryForRecipient(long sentTimestamp, ServiceId serviceId, int deviceId) {
220 deleteEntriesForRecipient(List.of(sentTimestamp), serviceId, deviceId);
221 }
222
223 public void deleteEntriesForRecipient(List<Long> sentTimestamps, ServiceId serviceId, int deviceId) {
224 final var sql = """
225 DELETE FROM %s AS l
226 WHERE l.content_id IN (SELECT _id FROM %s lc WHERE lc.timestamp = ?) AND l.address = ? AND l.device_id = ?
227 """.formatted(TABLE_MESSAGE_SEND_LOG, TABLE_MESSAGE_SEND_LOG_CONTENT);
228 try (final var connection = database.getConnection()) {
229 connection.setAutoCommit(false);
230 try (final var statement = connection.prepareStatement(sql)) {
231 for (final var sentTimestamp : sentTimestamps) {
232 statement.setLong(1, sentTimestamp);
233 statement.setString(2, serviceId.toString());
234 statement.setInt(3, deviceId);
235 statement.executeUpdate();
236 }
237 }
238
239 deleteOrphanedLogContents(connection);
240 connection.commit();
241 } catch (SQLException e) {
242 logger.warn("Failed delete from message send log", e);
243 }
244 }
245
246 @Override
247 public void close() {
248 cleanupThread.interrupt();
249 try {
250 cleanupThread.join();
251 } catch (InterruptedException ignored) {
252 }
253 }
254
255 private RecipientDevices getRecipientDevices(final SendMessageResult sendMessageResult) {
256 if (sendMessageResult.isSuccess() && sendMessageResult.getSuccess().getContent().isPresent()) {
257 final var serviceId = sendMessageResult.getAddress().getServiceId();
258 return new RecipientDevices(serviceId, sendMessageResult.getSuccess().getDevices());
259 } else {
260 return null;
261 }
262 }
263
264 private long insert(
265 final List<RecipientDevices> recipientDevices,
266 final long sentTimestamp,
267 final Content content,
268 final ContentHint contentHint,
269 final boolean urgent
270 ) {
271 byte[] groupId = getGroupId(content);
272
273 final var sql = """
274 INSERT INTO %s (timestamp, group_id, content, content_hint, urgent)
275 VALUES (?,?,?,?,?)
276 RETURNING _id
277 """.formatted(TABLE_MESSAGE_SEND_LOG_CONTENT);
278 try (final var connection = database.getConnection()) {
279 connection.setAutoCommit(false);
280 final long contentId;
281 try (final var statement = connection.prepareStatement(sql)) {
282 statement.setLong(1, sentTimestamp);
283 statement.setBytes(2, groupId);
284 statement.setBytes(3, content.encode());
285 statement.setInt(4, contentHint.getType());
286 statement.setBoolean(5, urgent);
287 final var generatedKey = Utils.executeQueryForOptional(statement, Utils::getIdMapper);
288 if (generatedKey.isPresent()) {
289 contentId = generatedKey.get();
290 } else {
291 contentId = -1;
292 }
293 }
294 if (contentId == -1) {
295 logger.warn("Failed to insert message send log content");
296 return -1;
297 }
298 insertRecipientsForExistingContent(contentId, recipientDevices, connection);
299
300 connection.commit();
301 return contentId;
302 } catch (SQLException e) {
303 logger.warn("Failed to insert into message send log", e);
304 return -1;
305 }
306 }
307
308 private byte[] getGroupId(final Content content) {
309 try {
310 return content.dataMessage == null
311 ? null
312 : content.dataMessage.group != null && content.dataMessage.group.id != null
313 ? content.dataMessage.group.id.toByteArray()
314 : content.dataMessage.groupV2 != null && content.dataMessage.groupV2.masterKey != null
315 ? GroupUtils.getGroupIdV2(new GroupMasterKey(content.dataMessage.groupV2.masterKey.toByteArray()))
316 .serialize()
317 : null;
318 } catch (InvalidInputException e) {
319 logger.warn("Failed to parse groupId id from content");
320 return null;
321 }
322 }
323
324 private void insertRecipientsForExistingContent(
325 final long contentId, final List<RecipientDevices> recipientDevices
326 ) {
327 try (final var connection = database.getConnection()) {
328 connection.setAutoCommit(false);
329 insertRecipientsForExistingContent(contentId, recipientDevices, connection);
330 connection.commit();
331 } catch (SQLException e) {
332 logger.warn("Failed to append recipients to message send log", e);
333 }
334 }
335
336 private void insertRecipientsForExistingContent(
337 final long contentId, final List<RecipientDevices> recipientDevices, final Connection connection
338 ) throws SQLException {
339 final var sql = """
340 INSERT INTO %s (address, device_id, content_id)
341 VALUES (?,?,?)
342 """.formatted(TABLE_MESSAGE_SEND_LOG);
343 try (final var statement = connection.prepareStatement(sql)) {
344 for (final var recipientDevice : recipientDevices) {
345 for (final var deviceId : recipientDevice.deviceIds()) {
346 statement.setString(1, recipientDevice.serviceId().toString());
347 statement.setInt(2, deviceId);
348 statement.setLong(3, contentId);
349 statement.executeUpdate();
350 }
351 }
352 }
353 }
354
355 private void deleteOutdatedEntries(final Connection connection) throws SQLException {
356 final var sql = """
357 DELETE FROM %s
358 WHERE timestamp < ?
359 """.formatted(TABLE_MESSAGE_SEND_LOG_CONTENT);
360 try (final var statement = connection.prepareStatement(sql)) {
361 statement.setLong(1, System.currentTimeMillis() - LOG_DURATION.toMillis());
362 final var rowCount = statement.executeUpdate();
363 if (rowCount > 0) {
364 logger.debug("Removed {} outdated entries from the message send log", rowCount);
365 } else {
366 logger.trace("No outdated entries to be removed from message send log.");
367 }
368 }
369 }
370
371 private void deleteOrphanedLogContents(final Connection connection) throws SQLException {
372 final var sql = """
373 DELETE FROM %s
374 WHERE _id NOT IN (SELECT content_id FROM %s)
375 """.formatted(TABLE_MESSAGE_SEND_LOG_CONTENT, TABLE_MESSAGE_SEND_LOG);
376 try (final var statement = connection.prepareStatement(sql)) {
377 statement.executeUpdate();
378 }
379 }
380
381 private MessageSendLogEntry getMessageSendLogEntryFromResultSet(ResultSet resultSet) throws SQLException {
382 final var groupId = Optional.ofNullable(resultSet.getBytes("group_id")).map(GroupId::unknownVersion);
383 final Content content;
384 try {
385 content = Content.ADAPTER.decode(resultSet.getBinaryStream("content"));
386 } catch (IOException e) {
387 logger.warn("Failed to parse content from message send log", e);
388 return null;
389 }
390 final var contentHint = ContentHint.fromType(resultSet.getInt("content_hint"));
391 final var urgent = resultSet.getBoolean("urgent");
392 return new MessageSendLogEntry(groupId, content, contentHint, urgent);
393 }
394
395 private record RecipientDevices(ServiceId serviceId, List<Integer> deviceIds) {}
396 }