From: AsamK Date: Mon, 11 Jan 2021 20:18:03 +0000 (+0100) Subject: Refactor message cache X-Git-Tag: v0.7.3~41 X-Git-Url: https://git.nmode.ca/signal-cli/commitdiff_plain/bc47c0d5d6ade54db2997f7d6ed5f7154781fcc7 Refactor message cache --- diff --git a/src/main/java/org/asamk/signal/manager/Manager.java b/src/main/java/org/asamk/signal/manager/Manager.java index 097c4c0d..d0a9e277 100644 --- a/src/main/java/org/asamk/signal/manager/Manager.java +++ b/src/main/java/org/asamk/signal/manager/Manager.java @@ -34,6 +34,7 @@ import org.asamk.signal.manager.storage.contacts.ContactInfo; import org.asamk.signal.manager.storage.groups.GroupInfo; import org.asamk.signal.manager.storage.groups.GroupInfoV1; import org.asamk.signal.manager.storage.groups.GroupInfoV2; +import org.asamk.signal.manager.storage.messageCache.CachedMessage; import org.asamk.signal.manager.storage.profiles.SignalProfile; import org.asamk.signal.manager.storage.profiles.SignalProfileEntry; import org.asamk.signal.manager.storage.protocol.IdentityInfo; @@ -41,7 +42,6 @@ import org.asamk.signal.manager.storage.stickers.Sticker; import org.asamk.signal.manager.util.AttachmentUtils; import org.asamk.signal.manager.util.IOUtils; import org.asamk.signal.manager.util.KeyUtils; -import org.asamk.signal.manager.util.MessageCacheUtils; import org.asamk.signal.manager.util.Utils; import org.signal.libsignal.metadata.InvalidMetadataMessageException; import org.signal.libsignal.metadata.InvalidMetadataVersionException; @@ -170,7 +170,6 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.Set; import java.util.UUID; import java.util.concurrent.ExecutorService; @@ -188,7 +187,6 @@ public class Manager implements Closeable { final static Logger logger = LoggerFactory.getLogger(Manager.class); - private final SleepTimer timer = new UptimeSleepTimer(); private final CertificateValidator certificateValidator = new CertificateValidator(ServiceConfig.getUnidentifiedSenderTrustRoot()); private final SignalServiceConfiguration serviceConfiguration; @@ -222,6 +220,7 @@ public class Manager implements Closeable { this.userAgent = userAgent; this.groupsV2Operations = capabilities.isGv2() ? new GroupsV2Operations(ClientZkOperations.create( serviceConfiguration)) : null; + final SleepTimer timer = new UptimeSleepTimer(); this.accountManager = new SignalServiceAccountManager(serviceConfiguration, new DynamicCredentialsProvider(account.getUuid(), account.getUsername(), @@ -281,24 +280,6 @@ public class Manager implements Closeable { return account.getDeviceId(); } - private File getMessageCachePath() { - return SignalAccount.getMessageCachePath(pathConfig.getDataPath(), account.getUsername()); - } - - private File getMessageCachePath(String sender) { - if (sender == null || sender.isEmpty()) { - return getMessageCachePath(); - } - - return new File(getMessageCachePath(), sender.replace("/", "_")); - } - - private File getMessageCacheFile(String sender, long now, long timestamp) throws IOException { - File cachePath = getMessageCachePath(sender); - IOUtils.createPrivateDirectories(cachePath); - return new File(cachePath, now + "_" + timestamp); - } - public static Manager init( String username, File settingsPath, SignalServiceConfiguration serviceConfiguration, String userAgent ) throws IOException, NotRegisteredException { @@ -1727,41 +1708,17 @@ public class Manager implements Closeable { } } - private void retryFailedReceivedMessages( - ReceiveMessageHandler handler, boolean ignoreAttachments - ) { - final File cachePath = getMessageCachePath(); - if (!cachePath.exists()) { - return; - } - for (final File dir : Objects.requireNonNull(cachePath.listFiles())) { - if (!dir.isDirectory()) { - retryFailedReceivedMessage(handler, ignoreAttachments, dir); - continue; - } - - for (final File fileEntry : Objects.requireNonNull(dir.listFiles())) { - if (!fileEntry.isFile()) { - continue; - } - retryFailedReceivedMessage(handler, ignoreAttachments, fileEntry); - } - // Try to delete directory if empty - dir.delete(); + private void retryFailedReceivedMessages(ReceiveMessageHandler handler, boolean ignoreAttachments) { + for (CachedMessage cachedMessage : account.getMessageCache().getCachedMessages()) { + retryFailedReceivedMessage(handler, ignoreAttachments, cachedMessage); } } private void retryFailedReceivedMessage( - final ReceiveMessageHandler handler, final boolean ignoreAttachments, final File fileEntry + final ReceiveMessageHandler handler, final boolean ignoreAttachments, final CachedMessage cachedMessage ) { - SignalServiceEnvelope envelope; - try { - envelope = MessageCacheUtils.loadEnvelope(fileEntry); - if (envelope == null) { - return; - } - } catch (IOException e) { - e.printStackTrace(); + SignalServiceEnvelope envelope = cachedMessage.loadEnvelope(); + if (envelope == null) { return; } SignalServiceContent content = null; @@ -1772,11 +1729,7 @@ public class Manager implements Closeable { return; } catch (Exception er) { // All other errors are not recoverable, so delete the cached message - try { - Files.delete(fileEntry.toPath()); - } catch (IOException e) { - logger.warn("Failed to delete cached message file “{}”, ignoring: {}", fileEntry, e.getMessage()); - } + cachedMessage.delete(); return; } List actions = handleMessage(envelope, content, ignoreAttachments); @@ -1790,11 +1743,7 @@ public class Manager implements Closeable { } account.save(); handler.handleMessage(envelope, content, null); - try { - Files.delete(fileEntry.toPath()); - } catch (IOException e) { - logger.warn("Failed to delete cached message file “{}”, ignoring: {}", fileEntry, e.getMessage()); - } + cachedMessage.delete(); } public void receiveMessages( @@ -1808,7 +1757,7 @@ public class Manager implements Closeable { Set queuedActions = null; - getOrCreateMessagePipe(); + final SignalServiceMessagePipe messagePipe = getOrCreateMessagePipe(); boolean hasCaughtUpWithOldMessages = false; @@ -1816,17 +1765,11 @@ public class Manager implements Closeable { SignalServiceEnvelope envelope; SignalServiceContent content = null; Exception exception = null; - final long now = new Date().getTime(); + final CachedMessage[] cachedMessage = {null}; try { Optional result = messagePipe.readOrEmpty(timeout, unit, envelope1 -> { // store message on disk, before acknowledging receipt to the server - try { - String source = envelope1.getSourceE164().isPresent() ? envelope1.getSourceE164().get() : ""; - File cacheFile = getMessageCacheFile(source, now, envelope1.getTimestamp()); - MessageCacheUtils.storeEnvelope(envelope1, cacheFile); - } catch (IOException e) { - logger.warn("Failed to store encrypted message in disk cache, ignoring: {}", e.getMessage()); - } + cachedMessage[0] = account.getMessageCache().cacheMessage(envelope1); }); if (result.isPresent()) { envelope = result.get(); @@ -1890,15 +1833,8 @@ public class Manager implements Closeable { handler.handleMessage(envelope, content, exception); } if (!(exception instanceof org.whispersystems.libsignal.UntrustedIdentityException)) { - File cacheFile = null; - try { - String source = envelope.getSourceE164().isPresent() ? envelope.getSourceE164().get() : ""; - cacheFile = getMessageCacheFile(source, now, envelope.getTimestamp()); - Files.delete(cacheFile.toPath()); - // Try to delete directory if empty - getMessageCachePath().delete(); - } catch (IOException e) { - logger.warn("Failed to delete cached message file “{}”, ignoring: {}", cacheFile, e.getMessage()); + if (cachedMessage[0] != null) { + cachedMessage[0].delete(); } } } diff --git a/src/main/java/org/asamk/signal/manager/storage/SignalAccount.java b/src/main/java/org/asamk/signal/manager/storage/SignalAccount.java index a030af3f..6d592573 100644 --- a/src/main/java/org/asamk/signal/manager/storage/SignalAccount.java +++ b/src/main/java/org/asamk/signal/manager/storage/SignalAccount.java @@ -16,6 +16,7 @@ import org.asamk.signal.manager.storage.contacts.JsonContactsStore; import org.asamk.signal.manager.storage.groups.GroupInfo; import org.asamk.signal.manager.storage.groups.GroupInfoV1; import org.asamk.signal.manager.storage.groups.JsonGroupStore; +import org.asamk.signal.manager.storage.messageCache.MessageCache; import org.asamk.signal.manager.storage.profiles.ProfileStore; import org.asamk.signal.manager.storage.protocol.IdentityInfo; import org.asamk.signal.manager.storage.protocol.JsonSignalProtocolStore; @@ -84,6 +85,8 @@ public class SignalAccount implements Closeable { private ProfileStore profileStore; private StickerStore stickerStore; + private MessageCache messageCache; + private SignalAccount(final FileChannel fileChannel, final FileLock lock) { this.fileChannel = fileChannel; this.lock = lock; @@ -130,6 +133,9 @@ public class SignalAccount implements Closeable { account.recipientStore = new RecipientStore(); account.profileStore = new ProfileStore(); account.stickerStore = new StickerStore(); + + account.messageCache = new MessageCache(getMessageCachePath(dataPath, username)); + account.registered = false; return account; @@ -167,6 +173,9 @@ public class SignalAccount implements Closeable { account.recipientStore = new RecipientStore(); account.profileStore = new ProfileStore(); account.stickerStore = new StickerStore(); + + account.messageCache = new MessageCache(getMessageCachePath(dataPath, username)); + account.registered = true; account.isMultiDevice = true; @@ -342,6 +351,8 @@ public class SignalAccount implements Closeable { stickerStore = new StickerStore(); } + messageCache = new MessageCache(getMessageCachePath(dataPath, username)); + JsonNode threadStoreNode = rootNode.get("threadStore"); if (threadStoreNode != null) { LegacyJsonThreadStore threadStore = jsonProcessor.convertValue(threadStoreNode, @@ -460,6 +471,10 @@ public class SignalAccount implements Closeable { return stickerStore; } + public MessageCache getMessageCache() { + return messageCache; + } + public String getUsername() { return username; } diff --git a/src/main/java/org/asamk/signal/manager/storage/messageCache/CachedMessage.java b/src/main/java/org/asamk/signal/manager/storage/messageCache/CachedMessage.java new file mode 100644 index 00000000..6c20cf62 --- /dev/null +++ b/src/main/java/org/asamk/signal/manager/storage/messageCache/CachedMessage.java @@ -0,0 +1,38 @@ +package org.asamk.signal.manager.storage.messageCache; + +import org.asamk.signal.manager.util.MessageCacheUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.whispersystems.signalservice.api.messages.SignalServiceEnvelope; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; + +public final class CachedMessage { + + final static Logger logger = LoggerFactory.getLogger(CachedMessage.class); + + private final File file; + + CachedMessage(final File file) { + this.file = file; + } + + public SignalServiceEnvelope loadEnvelope() { + try { + return MessageCacheUtils.loadEnvelope(file); + } catch (IOException e) { + logger.error("Failed to load cached message envelope “{}”: {}", file, e.getMessage()); + return null; + } + } + + public void delete() { + try { + Files.delete(file.toPath()); + } catch (IOException e) { + logger.warn("Failed to delete cached message file “{}”, ignoring: {}", file, e.getMessage()); + } + } +} diff --git a/src/main/java/org/asamk/signal/manager/storage/messageCache/MessageCache.java b/src/main/java/org/asamk/signal/manager/storage/messageCache/MessageCache.java new file mode 100644 index 00000000..4e48ee76 --- /dev/null +++ b/src/main/java/org/asamk/signal/manager/storage/messageCache/MessageCache.java @@ -0,0 +1,79 @@ +package org.asamk.signal.manager.storage.messageCache; + +import org.asamk.signal.manager.util.IOUtils; +import org.asamk.signal.manager.util.MessageCacheUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.whispersystems.signalservice.api.messages.SignalServiceEnvelope; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.util.Arrays; +import java.util.Collections; +import java.util.Date; +import java.util.Objects; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +public class MessageCache { + + final static Logger logger = LoggerFactory.getLogger(MessageCache.class); + + private final File messageCachePath; + + public MessageCache(final File messageCachePath) { + this.messageCachePath = messageCachePath; + } + + public Iterable getCachedMessages() { + if (!messageCachePath.exists()) { + return Collections.emptyList(); + } + + return Arrays.stream(Objects.requireNonNull(messageCachePath.listFiles())).flatMap(dir -> { + if (dir.isFile()) { + return Stream.of(dir); + } + + final File[] files = Objects.requireNonNull(dir.listFiles()); + if (files.length == 0) { + try { + Files.delete(dir.toPath()); + } catch (IOException e) { + logger.warn("Failed to delete cache dir “{}”, ignoring: {}", dir, e.getMessage()); + } + return Stream.empty(); + } + return Arrays.stream(files).filter(File::isFile); + }).map(CachedMessage::new).collect(Collectors.toList()); + } + + public CachedMessage cacheMessage(SignalServiceEnvelope envelope) { + final long now = new Date().getTime(); + final String source = envelope.hasSource() ? envelope.getSourceAddress().getLegacyIdentifier() : ""; + + try { + File cacheFile = getMessageCacheFile(source, now, envelope.getTimestamp()); + MessageCacheUtils.storeEnvelope(envelope, cacheFile); + return new CachedMessage(cacheFile); + } catch (IOException e) { + logger.warn("Failed to store encrypted message in disk cache, ignoring: {}", e.getMessage()); + return null; + } + } + + private File getMessageCachePath(String sender) { + if (sender == null || sender.isEmpty()) { + return messageCachePath; + } + + return new File(messageCachePath, sender.replace("/", "_")); + } + + private File getMessageCacheFile(String sender, long now, long timestamp) throws IOException { + File cachePath = getMessageCachePath(sender); + IOUtils.createPrivateDirectories(cachePath); + return new File(cachePath, now + "_" + timestamp); + } +}