]> nmode's Git Repositories - signal-cli/blob - lib/src/main/java/org/asamk/signal/manager/storage/sendLog/MessageSendLogStore.java
d0034d1b67b04e83e1b0836e1c8b4d8994e8fe4d
[signal-cli] / lib / src / main / java / org / asamk / signal / manager / storage / sendLog / MessageSendLogStore.java
1 package org.asamk.signal.manager.storage.sendLog;
2
3 import org.asamk.signal.manager.groups.GroupId;
4 import org.asamk.signal.manager.groups.GroupUtils;
5 import org.asamk.signal.manager.storage.Database;
6 import org.asamk.signal.manager.storage.Utils;
7 import org.signal.libsignal.zkgroup.InvalidInputException;
8 import org.signal.libsignal.zkgroup.groups.GroupMasterKey;
9 import org.slf4j.Logger;
10 import org.slf4j.LoggerFactory;
11 import org.whispersystems.signalservice.api.crypto.ContentHint;
12 import org.whispersystems.signalservice.api.messages.SendMessageResult;
13 import org.whispersystems.signalservice.api.push.ServiceId;
14 import org.whispersystems.signalservice.internal.push.SignalServiceProtos;
15
16 import java.io.IOException;
17 import java.sql.Connection;
18 import java.sql.ResultSet;
19 import java.sql.SQLException;
20 import java.time.Duration;
21 import java.util.List;
22 import java.util.Objects;
23 import java.util.Optional;
24
25 public class MessageSendLogStore implements AutoCloseable {
26
27 private static final Logger logger = LoggerFactory.getLogger(MessageSendLogStore.class);
28
29 private static final String TABLE_MESSAGE_SEND_LOG = "message_send_log";
30 private static final String TABLE_MESSAGE_SEND_LOG_CONTENT = "message_send_log_content";
31
32 private static final Duration LOG_DURATION = Duration.ofDays(1);
33
34 private final Database database;
35 private final Thread cleanupThread;
36
37 public MessageSendLogStore(final Database database) {
38 this.database = database;
39 this.cleanupThread = new Thread(() -> {
40 try {
41 final var interval = Duration.ofHours(1).toMillis();
42 while (!Thread.interrupted()) {
43 try (final var connection = database.getConnection()) {
44 deleteOutdatedEntries(connection);
45 } catch (SQLException e) {
46 logger.warn("Deleting outdated entries failed");
47 break;
48 }
49 Thread.sleep(interval);
50 }
51 } catch (InterruptedException e) {
52 logger.debug("Stopping msl cleanup thread");
53 }
54 });
55 cleanupThread.setName("msl-cleanup");
56 cleanupThread.setDaemon(true);
57 cleanupThread.start();
58 }
59
60 public static void createSql(Connection connection) throws SQLException {
61 // When modifying the CREATE statement here, also add a migration in AccountDatabase.java
62 try (final var statement = connection.createStatement()) {
63 statement.executeUpdate("""
64 CREATE TABLE message_send_log (
65 _id INTEGER PRIMARY KEY,
66 content_id INTEGER NOT NULL REFERENCES message_send_log_content (_id) ON DELETE CASCADE,
67 uuid BLOB NOT NULL,
68 device_id INTEGER NOT NULL
69 ) STRICT;
70 CREATE TABLE message_send_log_content (
71 _id INTEGER PRIMARY KEY,
72 group_id BLOB,
73 timestamp INTEGER NOT NULL,
74 content BLOB NOT NULL,
75 content_hint INTEGER NOT NULL,
76 urgent INTEGER NOT NULL
77 ) STRICT;
78 CREATE INDEX mslc_timestamp_index ON message_send_log_content (timestamp);
79 CREATE INDEX msl_recipient_index ON message_send_log (uuid, device_id, content_id);
80 CREATE INDEX msl_content_index ON message_send_log (content_id);
81 """);
82 }
83 }
84
85 public List<MessageSendLogEntry> findMessages(
86 final ServiceId serviceId, final int deviceId, final long timestamp, final boolean isSenderKey
87 ) {
88 final var sql = """
89 SELECT group_id, content, content_hint
90 FROM %s l
91 INNER JOIN %s lc ON l.content_id = lc._id
92 WHERE l.uuid = ? AND l.device_id = ? AND lc.timestamp = ?
93 """.formatted(TABLE_MESSAGE_SEND_LOG, TABLE_MESSAGE_SEND_LOG_CONTENT);
94 try (final var connection = database.getConnection()) {
95 deleteOutdatedEntries(connection);
96
97 try (final var statement = connection.prepareStatement(sql)) {
98 statement.setBytes(1, serviceId.toByteArray());
99 statement.setInt(2, deviceId);
100 statement.setLong(3, timestamp);
101 try (var result = Utils.executeQueryForStream(statement, this::getMessageSendLogEntryFromResultSet)) {
102 return result.filter(Objects::nonNull)
103 .filter(e -> !isSenderKey || e.groupId().isPresent())
104 .toList();
105 }
106 }
107 } catch (SQLException e) {
108 logger.warn("Failed read from message send log", e);
109 return List.of();
110 }
111 }
112
113 public long insertIfPossible(
114 long sentTimestamp, SendMessageResult sendMessageResult, ContentHint contentHint, boolean urgent
115 ) {
116 final RecipientDevices recipientDevice = getRecipientDevices(sendMessageResult);
117 if (recipientDevice == null) {
118 return -1;
119 }
120
121 return insert(List.of(recipientDevice),
122 sentTimestamp,
123 sendMessageResult.getSuccess().getContent().get(),
124 contentHint,
125 urgent);
126 }
127
128 public long insertIfPossible(
129 long sentTimestamp, List<SendMessageResult> sendMessageResults, ContentHint contentHint, boolean urgent
130 ) {
131 final var recipientDevices = sendMessageResults.stream()
132 .map(this::getRecipientDevices)
133 .filter(Objects::nonNull)
134 .toList();
135 if (recipientDevices.isEmpty()) {
136 return -1;
137 }
138
139 final var content = sendMessageResults.stream()
140 .filter(r -> r.isSuccess() && r.getSuccess().getContent().isPresent())
141 .map(r -> r.getSuccess().getContent().get())
142 .findFirst()
143 .get();
144
145 return insert(recipientDevices, sentTimestamp, content, contentHint, urgent);
146 }
147
148 public void addRecipientToExistingEntryIfPossible(final long contentId, final SendMessageResult sendMessageResult) {
149 final RecipientDevices recipientDevice = getRecipientDevices(sendMessageResult);
150 if (recipientDevice == null) {
151 return;
152 }
153
154 insertRecipientsForExistingContent(contentId, List.of(recipientDevice));
155 }
156
157 public void addRecipientToExistingEntryIfPossible(
158 final long contentId, final List<SendMessageResult> sendMessageResults
159 ) {
160 final var recipientDevices = sendMessageResults.stream()
161 .map(this::getRecipientDevices)
162 .filter(Objects::nonNull)
163 .toList();
164 if (recipientDevices.isEmpty()) {
165 return;
166 }
167
168 insertRecipientsForExistingContent(contentId, recipientDevices);
169 }
170
171 public void deleteEntryForGroup(long sentTimestamp, GroupId groupId) {
172 final var sql = """
173 DELETE FROM %s AS lc
174 WHERE lc.timestamp = ? AND lc.group_id = ?
175 """.formatted(TABLE_MESSAGE_SEND_LOG_CONTENT);
176 try (final var connection = database.getConnection()) {
177 try (final var statement = connection.prepareStatement(sql)) {
178 statement.setLong(1, sentTimestamp);
179 statement.setBytes(2, groupId.serialize());
180 statement.executeUpdate();
181 }
182 } catch (SQLException e) {
183 logger.warn("Failed delete from message send log", e);
184 }
185 }
186
187 public void deleteEntryForRecipientNonGroup(long sentTimestamp, ServiceId serviceId) {
188 final var sql = """
189 DELETE FROM %s AS lc
190 WHERE lc.timestamp = ? AND lc.group_id IS NULL AND lc._id IN (SELECT content_id FROM %s l WHERE l.uuid = ?)
191 """.formatted(TABLE_MESSAGE_SEND_LOG_CONTENT, TABLE_MESSAGE_SEND_LOG);
192 try (final var connection = database.getConnection()) {
193 connection.setAutoCommit(false);
194 try (final var statement = connection.prepareStatement(sql)) {
195 statement.setLong(1, sentTimestamp);
196 statement.setBytes(2, serviceId.toByteArray());
197 statement.executeUpdate();
198 }
199
200 deleteOrphanedLogContents(connection);
201 connection.commit();
202 } catch (SQLException e) {
203 logger.warn("Failed delete from message send log", e);
204 }
205 }
206
207 public void deleteEntryForRecipient(long sentTimestamp, ServiceId serviceId, int deviceId) {
208 deleteEntriesForRecipient(List.of(sentTimestamp), serviceId, deviceId);
209 }
210
211 public void deleteEntriesForRecipient(List<Long> sentTimestamps, ServiceId serviceId, int deviceId) {
212 final var sql = """
213 DELETE FROM %s AS l
214 WHERE l.content_id IN (SELECT _id FROM %s lc WHERE lc.timestamp = ?) AND l.uuid = ? AND l.device_id = ?
215 """.formatted(TABLE_MESSAGE_SEND_LOG, TABLE_MESSAGE_SEND_LOG_CONTENT);
216 try (final var connection = database.getConnection()) {
217 connection.setAutoCommit(false);
218 try (final var statement = connection.prepareStatement(sql)) {
219 for (final var sentTimestamp : sentTimestamps) {
220 statement.setLong(1, sentTimestamp);
221 statement.setBytes(2, serviceId.toByteArray());
222 statement.setInt(3, deviceId);
223 statement.executeUpdate();
224 }
225 }
226
227 deleteOrphanedLogContents(connection);
228 connection.commit();
229 } catch (SQLException e) {
230 logger.warn("Failed delete from message send log", e);
231 }
232 }
233
234 @Override
235 public void close() {
236 cleanupThread.interrupt();
237 try {
238 cleanupThread.join();
239 } catch (InterruptedException ignored) {
240 }
241 }
242
243 private RecipientDevices getRecipientDevices(final SendMessageResult sendMessageResult) {
244 if (sendMessageResult.isSuccess() && sendMessageResult.getSuccess().getContent().isPresent()) {
245 final var serviceId = sendMessageResult.getAddress().getServiceId();
246 return new RecipientDevices(serviceId, sendMessageResult.getSuccess().getDevices());
247 } else {
248 return null;
249 }
250 }
251
252 private long insert(
253 final List<RecipientDevices> recipientDevices,
254 final long sentTimestamp,
255 final SignalServiceProtos.Content content,
256 final ContentHint contentHint,
257 final boolean urgent
258 ) {
259 byte[] groupId = getGroupId(content);
260
261 final var sql = """
262 INSERT INTO %s (timestamp, group_id, content, content_hint, urgent)
263 VALUES (?,?,?,?,?)
264 """.formatted(TABLE_MESSAGE_SEND_LOG_CONTENT);
265 try (final var connection = database.getConnection()) {
266 connection.setAutoCommit(false);
267 final long contentId;
268 try (final var statement = connection.prepareStatement(sql)) {
269 statement.setLong(1, sentTimestamp);
270 statement.setBytes(2, groupId);
271 statement.setBytes(3, content.toByteArray());
272 statement.setInt(4, contentHint.getType());
273 statement.setBoolean(5, urgent);
274 statement.executeUpdate();
275 final var generatedKeys = statement.getGeneratedKeys();
276 if (generatedKeys.next()) {
277 contentId = generatedKeys.getLong(1);
278 } else {
279 contentId = -1;
280 }
281 }
282 if (contentId == -1) {
283 logger.warn("Failed to insert message send log content");
284 return -1;
285 }
286 insertRecipientsForExistingContent(contentId, recipientDevices, connection);
287
288 connection.commit();
289 return contentId;
290 } catch (SQLException e) {
291 logger.warn("Failed to insert into message send log", e);
292 return -1;
293 }
294 }
295
296 private byte[] getGroupId(final SignalServiceProtos.Content content) {
297 try {
298 return !content.hasDataMessage()
299 ? null
300 : content.getDataMessage().hasGroup()
301 ? content.getDataMessage().getGroup().getId().toByteArray()
302 : content.getDataMessage().hasGroupV2()
303 ? GroupUtils.getGroupIdV2(new GroupMasterKey(content.getDataMessage()
304 .getGroupV2()
305 .getMasterKey()
306 .toByteArray())).serialize()
307 : null;
308 } catch (InvalidInputException e) {
309 logger.warn("Failed to parse groupId id from content");
310 return null;
311 }
312 }
313
314 private void insertRecipientsForExistingContent(
315 final long contentId, final List<RecipientDevices> recipientDevices
316 ) {
317 try (final var connection = database.getConnection()) {
318 connection.setAutoCommit(false);
319 insertRecipientsForExistingContent(contentId, recipientDevices, connection);
320 connection.commit();
321 } catch (SQLException e) {
322 logger.warn("Failed to append recipients to message send log", e);
323 }
324 }
325
326 private void insertRecipientsForExistingContent(
327 final long contentId, final List<RecipientDevices> recipientDevices, final Connection connection
328 ) throws SQLException {
329 final var sql = """
330 INSERT INTO %s (uuid, device_id, content_id)
331 VALUES (?,?,?)
332 """.formatted(TABLE_MESSAGE_SEND_LOG);
333 try (final var statement = connection.prepareStatement(sql)) {
334 for (final var recipientDevice : recipientDevices) {
335 for (final var deviceId : recipientDevice.deviceIds()) {
336 statement.setBytes(1, recipientDevice.serviceId().toByteArray());
337 statement.setInt(2, deviceId);
338 statement.setLong(3, contentId);
339 statement.executeUpdate();
340 }
341 }
342 }
343 }
344
345 private void deleteOutdatedEntries(final Connection connection) throws SQLException {
346 final var sql = """
347 DELETE FROM %s
348 WHERE timestamp < ?
349 """.formatted(TABLE_MESSAGE_SEND_LOG_CONTENT);
350 try (final var statement = connection.prepareStatement(sql)) {
351 statement.setLong(1, System.currentTimeMillis() - LOG_DURATION.toMillis());
352 final var rowCount = statement.executeUpdate();
353 if (rowCount > 0) {
354 logger.debug("Removed {} outdated entries from the message send log", rowCount);
355 } else {
356 logger.trace("No outdated entries to be removed from message send log.");
357 }
358 }
359 }
360
361 private void deleteOrphanedLogContents(final Connection connection) throws SQLException {
362 final var sql = """
363 DELETE FROM %s
364 WHERE _id NOT IN (SELECT content_id FROM %s)
365 """.formatted(TABLE_MESSAGE_SEND_LOG_CONTENT, TABLE_MESSAGE_SEND_LOG);
366 try (final var statement = connection.prepareStatement(sql)) {
367 statement.executeUpdate();
368 }
369 }
370
371 private MessageSendLogEntry getMessageSendLogEntryFromResultSet(ResultSet resultSet) throws SQLException {
372 final var groupId = Optional.ofNullable(resultSet.getBytes("group_id")).map(GroupId::unknownVersion);
373 final SignalServiceProtos.Content content;
374 try {
375 content = SignalServiceProtos.Content.parseFrom(resultSet.getBinaryStream("content"));
376 } catch (IOException e) {
377 logger.warn("Failed to parse content from message send log", e);
378 return null;
379 }
380 final var contentHint = ContentHint.fromType(resultSet.getInt("content_hint"));
381 final var urgent = resultSet.getBoolean("urgent");
382 return new MessageSendLogEntry(groupId, content, contentHint, urgent);
383 }
384
385 private record RecipientDevices(ServiceId serviceId, List<Integer> deviceIds) {}
386 }