Move recipient store to database

This commit is contained in:
AsamK 2022-05-22 21:47:40 +02:00
parent 9d534dc7bb
commit 862c2fec87
11 changed files with 1041 additions and 593 deletions

View file

@ -1133,6 +1133,34 @@
"name":"org.asamk.signal.manager.storage.recipients.LegacyRecipientStore$RecipientStoreDeserializer",
"methods":[{"name":"<init>","parameterTypes":[] }]
},
{
"name":"org.asamk.signal.manager.storage.recipients.LegacyRecipientStore2$Storage",
"allDeclaredFields":true,
"queryAllDeclaredMethods":true,
"queryAllDeclaredConstructors":true,
"methods":[{"name":"<init>","parameterTypes":["java.util.List","long"] }]
},
{
"name":"org.asamk.signal.manager.storage.recipients.LegacyRecipientStore2$Storage$Recipient",
"allDeclaredFields":true,
"queryAllDeclaredMethods":true,
"queryAllDeclaredConstructors":true,
"methods":[{"name":"<init>","parameterTypes":["long","java.lang.String","java.lang.String","java.lang.String","java.lang.String","org.asamk.signal.manager.storage.recipients.LegacyRecipientStore2$Storage$Recipient$Contact","org.asamk.signal.manager.storage.recipients.LegacyRecipientStore2$Storage$Recipient$Profile"] }]
},
{
"name":"org.asamk.signal.manager.storage.recipients.LegacyRecipientStore2$Storage$Recipient$Contact",
"allDeclaredFields":true,
"queryAllDeclaredMethods":true,
"queryAllDeclaredConstructors":true,
"methods":[{"name":"<init>","parameterTypes":["java.lang.String","java.lang.String","int","boolean","boolean","boolean"] }]
},
{
"name":"org.asamk.signal.manager.storage.recipients.LegacyRecipientStore2$Storage$Recipient$Profile",
"allDeclaredFields":true,
"queryAllDeclaredMethods":true,
"queryAllDeclaredConstructors":true,
"methods":[{"name":"<init>","parameterTypes":["long","java.lang.String","java.lang.String","java.lang.String","java.lang.String","java.lang.String","java.lang.String","java.lang.String","java.util.Set"] }]
},
{
"name":"org.asamk.signal.manager.storage.recipients.RecipientAddress",
"allDeclaredFields":true,
@ -1143,30 +1171,6 @@
{"name":"uuid","parameterTypes":[] }
]
},
{
"name":"org.asamk.signal.manager.storage.recipients.RecipientStore$Storage",
"allDeclaredFields":true,
"allDeclaredMethods":true,
"allDeclaredConstructors":true
},
{
"name":"org.asamk.signal.manager.storage.recipients.RecipientStore$Storage$Recipient",
"allDeclaredFields":true,
"allDeclaredMethods":true,
"allDeclaredConstructors":true
},
{
"name":"org.asamk.signal.manager.storage.recipients.RecipientStore$Storage$Recipient$Contact",
"allDeclaredFields":true,
"allDeclaredMethods":true,
"allDeclaredConstructors":true
},
{
"name":"org.asamk.signal.manager.storage.recipients.RecipientStore$Storage$Recipient$Profile",
"allDeclaredFields":true,
"allDeclaredMethods":true,
"allDeclaredConstructors":true
},
{
"name":"org.asamk.signal.manager.storage.senderKeys.SenderKeySharedStore$Storage",
"allDeclaredFields":true,

View file

@ -108,17 +108,12 @@ public final class ProfileHelper {
}
public List<ExpiringProfileKeyCredential> getExpiringProfileKeyCredential(List<RecipientId> recipientIds) {
try {
account.getRecipientStore().setBulkUpdating(true);
final var profileFetches = Flowable.fromIterable(recipientIds)
.filter(recipientId -> !ExpiringProfileCredentialUtil.isValid(account.getProfileStore()
.getExpiringProfileKeyCredential(recipientId)))
.map(recipientId -> retrieveProfile(recipientId,
SignalServiceProfile.RequestType.PROFILE_AND_CREDENTIAL).onErrorComplete());
Maybe.merge(profileFetches, 10).blockingSubscribe();
} finally {
account.getRecipientStore().setBulkUpdating(false);
}
return recipientIds.stream().map(r -> account.getProfileStore().getExpiringProfileKeyCredential(r)).toList();
}
@ -233,16 +228,11 @@ public final class ProfileHelper {
private List<Profile> getRecipientProfiles(Collection<RecipientId> recipientIds, boolean force) {
final var profileStore = account.getProfileStore();
try {
account.getRecipientStore().setBulkUpdating(true);
final var profileFetches = Flowable.fromIterable(recipientIds)
.filter(recipientId -> force || isProfileRefreshRequired(profileStore.getProfile(recipientId)))
.map(recipientId -> retrieveProfile(recipientId,
SignalServiceProfile.RequestType.PROFILE).onErrorComplete());
Maybe.merge(profileFetches, 10).blockingSubscribe();
} finally {
account.getRecipientStore().setBulkUpdating(false);
}
return recipientIds.stream().map(profileStore::getProfile).toList();
}

View file

@ -2,6 +2,7 @@ package org.asamk.signal.manager.storage;
import com.zaxxer.hikari.HikariDataSource;
import org.asamk.signal.manager.storage.recipients.RecipientStore;
import org.asamk.signal.manager.storage.sendLog.MessageSendLogStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -13,7 +14,7 @@ import java.sql.SQLException;
public class AccountDatabase extends Database {
private final static Logger logger = LoggerFactory.getLogger(AccountDatabase.class);
private static final long DATABASE_VERSION = 1;
private static final long DATABASE_VERSION = 2;
private AccountDatabase(final HikariDataSource dataSource) {
super(logger, DATABASE_VERSION, dataSource);
@ -24,10 +25,45 @@ public class AccountDatabase extends Database {
}
@Override
protected void upgradeDatabase(final Connection connection, final long oldVersion) throws SQLException {
if (oldVersion < 1) {
logger.debug("Updating database: Creating message send log tables");
protected void createDatabase(final Connection connection) throws SQLException {
RecipientStore.createSql(connection);
MessageSendLogStore.createSql(connection);
}
@Override
protected void upgradeDatabase(final Connection connection, final long oldVersion) throws SQLException {
if (oldVersion < 2) {
logger.debug("Updating database: Creating recipient table");
try (final var statement = connection.createStatement()) {
statement.executeUpdate("""
CREATE TABLE recipient (
_id INTEGER PRIMARY KEY AUTOINCREMENT,
number TEXT UNIQUE,
uuid BLOB UNIQUE,
profile_key BLOB,
profile_key_credential BLOB,
given_name TEXT,
family_name TEXT,
color TEXT,
expiration_time INTEGER NOT NULL DEFAULT 0,
blocked BOOLEAN NOT NULL DEFAULT FALSE,
archived BOOLEAN NOT NULL DEFAULT FALSE,
profile_sharing BOOLEAN NOT NULL DEFAULT FALSE,
profile_last_update_timestamp INTEGER NOT NULL DEFAULT 0,
profile_given_name TEXT,
profile_family_name TEXT,
profile_about TEXT,
profile_about_emoji TEXT,
profile_avatar_url_path TEXT,
profile_mobile_coin_address BLOB,
profile_unidentified_access_mode TEXT,
profile_capabilities TEXT
);
""");
}
}
}
}

View file

@ -53,19 +53,26 @@ public abstract class Database implements AutoCloseable {
protected final void initDb() throws SQLException {
try (final var connection = dataSource.getConnection()) {
connection.setAutoCommit(false);
final var userVersion = getUserVersion(connection);
logger.trace("Current database version: {} Program database version: {}", userVersion, databaseVersion);
if (userVersion > databaseVersion) {
if (userVersion == 0) {
createDatabase(connection);
setUserVersion(connection, databaseVersion);
} else if (userVersion > databaseVersion) {
logger.error("Database has been updated by a newer signal-cli version");
throw new SQLException("Database has been updated by a newer signal-cli version");
} else if (userVersion < databaseVersion) {
upgradeDatabase(connection, userVersion);
setUserVersion(connection, databaseVersion);
}
connection.commit();
}
}
protected abstract void createDatabase(final Connection connection) throws SQLException;
protected abstract void upgradeDatabase(final Connection connection, long oldVersion) throws SQLException;
private static long getUserVersion(final Connection connection) throws SQLException {

View file

@ -26,6 +26,7 @@ import org.asamk.signal.manager.storage.protocol.LegacyJsonSignalProtocolStore;
import org.asamk.signal.manager.storage.protocol.SignalProtocolStore;
import org.asamk.signal.manager.storage.recipients.Contact;
import org.asamk.signal.manager.storage.recipients.LegacyRecipientStore;
import org.asamk.signal.manager.storage.recipients.LegacyRecipientStore2;
import org.asamk.signal.manager.storage.recipients.Profile;
import org.asamk.signal.manager.storage.recipients.RecipientAddress;
import org.asamk.signal.manager.storage.recipients.RecipientId;
@ -93,7 +94,7 @@ public class SignalAccount implements Closeable {
private final static Logger logger = LoggerFactory.getLogger(SignalAccount.class);
private static final int MINIMUM_STORAGE_VERSION = 1;
private static final int CURRENT_STORAGE_VERSION = 4;
private static final int CURRENT_STORAGE_VERSION = 5;
private final Object LOCK = new Object();
@ -392,8 +393,6 @@ public class SignalAccount implements Closeable {
// Old config file, creating new profile key
setProfileKey(KeyUtils.createProfileKey());
}
// Ensure our profile key is stored in profile store
getProfileStore().storeSelfProfileKey(getSelfRecipientId(), getProfileKey());
if (previousStorageVersion < 3) {
for (final var group : groupStore.getGroups()) {
if (group instanceof GroupInfoV2 && group.getDistributionId() == null) {
@ -514,9 +513,10 @@ public class SignalAccount implements Closeable {
if (rootNode.hasNonNull("version")) {
var accountVersion = rootNode.get("version").asInt(1);
if (accountVersion > CURRENT_STORAGE_VERSION) {
throw new IOException("Config file was created by a more recent version!");
throw new IOException("Config file was created by a more recent version: " + accountVersion);
} else if (accountVersion < MINIMUM_STORAGE_VERSION) {
throw new IOException("Config file was created by a no longer supported older version!");
throw new IOException("Config file was created by a no longer supported older version: "
+ accountVersion);
}
previousStorageVersion = accountVersion;
if (accountVersion < CURRENT_STORAGE_VERSION) {
@ -621,6 +621,15 @@ public class SignalAccount implements Closeable {
}
}
if (previousStorageVersion < 5) {
final var legacyRecipientsStoreFile = getRecipientsStoreFile(dataPath, accountPath);
if (legacyRecipientsStoreFile.exists()) {
LegacyRecipientStore2.migrate(legacyRecipientsStoreFile, getRecipientStore());
// Ensure our profile key is stored in profile store
getProfileStore().storeSelfProfileKey(getSelfRecipientId(), getProfileKey());
migratedLegacyConfig = true;
}
}
final var legacySignalProtocolStore = rootNode.hasNonNull("axolotlStore")
? jsonProcessor.convertValue(Utils.getNotNullNode(rootNode, "axolotlStore"),
LegacyJsonSignalProtocolStore.class)
@ -681,7 +690,8 @@ public class SignalAccount implements Closeable {
logger.debug("Migrating legacy recipient store.");
var legacyRecipientStore = jsonProcessor.convertValue(legacyRecipientStoreNode, LegacyRecipientStore.class);
if (legacyRecipientStore != null) {
getRecipientStore().resolveRecipientsTrusted(legacyRecipientStore.getAddresses());
legacyRecipientStore.getAddresses()
.forEach(recipient -> getRecipientStore().resolveRecipientTrusted(recipient));
}
getRecipientTrustedResolver().resolveSelfRecipientTrusted(getSelfRecipientAddress());
migrated = true;
@ -1094,22 +1104,42 @@ public class SignalAccount implements Closeable {
}
public RecipientResolver getRecipientResolver() {
return getRecipientStore();
return new RecipientResolver() {
@Override
public RecipientId resolveRecipient(final RecipientAddress address) {
return getRecipientStore().resolveRecipient(address);
}
@Override
public RecipientId resolveRecipient(final long recipientId) {
return getRecipientStore().resolveRecipient(recipientId);
}
};
}
public RecipientTrustedResolver getRecipientTrustedResolver() {
return getRecipientStore();
return new RecipientTrustedResolver() {
@Override
public RecipientId resolveSelfRecipientTrusted(final RecipientAddress address) {
return getRecipientStore().resolveSelfRecipientTrusted(address);
}
@Override
public RecipientId resolveRecipientTrusted(final SignalServiceAddress address) {
return getRecipientStore().resolveRecipientTrusted(address);
}
};
}
public RecipientAddressResolver getRecipientAddressResolver() {
return getRecipientStore()::resolveRecipientAddress;
return recipientId -> getRecipientStore().resolveRecipientAddress(recipientId);
}
public RecipientStore getRecipientStore() {
return getOrCreate(() -> recipientStore,
() -> recipientStore = RecipientStore.load(getRecipientsStoreFile(dataPath, accountPath),
this::mergeRecipients,
this::getSelfRecipientAddress));
() -> recipientStore = new RecipientStore(this::mergeRecipients,
this::getSelfRecipientAddress,
getAccountDatabase()));
}
public ProfileStore getProfileStore() {

View file

@ -10,13 +10,25 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import org.asamk.signal.manager.storage.recipients.RecipientAddress;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.signalservice.api.util.UuidUtil;
import java.io.InvalidObjectException;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Optional;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.function.Consumer;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
public class Utils {
private static final Logger logger = LoggerFactory.getLogger(Utils.class);
private Utils() {
}
@ -49,4 +61,51 @@ public class Utils {
return new RecipientAddress(Optional.empty(), Optional.of(identifier));
}
}
public static <T> T executeQuerySingleRow(
PreparedStatement statement, ResultSetMapper<T> mapper
) throws SQLException {
final var resultSet = statement.executeQuery();
if (!resultSet.next()) {
throw new RuntimeException("Expected a row in result set, but none found.");
}
return mapper.apply(resultSet);
}
public static <T> Optional<T> executeQueryForOptional(
PreparedStatement statement, ResultSetMapper<T> mapper
) throws SQLException {
final var resultSet = statement.executeQuery();
if (!resultSet.next()) {
return Optional.empty();
}
return Optional.ofNullable(mapper.apply(resultSet));
}
public static <T> Stream<T> executeQueryForStream(
PreparedStatement statement, ResultSetMapper<T> mapper
) throws SQLException {
final var resultSet = statement.executeQuery();
return StreamSupport.stream(new Spliterators.AbstractSpliterator<>(Long.MAX_VALUE, Spliterator.ORDERED) {
@Override
public boolean tryAdvance(final Consumer<? super T> consumer) {
try {
if (!resultSet.next()) {
return false;
}
consumer.accept(mapper.apply(resultSet));
return true;
} catch (SQLException e) {
logger.warn("Failed to read from database result", e);
throw new RuntimeException(e);
}
}
}, false);
}
public interface ResultSetMapper<T> {
T apply(ResultSet resultSet) throws SQLException;
}
}

View file

@ -20,7 +20,6 @@ public interface ProfileStore {
void storeProfileKey(RecipientId recipientId, ProfileKey profileKey);
void storeExpiringProfileKeyCredential(
RecipientId recipientId,
ExpiringProfileKeyCredential expiringProfileKeyCredential
RecipientId recipientId, ExpiringProfileKeyCredential expiringProfileKeyCredential
);
}

View file

@ -0,0 +1,130 @@
package org.asamk.signal.manager.storage.recipients;
import org.asamk.signal.manager.storage.Utils;
import org.signal.libsignal.zkgroup.InvalidInputException;
import org.signal.libsignal.zkgroup.profiles.ExpiringProfileKeyCredential;
import org.signal.libsignal.zkgroup.profiles.ProfileKey;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.signalservice.api.util.UuidUtil;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.file.Files;
import java.util.Base64;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
public class LegacyRecipientStore2 {
private final static Logger logger = LoggerFactory.getLogger(LegacyRecipientStore2.class);
public static void migrate(File file, RecipientStore recipientStore) {
final var objectMapper = Utils.createStorageObjectMapper();
try (var inputStream = new FileInputStream(file)) {
final var storage = objectMapper.readValue(inputStream, Storage.class);
final var recipients = storage.recipients.stream().map(r -> {
final var recipientId = new RecipientId(r.id, recipientStore);
final var address = new RecipientAddress(Optional.ofNullable(r.uuid).map(UuidUtil::parseOrThrow),
Optional.ofNullable(r.number));
Contact contact = null;
if (r.contact != null) {
contact = new Contact(r.contact.name,
null,
r.contact.color,
r.contact.messageExpirationTime,
r.contact.blocked,
r.contact.archived,
r.contact.profileSharingEnabled);
}
ProfileKey profileKey = null;
if (r.profileKey != null) {
try {
profileKey = new ProfileKey(Base64.getDecoder().decode(r.profileKey));
} catch (InvalidInputException ignored) {
}
}
ExpiringProfileKeyCredential expiringProfileKeyCredential = null;
if (r.expiringProfileKeyCredential != null) {
try {
expiringProfileKeyCredential = new ExpiringProfileKeyCredential(Base64.getDecoder()
.decode(r.expiringProfileKeyCredential));
} catch (Throwable ignored) {
}
}
Profile profile = null;
if (r.profile != null) {
profile = new Profile(r.profile.lastUpdateTimestamp,
r.profile.givenName,
r.profile.familyName,
r.profile.about,
r.profile.aboutEmoji,
r.profile.avatarUrlPath,
r.profile.mobileCoinAddress == null
? null
: Base64.getDecoder().decode(r.profile.mobileCoinAddress),
Profile.UnidentifiedAccessMode.valueOfOrUnknown(r.profile.unidentifiedAccessMode),
r.profile.capabilities.stream()
.map(Profile.Capability::valueOfOrNull)
.filter(Objects::nonNull)
.collect(Collectors.toSet()));
}
return new Recipient(recipientId, address, contact, profileKey, expiringProfileKeyCredential, profile);
}).collect(Collectors.toMap(Recipient::getRecipientId, r -> r));
recipientStore.addLegacyRecipients(recipients);
Files.delete(file.toPath());
} catch (FileNotFoundException e) {
// nothing to migrate
} catch (IOException e) {
logger.warn("Failed to load recipient store", e);
throw new RuntimeException(e);
}
}
private record Storage(List<Recipient> recipients, long lastId) {
private record Recipient(
long id,
String number,
String uuid,
String profileKey,
String expiringProfileKeyCredential,
Contact contact,
Profile profile
) {
private record Contact(
String name,
String color,
int messageExpirationTime,
boolean blocked,
boolean archived,
boolean profileSharingEnabled
) {}
private record Profile(
long lastUpdateTimestamp,
String givenName,
String familyName,
String about,
String aboutEmoji,
String avatarUrlPath,
String mobileCoinAddress,
String unidentifiedAccessMode,
Set<String> capabilities
) {}
}
}
}

View file

@ -1,17 +1,24 @@
package org.asamk.signal.manager.storage.recipients;
import org.asamk.signal.manager.storage.Utils;
import org.whispersystems.signalservice.api.push.ServiceId;
import org.whispersystems.signalservice.api.push.SignalServiceAddress;
public interface RecipientResolver {
RecipientId resolveRecipient(String identifier);
RecipientId resolveRecipient(RecipientAddress address);
RecipientId resolveRecipient(SignalServiceAddress address);
RecipientId resolveRecipient(ServiceId aci);
RecipientId resolveRecipient(long recipientId);
default RecipientId resolveRecipient(String identifier) {
return resolveRecipient(Utils.getRecipientAddressFromIdentifier(identifier));
}
default RecipientId resolveRecipient(SignalServiceAddress address) {
return resolveRecipient(new RecipientAddress(address));
}
default RecipientId resolveRecipient(ServiceId serviceId) {
return resolveRecipient(new RecipientAddress(serviceId.uuid()));
}
}

View file

@ -3,6 +3,7 @@ package org.asamk.signal.manager.storage.sendLog;
import org.asamk.signal.manager.groups.GroupId;
import org.asamk.signal.manager.groups.GroupUtils;
import org.asamk.signal.manager.storage.Database;
import org.asamk.signal.manager.storage.Utils;
import org.asamk.signal.manager.storage.recipients.RecipientId;
import org.asamk.signal.manager.storage.recipients.RecipientResolver;
import org.signal.libsignal.zkgroup.InvalidInputException;
@ -15,18 +16,11 @@ import org.whispersystems.signalservice.internal.push.SignalServiceProtos;
import java.io.IOException;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.time.Duration;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.function.Consumer;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
public class MessageSendLogStore implements AutoCloseable {
@ -68,12 +62,13 @@ public class MessageSendLogStore implements AutoCloseable {
}
public static void createSql(Connection connection) throws SQLException {
// When modifying the CREATE statement here, also add a migration in AccountDatabase.java
try (final var statement = connection.createStatement()) {
statement.executeUpdate("""
CREATE TABLE message_send_log (
_id INTEGER PRIMARY KEY,
content_id INTEGER NOT NULL REFERENCES message_send_log_content (_id) ON DELETE CASCADE,
recipient_id INTEGER NOT NULL,
recipient_id INTEGER NOT NULL REFERENCES recipient (_id) ON DELETE CASCADE,
device_id INTEGER NOT NULL
);
CREATE TABLE message_send_log_content (
@ -106,7 +101,7 @@ public class MessageSendLogStore implements AutoCloseable {
statement.setLong(1, recipientId.id());
statement.setInt(2, deviceId);
statement.setLong(3, timestamp);
try (var result = executeQueryForStream(statement, resultSet -> {
try (var result = Utils.executeQueryForStream(statement, resultSet -> {
final var groupId = Optional.ofNullable(resultSet.getBytes("group_id"))
.map(GroupId::unknownVersion);
final SignalServiceProtos.Content content;
@ -389,32 +384,5 @@ public class MessageSendLogStore implements AutoCloseable {
}
}
private <T> Stream<T> executeQueryForStream(
PreparedStatement statement, ResultSetMapper<T> mapper
) throws SQLException {
final var resultSet = statement.executeQuery();
return StreamSupport.stream(new Spliterators.AbstractSpliterator<>(Long.MAX_VALUE, Spliterator.ORDERED) {
@Override
public boolean tryAdvance(final Consumer<? super T> consumer) {
try {
if (!resultSet.next()) {
return false;
}
consumer.accept(mapper.apply(resultSet));
return true;
} catch (SQLException e) {
logger.warn("Failed to read from database result", e);
throw new RuntimeException(e);
}
}
}, false);
}
private interface ResultSetMapper<T> {
T apply(ResultSet resultSet) throws SQLException;
}
private record RecipientDevices(RecipientId recipientId, List<Integer> deviceIds) {}
}