]> nmode's Git Repositories - signal-cli/blob - lib/src/main/java/org/asamk/signal/manager/storage/sendLog/MessageSendLogStore.java
3b75de65499ba9305158d6f125c72f618018d163
[signal-cli] / lib / src / main / java / org / asamk / signal / manager / storage / sendLog / MessageSendLogStore.java
1 package org.asamk.signal.manager.storage.sendLog;
2
3 import org.asamk.signal.manager.groups.GroupId;
4 import org.asamk.signal.manager.groups.GroupUtils;
5 import org.asamk.signal.manager.storage.Database;
6 import org.asamk.signal.manager.storage.recipients.RecipientId;
7 import org.asamk.signal.manager.storage.recipients.RecipientResolver;
8 import org.signal.zkgroup.InvalidInputException;
9 import org.signal.zkgroup.groups.GroupMasterKey;
10 import org.slf4j.Logger;
11 import org.slf4j.LoggerFactory;
12 import org.whispersystems.signalservice.api.crypto.ContentHint;
13 import org.whispersystems.signalservice.api.messages.SendMessageResult;
14 import org.whispersystems.signalservice.internal.push.SignalServiceProtos;
15
16 import java.io.IOException;
17 import java.sql.Connection;
18 import java.sql.PreparedStatement;
19 import java.sql.ResultSet;
20 import java.sql.SQLException;
21 import java.time.Duration;
22 import java.util.List;
23 import java.util.Objects;
24 import java.util.Optional;
25 import java.util.Spliterator;
26 import java.util.Spliterators;
27 import java.util.function.Consumer;
28 import java.util.stream.Stream;
29 import java.util.stream.StreamSupport;
30
31 public class MessageSendLogStore implements AutoCloseable {
32
33 private static final Logger logger = LoggerFactory.getLogger(MessageSendLogStore.class);
34
35 private static final String TABLE_MESSAGE_SEND_LOG = "message_send_log";
36 private static final String TABLE_MESSAGE_SEND_LOG_CONTENT = "message_send_log_content";
37
38 private static final Duration LOG_DURATION = Duration.ofDays(1);
39
40 private final RecipientResolver recipientResolver;
41 private final Database database;
42 private final Thread cleanupThread;
43
44 public MessageSendLogStore(
45 final RecipientResolver recipientResolver, final Database database
46 ) {
47 this.recipientResolver = recipientResolver;
48 this.database = database;
49 this.cleanupThread = new Thread(() -> {
50 try {
51 final var interval = Duration.ofHours(1).toMillis();
52 while (!Thread.interrupted()) {
53 try (final var connection = database.getConnection()) {
54 deleteOutdatedEntries(connection);
55 } catch (SQLException e) {
56 logger.warn("Deleting outdated entries failed");
57 break;
58 }
59 Thread.sleep(interval);
60 }
61 } catch (InterruptedException e) {
62 logger.debug("Stopping msl cleanup thread");
63 }
64 });
65 cleanupThread.setName("msl-cleanup");
66 cleanupThread.setDaemon(true);
67 cleanupThread.start();
68 }
69
70 public static void createSql(Connection connection) throws SQLException {
71 try (final var statement = connection.createStatement()) {
72 statement.executeUpdate("""
73 CREATE TABLE message_send_log (
74 _id INTEGER PRIMARY KEY,
75 content_id INTEGER NOT NULL REFERENCES message_send_log_content (_id) ON DELETE CASCADE,
76 recipient_id INTEGER NOT NULL,
77 device_id INTEGER NOT NULL
78 );
79 CREATE TABLE message_send_log_content (
80 _id INTEGER PRIMARY KEY,
81 group_id BLOB,
82 timestamp INTEGER NOT NULL,
83 content BLOB NOT NULL,
84 content_hint INTEGER NOT NULL
85 );
86 CREATE INDEX mslc_timestamp_index ON message_send_log_content (timestamp);
87 CREATE INDEX msl_recipient_index ON message_send_log (recipient_id, device_id, content_id);
88 CREATE INDEX msl_content_index ON message_send_log (content_id);
89 """);
90 }
91 }
92
93 public List<MessageSendLogEntry> findMessages(
94 final RecipientId recipientId, final int deviceId, final long timestamp, final boolean isSenderKey
95 ) {
96 try (final var connection = database.getConnection()) {
97 deleteOutdatedEntries(connection);
98
99 try (final var statement = connection.prepareStatement(
100 "SELECT group_id, content, content_hint FROM %s l INNER JOIN %s lc ON l.content_id = lc._id WHERE l.recipient_id = ? AND l.device_id = ? AND lc.timestamp = ?".formatted(
101 TABLE_MESSAGE_SEND_LOG,
102 TABLE_MESSAGE_SEND_LOG_CONTENT))) {
103 statement.setLong(1, recipientId.id());
104 statement.setInt(2, deviceId);
105 statement.setLong(3, timestamp);
106 try (var result = executeQueryForStream(statement, resultSet -> {
107 final var groupId = Optional.ofNullable(resultSet.getBytes("group_id"))
108 .map(GroupId::unknownVersion);
109 final SignalServiceProtos.Content content;
110 try {
111 content = SignalServiceProtos.Content.parseFrom(resultSet.getBinaryStream("content"));
112 } catch (IOException e) {
113 logger.warn("Failed to parse content from message send log", e);
114 return null;
115 }
116 final var contentHint = ContentHint.fromType(resultSet.getInt("content_hint"));
117 return new MessageSendLogEntry(groupId, content, contentHint);
118 })) {
119 return result.filter(Objects::nonNull)
120 .filter(e -> !isSenderKey || e.groupId().isPresent())
121 .toList();
122 }
123 }
124 } catch (SQLException e) {
125 logger.warn("Failed read from message send log", e);
126 return List.of();
127 }
128 }
129
130 public long insertIfPossible(
131 long sentTimestamp, SendMessageResult sendMessageResult, ContentHint contentHint
132 ) {
133 final RecipientDevices recipientDevice = getRecipientDevices(sendMessageResult);
134 if (recipientDevice == null) {
135 return -1;
136 }
137
138 return insert(List.of(recipientDevice),
139 sentTimestamp,
140 sendMessageResult.getSuccess().getContent().get(),
141 contentHint);
142 }
143
144 public long insertIfPossible(
145 long sentTimestamp, List<SendMessageResult> sendMessageResults, ContentHint contentHint
146 ) {
147 final var recipientDevices = sendMessageResults.stream()
148 .map(this::getRecipientDevices)
149 .filter(Objects::nonNull)
150 .toList();
151 if (recipientDevices.isEmpty()) {
152 return -1;
153 }
154
155 final var content = sendMessageResults.stream()
156 .filter(r -> r.isSuccess() && r.getSuccess().getContent().isPresent())
157 .map(r -> r.getSuccess().getContent().get())
158 .findFirst()
159 .get();
160
161 return insert(recipientDevices, sentTimestamp, content, contentHint);
162 }
163
164 public void addRecipientToExistingEntryIfPossible(final long contentId, final SendMessageResult sendMessageResult) {
165 final RecipientDevices recipientDevice = getRecipientDevices(sendMessageResult);
166 if (recipientDevice == null) {
167 return;
168 }
169
170 insertRecipientsForExistingContent(contentId, List.of(recipientDevice));
171 }
172
173 public void addRecipientToExistingEntryIfPossible(
174 final long contentId, final List<SendMessageResult> sendMessageResults
175 ) {
176 final var recipientDevices = sendMessageResults.stream()
177 .map(this::getRecipientDevices)
178 .filter(Objects::nonNull)
179 .toList();
180 if (recipientDevices.isEmpty()) {
181 return;
182 }
183
184 insertRecipientsForExistingContent(contentId, recipientDevices);
185 }
186
187 public void deleteEntryForGroup(long sentTimestamp, GroupId groupId) {
188 try (final var connection = database.getConnection()) {
189 try (final var statement = connection.prepareStatement(
190 "DELETE FROM %s AS lc WHERE lc.timestamp = ? AND lc.group_id = ?".formatted(
191 TABLE_MESSAGE_SEND_LOG_CONTENT))) {
192 statement.setLong(1, sentTimestamp);
193 statement.setBytes(2, groupId.serialize());
194 statement.executeUpdate();
195 }
196 } catch (SQLException e) {
197 logger.warn("Failed delete from message send log", e);
198 }
199 }
200
201 public void deleteEntryForRecipientNonGroup(long sentTimestamp, RecipientId recipientId) {
202 try (final var connection = database.getConnection()) {
203 connection.setAutoCommit(false);
204 try (final var statement = connection.prepareStatement(
205 "DELETE FROM %s AS lc WHERE lc.timestamp = ? AND lc.group_id IS NULL AND lc._id IN (SELECT content_id FROM %s l WHERE l.recipient_id = ?)".formatted(
206 TABLE_MESSAGE_SEND_LOG_CONTENT,
207 TABLE_MESSAGE_SEND_LOG))) {
208 statement.setLong(1, sentTimestamp);
209 statement.setLong(2, recipientId.id());
210 statement.executeUpdate();
211 }
212
213 deleteOrphanedLogContents(connection);
214 connection.commit();
215 } catch (SQLException e) {
216 logger.warn("Failed delete from message send log", e);
217 }
218 }
219
220 public void deleteEntryForRecipient(long sentTimestamp, RecipientId recipientId, int deviceId) {
221 deleteEntriesForRecipient(List.of(sentTimestamp), recipientId, deviceId);
222 }
223
224 public void deleteEntriesForRecipient(List<Long> sentTimestamps, RecipientId recipientId, int deviceId) {
225 try (final var connection = database.getConnection()) {
226 connection.setAutoCommit(false);
227 try (final var statement = connection.prepareStatement(
228 "DELETE FROM %s AS l WHERE l.content_id IN (SELECT _id FROM %s lc WHERE lc.timestamp = ?) AND l.recipient_id = ? AND l.device_id = ?".formatted(
229 TABLE_MESSAGE_SEND_LOG,
230 TABLE_MESSAGE_SEND_LOG_CONTENT))) {
231 for (final var sentTimestamp : sentTimestamps) {
232 statement.setLong(1, sentTimestamp);
233 statement.setLong(2, recipientId.id());
234 statement.setInt(3, deviceId);
235 statement.executeUpdate();
236 }
237 }
238
239 deleteOrphanedLogContents(connection);
240 connection.commit();
241 } catch (SQLException e) {
242 logger.warn("Failed delete from message send log", e);
243 }
244 }
245
246 @Override
247 public void close() {
248 cleanupThread.interrupt();
249 try {
250 cleanupThread.join();
251 } catch (InterruptedException ignored) {
252 }
253 }
254
255 private RecipientDevices getRecipientDevices(final SendMessageResult sendMessageResult) {
256 if (sendMessageResult.isSuccess() && sendMessageResult.getSuccess().getContent().isPresent()) {
257 final var recipientId = recipientResolver.resolveRecipient(sendMessageResult.getAddress());
258 return new RecipientDevices(recipientId, sendMessageResult.getSuccess().getDevices());
259 } else {
260 return null;
261 }
262 }
263
264 private long insert(
265 final List<RecipientDevices> recipientDevices,
266 final long sentTimestamp,
267 final SignalServiceProtos.Content content,
268 final ContentHint contentHint
269 ) {
270 byte[] groupId = getGroupId(content);
271
272 try (final var connection = database.getConnection()) {
273 connection.setAutoCommit(false);
274 final long contentId;
275 try (final var statement = connection.prepareStatement(
276 "INSERT INTO %s (timestamp, group_id, content, content_hint) VALUES (?,?,?,?)".formatted(
277 TABLE_MESSAGE_SEND_LOG_CONTENT))) {
278 statement.setLong(1, sentTimestamp);
279 statement.setBytes(2, groupId);
280 statement.setBytes(3, content.toByteArray());
281 statement.setInt(4, contentHint.getType());
282 statement.executeUpdate();
283 final var generatedKeys = statement.getGeneratedKeys();
284 if (generatedKeys.next()) {
285 contentId = generatedKeys.getLong(1);
286 } else {
287 contentId = -1;
288 }
289 }
290 if (contentId == -1) {
291 logger.warn("Failed to insert message send log content");
292 return -1;
293 }
294 insertRecipientsForExistingContent(contentId, recipientDevices, connection);
295
296 connection.commit();
297 return contentId;
298 } catch (SQLException e) {
299 logger.warn("Failed to insert into message send log", e);
300 return -1;
301 }
302 }
303
304 private byte[] getGroupId(final SignalServiceProtos.Content content) {
305 try {
306 return !content.hasDataMessage()
307 ? null
308 : content.getDataMessage().hasGroup()
309 ? content.getDataMessage().getGroup().getId().toByteArray()
310 : content.getDataMessage().hasGroupV2()
311 ? GroupUtils.getGroupIdV2(new GroupMasterKey(content.getDataMessage()
312 .getGroupV2()
313 .getMasterKey()
314 .toByteArray())).serialize()
315 : null;
316 } catch (InvalidInputException e) {
317 logger.warn("Failed to parse groupId id from content");
318 return null;
319 }
320 }
321
322 private void insertRecipientsForExistingContent(
323 final long contentId, final List<RecipientDevices> recipientDevices
324 ) {
325 try (final var connection = database.getConnection()) {
326 connection.setAutoCommit(false);
327 insertRecipientsForExistingContent(contentId, recipientDevices, connection);
328 connection.commit();
329 } catch (SQLException e) {
330 logger.warn("Failed to append recipients to message send log", e);
331 }
332 }
333
334 private void insertRecipientsForExistingContent(
335 final long contentId, final List<RecipientDevices> recipientDevices, final Connection connection
336 ) throws SQLException {
337 try (final var statement = connection.prepareStatement(
338 "INSERT INTO %s (recipient_id, device_id, content_id) VALUES (?,?,?)".formatted(TABLE_MESSAGE_SEND_LOG))) {
339 for (final var recipientDevice : recipientDevices) {
340 for (final var deviceId : recipientDevice.deviceIds()) {
341 statement.setLong(1, recipientDevice.recipientId().id());
342 statement.setInt(2, deviceId);
343 statement.setLong(3, contentId);
344 statement.executeUpdate();
345 }
346 }
347 }
348 }
349
350 private void deleteOutdatedEntries(final Connection connection) throws SQLException {
351 try (final var statement = connection.prepareStatement("DELETE FROM %s WHERE timestamp < ?".formatted(
352 TABLE_MESSAGE_SEND_LOG_CONTENT))) {
353 statement.setLong(1, System.currentTimeMillis() - LOG_DURATION.toMillis());
354 final var rowCount = statement.executeUpdate();
355 if (rowCount > 0) {
356 logger.debug("Removed {} outdated entries from the message send log", rowCount);
357 } else {
358 logger.trace("No outdated entries to be removed from message send log.");
359 }
360 }
361 }
362
363 private void deleteOrphanedLogContents(final Connection connection) throws SQLException {
364 try (final var statement = connection.prepareStatement(
365 "DELETE FROM %s WHERE _id NOT IN (SELECT content_id FROM %s)".formatted(TABLE_MESSAGE_SEND_LOG_CONTENT,
366 TABLE_MESSAGE_SEND_LOG))) {
367 statement.executeUpdate();
368 }
369 }
370
371 private <T> Stream<T> executeQueryForStream(
372 PreparedStatement statement, ResultSetMapper<T> mapper
373 ) throws SQLException {
374 final var resultSet = statement.executeQuery();
375
376 return StreamSupport.stream(new Spliterators.AbstractSpliterator<>(Long.MAX_VALUE, Spliterator.ORDERED) {
377 @Override
378 public boolean tryAdvance(final Consumer<? super T> consumer) {
379 try {
380 if (!resultSet.next()) {
381 return false;
382 }
383 consumer.accept(mapper.apply(resultSet));
384 return true;
385 } catch (SQLException e) {
386 logger.warn("Failed to read from database result", e);
387 throw new RuntimeException(e);
388 }
389 }
390 }, false);
391 }
392
393 private interface ResultSetMapper<T> {
394
395 T apply(ResultSet resultSet) throws SQLException;
396 }
397
398 private record RecipientDevices(RecipientId recipientId, List<Integer> deviceIds) {}
399 }