Refactor message cache

This commit is contained in:
AsamK 2021-01-11 21:18:03 +01:00
parent 38267fa2a1
commit bc47c0d5d6
4 changed files with 147 additions and 79 deletions

View file

@ -34,6 +34,7 @@ import org.asamk.signal.manager.storage.contacts.ContactInfo;
import org.asamk.signal.manager.storage.groups.GroupInfo; import org.asamk.signal.manager.storage.groups.GroupInfo;
import org.asamk.signal.manager.storage.groups.GroupInfoV1; import org.asamk.signal.manager.storage.groups.GroupInfoV1;
import org.asamk.signal.manager.storage.groups.GroupInfoV2; import org.asamk.signal.manager.storage.groups.GroupInfoV2;
import org.asamk.signal.manager.storage.messageCache.CachedMessage;
import org.asamk.signal.manager.storage.profiles.SignalProfile; import org.asamk.signal.manager.storage.profiles.SignalProfile;
import org.asamk.signal.manager.storage.profiles.SignalProfileEntry; import org.asamk.signal.manager.storage.profiles.SignalProfileEntry;
import org.asamk.signal.manager.storage.protocol.IdentityInfo; import org.asamk.signal.manager.storage.protocol.IdentityInfo;
@ -41,7 +42,6 @@ import org.asamk.signal.manager.storage.stickers.Sticker;
import org.asamk.signal.manager.util.AttachmentUtils; import org.asamk.signal.manager.util.AttachmentUtils;
import org.asamk.signal.manager.util.IOUtils; import org.asamk.signal.manager.util.IOUtils;
import org.asamk.signal.manager.util.KeyUtils; import org.asamk.signal.manager.util.KeyUtils;
import org.asamk.signal.manager.util.MessageCacheUtils;
import org.asamk.signal.manager.util.Utils; import org.asamk.signal.manager.util.Utils;
import org.signal.libsignal.metadata.InvalidMetadataMessageException; import org.signal.libsignal.metadata.InvalidMetadataMessageException;
import org.signal.libsignal.metadata.InvalidMetadataVersionException; import org.signal.libsignal.metadata.InvalidMetadataVersionException;
@ -170,7 +170,6 @@ import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects;
import java.util.Set; import java.util.Set;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
@ -188,7 +187,6 @@ public class Manager implements Closeable {
final static Logger logger = LoggerFactory.getLogger(Manager.class); final static Logger logger = LoggerFactory.getLogger(Manager.class);
private final SleepTimer timer = new UptimeSleepTimer();
private final CertificateValidator certificateValidator = new CertificateValidator(ServiceConfig.getUnidentifiedSenderTrustRoot()); private final CertificateValidator certificateValidator = new CertificateValidator(ServiceConfig.getUnidentifiedSenderTrustRoot());
private final SignalServiceConfiguration serviceConfiguration; private final SignalServiceConfiguration serviceConfiguration;
@ -222,6 +220,7 @@ public class Manager implements Closeable {
this.userAgent = userAgent; this.userAgent = userAgent;
this.groupsV2Operations = capabilities.isGv2() ? new GroupsV2Operations(ClientZkOperations.create( this.groupsV2Operations = capabilities.isGv2() ? new GroupsV2Operations(ClientZkOperations.create(
serviceConfiguration)) : null; serviceConfiguration)) : null;
final SleepTimer timer = new UptimeSleepTimer();
this.accountManager = new SignalServiceAccountManager(serviceConfiguration, this.accountManager = new SignalServiceAccountManager(serviceConfiguration,
new DynamicCredentialsProvider(account.getUuid(), new DynamicCredentialsProvider(account.getUuid(),
account.getUsername(), account.getUsername(),
@ -281,24 +280,6 @@ public class Manager implements Closeable {
return account.getDeviceId(); return account.getDeviceId();
} }
private File getMessageCachePath() {
return SignalAccount.getMessageCachePath(pathConfig.getDataPath(), account.getUsername());
}
private File getMessageCachePath(String sender) {
if (sender == null || sender.isEmpty()) {
return getMessageCachePath();
}
return new File(getMessageCachePath(), sender.replace("/", "_"));
}
private File getMessageCacheFile(String sender, long now, long timestamp) throws IOException {
File cachePath = getMessageCachePath(sender);
IOUtils.createPrivateDirectories(cachePath);
return new File(cachePath, now + "_" + timestamp);
}
public static Manager init( public static Manager init(
String username, File settingsPath, SignalServiceConfiguration serviceConfiguration, String userAgent String username, File settingsPath, SignalServiceConfiguration serviceConfiguration, String userAgent
) throws IOException, NotRegisteredException { ) throws IOException, NotRegisteredException {
@ -1727,41 +1708,17 @@ public class Manager implements Closeable {
} }
} }
private void retryFailedReceivedMessages( private void retryFailedReceivedMessages(ReceiveMessageHandler handler, boolean ignoreAttachments) {
ReceiveMessageHandler handler, boolean ignoreAttachments for (CachedMessage cachedMessage : account.getMessageCache().getCachedMessages()) {
) { retryFailedReceivedMessage(handler, ignoreAttachments, cachedMessage);
final File cachePath = getMessageCachePath();
if (!cachePath.exists()) {
return;
}
for (final File dir : Objects.requireNonNull(cachePath.listFiles())) {
if (!dir.isDirectory()) {
retryFailedReceivedMessage(handler, ignoreAttachments, dir);
continue;
}
for (final File fileEntry : Objects.requireNonNull(dir.listFiles())) {
if (!fileEntry.isFile()) {
continue;
}
retryFailedReceivedMessage(handler, ignoreAttachments, fileEntry);
}
// Try to delete directory if empty
dir.delete();
} }
} }
private void retryFailedReceivedMessage( private void retryFailedReceivedMessage(
final ReceiveMessageHandler handler, final boolean ignoreAttachments, final File fileEntry final ReceiveMessageHandler handler, final boolean ignoreAttachments, final CachedMessage cachedMessage
) { ) {
SignalServiceEnvelope envelope; SignalServiceEnvelope envelope = cachedMessage.loadEnvelope();
try { if (envelope == null) {
envelope = MessageCacheUtils.loadEnvelope(fileEntry);
if (envelope == null) {
return;
}
} catch (IOException e) {
e.printStackTrace();
return; return;
} }
SignalServiceContent content = null; SignalServiceContent content = null;
@ -1772,11 +1729,7 @@ public class Manager implements Closeable {
return; return;
} catch (Exception er) { } catch (Exception er) {
// All other errors are not recoverable, so delete the cached message // All other errors are not recoverable, so delete the cached message
try { cachedMessage.delete();
Files.delete(fileEntry.toPath());
} catch (IOException e) {
logger.warn("Failed to delete cached message file “{}”, ignoring: {}", fileEntry, e.getMessage());
}
return; return;
} }
List<HandleAction> actions = handleMessage(envelope, content, ignoreAttachments); List<HandleAction> actions = handleMessage(envelope, content, ignoreAttachments);
@ -1790,11 +1743,7 @@ public class Manager implements Closeable {
} }
account.save(); account.save();
handler.handleMessage(envelope, content, null); handler.handleMessage(envelope, content, null);
try { cachedMessage.delete();
Files.delete(fileEntry.toPath());
} catch (IOException e) {
logger.warn("Failed to delete cached message file “{}”, ignoring: {}", fileEntry, e.getMessage());
}
} }
public void receiveMessages( public void receiveMessages(
@ -1808,7 +1757,7 @@ public class Manager implements Closeable {
Set<HandleAction> queuedActions = null; Set<HandleAction> queuedActions = null;
getOrCreateMessagePipe(); final SignalServiceMessagePipe messagePipe = getOrCreateMessagePipe();
boolean hasCaughtUpWithOldMessages = false; boolean hasCaughtUpWithOldMessages = false;
@ -1816,17 +1765,11 @@ public class Manager implements Closeable {
SignalServiceEnvelope envelope; SignalServiceEnvelope envelope;
SignalServiceContent content = null; SignalServiceContent content = null;
Exception exception = null; Exception exception = null;
final long now = new Date().getTime(); final CachedMessage[] cachedMessage = {null};
try { try {
Optional<SignalServiceEnvelope> result = messagePipe.readOrEmpty(timeout, unit, envelope1 -> { Optional<SignalServiceEnvelope> result = messagePipe.readOrEmpty(timeout, unit, envelope1 -> {
// store message on disk, before acknowledging receipt to the server // store message on disk, before acknowledging receipt to the server
try { cachedMessage[0] = account.getMessageCache().cacheMessage(envelope1);
String source = envelope1.getSourceE164().isPresent() ? envelope1.getSourceE164().get() : "";
File cacheFile = getMessageCacheFile(source, now, envelope1.getTimestamp());
MessageCacheUtils.storeEnvelope(envelope1, cacheFile);
} catch (IOException e) {
logger.warn("Failed to store encrypted message in disk cache, ignoring: {}", e.getMessage());
}
}); });
if (result.isPresent()) { if (result.isPresent()) {
envelope = result.get(); envelope = result.get();
@ -1890,15 +1833,8 @@ public class Manager implements Closeable {
handler.handleMessage(envelope, content, exception); handler.handleMessage(envelope, content, exception);
} }
if (!(exception instanceof org.whispersystems.libsignal.UntrustedIdentityException)) { if (!(exception instanceof org.whispersystems.libsignal.UntrustedIdentityException)) {
File cacheFile = null; if (cachedMessage[0] != null) {
try { cachedMessage[0].delete();
String source = envelope.getSourceE164().isPresent() ? envelope.getSourceE164().get() : "";
cacheFile = getMessageCacheFile(source, now, envelope.getTimestamp());
Files.delete(cacheFile.toPath());
// Try to delete directory if empty
getMessageCachePath().delete();
} catch (IOException e) {
logger.warn("Failed to delete cached message file “{}”, ignoring: {}", cacheFile, e.getMessage());
} }
} }
} }

View file

@ -16,6 +16,7 @@ import org.asamk.signal.manager.storage.contacts.JsonContactsStore;
import org.asamk.signal.manager.storage.groups.GroupInfo; import org.asamk.signal.manager.storage.groups.GroupInfo;
import org.asamk.signal.manager.storage.groups.GroupInfoV1; import org.asamk.signal.manager.storage.groups.GroupInfoV1;
import org.asamk.signal.manager.storage.groups.JsonGroupStore; import org.asamk.signal.manager.storage.groups.JsonGroupStore;
import org.asamk.signal.manager.storage.messageCache.MessageCache;
import org.asamk.signal.manager.storage.profiles.ProfileStore; import org.asamk.signal.manager.storage.profiles.ProfileStore;
import org.asamk.signal.manager.storage.protocol.IdentityInfo; import org.asamk.signal.manager.storage.protocol.IdentityInfo;
import org.asamk.signal.manager.storage.protocol.JsonSignalProtocolStore; import org.asamk.signal.manager.storage.protocol.JsonSignalProtocolStore;
@ -84,6 +85,8 @@ public class SignalAccount implements Closeable {
private ProfileStore profileStore; private ProfileStore profileStore;
private StickerStore stickerStore; private StickerStore stickerStore;
private MessageCache messageCache;
private SignalAccount(final FileChannel fileChannel, final FileLock lock) { private SignalAccount(final FileChannel fileChannel, final FileLock lock) {
this.fileChannel = fileChannel; this.fileChannel = fileChannel;
this.lock = lock; this.lock = lock;
@ -130,6 +133,9 @@ public class SignalAccount implements Closeable {
account.recipientStore = new RecipientStore(); account.recipientStore = new RecipientStore();
account.profileStore = new ProfileStore(); account.profileStore = new ProfileStore();
account.stickerStore = new StickerStore(); account.stickerStore = new StickerStore();
account.messageCache = new MessageCache(getMessageCachePath(dataPath, username));
account.registered = false; account.registered = false;
return account; return account;
@ -167,6 +173,9 @@ public class SignalAccount implements Closeable {
account.recipientStore = new RecipientStore(); account.recipientStore = new RecipientStore();
account.profileStore = new ProfileStore(); account.profileStore = new ProfileStore();
account.stickerStore = new StickerStore(); account.stickerStore = new StickerStore();
account.messageCache = new MessageCache(getMessageCachePath(dataPath, username));
account.registered = true; account.registered = true;
account.isMultiDevice = true; account.isMultiDevice = true;
@ -342,6 +351,8 @@ public class SignalAccount implements Closeable {
stickerStore = new StickerStore(); stickerStore = new StickerStore();
} }
messageCache = new MessageCache(getMessageCachePath(dataPath, username));
JsonNode threadStoreNode = rootNode.get("threadStore"); JsonNode threadStoreNode = rootNode.get("threadStore");
if (threadStoreNode != null) { if (threadStoreNode != null) {
LegacyJsonThreadStore threadStore = jsonProcessor.convertValue(threadStoreNode, LegacyJsonThreadStore threadStore = jsonProcessor.convertValue(threadStoreNode,
@ -460,6 +471,10 @@ public class SignalAccount implements Closeable {
return stickerStore; return stickerStore;
} }
public MessageCache getMessageCache() {
return messageCache;
}
public String getUsername() { public String getUsername() {
return username; return username;
} }

View file

@ -0,0 +1,38 @@
package org.asamk.signal.manager.storage.messageCache;
import org.asamk.signal.manager.util.MessageCacheUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.signalservice.api.messages.SignalServiceEnvelope;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
public final class CachedMessage {
final static Logger logger = LoggerFactory.getLogger(CachedMessage.class);
private final File file;
CachedMessage(final File file) {
this.file = file;
}
public SignalServiceEnvelope loadEnvelope() {
try {
return MessageCacheUtils.loadEnvelope(file);
} catch (IOException e) {
logger.error("Failed to load cached message envelope “{}”: {}", file, e.getMessage());
return null;
}
}
public void delete() {
try {
Files.delete(file.toPath());
} catch (IOException e) {
logger.warn("Failed to delete cached message file “{}”, ignoring: {}", file, e.getMessage());
}
}
}

View file

@ -0,0 +1,79 @@
package org.asamk.signal.manager.storage.messageCache;
import org.asamk.signal.manager.util.IOUtils;
import org.asamk.signal.manager.util.MessageCacheUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.signalservice.api.messages.SignalServiceEnvelope;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.Objects;
import java.util.stream.Collectors;
import java.util.stream.Stream;
public class MessageCache {
final static Logger logger = LoggerFactory.getLogger(MessageCache.class);
private final File messageCachePath;
public MessageCache(final File messageCachePath) {
this.messageCachePath = messageCachePath;
}
public Iterable<CachedMessage> getCachedMessages() {
if (!messageCachePath.exists()) {
return Collections.emptyList();
}
return Arrays.stream(Objects.requireNonNull(messageCachePath.listFiles())).flatMap(dir -> {
if (dir.isFile()) {
return Stream.of(dir);
}
final File[] files = Objects.requireNonNull(dir.listFiles());
if (files.length == 0) {
try {
Files.delete(dir.toPath());
} catch (IOException e) {
logger.warn("Failed to delete cache dir “{}”, ignoring: {}", dir, e.getMessage());
}
return Stream.empty();
}
return Arrays.stream(files).filter(File::isFile);
}).map(CachedMessage::new).collect(Collectors.toList());
}
public CachedMessage cacheMessage(SignalServiceEnvelope envelope) {
final long now = new Date().getTime();
final String source = envelope.hasSource() ? envelope.getSourceAddress().getLegacyIdentifier() : "";
try {
File cacheFile = getMessageCacheFile(source, now, envelope.getTimestamp());
MessageCacheUtils.storeEnvelope(envelope, cacheFile);
return new CachedMessage(cacheFile);
} catch (IOException e) {
logger.warn("Failed to store encrypted message in disk cache, ignoring: {}", e.getMessage());
return null;
}
}
private File getMessageCachePath(String sender) {
if (sender == null || sender.isEmpty()) {
return messageCachePath;
}
return new File(messageCachePath, sender.replace("/", "_"));
}
private File getMessageCacheFile(String sender, long now, long timestamp) throws IOException {
File cachePath = getMessageCachePath(sender);
IOUtils.createPrivateDirectories(cachePath);
return new File(cachePath, now + "_" + timestamp);
}
}