Add cache for serviceId to recipient id/address mapping

This commit is contained in:
AsamK 2023-10-17 15:20:14 +02:00
parent 1addffe622
commit 2c5edbc981

View file

@ -47,6 +47,8 @@ public class RecipientStore implements RecipientIdCreator, RecipientResolver, Re
private final Object recipientsLock = new Object(); private final Object recipientsLock = new Object();
private final Map<Long, Long> recipientsMerged = new HashMap<>(); private final Map<Long, Long> recipientsMerged = new HashMap<>();
private final Map<ServiceId, RecipientWithAddress> recipientAddressCache = new HashMap<>();
public static void createSql(Connection connection) throws SQLException { public static void createSql(Connection connection) throws SQLException {
// When modifying the CREATE statement here, also add a migration in AccountDatabase.java // When modifying the CREATE statement here, also add a migration in AccountDatabase.java
try (final var statement = connection.createStatement()) { try (final var statement = connection.createStatement()) {
@ -176,15 +178,18 @@ public class RecipientStore implements RecipientIdCreator, RecipientResolver, Re
@Override @Override
public RecipientId resolveRecipient(final ServiceId serviceId) { public RecipientId resolveRecipient(final ServiceId serviceId) {
synchronized (recipientsLock) { synchronized (recipientsLock) {
final RecipientId recipientId; final var recipientWithAddress = recipientAddressCache.get(serviceId);
if (recipientWithAddress != null) {
return recipientWithAddress.id();
}
try (final var connection = database.getConnection()) { try (final var connection = database.getConnection()) {
connection.setAutoCommit(false); connection.setAutoCommit(false);
recipientId = resolveRecipientLocked(connection, serviceId); final var recipientId = resolveRecipientLocked(connection, serviceId);
connection.commit(); connection.commit();
return recipientId;
} catch (SQLException e) { } catch (SQLException e) {
throw new RuntimeException("Failed read recipient store", e); throw new RuntimeException("Failed read recipient store", e);
} }
return recipientId;
} }
} }
@ -357,7 +362,7 @@ public class RecipientStore implements RecipientIdCreator, RecipientResolver, Re
FROM %s r FROM %s r
WHERE (r.number IS NOT NULL OR r.uuid IS NOT NULL) AND %s WHERE (r.number IS NOT NULL OR r.uuid IS NOT NULL) AND %s
""" """
).formatted(TABLE_RECIPIENT, sqlWhere.size() == 0 ? "TRUE" : String.join(" AND ", sqlWhere)); ).formatted(TABLE_RECIPIENT, sqlWhere.isEmpty() ? "TRUE" : String.join(" AND ", sqlWhere));
try (final var connection = database.getConnection()) { try (final var connection = database.getConnection()) {
try (final var statement = connection.prepareStatement(sql)) { try (final var statement = connection.prepareStatement(sql)) {
if (blocked.isPresent()) { if (blocked.isPresent()) {
@ -429,16 +434,22 @@ public class RecipientStore implements RecipientIdCreator, RecipientResolver, Re
public void deleteRecipientData(RecipientId recipientId) { public void deleteRecipientData(RecipientId recipientId) {
logger.debug("Deleting recipient data for {}", recipientId); logger.debug("Deleting recipient data for {}", recipientId);
try (final var connection = database.getConnection()) { synchronized (recipientsLock) {
connection.setAutoCommit(false); recipientAddressCache.entrySet()
storeContact(connection, recipientId, null); .stream()
storeProfile(connection, recipientId, null); .filter(e -> e.getValue().id().equals(recipientId))
storeProfileKey(connection, recipientId, null, false); .forEach(e -> recipientAddressCache.remove(e.getKey()));
storeExpiringProfileKeyCredential(connection, recipientId, null); try (final var connection = database.getConnection()) {
deleteRecipient(connection, recipientId); connection.setAutoCommit(false);
connection.commit(); storeContact(connection, recipientId, null);
} catch (SQLException e) { storeProfile(connection, recipientId, null);
throw new RuntimeException("Failed update recipient store", e); storeProfileKey(connection, recipientId, null, false);
storeExpiringProfileKeyCredential(connection, recipientId, null);
deleteRecipient(connection, recipientId);
connection.commit();
} catch (SQLException e) {
throw new RuntimeException("Failed update recipient store", e);
}
} }
} }
@ -691,11 +702,17 @@ public class RecipientStore implements RecipientIdCreator, RecipientResolver, Re
} }
} }
if (pair.second().size() > 0) { if (!pair.second().isEmpty()) {
try (final var connection = database.getConnection()) { try (final var connection = database.getConnection()) {
for (final var toBeMergedRecipientId : pair.second()) { for (final var toBeMergedRecipientId : pair.second()) {
recipientMergeHandler.mergeRecipients(connection, pair.first(), toBeMergedRecipientId); recipientMergeHandler.mergeRecipients(connection, pair.first(), toBeMergedRecipientId);
deleteRecipient(connection, toBeMergedRecipientId); deleteRecipient(connection, toBeMergedRecipientId);
synchronized (recipientsLock) {
recipientAddressCache.entrySet()
.stream()
.filter(e -> e.getValue().id().equals(toBeMergedRecipientId))
.forEach(e -> recipientAddressCache.remove(e.getKey()));
}
} }
} catch (SQLException e) { } catch (SQLException e) {
throw new RuntimeException("Failed update recipient store", e); throw new RuntimeException("Failed update recipient store", e);
@ -789,37 +806,49 @@ public class RecipientStore implements RecipientIdCreator, RecipientResolver, Re
} }
private void removeRecipientAddress(Connection connection, RecipientId recipientId) throws SQLException { private void removeRecipientAddress(Connection connection, RecipientId recipientId) throws SQLException {
final var sql = ( synchronized (recipientsLock) {
""" recipientAddressCache.entrySet()
UPDATE %s .stream()
SET number = NULL, uuid = NULL, pni = NULL .filter(e -> e.getValue().id().equals(recipientId))
WHERE _id = ? .forEach(e -> recipientAddressCache.remove(e.getKey()));
""" final var sql = (
).formatted(TABLE_RECIPIENT); """
try (final var statement = connection.prepareStatement(sql)) { UPDATE %s
statement.setLong(1, recipientId.id()); SET number = NULL, uuid = NULL, pni = NULL
statement.executeUpdate(); WHERE _id = ?
"""
).formatted(TABLE_RECIPIENT);
try (final var statement = connection.prepareStatement(sql)) {
statement.setLong(1, recipientId.id());
statement.executeUpdate();
}
} }
} }
private void updateRecipientAddress( private void updateRecipientAddress(
Connection connection, RecipientId recipientId, final RecipientAddress address Connection connection, RecipientId recipientId, final RecipientAddress address
) throws SQLException { ) throws SQLException {
final var sql = ( synchronized (recipientsLock) {
""" recipientAddressCache.entrySet()
UPDATE %s .stream()
SET number = ?, uuid = ?, pni = ?, username = ? .filter(e -> e.getValue().id().equals(recipientId))
WHERE _id = ? .forEach(e -> recipientAddressCache.remove(e.getKey()));
""" final var sql = (
).formatted(TABLE_RECIPIENT); """
try (final var statement = connection.prepareStatement(sql)) { UPDATE %s
statement.setString(1, address.number().orElse(null)); SET number = ?, uuid = ?, pni = ?, username = ?
statement.setBytes(2, WHERE _id = ?
address.serviceId().map(ServiceId::getRawUuid).map(UuidUtil::toByteArray).orElse(null)); """
statement.setBytes(3, address.pni().map(PNI::getRawUuid).map(UuidUtil::toByteArray).orElse(null)); ).formatted(TABLE_RECIPIENT);
statement.setString(4, address.username().orElse(null)); try (final var statement = connection.prepareStatement(sql)) {
statement.setLong(5, recipientId.id()); statement.setString(1, address.number().orElse(null));
statement.executeUpdate(); statement.setBytes(2,
address.serviceId().map(ServiceId::getRawUuid).map(UuidUtil::toByteArray).orElse(null));
statement.setBytes(3, address.pni().map(PNI::getRawUuid).map(UuidUtil::toByteArray).orElse(null));
statement.setString(4, address.username().orElse(null));
statement.setLong(5, recipientId.id());
statement.executeUpdate();
}
} }
} }
@ -900,6 +929,10 @@ public class RecipientStore implements RecipientIdCreator, RecipientResolver, Re
private Optional<RecipientWithAddress> findByServiceId( private Optional<RecipientWithAddress> findByServiceId(
final Connection connection, final ServiceId serviceId final Connection connection, final ServiceId serviceId
) throws SQLException { ) throws SQLException {
var recipientWithAddress = Optional.ofNullable(recipientAddressCache.get(serviceId));
if (recipientWithAddress.isPresent()) {
return recipientWithAddress;
}
final var sql = """ final var sql = """
SELECT r._id, r.number, r.uuid, r.pni, r.username SELECT r._id, r.number, r.uuid, r.pni, r.username
FROM %s r FROM %s r
@ -908,7 +941,9 @@ public class RecipientStore implements RecipientIdCreator, RecipientResolver, Re
""".formatted(TABLE_RECIPIENT); """.formatted(TABLE_RECIPIENT);
try (final var statement = connection.prepareStatement(sql)) { try (final var statement = connection.prepareStatement(sql)) {
statement.setBytes(1, UuidUtil.toByteArray(serviceId.getRawUuid())); statement.setBytes(1, UuidUtil.toByteArray(serviceId.getRawUuid()));
return Utils.executeQueryForOptional(statement, this::getRecipientWithAddressFromResultSet); recipientWithAddress = Utils.executeQueryForOptional(statement, this::getRecipientWithAddressFromResultSet);
recipientWithAddress.ifPresent(r -> recipientAddressCache.put(serviceId, r));
return recipientWithAddress;
} }
} }