]> nmode's Git Repositories - signal-cli/blob - lib/src/main/java/org/asamk/signal/manager/storage/sendLog/MessageSendLogStore.java
08b23933703528cac6e834601755724590072c9d
[signal-cli] / lib / src / main / java / org / asamk / signal / manager / storage / sendLog / MessageSendLogStore.java
1 package org.asamk.signal.manager.storage.sendLog;
2
3 import org.asamk.signal.manager.api.GroupId;
4 import org.asamk.signal.manager.groups.GroupUtils;
5 import org.asamk.signal.manager.storage.Database;
6 import org.asamk.signal.manager.storage.Utils;
7 import org.signal.libsignal.zkgroup.InvalidInputException;
8 import org.signal.libsignal.zkgroup.groups.GroupMasterKey;
9 import org.slf4j.Logger;
10 import org.slf4j.LoggerFactory;
11 import org.whispersystems.signalservice.api.crypto.ContentHint;
12 import org.whispersystems.signalservice.api.messages.SendMessageResult;
13 import org.whispersystems.signalservice.api.push.ServiceId;
14 import org.whispersystems.signalservice.internal.push.SignalServiceProtos;
15
16 import java.io.IOException;
17 import java.sql.Connection;
18 import java.sql.ResultSet;
19 import java.sql.SQLException;
20 import java.time.Duration;
21 import java.util.List;
22 import java.util.Objects;
23 import java.util.Optional;
24
25 public class MessageSendLogStore implements AutoCloseable {
26
27 private static final Logger logger = LoggerFactory.getLogger(MessageSendLogStore.class);
28
29 private static final String TABLE_MESSAGE_SEND_LOG = "message_send_log";
30 private static final String TABLE_MESSAGE_SEND_LOG_CONTENT = "message_send_log_content";
31
32 private static final Duration LOG_DURATION = Duration.ofDays(1);
33
34 private final Database database;
35 private final Thread cleanupThread;
36 private final boolean sendLogDisabled;
37
38 public MessageSendLogStore(final Database database, final boolean disableMessageSendLog) {
39 this.database = database;
40 this.sendLogDisabled = disableMessageSendLog;
41 this.cleanupThread = new Thread(() -> {
42 try {
43 final var interval = Duration.ofHours(1).toMillis();
44 while (!Thread.interrupted()) {
45 try (final var connection = database.getConnection()) {
46 deleteOutdatedEntries(connection);
47 } catch (SQLException e) {
48 logger.debug("MSL", e);
49 logger.warn("Deleting outdated entries failed");
50 break;
51 }
52 Thread.sleep(interval);
53 }
54 } catch (InterruptedException e) {
55 logger.debug("Stopping msl cleanup thread");
56 }
57 });
58 cleanupThread.setName("msl-cleanup");
59 cleanupThread.setDaemon(true);
60 cleanupThread.start();
61 }
62
63 public static void createSql(Connection connection) throws SQLException {
64 // When modifying the CREATE statement here, also add a migration in AccountDatabase.java
65 try (final var statement = connection.createStatement()) {
66 statement.executeUpdate("""
67 CREATE TABLE message_send_log (
68 _id INTEGER PRIMARY KEY,
69 content_id INTEGER NOT NULL REFERENCES message_send_log_content (_id) ON DELETE CASCADE,
70 address TEXT NOT NULL,
71 device_id INTEGER NOT NULL
72 ) STRICT;
73 CREATE TABLE message_send_log_content (
74 _id INTEGER PRIMARY KEY,
75 group_id BLOB,
76 timestamp INTEGER NOT NULL,
77 content BLOB NOT NULL,
78 content_hint INTEGER NOT NULL,
79 urgent INTEGER NOT NULL
80 ) STRICT;
81 CREATE INDEX mslc_timestamp_index ON message_send_log_content (timestamp);
82 CREATE INDEX msl_recipient_index ON message_send_log (address, device_id, content_id);
83 CREATE INDEX msl_content_index ON message_send_log (content_id);
84 """);
85 }
86 }
87
88 public List<MessageSendLogEntry> findMessages(
89 final ServiceId serviceId, final int deviceId, final long timestamp, final boolean isSenderKey
90 ) {
91 final var sql = """
92 SELECT group_id, content, content_hint, urgent
93 FROM %s l
94 INNER JOIN %s lc ON l.content_id = lc._id
95 WHERE l.address = ? AND l.device_id = ? AND lc.timestamp = ?
96 """.formatted(TABLE_MESSAGE_SEND_LOG, TABLE_MESSAGE_SEND_LOG_CONTENT);
97 try (final var connection = database.getConnection()) {
98 deleteOutdatedEntries(connection);
99
100 try (final var statement = connection.prepareStatement(sql)) {
101 statement.setString(1, serviceId.toString());
102 statement.setInt(2, deviceId);
103 statement.setLong(3, timestamp);
104 try (var result = Utils.executeQueryForStream(statement, this::getMessageSendLogEntryFromResultSet)) {
105 return result.filter(Objects::nonNull)
106 .filter(e -> !isSenderKey || e.groupId().isPresent())
107 .toList();
108 }
109 }
110 } catch (SQLException e) {
111 logger.warn("Failed read from message send log", e);
112 return List.of();
113 }
114 }
115
116 public long insertIfPossible(
117 long sentTimestamp, SendMessageResult sendMessageResult, ContentHint contentHint, boolean urgent
118 ) {
119 if (sendLogDisabled) {
120 return -1;
121 }
122 final RecipientDevices recipientDevice = getRecipientDevices(sendMessageResult);
123 if (recipientDevice == null) {
124 return -1;
125 }
126
127 return insert(List.of(recipientDevice),
128 sentTimestamp,
129 sendMessageResult.getSuccess().getContent().get(),
130 contentHint,
131 urgent);
132 }
133
134 public long insertIfPossible(
135 long sentTimestamp, List<SendMessageResult> sendMessageResults, ContentHint contentHint, boolean urgent
136 ) {
137 if (sendLogDisabled) {
138 return -1;
139 }
140 final var recipientDevices = sendMessageResults.stream()
141 .map(this::getRecipientDevices)
142 .filter(Objects::nonNull)
143 .toList();
144 if (recipientDevices.isEmpty()) {
145 return -1;
146 }
147
148 final var content = sendMessageResults.stream()
149 .filter(r -> r.isSuccess() && r.getSuccess().getContent().isPresent())
150 .map(r -> r.getSuccess().getContent().get())
151 .findFirst()
152 .get();
153
154 return insert(recipientDevices, sentTimestamp, content, contentHint, urgent);
155 }
156
157 public void addRecipientToExistingEntryIfPossible(final long contentId, final SendMessageResult sendMessageResult) {
158 if (sendLogDisabled) {
159 return;
160 }
161 final RecipientDevices recipientDevice = getRecipientDevices(sendMessageResult);
162 if (recipientDevice == null) {
163 return;
164 }
165
166 insertRecipientsForExistingContent(contentId, List.of(recipientDevice));
167 }
168
169 public void addRecipientToExistingEntryIfPossible(
170 final long contentId, final List<SendMessageResult> sendMessageResults
171 ) {
172 if (sendLogDisabled) {
173 return;
174 }
175 final var recipientDevices = sendMessageResults.stream()
176 .map(this::getRecipientDevices)
177 .filter(Objects::nonNull)
178 .toList();
179 if (recipientDevices.isEmpty()) {
180 return;
181 }
182
183 insertRecipientsForExistingContent(contentId, recipientDevices);
184 }
185
186 public void deleteEntryForGroup(long sentTimestamp, GroupId groupId) {
187 final var sql = """
188 DELETE FROM %s AS lc
189 WHERE lc.timestamp = ? AND lc.group_id = ?
190 """.formatted(TABLE_MESSAGE_SEND_LOG_CONTENT);
191 try (final var connection = database.getConnection()) {
192 try (final var statement = connection.prepareStatement(sql)) {
193 statement.setLong(1, sentTimestamp);
194 statement.setBytes(2, groupId.serialize());
195 statement.executeUpdate();
196 }
197 } catch (SQLException e) {
198 logger.warn("Failed delete from message send log", e);
199 }
200 }
201
202 public void deleteEntryForRecipientNonGroup(long sentTimestamp, ServiceId serviceId) {
203 final var sql = """
204 DELETE FROM %s AS lc
205 WHERE lc.timestamp = ? AND lc.group_id IS NULL AND lc._id IN (SELECT content_id FROM %s l WHERE l.address = ?)
206 """.formatted(TABLE_MESSAGE_SEND_LOG_CONTENT, TABLE_MESSAGE_SEND_LOG);
207 try (final var connection = database.getConnection()) {
208 connection.setAutoCommit(false);
209 try (final var statement = connection.prepareStatement(sql)) {
210 statement.setLong(1, sentTimestamp);
211 statement.setString(2, serviceId.toString());
212 statement.executeUpdate();
213 }
214
215 deleteOrphanedLogContents(connection);
216 connection.commit();
217 } catch (SQLException e) {
218 logger.warn("Failed delete from message send log", e);
219 }
220 }
221
222 public void deleteEntryForRecipient(long sentTimestamp, ServiceId serviceId, int deviceId) {
223 deleteEntriesForRecipient(List.of(sentTimestamp), serviceId, deviceId);
224 }
225
226 public void deleteEntriesForRecipient(List<Long> sentTimestamps, ServiceId serviceId, int deviceId) {
227 final var sql = """
228 DELETE FROM %s AS l
229 WHERE l.content_id IN (SELECT _id FROM %s lc WHERE lc.timestamp = ?) AND l.address = ? AND l.device_id = ?
230 """.formatted(TABLE_MESSAGE_SEND_LOG, TABLE_MESSAGE_SEND_LOG_CONTENT);
231 try (final var connection = database.getConnection()) {
232 connection.setAutoCommit(false);
233 try (final var statement = connection.prepareStatement(sql)) {
234 for (final var sentTimestamp : sentTimestamps) {
235 statement.setLong(1, sentTimestamp);
236 statement.setString(2, serviceId.toString());
237 statement.setInt(3, deviceId);
238 statement.executeUpdate();
239 }
240 }
241
242 deleteOrphanedLogContents(connection);
243 connection.commit();
244 } catch (SQLException e) {
245 logger.warn("Failed delete from message send log", e);
246 }
247 }
248
249 @Override
250 public void close() {
251 cleanupThread.interrupt();
252 try {
253 cleanupThread.join();
254 } catch (InterruptedException ignored) {
255 }
256 }
257
258 private RecipientDevices getRecipientDevices(final SendMessageResult sendMessageResult) {
259 if (sendMessageResult.isSuccess() && sendMessageResult.getSuccess().getContent().isPresent()) {
260 final var serviceId = sendMessageResult.getAddress().getServiceId();
261 return new RecipientDevices(serviceId, sendMessageResult.getSuccess().getDevices());
262 } else {
263 return null;
264 }
265 }
266
267 private long insert(
268 final List<RecipientDevices> recipientDevices,
269 final long sentTimestamp,
270 final SignalServiceProtos.Content content,
271 final ContentHint contentHint,
272 final boolean urgent
273 ) {
274 byte[] groupId = getGroupId(content);
275
276 final var sql = """
277 INSERT INTO %s (timestamp, group_id, content, content_hint, urgent)
278 VALUES (?,?,?,?,?)
279 RETURNING _id
280 """.formatted(TABLE_MESSAGE_SEND_LOG_CONTENT);
281 try (final var connection = database.getConnection()) {
282 connection.setAutoCommit(false);
283 final long contentId;
284 try (final var statement = connection.prepareStatement(sql)) {
285 statement.setLong(1, sentTimestamp);
286 statement.setBytes(2, groupId);
287 statement.setBytes(3, content.toByteArray());
288 statement.setInt(4, contentHint.getType());
289 statement.setBoolean(5, urgent);
290 final var generatedKey = Utils.executeQueryForOptional(statement, Utils::getIdMapper);
291 if (generatedKey.isPresent()) {
292 contentId = generatedKey.get();
293 } else {
294 contentId = -1;
295 }
296 }
297 if (contentId == -1) {
298 logger.warn("Failed to insert message send log content");
299 return -1;
300 }
301 insertRecipientsForExistingContent(contentId, recipientDevices, connection);
302
303 connection.commit();
304 return contentId;
305 } catch (SQLException e) {
306 logger.warn("Failed to insert into message send log", e);
307 return -1;
308 }
309 }
310
311 private byte[] getGroupId(final SignalServiceProtos.Content content) {
312 try {
313 return !content.hasDataMessage()
314 ? null
315 : content.getDataMessage().hasGroup()
316 ? content.getDataMessage().getGroup().getId().toByteArray()
317 : content.getDataMessage().hasGroupV2()
318 ? GroupUtils.getGroupIdV2(new GroupMasterKey(content.getDataMessage()
319 .getGroupV2()
320 .getMasterKey()
321 .toByteArray())).serialize()
322 : null;
323 } catch (InvalidInputException e) {
324 logger.warn("Failed to parse groupId id from content");
325 return null;
326 }
327 }
328
329 private void insertRecipientsForExistingContent(
330 final long contentId, final List<RecipientDevices> recipientDevices
331 ) {
332 try (final var connection = database.getConnection()) {
333 connection.setAutoCommit(false);
334 insertRecipientsForExistingContent(contentId, recipientDevices, connection);
335 connection.commit();
336 } catch (SQLException e) {
337 logger.warn("Failed to append recipients to message send log", e);
338 }
339 }
340
341 private void insertRecipientsForExistingContent(
342 final long contentId, final List<RecipientDevices> recipientDevices, final Connection connection
343 ) throws SQLException {
344 final var sql = """
345 INSERT INTO %s (address, device_id, content_id)
346 VALUES (?,?,?)
347 """.formatted(TABLE_MESSAGE_SEND_LOG);
348 try (final var statement = connection.prepareStatement(sql)) {
349 for (final var recipientDevice : recipientDevices) {
350 for (final var deviceId : recipientDevice.deviceIds()) {
351 statement.setString(1, recipientDevice.serviceId().toString());
352 statement.setInt(2, deviceId);
353 statement.setLong(3, contentId);
354 statement.executeUpdate();
355 }
356 }
357 }
358 }
359
360 private void deleteOutdatedEntries(final Connection connection) throws SQLException {
361 final var sql = """
362 DELETE FROM %s
363 WHERE timestamp < ?
364 """.formatted(TABLE_MESSAGE_SEND_LOG_CONTENT);
365 try (final var statement = connection.prepareStatement(sql)) {
366 statement.setLong(1, System.currentTimeMillis() - LOG_DURATION.toMillis());
367 final var rowCount = statement.executeUpdate();
368 if (rowCount > 0) {
369 logger.debug("Removed {} outdated entries from the message send log", rowCount);
370 } else {
371 logger.trace("No outdated entries to be removed from message send log.");
372 }
373 }
374 }
375
376 private void deleteOrphanedLogContents(final Connection connection) throws SQLException {
377 final var sql = """
378 DELETE FROM %s
379 WHERE _id NOT IN (SELECT content_id FROM %s)
380 """.formatted(TABLE_MESSAGE_SEND_LOG_CONTENT, TABLE_MESSAGE_SEND_LOG);
381 try (final var statement = connection.prepareStatement(sql)) {
382 statement.executeUpdate();
383 }
384 }
385
386 private MessageSendLogEntry getMessageSendLogEntryFromResultSet(ResultSet resultSet) throws SQLException {
387 final var groupId = Optional.ofNullable(resultSet.getBytes("group_id")).map(GroupId::unknownVersion);
388 final SignalServiceProtos.Content content;
389 try {
390 content = SignalServiceProtos.Content.parseFrom(resultSet.getBinaryStream("content"));
391 } catch (IOException e) {
392 logger.warn("Failed to parse content from message send log", e);
393 return null;
394 }
395 final var contentHint = ContentHint.fromType(resultSet.getInt("content_hint"));
396 final var urgent = resultSet.getBoolean("urgent");
397 return new MessageSendLogEntry(groupId, content, contentHint, urgent);
398 }
399
400 private record RecipientDevices(ServiceId serviceId, List<Integer> deviceIds) {}
401 }