]> nmode's Git Repositories - signal-cli/blob - lib/src/main/java/org/asamk/signal/manager/storage/sendLog/MessageSendLogStore.java
54632395e1eb0fced1f3aac2776d301cec01e087
[signal-cli] / lib / src / main / java / org / asamk / signal / manager / storage / sendLog / MessageSendLogStore.java
1 package org.asamk.signal.manager.storage.sendLog;
2
3 import org.asamk.signal.manager.groups.GroupId;
4 import org.asamk.signal.manager.groups.GroupUtils;
5 import org.asamk.signal.manager.storage.Database;
6 import org.asamk.signal.manager.storage.Utils;
7 import org.asamk.signal.manager.storage.recipients.RecipientId;
8 import org.asamk.signal.manager.storage.recipients.RecipientResolver;
9 import org.signal.libsignal.zkgroup.InvalidInputException;
10 import org.signal.libsignal.zkgroup.groups.GroupMasterKey;
11 import org.slf4j.Logger;
12 import org.slf4j.LoggerFactory;
13 import org.whispersystems.signalservice.api.crypto.ContentHint;
14 import org.whispersystems.signalservice.api.messages.SendMessageResult;
15 import org.whispersystems.signalservice.internal.push.SignalServiceProtos;
16
17 import java.io.IOException;
18 import java.sql.Connection;
19 import java.sql.ResultSet;
20 import java.sql.SQLException;
21 import java.time.Duration;
22 import java.util.List;
23 import java.util.Objects;
24 import java.util.Optional;
25
26 public class MessageSendLogStore implements AutoCloseable {
27
28 private static final Logger logger = LoggerFactory.getLogger(MessageSendLogStore.class);
29
30 private static final String TABLE_MESSAGE_SEND_LOG = "message_send_log";
31 private static final String TABLE_MESSAGE_SEND_LOG_CONTENT = "message_send_log_content";
32
33 private static final Duration LOG_DURATION = Duration.ofDays(1);
34
35 private final RecipientResolver recipientResolver;
36 private final Database database;
37 private final Thread cleanupThread;
38
39 public MessageSendLogStore(
40 final RecipientResolver recipientResolver, final Database database
41 ) {
42 this.recipientResolver = recipientResolver;
43 this.database = database;
44 this.cleanupThread = new Thread(() -> {
45 try {
46 final var interval = Duration.ofHours(1).toMillis();
47 while (!Thread.interrupted()) {
48 try (final var connection = database.getConnection()) {
49 deleteOutdatedEntries(connection);
50 } catch (SQLException e) {
51 logger.warn("Deleting outdated entries failed");
52 break;
53 }
54 Thread.sleep(interval);
55 }
56 } catch (InterruptedException e) {
57 logger.debug("Stopping msl cleanup thread");
58 }
59 });
60 cleanupThread.setName("msl-cleanup");
61 cleanupThread.setDaemon(true);
62 cleanupThread.start();
63 }
64
65 public static void createSql(Connection connection) throws SQLException {
66 // When modifying the CREATE statement here, also add a migration in AccountDatabase.java
67 try (final var statement = connection.createStatement()) {
68 statement.executeUpdate("""
69 CREATE TABLE message_send_log (
70 _id INTEGER PRIMARY KEY,
71 content_id INTEGER NOT NULL REFERENCES message_send_log_content (_id) ON DELETE CASCADE,
72 recipient_id INTEGER NOT NULL REFERENCES recipient (_id) ON DELETE CASCADE,
73 device_id INTEGER NOT NULL
74 );
75 CREATE TABLE message_send_log_content (
76 _id INTEGER PRIMARY KEY,
77 group_id BLOB,
78 timestamp INTEGER NOT NULL,
79 content BLOB NOT NULL,
80 content_hint INTEGER NOT NULL,
81 urgent BOOLEAN NOT NULL
82 );
83 CREATE INDEX mslc_timestamp_index ON message_send_log_content (timestamp);
84 CREATE INDEX msl_recipient_index ON message_send_log (recipient_id, device_id, content_id);
85 CREATE INDEX msl_content_index ON message_send_log (content_id);
86 """);
87 }
88 }
89
90 public List<MessageSendLogEntry> findMessages(
91 final RecipientId recipientId, final int deviceId, final long timestamp, final boolean isSenderKey
92 ) {
93 final var sql = """
94 SELECT group_id, content, content_hint
95 FROM %s l
96 INNER JOIN %s lc ON l.content_id = lc._id
97 WHERE l.recipient_id = ? AND l.device_id = ? AND lc.timestamp = ?
98 """.formatted(TABLE_MESSAGE_SEND_LOG, TABLE_MESSAGE_SEND_LOG_CONTENT);
99 try (final var connection = database.getConnection()) {
100 deleteOutdatedEntries(connection);
101
102 try (final var statement = connection.prepareStatement(sql)) {
103 statement.setLong(1, recipientId.id());
104 statement.setInt(2, deviceId);
105 statement.setLong(3, timestamp);
106 try (var result = Utils.executeQueryForStream(statement, this::getMessageSendLogEntryFromResultSet)) {
107 return result.filter(Objects::nonNull)
108 .filter(e -> !isSenderKey || e.groupId().isPresent())
109 .toList();
110 }
111 }
112 } catch (SQLException e) {
113 logger.warn("Failed read from message send log", e);
114 return List.of();
115 }
116 }
117
118 public long insertIfPossible(
119 long sentTimestamp, SendMessageResult sendMessageResult, ContentHint contentHint, boolean urgent
120 ) {
121 final RecipientDevices recipientDevice = getRecipientDevices(sendMessageResult);
122 if (recipientDevice == null) {
123 return -1;
124 }
125
126 return insert(List.of(recipientDevice),
127 sentTimestamp,
128 sendMessageResult.getSuccess().getContent().get(),
129 contentHint,
130 urgent);
131 }
132
133 public long insertIfPossible(
134 long sentTimestamp, List<SendMessageResult> sendMessageResults, ContentHint contentHint, boolean urgent
135 ) {
136 final var recipientDevices = sendMessageResults.stream()
137 .map(this::getRecipientDevices)
138 .filter(Objects::nonNull)
139 .toList();
140 if (recipientDevices.isEmpty()) {
141 return -1;
142 }
143
144 final var content = sendMessageResults.stream()
145 .filter(r -> r.isSuccess() && r.getSuccess().getContent().isPresent())
146 .map(r -> r.getSuccess().getContent().get())
147 .findFirst()
148 .get();
149
150 return insert(recipientDevices, sentTimestamp, content, contentHint, urgent);
151 }
152
153 public void addRecipientToExistingEntryIfPossible(final long contentId, final SendMessageResult sendMessageResult) {
154 final RecipientDevices recipientDevice = getRecipientDevices(sendMessageResult);
155 if (recipientDevice == null) {
156 return;
157 }
158
159 insertRecipientsForExistingContent(contentId, List.of(recipientDevice));
160 }
161
162 public void addRecipientToExistingEntryIfPossible(
163 final long contentId, final List<SendMessageResult> sendMessageResults
164 ) {
165 final var recipientDevices = sendMessageResults.stream()
166 .map(this::getRecipientDevices)
167 .filter(Objects::nonNull)
168 .toList();
169 if (recipientDevices.isEmpty()) {
170 return;
171 }
172
173 insertRecipientsForExistingContent(contentId, recipientDevices);
174 }
175
176 public void deleteEntryForGroup(long sentTimestamp, GroupId groupId) {
177 final var sql = """
178 DELETE FROM %s AS lc
179 WHERE lc.timestamp = ? AND lc.group_id = ?
180 """.formatted(TABLE_MESSAGE_SEND_LOG_CONTENT);
181 try (final var connection = database.getConnection()) {
182 try (final var statement = connection.prepareStatement(sql)) {
183 statement.setLong(1, sentTimestamp);
184 statement.setBytes(2, groupId.serialize());
185 statement.executeUpdate();
186 }
187 } catch (SQLException e) {
188 logger.warn("Failed delete from message send log", e);
189 }
190 }
191
192 public void deleteEntryForRecipientNonGroup(long sentTimestamp, RecipientId recipientId) {
193 final var sql = """
194 DELETE FROM %s AS lc
195 WHERE lc.timestamp = ? AND lc.group_id IS NULL AND lc._id IN (SELECT content_id FROM %s l WHERE l.recipient_id = ?)
196 """.formatted(TABLE_MESSAGE_SEND_LOG_CONTENT, TABLE_MESSAGE_SEND_LOG);
197 try (final var connection = database.getConnection()) {
198 connection.setAutoCommit(false);
199 try (final var statement = connection.prepareStatement(sql)) {
200 statement.setLong(1, sentTimestamp);
201 statement.setLong(2, recipientId.id());
202 statement.executeUpdate();
203 }
204
205 deleteOrphanedLogContents(connection);
206 connection.commit();
207 } catch (SQLException e) {
208 logger.warn("Failed delete from message send log", e);
209 }
210 }
211
212 public void deleteEntryForRecipient(long sentTimestamp, RecipientId recipientId, int deviceId) {
213 deleteEntriesForRecipient(List.of(sentTimestamp), recipientId, deviceId);
214 }
215
216 public void deleteEntriesForRecipient(List<Long> sentTimestamps, RecipientId recipientId, int deviceId) {
217 final var sql = """
218 DELETE FROM %s AS l
219 WHERE l.content_id IN (SELECT _id FROM %s lc WHERE lc.timestamp = ?) AND l.recipient_id = ? AND l.device_id = ?
220 """.formatted(TABLE_MESSAGE_SEND_LOG, TABLE_MESSAGE_SEND_LOG_CONTENT);
221 try (final var connection = database.getConnection()) {
222 connection.setAutoCommit(false);
223 try (final var statement = connection.prepareStatement(sql)) {
224 for (final var sentTimestamp : sentTimestamps) {
225 statement.setLong(1, sentTimestamp);
226 statement.setLong(2, recipientId.id());
227 statement.setInt(3, deviceId);
228 statement.executeUpdate();
229 }
230 }
231
232 deleteOrphanedLogContents(connection);
233 connection.commit();
234 } catch (SQLException e) {
235 logger.warn("Failed delete from message send log", e);
236 }
237 }
238
239 @Override
240 public void close() {
241 cleanupThread.interrupt();
242 try {
243 cleanupThread.join();
244 } catch (InterruptedException ignored) {
245 }
246 }
247
248 private RecipientDevices getRecipientDevices(final SendMessageResult sendMessageResult) {
249 if (sendMessageResult.isSuccess() && sendMessageResult.getSuccess().getContent().isPresent()) {
250 final var recipientId = recipientResolver.resolveRecipient(sendMessageResult.getAddress());
251 return new RecipientDevices(recipientId, sendMessageResult.getSuccess().getDevices());
252 } else {
253 return null;
254 }
255 }
256
257 private long insert(
258 final List<RecipientDevices> recipientDevices,
259 final long sentTimestamp,
260 final SignalServiceProtos.Content content,
261 final ContentHint contentHint,
262 final boolean urgent
263 ) {
264 byte[] groupId = getGroupId(content);
265
266 final var sql = """
267 INSERT INTO %s (timestamp, group_id, content, content_hint, urgent)
268 VALUES (?,?,?,?,?)
269 """.formatted(TABLE_MESSAGE_SEND_LOG_CONTENT);
270 try (final var connection = database.getConnection()) {
271 connection.setAutoCommit(false);
272 final long contentId;
273 try (final var statement = connection.prepareStatement(sql)) {
274 statement.setLong(1, sentTimestamp);
275 statement.setBytes(2, groupId);
276 statement.setBytes(3, content.toByteArray());
277 statement.setInt(4, contentHint.getType());
278 statement.setBoolean(5, urgent);
279 statement.executeUpdate();
280 final var generatedKeys = statement.getGeneratedKeys();
281 if (generatedKeys.next()) {
282 contentId = generatedKeys.getLong(1);
283 } else {
284 contentId = -1;
285 }
286 }
287 if (contentId == -1) {
288 logger.warn("Failed to insert message send log content");
289 return -1;
290 }
291 insertRecipientsForExistingContent(contentId, recipientDevices, connection);
292
293 connection.commit();
294 return contentId;
295 } catch (SQLException e) {
296 logger.warn("Failed to insert into message send log", e);
297 return -1;
298 }
299 }
300
301 private byte[] getGroupId(final SignalServiceProtos.Content content) {
302 try {
303 return !content.hasDataMessage()
304 ? null
305 : content.getDataMessage().hasGroup()
306 ? content.getDataMessage().getGroup().getId().toByteArray()
307 : content.getDataMessage().hasGroupV2()
308 ? GroupUtils.getGroupIdV2(new GroupMasterKey(content.getDataMessage()
309 .getGroupV2()
310 .getMasterKey()
311 .toByteArray())).serialize()
312 : null;
313 } catch (InvalidInputException e) {
314 logger.warn("Failed to parse groupId id from content");
315 return null;
316 }
317 }
318
319 private void insertRecipientsForExistingContent(
320 final long contentId, final List<RecipientDevices> recipientDevices
321 ) {
322 try (final var connection = database.getConnection()) {
323 connection.setAutoCommit(false);
324 insertRecipientsForExistingContent(contentId, recipientDevices, connection);
325 connection.commit();
326 } catch (SQLException e) {
327 logger.warn("Failed to append recipients to message send log", e);
328 }
329 }
330
331 private void insertRecipientsForExistingContent(
332 final long contentId, final List<RecipientDevices> recipientDevices, final Connection connection
333 ) throws SQLException {
334 final var sql = """
335 INSERT INTO %s (recipient_id, device_id, content_id)
336 VALUES (?,?,?)
337 """.formatted(TABLE_MESSAGE_SEND_LOG);
338 try (final var statement = connection.prepareStatement(sql)) {
339 for (final var recipientDevice : recipientDevices) {
340 for (final var deviceId : recipientDevice.deviceIds()) {
341 statement.setLong(1, recipientDevice.recipientId().id());
342 statement.setInt(2, deviceId);
343 statement.setLong(3, contentId);
344 statement.executeUpdate();
345 }
346 }
347 }
348 }
349
350 private void deleteOutdatedEntries(final Connection connection) throws SQLException {
351 final var sql = """
352 DELETE FROM %s
353 WHERE timestamp < ?
354 """.formatted(TABLE_MESSAGE_SEND_LOG_CONTENT);
355 try (final var statement = connection.prepareStatement(sql)) {
356 statement.setLong(1, System.currentTimeMillis() - LOG_DURATION.toMillis());
357 final var rowCount = statement.executeUpdate();
358 if (rowCount > 0) {
359 logger.debug("Removed {} outdated entries from the message send log", rowCount);
360 } else {
361 logger.trace("No outdated entries to be removed from message send log.");
362 }
363 }
364 }
365
366 private void deleteOrphanedLogContents(final Connection connection) throws SQLException {
367 final var sql = """
368 DELETE FROM %s
369 WHERE _id NOT IN (SELECT content_id FROM %s)
370 """.formatted(TABLE_MESSAGE_SEND_LOG_CONTENT, TABLE_MESSAGE_SEND_LOG);
371 try (final var statement = connection.prepareStatement(sql)) {
372 statement.executeUpdate();
373 }
374 }
375
376 private MessageSendLogEntry getMessageSendLogEntryFromResultSet(ResultSet resultSet) throws SQLException {
377 final var groupId = Optional.ofNullable(resultSet.getBytes("group_id")).map(GroupId::unknownVersion);
378 final SignalServiceProtos.Content content;
379 try {
380 content = SignalServiceProtos.Content.parseFrom(resultSet.getBinaryStream("content"));
381 } catch (IOException e) {
382 logger.warn("Failed to parse content from message send log", e);
383 return null;
384 }
385 final var contentHint = ContentHint.fromType(resultSet.getInt("content_hint"));
386 final var urgent = resultSet.getBoolean("urgent");
387 return new MessageSendLogEntry(groupId, content, contentHint, urgent);
388 }
389
390 private record RecipientDevices(RecipientId recipientId, List<Integer> deviceIds) {}
391 }