]> nmode's Git Repositories - signal-cli/blob - lib/src/main/java/org/asamk/signal/manager/storage/sendLog/MessageSendLogStore.java
b53c5ac0040451bb9914566b53e0b396c35aa8e7
[signal-cli] / lib / src / main / java / org / asamk / signal / manager / storage / sendLog / MessageSendLogStore.java
1 package org.asamk.signal.manager.storage.sendLog;
2
3 import org.asamk.signal.manager.groups.GroupId;
4 import org.asamk.signal.manager.groups.GroupUtils;
5 import org.asamk.signal.manager.storage.Database;
6 import org.asamk.signal.manager.storage.recipients.RecipientId;
7 import org.asamk.signal.manager.storage.recipients.RecipientResolver;
8 import org.signal.libsignal.zkgroup.InvalidInputException;
9 import org.signal.libsignal.zkgroup.groups.GroupMasterKey;
10 import org.slf4j.Logger;
11 import org.slf4j.LoggerFactory;
12 import org.whispersystems.signalservice.api.crypto.ContentHint;
13 import org.whispersystems.signalservice.api.messages.SendMessageResult;
14 import org.whispersystems.signalservice.internal.push.SignalServiceProtos;
15
16 import java.io.IOException;
17 import java.sql.Connection;
18 import java.sql.PreparedStatement;
19 import java.sql.ResultSet;
20 import java.sql.SQLException;
21 import java.time.Duration;
22 import java.util.List;
23 import java.util.Objects;
24 import java.util.Optional;
25 import java.util.Spliterator;
26 import java.util.Spliterators;
27 import java.util.function.Consumer;
28 import java.util.stream.Stream;
29 import java.util.stream.StreamSupport;
30
31 public class MessageSendLogStore implements AutoCloseable {
32
33 private static final Logger logger = LoggerFactory.getLogger(MessageSendLogStore.class);
34
35 private static final String TABLE_MESSAGE_SEND_LOG = "message_send_log";
36 private static final String TABLE_MESSAGE_SEND_LOG_CONTENT = "message_send_log_content";
37
38 private static final Duration LOG_DURATION = Duration.ofDays(1);
39
40 private final RecipientResolver recipientResolver;
41 private final Database database;
42 private final Thread cleanupThread;
43
44 public MessageSendLogStore(
45 final RecipientResolver recipientResolver, final Database database
46 ) {
47 this.recipientResolver = recipientResolver;
48 this.database = database;
49 this.cleanupThread = new Thread(() -> {
50 try {
51 final var interval = Duration.ofHours(1).toMillis();
52 while (!Thread.interrupted()) {
53 try (final var connection = database.getConnection()) {
54 deleteOutdatedEntries(connection);
55 } catch (SQLException e) {
56 logger.warn("Deleting outdated entries failed");
57 break;
58 }
59 Thread.sleep(interval);
60 }
61 } catch (InterruptedException e) {
62 logger.debug("Stopping msl cleanup thread");
63 }
64 });
65 cleanupThread.setName("msl-cleanup");
66 cleanupThread.setDaemon(true);
67 cleanupThread.start();
68 }
69
70 public static void createSql(Connection connection) throws SQLException {
71 try (final var statement = connection.createStatement()) {
72 statement.executeUpdate("""
73 CREATE TABLE message_send_log (
74 _id INTEGER PRIMARY KEY,
75 content_id INTEGER NOT NULL REFERENCES message_send_log_content (_id) ON DELETE CASCADE,
76 recipient_id INTEGER NOT NULL,
77 device_id INTEGER NOT NULL
78 );
79 CREATE TABLE message_send_log_content (
80 _id INTEGER PRIMARY KEY,
81 group_id BLOB,
82 timestamp INTEGER NOT NULL,
83 content BLOB NOT NULL,
84 content_hint INTEGER NOT NULL
85 );
86 CREATE INDEX mslc_timestamp_index ON message_send_log_content (timestamp);
87 CREATE INDEX msl_recipient_index ON message_send_log (recipient_id, device_id, content_id);
88 CREATE INDEX msl_content_index ON message_send_log (content_id);
89 """);
90 }
91 }
92
93 public List<MessageSendLogEntry> findMessages(
94 final RecipientId recipientId, final int deviceId, final long timestamp, final boolean isSenderKey
95 ) {
96 final var sql = """
97 SELECT group_id, content, content_hint
98 FROM %s l
99 INNER JOIN %s lc ON l.content_id = lc._id
100 WHERE l.recipient_id = ? AND l.device_id = ? AND lc.timestamp = ?
101 """.formatted(TABLE_MESSAGE_SEND_LOG, TABLE_MESSAGE_SEND_LOG_CONTENT);
102 try (final var connection = database.getConnection()) {
103 deleteOutdatedEntries(connection);
104
105 try (final var statement = connection.prepareStatement(sql)) {
106 statement.setLong(1, recipientId.id());
107 statement.setInt(2, deviceId);
108 statement.setLong(3, timestamp);
109 try (var result = executeQueryForStream(statement, resultSet -> {
110 final var groupId = Optional.ofNullable(resultSet.getBytes("group_id"))
111 .map(GroupId::unknownVersion);
112 final SignalServiceProtos.Content content;
113 try {
114 content = SignalServiceProtos.Content.parseFrom(resultSet.getBinaryStream("content"));
115 } catch (IOException e) {
116 logger.warn("Failed to parse content from message send log", e);
117 return null;
118 }
119 final var contentHint = ContentHint.fromType(resultSet.getInt("content_hint"));
120 final var urgent = true; // TODO
121 return new MessageSendLogEntry(groupId, content, contentHint, urgent);
122 })) {
123 return result.filter(Objects::nonNull)
124 .filter(e -> !isSenderKey || e.groupId().isPresent())
125 .toList();
126 }
127 }
128 } catch (SQLException e) {
129 logger.warn("Failed read from message send log", e);
130 return List.of();
131 }
132 }
133
134 public long insertIfPossible(
135 long sentTimestamp, SendMessageResult sendMessageResult, ContentHint contentHint, boolean urgent
136 ) {
137 final RecipientDevices recipientDevice = getRecipientDevices(sendMessageResult);
138 if (recipientDevice == null) {
139 return -1;
140 }
141
142 return insert(List.of(recipientDevice),
143 sentTimestamp,
144 sendMessageResult.getSuccess().getContent().get(),
145 contentHint,
146 urgent);
147 }
148
149 public long insertIfPossible(
150 long sentTimestamp, List<SendMessageResult> sendMessageResults, ContentHint contentHint, boolean urgent
151 ) {
152 final var recipientDevices = sendMessageResults.stream()
153 .map(this::getRecipientDevices)
154 .filter(Objects::nonNull)
155 .toList();
156 if (recipientDevices.isEmpty()) {
157 return -1;
158 }
159
160 final var content = sendMessageResults.stream()
161 .filter(r -> r.isSuccess() && r.getSuccess().getContent().isPresent())
162 .map(r -> r.getSuccess().getContent().get())
163 .findFirst()
164 .get();
165
166 return insert(recipientDevices, sentTimestamp, content, contentHint, urgent);
167 }
168
169 public void addRecipientToExistingEntryIfPossible(final long contentId, final SendMessageResult sendMessageResult) {
170 final RecipientDevices recipientDevice = getRecipientDevices(sendMessageResult);
171 if (recipientDevice == null) {
172 return;
173 }
174
175 insertRecipientsForExistingContent(contentId, List.of(recipientDevice));
176 }
177
178 public void addRecipientToExistingEntryIfPossible(
179 final long contentId, final List<SendMessageResult> sendMessageResults
180 ) {
181 final var recipientDevices = sendMessageResults.stream()
182 .map(this::getRecipientDevices)
183 .filter(Objects::nonNull)
184 .toList();
185 if (recipientDevices.isEmpty()) {
186 return;
187 }
188
189 insertRecipientsForExistingContent(contentId, recipientDevices);
190 }
191
192 public void deleteEntryForGroup(long sentTimestamp, GroupId groupId) {
193 final var sql = """
194 DELETE FROM %s AS lc
195 WHERE lc.timestamp = ? AND lc.group_id = ?
196 """.formatted(TABLE_MESSAGE_SEND_LOG_CONTENT);
197 try (final var connection = database.getConnection()) {
198 try (final var statement = connection.prepareStatement(sql)) {
199 statement.setLong(1, sentTimestamp);
200 statement.setBytes(2, groupId.serialize());
201 statement.executeUpdate();
202 }
203 } catch (SQLException e) {
204 logger.warn("Failed delete from message send log", e);
205 }
206 }
207
208 public void deleteEntryForRecipientNonGroup(long sentTimestamp, RecipientId recipientId) {
209 final var sql = """
210 DELETE FROM %s AS lc
211 WHERE lc.timestamp = ? AND lc.group_id IS NULL AND lc._id IN (SELECT content_id FROM %s l WHERE l.recipient_id = ?)
212 """.formatted(TABLE_MESSAGE_SEND_LOG_CONTENT, TABLE_MESSAGE_SEND_LOG);
213 try (final var connection = database.getConnection()) {
214 connection.setAutoCommit(false);
215 try (final var statement = connection.prepareStatement(sql)) {
216 statement.setLong(1, sentTimestamp);
217 statement.setLong(2, recipientId.id());
218 statement.executeUpdate();
219 }
220
221 deleteOrphanedLogContents(connection);
222 connection.commit();
223 } catch (SQLException e) {
224 logger.warn("Failed delete from message send log", e);
225 }
226 }
227
228 public void deleteEntryForRecipient(long sentTimestamp, RecipientId recipientId, int deviceId) {
229 deleteEntriesForRecipient(List.of(sentTimestamp), recipientId, deviceId);
230 }
231
232 public void deleteEntriesForRecipient(List<Long> sentTimestamps, RecipientId recipientId, int deviceId) {
233 final var sql = """
234 DELETE FROM %s AS l
235 WHERE l.content_id IN (SELECT _id FROM %s lc WHERE lc.timestamp = ?) AND l.recipient_id = ? AND l.device_id = ?
236 """.formatted(TABLE_MESSAGE_SEND_LOG, TABLE_MESSAGE_SEND_LOG_CONTENT);
237 try (final var connection = database.getConnection()) {
238 connection.setAutoCommit(false);
239 try (final var statement = connection.prepareStatement(sql)) {
240 for (final var sentTimestamp : sentTimestamps) {
241 statement.setLong(1, sentTimestamp);
242 statement.setLong(2, recipientId.id());
243 statement.setInt(3, deviceId);
244 statement.executeUpdate();
245 }
246 }
247
248 deleteOrphanedLogContents(connection);
249 connection.commit();
250 } catch (SQLException e) {
251 logger.warn("Failed delete from message send log", e);
252 }
253 }
254
255 @Override
256 public void close() {
257 cleanupThread.interrupt();
258 try {
259 cleanupThread.join();
260 } catch (InterruptedException ignored) {
261 }
262 }
263
264 private RecipientDevices getRecipientDevices(final SendMessageResult sendMessageResult) {
265 if (sendMessageResult.isSuccess() && sendMessageResult.getSuccess().getContent().isPresent()) {
266 final var recipientId = recipientResolver.resolveRecipient(sendMessageResult.getAddress());
267 return new RecipientDevices(recipientId, sendMessageResult.getSuccess().getDevices());
268 } else {
269 return null;
270 }
271 }
272
273 private long insert(
274 final List<RecipientDevices> recipientDevices,
275 final long sentTimestamp,
276 final SignalServiceProtos.Content content,
277 final ContentHint contentHint,
278 final boolean urgent
279 ) {
280 byte[] groupId = getGroupId(content);
281
282 // TODO store urgent
283 final var sql = """
284 INSERT INTO %s (timestamp, group_id, content, content_hint)
285 VALUES (?,?,?,?)
286 """.formatted(TABLE_MESSAGE_SEND_LOG_CONTENT);
287 try (final var connection = database.getConnection()) {
288 connection.setAutoCommit(false);
289 final long contentId;
290 try (final var statement = connection.prepareStatement(sql)) {
291 statement.setLong(1, sentTimestamp);
292 statement.setBytes(2, groupId);
293 statement.setBytes(3, content.toByteArray());
294 statement.setInt(4, contentHint.getType());
295 statement.executeUpdate();
296 final var generatedKeys = statement.getGeneratedKeys();
297 if (generatedKeys.next()) {
298 contentId = generatedKeys.getLong(1);
299 } else {
300 contentId = -1;
301 }
302 }
303 if (contentId == -1) {
304 logger.warn("Failed to insert message send log content");
305 return -1;
306 }
307 insertRecipientsForExistingContent(contentId, recipientDevices, connection);
308
309 connection.commit();
310 return contentId;
311 } catch (SQLException e) {
312 logger.warn("Failed to insert into message send log", e);
313 return -1;
314 }
315 }
316
317 private byte[] getGroupId(final SignalServiceProtos.Content content) {
318 try {
319 return !content.hasDataMessage()
320 ? null
321 : content.getDataMessage().hasGroup()
322 ? content.getDataMessage().getGroup().getId().toByteArray()
323 : content.getDataMessage().hasGroupV2()
324 ? GroupUtils.getGroupIdV2(new GroupMasterKey(content.getDataMessage()
325 .getGroupV2()
326 .getMasterKey()
327 .toByteArray())).serialize()
328 : null;
329 } catch (InvalidInputException e) {
330 logger.warn("Failed to parse groupId id from content");
331 return null;
332 }
333 }
334
335 private void insertRecipientsForExistingContent(
336 final long contentId, final List<RecipientDevices> recipientDevices
337 ) {
338 try (final var connection = database.getConnection()) {
339 connection.setAutoCommit(false);
340 insertRecipientsForExistingContent(contentId, recipientDevices, connection);
341 connection.commit();
342 } catch (SQLException e) {
343 logger.warn("Failed to append recipients to message send log", e);
344 }
345 }
346
347 private void insertRecipientsForExistingContent(
348 final long contentId, final List<RecipientDevices> recipientDevices, final Connection connection
349 ) throws SQLException {
350 final var sql = """
351 INSERT INTO %s (recipient_id, device_id, content_id)
352 VALUES (?,?,?)
353 """.formatted(TABLE_MESSAGE_SEND_LOG);
354 try (final var statement = connection.prepareStatement(sql)) {
355 for (final var recipientDevice : recipientDevices) {
356 for (final var deviceId : recipientDevice.deviceIds()) {
357 statement.setLong(1, recipientDevice.recipientId().id());
358 statement.setInt(2, deviceId);
359 statement.setLong(3, contentId);
360 statement.executeUpdate();
361 }
362 }
363 }
364 }
365
366 private void deleteOutdatedEntries(final Connection connection) throws SQLException {
367 final var sql = """
368 DELETE FROM %s
369 WHERE timestamp < ?
370 """.formatted(TABLE_MESSAGE_SEND_LOG_CONTENT);
371 try (final var statement = connection.prepareStatement(sql)) {
372 statement.setLong(1, System.currentTimeMillis() - LOG_DURATION.toMillis());
373 final var rowCount = statement.executeUpdate();
374 if (rowCount > 0) {
375 logger.debug("Removed {} outdated entries from the message send log", rowCount);
376 } else {
377 logger.trace("No outdated entries to be removed from message send log.");
378 }
379 }
380 }
381
382 private void deleteOrphanedLogContents(final Connection connection) throws SQLException {
383 final var sql = """
384 DELETE FROM %s
385 WHERE _id NOT IN (SELECT content_id FROM %s)
386 """.formatted(TABLE_MESSAGE_SEND_LOG_CONTENT, TABLE_MESSAGE_SEND_LOG);
387 try (final var statement = connection.prepareStatement(sql)) {
388 statement.executeUpdate();
389 }
390 }
391
392 private <T> Stream<T> executeQueryForStream(
393 PreparedStatement statement, ResultSetMapper<T> mapper
394 ) throws SQLException {
395 final var resultSet = statement.executeQuery();
396
397 return StreamSupport.stream(new Spliterators.AbstractSpliterator<>(Long.MAX_VALUE, Spliterator.ORDERED) {
398 @Override
399 public boolean tryAdvance(final Consumer<? super T> consumer) {
400 try {
401 if (!resultSet.next()) {
402 return false;
403 }
404 consumer.accept(mapper.apply(resultSet));
405 return true;
406 } catch (SQLException e) {
407 logger.warn("Failed to read from database result", e);
408 throw new RuntimeException(e);
409 }
410 }
411 }, false);
412 }
413
414 private interface ResultSetMapper<T> {
415
416 T apply(ResultSet resultSet) throws SQLException;
417 }
418
419 private record RecipientDevices(RecipientId recipientId, List<Integer> deviceIds) {}
420 }