]> nmode's Git Repositories - signal-cli/blob - lib/src/main/java/org/asamk/signal/manager/storage/sendLog/MessageSendLogStore.java
795919f6e7168ebc85f97123cd2ba5607edad559
[signal-cli] / lib / src / main / java / org / asamk / signal / manager / storage / sendLog / MessageSendLogStore.java
1 package org.asamk.signal.manager.storage.sendLog;
2
3 import org.asamk.signal.manager.groups.GroupId;
4 import org.asamk.signal.manager.groups.GroupUtils;
5 import org.asamk.signal.manager.storage.Database;
6 import org.asamk.signal.manager.storage.recipients.RecipientId;
7 import org.asamk.signal.manager.storage.recipients.RecipientResolver;
8 import org.signal.zkgroup.InvalidInputException;
9 import org.signal.zkgroup.groups.GroupMasterKey;
10 import org.slf4j.Logger;
11 import org.slf4j.LoggerFactory;
12 import org.whispersystems.signalservice.api.crypto.ContentHint;
13 import org.whispersystems.signalservice.api.messages.SendMessageResult;
14 import org.whispersystems.signalservice.internal.push.SignalServiceProtos;
15
16 import java.io.IOException;
17 import java.sql.Connection;
18 import java.sql.PreparedStatement;
19 import java.sql.ResultSet;
20 import java.sql.SQLException;
21 import java.time.Duration;
22 import java.util.List;
23 import java.util.Objects;
24 import java.util.Optional;
25 import java.util.Spliterator;
26 import java.util.Spliterators;
27 import java.util.function.Consumer;
28 import java.util.stream.Stream;
29 import java.util.stream.StreamSupport;
30
31 public class MessageSendLogStore implements AutoCloseable {
32
33 private static final Logger logger = LoggerFactory.getLogger(MessageSendLogStore.class);
34
35 private static final String TABLE_MESSAGE_SEND_LOG = "message_send_log";
36 private static final String TABLE_MESSAGE_SEND_LOG_CONTENT = "message_send_log_content";
37
38 private static final Duration LOG_DURATION = Duration.ofDays(1);
39
40 private final RecipientResolver recipientResolver;
41 private final Database database;
42 private final Thread cleanupThread;
43
44 public MessageSendLogStore(
45 final RecipientResolver recipientResolver, final Database database
46 ) {
47 this.recipientResolver = recipientResolver;
48 this.database = database;
49 this.cleanupThread = new Thread(() -> {
50 try {
51 final var interval = Duration.ofHours(1).toMillis();
52 while (true) {
53 try (final var connection = database.getConnection()) {
54 deleteOutdatedEntries(connection);
55 Thread.sleep(interval);
56 } catch (SQLException e) {
57 logger.warn("Deleting outdated entries failed");
58 break;
59 }
60 }
61 } catch (InterruptedException e) {
62 logger.debug("Stopping msl cleanup thread");
63 }
64 });
65 cleanupThread.setDaemon(true);
66 cleanupThread.start();
67 }
68
69 public static void createSql(Connection connection) throws SQLException {
70 try (final var statement = connection.createStatement()) {
71 statement.executeUpdate("""
72 CREATE TABLE message_send_log (
73 _id INTEGER PRIMARY KEY,
74 content_id INTEGER NOT NULL REFERENCES message_send_log_content (_id) ON DELETE CASCADE,
75 recipient_id INTEGER NOT NULL,
76 device_id INTEGER NOT NULL
77 );
78 CREATE TABLE message_send_log_content (
79 _id INTEGER PRIMARY KEY,
80 group_id BLOB,
81 timestamp INTEGER NOT NULL,
82 content BLOB NOT NULL,
83 content_hint INTEGER NOT NULL
84 );
85 CREATE INDEX mslc_timestamp_index ON message_send_log_content (timestamp);
86 CREATE INDEX msl_recipient_index ON message_send_log (recipient_id, device_id, content_id);
87 CREATE INDEX msl_content_index ON message_send_log (content_id);
88 """);
89 }
90 }
91
92 public List<MessageSendLogEntry> findMessages(
93 final RecipientId recipientId, final int deviceId, final long timestamp, final boolean isSenderKey
94 ) {
95 try (final var connection = database.getConnection()) {
96 deleteOutdatedEntries(connection);
97
98 try (final var statement = connection.prepareStatement(
99 "SELECT group_id, content, content_hint FROM %s l INNER JOIN %s lc ON l.content_id = lc._id WHERE l.recipient_id = ? AND l.device_id = ? AND lc.timestamp = ?".formatted(
100 TABLE_MESSAGE_SEND_LOG,
101 TABLE_MESSAGE_SEND_LOG_CONTENT))) {
102 statement.setLong(1, recipientId.id());
103 statement.setInt(2, deviceId);
104 statement.setLong(3, timestamp);
105 try (var result = executeQueryForStream(statement, resultSet -> {
106 final var groupId = Optional.ofNullable(resultSet.getBytes("group_id"))
107 .map(GroupId::unknownVersion);
108 final SignalServiceProtos.Content content;
109 try {
110 content = SignalServiceProtos.Content.parseFrom(resultSet.getBinaryStream("content"));
111 } catch (IOException e) {
112 logger.warn("Failed to parse content from message send log", e);
113 return null;
114 }
115 final var contentHint = ContentHint.fromType(resultSet.getInt("content_hint"));
116 return new MessageSendLogEntry(groupId, content, contentHint);
117 })) {
118 return result.filter(Objects::nonNull)
119 .filter(e -> !isSenderKey || e.groupId().isPresent())
120 .toList();
121 }
122 }
123 } catch (SQLException e) {
124 logger.warn("Failed read from message send log", e);
125 return List.of();
126 }
127 }
128
129 public long insertIfPossible(
130 long sentTimestamp, SendMessageResult sendMessageResult, ContentHint contentHint
131 ) {
132 final RecipientDevices recipientDevice = getRecipientDevices(sendMessageResult);
133 if (recipientDevice == null) {
134 return -1;
135 }
136
137 return insert(List.of(recipientDevice),
138 sentTimestamp,
139 sendMessageResult.getSuccess().getContent().get(),
140 contentHint);
141 }
142
143 public long insertIfPossible(
144 long sentTimestamp, List<SendMessageResult> sendMessageResults, ContentHint contentHint
145 ) {
146 final var recipientDevices = sendMessageResults.stream()
147 .map(this::getRecipientDevices)
148 .filter(Objects::nonNull)
149 .toList();
150 if (recipientDevices.isEmpty()) {
151 return -1;
152 }
153
154 final var content = sendMessageResults.stream()
155 .filter(r -> r.isSuccess() && r.getSuccess().getContent().isPresent())
156 .map(r -> r.getSuccess().getContent().get())
157 .findFirst()
158 .get();
159
160 return insert(recipientDevices, sentTimestamp, content, contentHint);
161 }
162
163 public void addRecipientToExistingEntryIfPossible(final long contentId, final SendMessageResult sendMessageResult) {
164 final RecipientDevices recipientDevice = getRecipientDevices(sendMessageResult);
165 if (recipientDevice == null) {
166 return;
167 }
168
169 insertRecipientsForExistingContent(contentId, List.of(recipientDevice));
170 }
171
172 public void addRecipientToExistingEntryIfPossible(
173 final long contentId, final List<SendMessageResult> sendMessageResults
174 ) {
175 final var recipientDevices = sendMessageResults.stream()
176 .map(this::getRecipientDevices)
177 .filter(Objects::nonNull)
178 .toList();
179 if (recipientDevices.isEmpty()) {
180 return;
181 }
182
183 insertRecipientsForExistingContent(contentId, recipientDevices);
184 }
185
186 public void deleteEntryForGroup(long sentTimestamp, GroupId groupId) {
187 try (final var connection = database.getConnection()) {
188 try (final var statement = connection.prepareStatement(
189 "DELETE FROM %s AS lc WHERE lc.timestamp = ? AND lc.group_id = ?".formatted(
190 TABLE_MESSAGE_SEND_LOG_CONTENT))) {
191 statement.setLong(1, sentTimestamp);
192 statement.setBytes(2, groupId.serialize());
193 statement.executeUpdate();
194 }
195 } catch (SQLException e) {
196 logger.warn("Failed delete from message send log", e);
197 }
198 }
199
200 public void deleteEntryForRecipientNonGroup(long sentTimestamp, RecipientId recipientId) {
201 try (final var connection = database.getConnection()) {
202 connection.setAutoCommit(false);
203 try (final var statement = connection.prepareStatement(
204 "DELETE FROM %s AS lc WHERE lc.timestamp = ? AND lc.group_id IS NULL AND lc._id IN (SELECT content_id FROM %s l WHERE l.recipient_id = ?)".formatted(
205 TABLE_MESSAGE_SEND_LOG_CONTENT,
206 TABLE_MESSAGE_SEND_LOG))) {
207 statement.setLong(1, sentTimestamp);
208 statement.setLong(2, recipientId.id());
209 statement.executeUpdate();
210 }
211
212 deleteOrphanedLogContents(connection);
213 connection.commit();
214 } catch (SQLException e) {
215 logger.warn("Failed delete from message send log", e);
216 }
217 }
218
219 public void deleteEntryForRecipient(long sentTimestamp, RecipientId recipientId, int deviceId) {
220 deleteEntriesForRecipient(List.of(sentTimestamp), recipientId, deviceId);
221 }
222
223 public void deleteEntriesForRecipient(List<Long> sentTimestamps, RecipientId recipientId, int deviceId) {
224 try (final var connection = database.getConnection()) {
225 connection.setAutoCommit(false);
226 try (final var statement = connection.prepareStatement(
227 "DELETE FROM %s AS l WHERE l.content_id IN (SELECT _id FROM %s lc WHERE lc.timestamp = ?) AND l.recipient_id = ? AND l.device_id = ?".formatted(
228 TABLE_MESSAGE_SEND_LOG,
229 TABLE_MESSAGE_SEND_LOG_CONTENT))) {
230 for (final var sentTimestamp : sentTimestamps) {
231 statement.setLong(1, sentTimestamp);
232 statement.setLong(2, recipientId.id());
233 statement.setInt(3, deviceId);
234 statement.executeUpdate();
235 }
236 }
237
238 deleteOrphanedLogContents(connection);
239 connection.commit();
240 } catch (SQLException e) {
241 logger.warn("Failed delete from message send log", e);
242 }
243 }
244
245 @Override
246 public void close() {
247 cleanupThread.interrupt();
248 try {
249 cleanupThread.join();
250 } catch (InterruptedException ignored) {
251 }
252 }
253
254 private RecipientDevices getRecipientDevices(final SendMessageResult sendMessageResult) {
255 if (sendMessageResult.isSuccess() && sendMessageResult.getSuccess().getContent().isPresent()) {
256 final var recipientId = recipientResolver.resolveRecipient(sendMessageResult.getAddress());
257 return new RecipientDevices(recipientId, sendMessageResult.getSuccess().getDevices());
258 } else {
259 return null;
260 }
261 }
262
263 private long insert(
264 final List<RecipientDevices> recipientDevices,
265 final long sentTimestamp,
266 final SignalServiceProtos.Content content,
267 final ContentHint contentHint
268 ) {
269 byte[] groupId = getGroupId(content);
270
271 try (final var connection = database.getConnection()) {
272 connection.setAutoCommit(false);
273 final long contentId;
274 try (final var statement = connection.prepareStatement(
275 "INSERT INTO %s (timestamp, group_id, content, content_hint) VALUES (?,?,?,?)".formatted(
276 TABLE_MESSAGE_SEND_LOG_CONTENT))) {
277 statement.setLong(1, sentTimestamp);
278 statement.setBytes(2, groupId);
279 statement.setBytes(3, content.toByteArray());
280 statement.setInt(4, contentHint.getType());
281 statement.executeUpdate();
282 final var generatedKeys = statement.getGeneratedKeys();
283 if (generatedKeys.next()) {
284 contentId = generatedKeys.getLong(1);
285 } else {
286 contentId = -1;
287 }
288 }
289 if (contentId == -1) {
290 logger.warn("Failed to insert message send log content");
291 return -1;
292 }
293 insertRecipientsForExistingContent(contentId, recipientDevices, connection);
294
295 connection.commit();
296 return contentId;
297 } catch (SQLException e) {
298 logger.warn("Failed to insert into message send log", e);
299 return -1;
300 }
301 }
302
303 private byte[] getGroupId(final SignalServiceProtos.Content content) {
304 try {
305 return !content.hasDataMessage()
306 ? null
307 : content.getDataMessage().hasGroup()
308 ? content.getDataMessage().getGroup().getId().toByteArray()
309 : content.getDataMessage().hasGroupV2()
310 ? GroupUtils.getGroupIdV2(new GroupMasterKey(content.getDataMessage()
311 .getGroupV2()
312 .getMasterKey()
313 .toByteArray())).serialize()
314 : null;
315 } catch (InvalidInputException e) {
316 logger.warn("Failed to parse groupId id from content");
317 return null;
318 }
319 }
320
321 private void insertRecipientsForExistingContent(
322 final long contentId, final List<RecipientDevices> recipientDevices
323 ) {
324 try (final var connection = database.getConnection()) {
325 connection.setAutoCommit(false);
326 insertRecipientsForExistingContent(contentId, recipientDevices, connection);
327 connection.commit();
328 } catch (SQLException e) {
329 logger.warn("Failed to append recipients to message send log", e);
330 }
331 }
332
333 private void insertRecipientsForExistingContent(
334 final long contentId, final List<RecipientDevices> recipientDevices, final Connection connection
335 ) throws SQLException {
336 try (final var statement = connection.prepareStatement(
337 "INSERT INTO %s (recipient_id, device_id, content_id) VALUES (?,?,?)".formatted(TABLE_MESSAGE_SEND_LOG))) {
338 for (final var recipientDevice : recipientDevices) {
339 for (final var deviceId : recipientDevice.deviceIds()) {
340 statement.setLong(1, recipientDevice.recipientId().id());
341 statement.setInt(2, deviceId);
342 statement.setLong(3, contentId);
343 statement.executeUpdate();
344 }
345 }
346 }
347 }
348
349 private void deleteOutdatedEntries(final Connection connection) throws SQLException {
350 try (final var statement = connection.prepareStatement("DELETE FROM %s WHERE timestamp < ?".formatted(
351 TABLE_MESSAGE_SEND_LOG_CONTENT))) {
352 statement.setLong(1, System.currentTimeMillis() - LOG_DURATION.toMillis());
353 final var rowCount = statement.executeUpdate();
354 if (rowCount > 0) {
355 logger.debug("Removed {} outdated entries from the message send log", rowCount);
356 }
357 }
358 }
359
360 private void deleteOrphanedLogContents(final Connection connection) throws SQLException {
361 try (final var statement = connection.prepareStatement(
362 "DELETE FROM %s WHERE _id NOT IN (SELECT content_id FROM %s)".formatted(TABLE_MESSAGE_SEND_LOG_CONTENT,
363 TABLE_MESSAGE_SEND_LOG))) {
364 statement.executeUpdate();
365 }
366 }
367
368 private <T> Stream<T> executeQueryForStream(
369 PreparedStatement statement, ResultSetMapper<T> mapper
370 ) throws SQLException {
371 final var resultSet = statement.executeQuery();
372
373 return StreamSupport.stream(new Spliterators.AbstractSpliterator<>(Long.MAX_VALUE, Spliterator.ORDERED) {
374 @Override
375 public boolean tryAdvance(final Consumer<? super T> consumer) {
376 try {
377 if (!resultSet.next()) {
378 return false;
379 }
380 consumer.accept(mapper.apply(resultSet));
381 return true;
382 } catch (SQLException e) {
383 logger.warn("Failed to read from database result", e);
384 throw new RuntimeException(e);
385 }
386 }
387 }, false);
388 }
389
390 private interface ResultSetMapper<T> {
391
392 T apply(ResultSet resultSet) throws SQLException;
393 }
394
395 private record RecipientDevices(RecipientId recipientId, List<Integer> deviceIds) {}
396 }