Improve performance when fetching multiple profiles

This commit is contained in:
AsamK 2022-01-15 18:18:40 +01:00
parent 365323f574
commit c8cc428e3f
2 changed files with 51 additions and 20 deletions

View file

@ -33,6 +33,7 @@ import java.util.List;
import java.util.Objects;
import java.util.Set;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.core.Maybe;
import io.reactivex.rxjava3.core.Single;
@ -59,16 +60,16 @@ public final class ProfileHelper {
}
public List<ProfileKeyCredential> getRecipientProfileKeyCredential(List<RecipientId> recipientIds) {
final var profileFetches = recipientIds.stream().map(recipientId -> {
var profileKeyCredential = account.getProfileStore().getProfileKeyCredential(recipientId);
if (profileKeyCredential != null) {
return null;
}
return retrieveProfile(recipientId,
SignalServiceProfile.RequestType.PROFILE_AND_CREDENTIAL).onErrorComplete();
}).filter(Objects::nonNull).toList();
Maybe.merge(profileFetches).blockingSubscribe();
try {
account.getRecipientStore().setBulkUpdating(true);
final var profileFetches = Flowable.fromIterable(recipientIds)
.filter(recipientId -> account.getProfileStore().getProfileKeyCredential(recipientId) == null)
.map(recipientId -> retrieveProfile(recipientId,
SignalServiceProfile.RequestType.PROFILE_AND_CREDENTIAL).onErrorComplete());
Maybe.merge(profileFetches, 10).blockingSubscribe();
} finally {
account.getRecipientStore().setBulkUpdating(false);
}
return recipientIds.stream().map(r -> account.getProfileStore().getProfileKeyCredential(r)).toList();
}
@ -158,15 +159,16 @@ public final class ProfileHelper {
}
public List<Profile> getRecipientProfile(List<RecipientId> recipientIds) {
final var profileFetches = recipientIds.stream().map(recipientId -> {
var profile = account.getProfileStore().getProfile(recipientId);
if (!isProfileRefreshRequired(profile)) {
return null;
}
return retrieveProfile(recipientId, SignalServiceProfile.RequestType.PROFILE).onErrorComplete();
}).filter(Objects::nonNull).toList();
Maybe.merge(profileFetches).blockingSubscribe();
try {
account.getRecipientStore().setBulkUpdating(true);
final var profileFetches = Flowable.fromIterable(recipientIds)
.filter(recipientId -> isProfileRefreshRequired(account.getProfileStore().getProfile(recipientId)))
.map(recipientId -> retrieveProfile(recipientId,
SignalServiceProfile.RequestType.PROFILE).onErrorComplete());
Maybe.merge(profileFetches, 10).blockingSubscribe();
} finally {
account.getRecipientStore().setBulkUpdating(false);
}
return recipientIds.stream().map(r -> account.getProfileStore().getProfile(r)).toList();
}
@ -215,6 +217,7 @@ public final class ProfileHelper {
) {
var profile = account.getProfileStore().getProfile(recipientId);
if (profile == null || !Objects.equals(avatarPath, profile.getAvatarUrlPath())) {
logger.trace("Downloading profile avatar for {}", recipientId);
downloadProfileAvatar(context.getRecipientHelper().resolveSignalServiceAddress(recipientId),
avatarPath,
profileKey);
@ -243,11 +246,17 @@ public final class ProfileHelper {
var unidentifiedAccess = getUnidentifiedAccess(recipientId);
var profileKey = Optional.fromNullable(account.getProfileStore().getProfileKey(recipientId));
logger.trace("Retrieving profile for {} {}",
recipientId,
profileKey.isPresent() ? "with profile key" : "without profile key");
final var address = context.getRecipientHelper().resolveSignalServiceAddress(recipientId);
return retrieveProfile(address, profileKey, unidentifiedAccess, requestType).doOnSuccess(p -> {
logger.trace("Got new profile for {}", recipientId);
final var encryptedProfile = p.getProfile();
if (requestType == SignalServiceProfile.RequestType.PROFILE_AND_CREDENTIAL) {
if (requestType == SignalServiceProfile.RequestType.PROFILE_AND_CREDENTIAL
|| account.getProfileStore().getProfileKeyCredential(recipientId) == null) {
logger.trace("Storing profile credential");
final var profileKeyCredential = p.getProfileKeyCredential().orNull();
account.getProfileStore().storeProfileKeyCredential(recipientId, profileKeyCredential);
}
@ -256,6 +265,7 @@ public final class ProfileHelper {
Profile newProfile = null;
if (profileKey.isPresent()) {
logger.trace("Decrypting profile");
newProfile = decryptProfileAndDownloadAvatar(recipientId, profileKey.get(), encryptedProfile);
}
@ -268,15 +278,18 @@ public final class ProfileHelper {
.build();
}
logger.trace("Storing profile");
account.getProfileStore().storeProfile(recipientId, newProfile);
try {
logger.trace("Storing identity");
var newIdentity = account.getIdentityKeyStore()
.saveIdentity(recipientId,
new IdentityKey(Base64.getDecoder().decode(encryptedProfile.getIdentityKey())),
new Date());
if (newIdentity) {
logger.trace("Archiving old sessions");
account.getSessionStore().archiveSessions(recipientId);
account.getSenderKeyStore().deleteSharedWith(recipientId);
}
@ -284,6 +297,7 @@ public final class ProfileHelper {
logger.warn("Got invalid identity key in profile for {}",
context.getRecipientHelper().resolveSignalServiceAddress(recipientId).getIdentifier());
}
logger.trace("Done handling retrieved profile");
}).doOnError(e -> {
logger.warn("Failed to retrieve profile, ignoring: {}", e.getMessage());
final var profile = account.getProfileStore().getProfile(recipientId);

View file

@ -47,6 +47,7 @@ public class RecipientStore implements RecipientResolver, ContactsStore, Profile
private final Map<Long, Long> recipientsMerged = new HashMap<>();
private long lastId;
private boolean isBulkUpdating;
public static RecipientStore load(File file, RecipientMergeHandler recipientMergeHandler) throws IOException {
final var objectMapper = Utils.createStorageObjectMapper();
@ -130,6 +131,19 @@ public class RecipientStore implements RecipientResolver, ContactsStore, Profile
this.lastId = lastId;
}
public boolean isBulkUpdating() {
return isBulkUpdating;
}
public void setBulkUpdating(final boolean bulkUpdating) {
isBulkUpdating = bulkUpdating;
if (!bulkUpdating) {
synchronized (recipients) {
saveLocked();
}
}
}
public RecipientAddress resolveRecipientAddress(RecipientId recipientId) {
synchronized (recipients) {
return getRecipient(recipientId).getAddress();
@ -483,6 +497,9 @@ public class RecipientStore implements RecipientResolver, ContactsStore, Profile
}
private void saveLocked() {
if (isBulkUpdating) {
return;
}
final var base64 = Base64.getEncoder();
var storage = new Storage(recipients.entrySet().stream().map(pair -> {
final var recipient = pair.getValue();