]> nmode's Git Repositories - signal-cli/blob - lib/src/main/java/org/asamk/signal/manager/storage/sendLog/MessageSendLogStore.java
8a807fd56c7003c0e80ecb3bf7846866b26fc914
[signal-cli] / lib / src / main / java / org / asamk / signal / manager / storage / sendLog / MessageSendLogStore.java
1 package org.asamk.signal.manager.storage.sendLog;
2
3 import org.asamk.signal.manager.groups.GroupId;
4 import org.asamk.signal.manager.groups.GroupUtils;
5 import org.asamk.signal.manager.storage.Database;
6 import org.asamk.signal.manager.storage.Utils;
7 import org.asamk.signal.manager.storage.recipients.RecipientId;
8 import org.asamk.signal.manager.storage.recipients.RecipientResolver;
9 import org.signal.libsignal.zkgroup.InvalidInputException;
10 import org.signal.libsignal.zkgroup.groups.GroupMasterKey;
11 import org.slf4j.Logger;
12 import org.slf4j.LoggerFactory;
13 import org.whispersystems.signalservice.api.crypto.ContentHint;
14 import org.whispersystems.signalservice.api.messages.SendMessageResult;
15 import org.whispersystems.signalservice.internal.push.SignalServiceProtos;
16
17 import java.io.IOException;
18 import java.sql.Connection;
19 import java.sql.SQLException;
20 import java.time.Duration;
21 import java.util.List;
22 import java.util.Objects;
23 import java.util.Optional;
24
25 public class MessageSendLogStore implements AutoCloseable {
26
27 private static final Logger logger = LoggerFactory.getLogger(MessageSendLogStore.class);
28
29 private static final String TABLE_MESSAGE_SEND_LOG = "message_send_log";
30 private static final String TABLE_MESSAGE_SEND_LOG_CONTENT = "message_send_log_content";
31
32 private static final Duration LOG_DURATION = Duration.ofDays(1);
33
34 private final RecipientResolver recipientResolver;
35 private final Database database;
36 private final Thread cleanupThread;
37
38 public MessageSendLogStore(
39 final RecipientResolver recipientResolver, final Database database
40 ) {
41 this.recipientResolver = recipientResolver;
42 this.database = database;
43 this.cleanupThread = new Thread(() -> {
44 try {
45 final var interval = Duration.ofHours(1).toMillis();
46 while (!Thread.interrupted()) {
47 try (final var connection = database.getConnection()) {
48 deleteOutdatedEntries(connection);
49 } catch (SQLException e) {
50 logger.warn("Deleting outdated entries failed");
51 break;
52 }
53 Thread.sleep(interval);
54 }
55 } catch (InterruptedException e) {
56 logger.debug("Stopping msl cleanup thread");
57 }
58 });
59 cleanupThread.setName("msl-cleanup");
60 cleanupThread.setDaemon(true);
61 cleanupThread.start();
62 }
63
64 public static void createSql(Connection connection) throws SQLException {
65 // When modifying the CREATE statement here, also add a migration in AccountDatabase.java
66 try (final var statement = connection.createStatement()) {
67 statement.executeUpdate("""
68 CREATE TABLE message_send_log (
69 _id INTEGER PRIMARY KEY,
70 content_id INTEGER NOT NULL REFERENCES message_send_log_content (_id) ON DELETE CASCADE,
71 recipient_id INTEGER NOT NULL REFERENCES recipient (_id) ON DELETE CASCADE,
72 device_id INTEGER NOT NULL
73 );
74 CREATE TABLE message_send_log_content (
75 _id INTEGER PRIMARY KEY,
76 group_id BLOB,
77 timestamp INTEGER NOT NULL,
78 content BLOB NOT NULL,
79 content_hint INTEGER NOT NULL,
80 urgent BOOLEAN NOT NULL
81 );
82 CREATE INDEX mslc_timestamp_index ON message_send_log_content (timestamp);
83 CREATE INDEX msl_recipient_index ON message_send_log (recipient_id, device_id, content_id);
84 CREATE INDEX msl_content_index ON message_send_log (content_id);
85 """);
86 }
87 }
88
89 public List<MessageSendLogEntry> findMessages(
90 final RecipientId recipientId, final int deviceId, final long timestamp, final boolean isSenderKey
91 ) {
92 final var sql = """
93 SELECT group_id, content, content_hint
94 FROM %s l
95 INNER JOIN %s lc ON l.content_id = lc._id
96 WHERE l.recipient_id = ? AND l.device_id = ? AND lc.timestamp = ?
97 """.formatted(TABLE_MESSAGE_SEND_LOG, TABLE_MESSAGE_SEND_LOG_CONTENT);
98 try (final var connection = database.getConnection()) {
99 deleteOutdatedEntries(connection);
100
101 try (final var statement = connection.prepareStatement(sql)) {
102 statement.setLong(1, recipientId.id());
103 statement.setInt(2, deviceId);
104 statement.setLong(3, timestamp);
105 try (var result = Utils.executeQueryForStream(statement, resultSet -> {
106 final var groupId = Optional.ofNullable(resultSet.getBytes("group_id"))
107 .map(GroupId::unknownVersion);
108 final SignalServiceProtos.Content content;
109 try {
110 content = SignalServiceProtos.Content.parseFrom(resultSet.getBinaryStream("content"));
111 } catch (IOException e) {
112 logger.warn("Failed to parse content from message send log", e);
113 return null;
114 }
115 final var contentHint = ContentHint.fromType(resultSet.getInt("content_hint"));
116 final var urgent = resultSet.getBoolean("urgent");
117 return new MessageSendLogEntry(groupId, content, contentHint, urgent);
118 })) {
119 return result.filter(Objects::nonNull)
120 .filter(e -> !isSenderKey || e.groupId().isPresent())
121 .toList();
122 }
123 }
124 } catch (SQLException e) {
125 logger.warn("Failed read from message send log", e);
126 return List.of();
127 }
128 }
129
130 public long insertIfPossible(
131 long sentTimestamp, SendMessageResult sendMessageResult, ContentHint contentHint, boolean urgent
132 ) {
133 final RecipientDevices recipientDevice = getRecipientDevices(sendMessageResult);
134 if (recipientDevice == null) {
135 return -1;
136 }
137
138 return insert(List.of(recipientDevice),
139 sentTimestamp,
140 sendMessageResult.getSuccess().getContent().get(),
141 contentHint,
142 urgent);
143 }
144
145 public long insertIfPossible(
146 long sentTimestamp, List<SendMessageResult> sendMessageResults, ContentHint contentHint, boolean urgent
147 ) {
148 final var recipientDevices = sendMessageResults.stream()
149 .map(this::getRecipientDevices)
150 .filter(Objects::nonNull)
151 .toList();
152 if (recipientDevices.isEmpty()) {
153 return -1;
154 }
155
156 final var content = sendMessageResults.stream()
157 .filter(r -> r.isSuccess() && r.getSuccess().getContent().isPresent())
158 .map(r -> r.getSuccess().getContent().get())
159 .findFirst()
160 .get();
161
162 return insert(recipientDevices, sentTimestamp, content, contentHint, urgent);
163 }
164
165 public void addRecipientToExistingEntryIfPossible(final long contentId, final SendMessageResult sendMessageResult) {
166 final RecipientDevices recipientDevice = getRecipientDevices(sendMessageResult);
167 if (recipientDevice == null) {
168 return;
169 }
170
171 insertRecipientsForExistingContent(contentId, List.of(recipientDevice));
172 }
173
174 public void addRecipientToExistingEntryIfPossible(
175 final long contentId, final List<SendMessageResult> sendMessageResults
176 ) {
177 final var recipientDevices = sendMessageResults.stream()
178 .map(this::getRecipientDevices)
179 .filter(Objects::nonNull)
180 .toList();
181 if (recipientDevices.isEmpty()) {
182 return;
183 }
184
185 insertRecipientsForExistingContent(contentId, recipientDevices);
186 }
187
188 public void deleteEntryForGroup(long sentTimestamp, GroupId groupId) {
189 final var sql = """
190 DELETE FROM %s AS lc
191 WHERE lc.timestamp = ? AND lc.group_id = ?
192 """.formatted(TABLE_MESSAGE_SEND_LOG_CONTENT);
193 try (final var connection = database.getConnection()) {
194 try (final var statement = connection.prepareStatement(sql)) {
195 statement.setLong(1, sentTimestamp);
196 statement.setBytes(2, groupId.serialize());
197 statement.executeUpdate();
198 }
199 } catch (SQLException e) {
200 logger.warn("Failed delete from message send log", e);
201 }
202 }
203
204 public void deleteEntryForRecipientNonGroup(long sentTimestamp, RecipientId recipientId) {
205 final var sql = """
206 DELETE FROM %s AS lc
207 WHERE lc.timestamp = ? AND lc.group_id IS NULL AND lc._id IN (SELECT content_id FROM %s l WHERE l.recipient_id = ?)
208 """.formatted(TABLE_MESSAGE_SEND_LOG_CONTENT, TABLE_MESSAGE_SEND_LOG);
209 try (final var connection = database.getConnection()) {
210 connection.setAutoCommit(false);
211 try (final var statement = connection.prepareStatement(sql)) {
212 statement.setLong(1, sentTimestamp);
213 statement.setLong(2, recipientId.id());
214 statement.executeUpdate();
215 }
216
217 deleteOrphanedLogContents(connection);
218 connection.commit();
219 } catch (SQLException e) {
220 logger.warn("Failed delete from message send log", e);
221 }
222 }
223
224 public void deleteEntryForRecipient(long sentTimestamp, RecipientId recipientId, int deviceId) {
225 deleteEntriesForRecipient(List.of(sentTimestamp), recipientId, deviceId);
226 }
227
228 public void deleteEntriesForRecipient(List<Long> sentTimestamps, RecipientId recipientId, int deviceId) {
229 final var sql = """
230 DELETE FROM %s AS l
231 WHERE l.content_id IN (SELECT _id FROM %s lc WHERE lc.timestamp = ?) AND l.recipient_id = ? AND l.device_id = ?
232 """.formatted(TABLE_MESSAGE_SEND_LOG, TABLE_MESSAGE_SEND_LOG_CONTENT);
233 try (final var connection = database.getConnection()) {
234 connection.setAutoCommit(false);
235 try (final var statement = connection.prepareStatement(sql)) {
236 for (final var sentTimestamp : sentTimestamps) {
237 statement.setLong(1, sentTimestamp);
238 statement.setLong(2, recipientId.id());
239 statement.setInt(3, deviceId);
240 statement.executeUpdate();
241 }
242 }
243
244 deleteOrphanedLogContents(connection);
245 connection.commit();
246 } catch (SQLException e) {
247 logger.warn("Failed delete from message send log", e);
248 }
249 }
250
251 @Override
252 public void close() {
253 cleanupThread.interrupt();
254 try {
255 cleanupThread.join();
256 } catch (InterruptedException ignored) {
257 }
258 }
259
260 private RecipientDevices getRecipientDevices(final SendMessageResult sendMessageResult) {
261 if (sendMessageResult.isSuccess() && sendMessageResult.getSuccess().getContent().isPresent()) {
262 final var recipientId = recipientResolver.resolveRecipient(sendMessageResult.getAddress());
263 return new RecipientDevices(recipientId, sendMessageResult.getSuccess().getDevices());
264 } else {
265 return null;
266 }
267 }
268
269 private long insert(
270 final List<RecipientDevices> recipientDevices,
271 final long sentTimestamp,
272 final SignalServiceProtos.Content content,
273 final ContentHint contentHint,
274 final boolean urgent
275 ) {
276 byte[] groupId = getGroupId(content);
277
278 final var sql = """
279 INSERT INTO %s (timestamp, group_id, content, content_hint, urgent)
280 VALUES (?,?,?,?,?)
281 """.formatted(TABLE_MESSAGE_SEND_LOG_CONTENT);
282 try (final var connection = database.getConnection()) {
283 connection.setAutoCommit(false);
284 final long contentId;
285 try (final var statement = connection.prepareStatement(sql)) {
286 statement.setLong(1, sentTimestamp);
287 statement.setBytes(2, groupId);
288 statement.setBytes(3, content.toByteArray());
289 statement.setInt(4, contentHint.getType());
290 statement.setBoolean(5, urgent);
291 statement.executeUpdate();
292 final var generatedKeys = statement.getGeneratedKeys();
293 if (generatedKeys.next()) {
294 contentId = generatedKeys.getLong(1);
295 } else {
296 contentId = -1;
297 }
298 }
299 if (contentId == -1) {
300 logger.warn("Failed to insert message send log content");
301 return -1;
302 }
303 insertRecipientsForExistingContent(contentId, recipientDevices, connection);
304
305 connection.commit();
306 return contentId;
307 } catch (SQLException e) {
308 logger.warn("Failed to insert into message send log", e);
309 return -1;
310 }
311 }
312
313 private byte[] getGroupId(final SignalServiceProtos.Content content) {
314 try {
315 return !content.hasDataMessage()
316 ? null
317 : content.getDataMessage().hasGroup()
318 ? content.getDataMessage().getGroup().getId().toByteArray()
319 : content.getDataMessage().hasGroupV2()
320 ? GroupUtils.getGroupIdV2(new GroupMasterKey(content.getDataMessage()
321 .getGroupV2()
322 .getMasterKey()
323 .toByteArray())).serialize()
324 : null;
325 } catch (InvalidInputException e) {
326 logger.warn("Failed to parse groupId id from content");
327 return null;
328 }
329 }
330
331 private void insertRecipientsForExistingContent(
332 final long contentId, final List<RecipientDevices> recipientDevices
333 ) {
334 try (final var connection = database.getConnection()) {
335 connection.setAutoCommit(false);
336 insertRecipientsForExistingContent(contentId, recipientDevices, connection);
337 connection.commit();
338 } catch (SQLException e) {
339 logger.warn("Failed to append recipients to message send log", e);
340 }
341 }
342
343 private void insertRecipientsForExistingContent(
344 final long contentId, final List<RecipientDevices> recipientDevices, final Connection connection
345 ) throws SQLException {
346 final var sql = """
347 INSERT INTO %s (recipient_id, device_id, content_id)
348 VALUES (?,?,?)
349 """.formatted(TABLE_MESSAGE_SEND_LOG);
350 try (final var statement = connection.prepareStatement(sql)) {
351 for (final var recipientDevice : recipientDevices) {
352 for (final var deviceId : recipientDevice.deviceIds()) {
353 statement.setLong(1, recipientDevice.recipientId().id());
354 statement.setInt(2, deviceId);
355 statement.setLong(3, contentId);
356 statement.executeUpdate();
357 }
358 }
359 }
360 }
361
362 private void deleteOutdatedEntries(final Connection connection) throws SQLException {
363 final var sql = """
364 DELETE FROM %s
365 WHERE timestamp < ?
366 """.formatted(TABLE_MESSAGE_SEND_LOG_CONTENT);
367 try (final var statement = connection.prepareStatement(sql)) {
368 statement.setLong(1, System.currentTimeMillis() - LOG_DURATION.toMillis());
369 final var rowCount = statement.executeUpdate();
370 if (rowCount > 0) {
371 logger.debug("Removed {} outdated entries from the message send log", rowCount);
372 } else {
373 logger.trace("No outdated entries to be removed from message send log.");
374 }
375 }
376 }
377
378 private void deleteOrphanedLogContents(final Connection connection) throws SQLException {
379 final var sql = """
380 DELETE FROM %s
381 WHERE _id NOT IN (SELECT content_id FROM %s)
382 """.formatted(TABLE_MESSAGE_SEND_LOG_CONTENT, TABLE_MESSAGE_SEND_LOG);
383 try (final var statement = connection.prepareStatement(sql)) {
384 statement.executeUpdate();
385 }
386 }
387
388 private record RecipientDevices(RecipientId recipientId, List<Integer> deviceIds) {}
389 }