]> nmode's Git Repositories - signal-cli/blob - lib/src/main/java/org/asamk/signal/manager/storage/sendLog/MessageSendLogStore.java
5dde88b2db91747b0d10544341ef6e4cc69afed4
[signal-cli] / lib / src / main / java / org / asamk / signal / manager / storage / sendLog / MessageSendLogStore.java
1 package org.asamk.signal.manager.storage.sendLog;
2
3 import org.asamk.signal.manager.groups.GroupId;
4 import org.asamk.signal.manager.groups.GroupUtils;
5 import org.asamk.signal.manager.storage.Database;
6 import org.asamk.signal.manager.storage.Utils;
7 import org.asamk.signal.manager.storage.recipients.RecipientId;
8 import org.asamk.signal.manager.storage.recipients.RecipientResolver;
9 import org.signal.libsignal.zkgroup.InvalidInputException;
10 import org.signal.libsignal.zkgroup.groups.GroupMasterKey;
11 import org.slf4j.Logger;
12 import org.slf4j.LoggerFactory;
13 import org.whispersystems.signalservice.api.crypto.ContentHint;
14 import org.whispersystems.signalservice.api.messages.SendMessageResult;
15 import org.whispersystems.signalservice.internal.push.SignalServiceProtos;
16
17 import java.io.IOException;
18 import java.sql.Connection;
19 import java.sql.SQLException;
20 import java.time.Duration;
21 import java.util.List;
22 import java.util.Objects;
23 import java.util.Optional;
24
25 public class MessageSendLogStore implements AutoCloseable {
26
27 private static final Logger logger = LoggerFactory.getLogger(MessageSendLogStore.class);
28
29 private static final String TABLE_MESSAGE_SEND_LOG = "message_send_log";
30 private static final String TABLE_MESSAGE_SEND_LOG_CONTENT = "message_send_log_content";
31
32 private static final Duration LOG_DURATION = Duration.ofDays(1);
33
34 private final RecipientResolver recipientResolver;
35 private final Database database;
36 private final Thread cleanupThread;
37
38 public MessageSendLogStore(
39 final RecipientResolver recipientResolver, final Database database
40 ) {
41 this.recipientResolver = recipientResolver;
42 this.database = database;
43 this.cleanupThread = new Thread(() -> {
44 try {
45 final var interval = Duration.ofHours(1).toMillis();
46 while (!Thread.interrupted()) {
47 try (final var connection = database.getConnection()) {
48 deleteOutdatedEntries(connection);
49 } catch (SQLException e) {
50 logger.warn("Deleting outdated entries failed");
51 break;
52 }
53 Thread.sleep(interval);
54 }
55 } catch (InterruptedException e) {
56 logger.debug("Stopping msl cleanup thread");
57 }
58 });
59 cleanupThread.setName("msl-cleanup");
60 cleanupThread.setDaemon(true);
61 cleanupThread.start();
62 }
63
64 public static void createSql(Connection connection) throws SQLException {
65 // When modifying the CREATE statement here, also add a migration in AccountDatabase.java
66 try (final var statement = connection.createStatement()) {
67 statement.executeUpdate("""
68 CREATE TABLE message_send_log (
69 _id INTEGER PRIMARY KEY,
70 content_id INTEGER NOT NULL REFERENCES message_send_log_content (_id) ON DELETE CASCADE,
71 recipient_id INTEGER NOT NULL REFERENCES recipient (_id) ON DELETE CASCADE,
72 device_id INTEGER NOT NULL
73 );
74 CREATE TABLE message_send_log_content (
75 _id INTEGER PRIMARY KEY,
76 group_id BLOB,
77 timestamp INTEGER NOT NULL,
78 content BLOB NOT NULL,
79 content_hint INTEGER NOT NULL
80 );
81 CREATE INDEX mslc_timestamp_index ON message_send_log_content (timestamp);
82 CREATE INDEX msl_recipient_index ON message_send_log (recipient_id, device_id, content_id);
83 CREATE INDEX msl_content_index ON message_send_log (content_id);
84 """);
85 }
86 }
87
88 public List<MessageSendLogEntry> findMessages(
89 final RecipientId recipientId, final int deviceId, final long timestamp, final boolean isSenderKey
90 ) {
91 final var sql = """
92 SELECT group_id, content, content_hint
93 FROM %s l
94 INNER JOIN %s lc ON l.content_id = lc._id
95 WHERE l.recipient_id = ? AND l.device_id = ? AND lc.timestamp = ?
96 """.formatted(TABLE_MESSAGE_SEND_LOG, TABLE_MESSAGE_SEND_LOG_CONTENT);
97 try (final var connection = database.getConnection()) {
98 deleteOutdatedEntries(connection);
99
100 try (final var statement = connection.prepareStatement(sql)) {
101 statement.setLong(1, recipientId.id());
102 statement.setInt(2, deviceId);
103 statement.setLong(3, timestamp);
104 try (var result = Utils.executeQueryForStream(statement, resultSet -> {
105 final var groupId = Optional.ofNullable(resultSet.getBytes("group_id"))
106 .map(GroupId::unknownVersion);
107 final SignalServiceProtos.Content content;
108 try {
109 content = SignalServiceProtos.Content.parseFrom(resultSet.getBinaryStream("content"));
110 } catch (IOException e) {
111 logger.warn("Failed to parse content from message send log", e);
112 return null;
113 }
114 final var contentHint = ContentHint.fromType(resultSet.getInt("content_hint"));
115 final var urgent = true; // TODO
116 return new MessageSendLogEntry(groupId, content, contentHint, urgent);
117 })) {
118 return result.filter(Objects::nonNull)
119 .filter(e -> !isSenderKey || e.groupId().isPresent())
120 .toList();
121 }
122 }
123 } catch (SQLException e) {
124 logger.warn("Failed read from message send log", e);
125 return List.of();
126 }
127 }
128
129 public long insertIfPossible(
130 long sentTimestamp, SendMessageResult sendMessageResult, ContentHint contentHint, boolean urgent
131 ) {
132 final RecipientDevices recipientDevice = getRecipientDevices(sendMessageResult);
133 if (recipientDevice == null) {
134 return -1;
135 }
136
137 return insert(List.of(recipientDevice),
138 sentTimestamp,
139 sendMessageResult.getSuccess().getContent().get(),
140 contentHint,
141 urgent);
142 }
143
144 public long insertIfPossible(
145 long sentTimestamp, List<SendMessageResult> sendMessageResults, ContentHint contentHint, boolean urgent
146 ) {
147 final var recipientDevices = sendMessageResults.stream()
148 .map(this::getRecipientDevices)
149 .filter(Objects::nonNull)
150 .toList();
151 if (recipientDevices.isEmpty()) {
152 return -1;
153 }
154
155 final var content = sendMessageResults.stream()
156 .filter(r -> r.isSuccess() && r.getSuccess().getContent().isPresent())
157 .map(r -> r.getSuccess().getContent().get())
158 .findFirst()
159 .get();
160
161 return insert(recipientDevices, sentTimestamp, content, contentHint, urgent);
162 }
163
164 public void addRecipientToExistingEntryIfPossible(final long contentId, final SendMessageResult sendMessageResult) {
165 final RecipientDevices recipientDevice = getRecipientDevices(sendMessageResult);
166 if (recipientDevice == null) {
167 return;
168 }
169
170 insertRecipientsForExistingContent(contentId, List.of(recipientDevice));
171 }
172
173 public void addRecipientToExistingEntryIfPossible(
174 final long contentId, final List<SendMessageResult> sendMessageResults
175 ) {
176 final var recipientDevices = sendMessageResults.stream()
177 .map(this::getRecipientDevices)
178 .filter(Objects::nonNull)
179 .toList();
180 if (recipientDevices.isEmpty()) {
181 return;
182 }
183
184 insertRecipientsForExistingContent(contentId, recipientDevices);
185 }
186
187 public void deleteEntryForGroup(long sentTimestamp, GroupId groupId) {
188 final var sql = """
189 DELETE FROM %s AS lc
190 WHERE lc.timestamp = ? AND lc.group_id = ?
191 """.formatted(TABLE_MESSAGE_SEND_LOG_CONTENT);
192 try (final var connection = database.getConnection()) {
193 try (final var statement = connection.prepareStatement(sql)) {
194 statement.setLong(1, sentTimestamp);
195 statement.setBytes(2, groupId.serialize());
196 statement.executeUpdate();
197 }
198 } catch (SQLException e) {
199 logger.warn("Failed delete from message send log", e);
200 }
201 }
202
203 public void deleteEntryForRecipientNonGroup(long sentTimestamp, RecipientId recipientId) {
204 final var sql = """
205 DELETE FROM %s AS lc
206 WHERE lc.timestamp = ? AND lc.group_id IS NULL AND lc._id IN (SELECT content_id FROM %s l WHERE l.recipient_id = ?)
207 """.formatted(TABLE_MESSAGE_SEND_LOG_CONTENT, TABLE_MESSAGE_SEND_LOG);
208 try (final var connection = database.getConnection()) {
209 connection.setAutoCommit(false);
210 try (final var statement = connection.prepareStatement(sql)) {
211 statement.setLong(1, sentTimestamp);
212 statement.setLong(2, recipientId.id());
213 statement.executeUpdate();
214 }
215
216 deleteOrphanedLogContents(connection);
217 connection.commit();
218 } catch (SQLException e) {
219 logger.warn("Failed delete from message send log", e);
220 }
221 }
222
223 public void deleteEntryForRecipient(long sentTimestamp, RecipientId recipientId, int deviceId) {
224 deleteEntriesForRecipient(List.of(sentTimestamp), recipientId, deviceId);
225 }
226
227 public void deleteEntriesForRecipient(List<Long> sentTimestamps, RecipientId recipientId, int deviceId) {
228 final var sql = """
229 DELETE FROM %s AS l
230 WHERE l.content_id IN (SELECT _id FROM %s lc WHERE lc.timestamp = ?) AND l.recipient_id = ? AND l.device_id = ?
231 """.formatted(TABLE_MESSAGE_SEND_LOG, TABLE_MESSAGE_SEND_LOG_CONTENT);
232 try (final var connection = database.getConnection()) {
233 connection.setAutoCommit(false);
234 try (final var statement = connection.prepareStatement(sql)) {
235 for (final var sentTimestamp : sentTimestamps) {
236 statement.setLong(1, sentTimestamp);
237 statement.setLong(2, recipientId.id());
238 statement.setInt(3, deviceId);
239 statement.executeUpdate();
240 }
241 }
242
243 deleteOrphanedLogContents(connection);
244 connection.commit();
245 } catch (SQLException e) {
246 logger.warn("Failed delete from message send log", e);
247 }
248 }
249
250 @Override
251 public void close() {
252 cleanupThread.interrupt();
253 try {
254 cleanupThread.join();
255 } catch (InterruptedException ignored) {
256 }
257 }
258
259 private RecipientDevices getRecipientDevices(final SendMessageResult sendMessageResult) {
260 if (sendMessageResult.isSuccess() && sendMessageResult.getSuccess().getContent().isPresent()) {
261 final var recipientId = recipientResolver.resolveRecipient(sendMessageResult.getAddress());
262 return new RecipientDevices(recipientId, sendMessageResult.getSuccess().getDevices());
263 } else {
264 return null;
265 }
266 }
267
268 private long insert(
269 final List<RecipientDevices> recipientDevices,
270 final long sentTimestamp,
271 final SignalServiceProtos.Content content,
272 final ContentHint contentHint,
273 final boolean urgent
274 ) {
275 byte[] groupId = getGroupId(content);
276
277 // TODO store urgent
278 final var sql = """
279 INSERT INTO %s (timestamp, group_id, content, content_hint)
280 VALUES (?,?,?,?)
281 """.formatted(TABLE_MESSAGE_SEND_LOG_CONTENT);
282 try (final var connection = database.getConnection()) {
283 connection.setAutoCommit(false);
284 final long contentId;
285 try (final var statement = connection.prepareStatement(sql)) {
286 statement.setLong(1, sentTimestamp);
287 statement.setBytes(2, groupId);
288 statement.setBytes(3, content.toByteArray());
289 statement.setInt(4, contentHint.getType());
290 statement.executeUpdate();
291 final var generatedKeys = statement.getGeneratedKeys();
292 if (generatedKeys.next()) {
293 contentId = generatedKeys.getLong(1);
294 } else {
295 contentId = -1;
296 }
297 }
298 if (contentId == -1) {
299 logger.warn("Failed to insert message send log content");
300 return -1;
301 }
302 insertRecipientsForExistingContent(contentId, recipientDevices, connection);
303
304 connection.commit();
305 return contentId;
306 } catch (SQLException e) {
307 logger.warn("Failed to insert into message send log", e);
308 return -1;
309 }
310 }
311
312 private byte[] getGroupId(final SignalServiceProtos.Content content) {
313 try {
314 return !content.hasDataMessage()
315 ? null
316 : content.getDataMessage().hasGroup()
317 ? content.getDataMessage().getGroup().getId().toByteArray()
318 : content.getDataMessage().hasGroupV2()
319 ? GroupUtils.getGroupIdV2(new GroupMasterKey(content.getDataMessage()
320 .getGroupV2()
321 .getMasterKey()
322 .toByteArray())).serialize()
323 : null;
324 } catch (InvalidInputException e) {
325 logger.warn("Failed to parse groupId id from content");
326 return null;
327 }
328 }
329
330 private void insertRecipientsForExistingContent(
331 final long contentId, final List<RecipientDevices> recipientDevices
332 ) {
333 try (final var connection = database.getConnection()) {
334 connection.setAutoCommit(false);
335 insertRecipientsForExistingContent(contentId, recipientDevices, connection);
336 connection.commit();
337 } catch (SQLException e) {
338 logger.warn("Failed to append recipients to message send log", e);
339 }
340 }
341
342 private void insertRecipientsForExistingContent(
343 final long contentId, final List<RecipientDevices> recipientDevices, final Connection connection
344 ) throws SQLException {
345 final var sql = """
346 INSERT INTO %s (recipient_id, device_id, content_id)
347 VALUES (?,?,?)
348 """.formatted(TABLE_MESSAGE_SEND_LOG);
349 try (final var statement = connection.prepareStatement(sql)) {
350 for (final var recipientDevice : recipientDevices) {
351 for (final var deviceId : recipientDevice.deviceIds()) {
352 statement.setLong(1, recipientDevice.recipientId().id());
353 statement.setInt(2, deviceId);
354 statement.setLong(3, contentId);
355 statement.executeUpdate();
356 }
357 }
358 }
359 }
360
361 private void deleteOutdatedEntries(final Connection connection) throws SQLException {
362 final var sql = """
363 DELETE FROM %s
364 WHERE timestamp < ?
365 """.formatted(TABLE_MESSAGE_SEND_LOG_CONTENT);
366 try (final var statement = connection.prepareStatement(sql)) {
367 statement.setLong(1, System.currentTimeMillis() - LOG_DURATION.toMillis());
368 final var rowCount = statement.executeUpdate();
369 if (rowCount > 0) {
370 logger.debug("Removed {} outdated entries from the message send log", rowCount);
371 } else {
372 logger.trace("No outdated entries to be removed from message send log.");
373 }
374 }
375 }
376
377 private void deleteOrphanedLogContents(final Connection connection) throws SQLException {
378 final var sql = """
379 DELETE FROM %s
380 WHERE _id NOT IN (SELECT content_id FROM %s)
381 """.formatted(TABLE_MESSAGE_SEND_LOG_CONTENT, TABLE_MESSAGE_SEND_LOG);
382 try (final var statement = connection.prepareStatement(sql)) {
383 statement.executeUpdate();
384 }
385 }
386
387 private record RecipientDevices(RecipientId recipientId, List<Integer> deviceIds) {}
388 }