Paralellize profile fetching

This commit is contained in:
AsamK 2021-12-26 17:14:06 +01:00
parent 3b81ba3596
commit fba7a6a75c
8 changed files with 181 additions and 140 deletions

View file

@ -197,8 +197,7 @@ public class ManagerImpl implements Manager {
avatarStore,
unidentifiedAccessHelper::getAccessFor,
this::resolveSignalServiceAddress);
final GroupV2Helper groupV2Helper = new GroupV2Helper(profileHelper::getRecipientProfileKeyCredential,
profileHelper::getRecipientProfile,
final GroupV2Helper groupV2Helper = new GroupV2Helper(profileHelper,
account::getSelfRecipientId,
dependencies.getGroupsV2Operations(),
dependencies.getGroupsV2Api(),
@ -210,7 +209,7 @@ public class ManagerImpl implements Manager {
account.getRecipientStore(),
this::handleIdentityFailure,
this::getGroupInfo,
profileHelper::getRecipientProfile,
profileHelper,
this::refreshRegisteredUser);
this.groupHelper = new GroupHelper(account,
dependencies,

View file

@ -12,6 +12,7 @@ import org.asamk.signal.manager.storage.groups.GroupInfoV2;
import org.asamk.signal.manager.storage.recipients.Profile;
import org.asamk.signal.manager.storage.recipients.RecipientId;
import org.asamk.signal.manager.util.IOUtils;
import org.asamk.signal.manager.util.Utils;
import org.signal.storageservice.protos.groups.AccessControl;
import org.signal.storageservice.protos.groups.GroupChange;
import org.signal.storageservice.protos.groups.Member;
@ -44,6 +45,7 @@ import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Set;
import java.util.UUID;
@ -54,8 +56,7 @@ public class GroupV2Helper {
private final static Logger logger = LoggerFactory.getLogger(GroupV2Helper.class);
private final ProfileKeyCredentialProvider profileKeyCredentialProvider;
private final ProfileProvider profileProvider;
private final ProfileHelper profileHelper;
private final SelfRecipientIdProvider selfRecipientIdProvider;
private final GroupsV2Operations groupsV2Operations;
private final GroupsV2Api groupsV2Api;
@ -64,15 +65,13 @@ public class GroupV2Helper {
private HashMap<Integer, AuthCredentialResponse> groupApiCredentials;
public GroupV2Helper(
final ProfileKeyCredentialProvider profileKeyCredentialProvider,
final ProfileProvider profileProvider,
final ProfileHelper profileHelper,
final SelfRecipientIdProvider selfRecipientIdProvider,
final GroupsV2Operations groupsV2Operations,
final GroupsV2Api groupsV2Api,
final SignalServiceAddressResolver addressResolver
) {
this.profileKeyCredentialProvider = profileKeyCredentialProvider;
this.profileProvider = profileProvider;
this.profileHelper = profileHelper;
this.selfRecipientIdProvider = selfRecipientIdProvider;
this.groupsV2Operations = groupsV2Operations;
this.groupsV2Api = groupsV2Api;
@ -149,7 +148,7 @@ public class GroupV2Helper {
private GroupsV2Operations.NewGroup buildNewGroup(
String name, Set<RecipientId> members, byte[] avatar
) {
final var profileKeyCredential = profileKeyCredentialProvider.getProfileKeyCredential(selfRecipientIdProvider.getSelfRecipientId());
final var profileKeyCredential = profileHelper.getRecipientProfileKeyCredential(selfRecipientIdProvider.getSelfRecipientId());
if (profileKeyCredential == null) {
logger.warn("Cannot create a V2 group as self does not have a versioned profile");
return null;
@ -157,10 +156,14 @@ public class GroupV2Helper {
if (!areMembersValid(members)) return null;
var self = new GroupCandidate(getSelfAci().uuid(), Optional.fromNullable(profileKeyCredential));
var candidates = members.stream()
.map(member -> new GroupCandidate(addressResolver.resolveSignalServiceAddress(member).getAci().uuid(),
Optional.fromNullable(profileKeyCredentialProvider.getProfileKeyCredential(member))))
final var self = new GroupCandidate(getSelfAci().uuid(), Optional.fromNullable(profileKeyCredential));
final var memberList = new ArrayList<>(members);
final var credentials = profileHelper.getRecipientProfileKeyCredential(memberList).stream();
final var uuids = memberList.stream()
.map(member -> addressResolver.resolveSignalServiceAddress(member).getAci().uuid());
var candidates = Utils.zip(uuids,
credentials,
(uuid, credential) -> new GroupCandidate(uuid, Optional.fromNullable(credential)))
.collect(Collectors.toSet());
final var groupSecretParams = GroupSecretParams.generate();
@ -174,8 +177,8 @@ public class GroupV2Helper {
}
private boolean areMembersValid(final Set<RecipientId> members) {
final var noGv2Capability = members.stream()
.map(profileProvider::getProfile)
final var noGv2Capability = profileHelper.getRecipientProfile(new ArrayList<>(members))
.stream()
.filter(profile -> profile != null && !profile.getCapabilities().contains(Profile.Capability.gv2))
.collect(Collectors.toSet());
if (noGv2Capability.size() > 0) {
@ -221,9 +224,13 @@ public class GroupV2Helper {
throw new IOException("Failed to update group");
}
var candidates = newMembers.stream()
.map(member -> new GroupCandidate(addressResolver.resolveSignalServiceAddress(member).getAci().uuid(),
Optional.fromNullable(profileKeyCredentialProvider.getProfileKeyCredential(member))))
final var memberList = new ArrayList<>(newMembers);
final var credentials = profileHelper.getRecipientProfileKeyCredential(memberList).stream();
final var uuids = memberList.stream()
.map(member -> addressResolver.resolveSignalServiceAddress(member).getAci().uuid());
var candidates = Utils.zip(uuids,
credentials,
(uuid, credential) -> new GroupCandidate(uuid, Optional.fromNullable(credential)))
.collect(Collectors.toSet());
final var aci = getSelfAci();
@ -333,7 +340,7 @@ public class GroupV2Helper {
final var groupOperations = groupsV2Operations.forGroup(groupSecretParams);
final var selfRecipientId = this.selfRecipientIdProvider.getSelfRecipientId();
final var profileKeyCredential = profileKeyCredentialProvider.getProfileKeyCredential(selfRecipientId);
final var profileKeyCredential = profileHelper.getRecipientProfileKeyCredential(selfRecipientId);
if (profileKeyCredential == null) {
throw new IOException("Cannot join a V2 group as self does not have a versioned profile");
}
@ -352,7 +359,7 @@ public class GroupV2Helper {
final GroupsV2Operations.GroupOperations groupOperations = getGroupOperations(groupInfoV2);
final var selfRecipientId = this.selfRecipientIdProvider.getSelfRecipientId();
final var profileKeyCredential = profileKeyCredentialProvider.getProfileKeyCredential(selfRecipientId);
final var profileKeyCredential = profileHelper.getRecipientProfileKeyCredential(selfRecipientId);
if (profileKeyCredential == null) {
throw new IOException("Cannot join a V2 group as self does not have a versioned profile");
}

View file

@ -30,11 +30,11 @@ import java.io.OutputStream;
import java.nio.file.Files;
import java.util.Base64;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import io.reactivex.rxjava3.core.Maybe;
import io.reactivex.rxjava3.core.Single;
public final class ProfileHelper {
@ -69,33 +69,35 @@ public final class ProfileHelper {
getRecipientProfile(recipientId, true);
}
public List<ProfileKeyCredential> getRecipientProfileKeyCredential(List<RecipientId> recipientIds) {
final var profileFetches = recipientIds.stream().map(recipientId -> {
var profileKeyCredential = account.getProfileStore().getProfileKeyCredential(recipientId);
if (profileKeyCredential != null) {
return null;
}
return retrieveProfile(recipientId,
SignalServiceProfile.RequestType.PROFILE_AND_CREDENTIAL).onErrorComplete();
}).filter(Objects::nonNull).toList();
Maybe.merge(profileFetches).blockingSubscribe();
return recipientIds.stream().map(r -> account.getProfileStore().getProfileKeyCredential(r)).toList();
}
public ProfileKeyCredential getRecipientProfileKeyCredential(RecipientId recipientId) {
var profileKeyCredential = account.getProfileStore().getProfileKeyCredential(recipientId);
if (profileKeyCredential != null) {
return profileKeyCredential;
}
ProfileAndCredential profileAndCredential;
try {
profileAndCredential = retrieveProfileAndCredential(recipientId,
SignalServiceProfile.RequestType.PROFILE_AND_CREDENTIAL);
blockingGetProfile(retrieveProfile(recipientId, SignalServiceProfile.RequestType.PROFILE_AND_CREDENTIAL));
} catch (IOException e) {
logger.warn("Failed to retrieve profile key credential, ignoring: {}", e.getMessage());
return null;
}
profileKeyCredential = profileAndCredential.getProfileKeyCredential().orNull();
account.getProfileStore().storeProfileKeyCredential(recipientId, profileKeyCredential);
var profileKey = account.getProfileStore().getProfileKey(recipientId);
if (profileKey != null) {
final var profile = decryptProfileAndDownloadAvatar(recipientId,
profileKey,
profileAndCredential.getProfile());
account.getProfileStore().storeProfile(recipientId, profile);
}
return profileKeyCredential;
return account.getProfileStore().getProfileKeyCredential(recipientId);
}
/**
@ -164,73 +166,43 @@ public final class ProfileHelper {
account.getProfileStore().storeProfile(account.getSelfRecipientId(), newProfile);
}
private final Set<RecipientId> pendingProfileRequest = new HashSet<>();
public List<Profile> getRecipientProfile(List<RecipientId> recipientIds) {
final var profileFetches = recipientIds.stream().map(recipientId -> {
var profile = account.getProfileStore().getProfile(recipientId);
if (!isProfileRefreshRequired(profile)) {
return null;
}
return retrieveProfile(recipientId, SignalServiceProfile.RequestType.PROFILE).onErrorComplete();
}).filter(Objects::nonNull).toList();
Maybe.merge(profileFetches).blockingSubscribe();
return recipientIds.stream().map(r -> account.getProfileStore().getProfile(r)).toList();
}
private Profile getRecipientProfile(RecipientId recipientId, boolean force) {
var profile = account.getProfileStore().getProfile(recipientId);
var now = System.currentTimeMillis();
// Profiles are cached for 24h before retrieving them again, unless forced
if (!force && profile != null && now - profile.getLastUpdateTimestamp() < 6 * 60 * 60 * 1000) {
if (!force && !isProfileRefreshRequired(profile)) {
return profile;
}
synchronized (pendingProfileRequest) {
if (pendingProfileRequest.contains(recipientId)) {
return profile;
}
pendingProfileRequest.add(recipientId);
}
final SignalServiceProfile encryptedProfile;
try {
encryptedProfile = retrieveEncryptedProfile(recipientId);
} finally {
synchronized (pendingProfileRequest) {
pendingProfileRequest.remove(recipientId);
}
}
Profile newProfile = null;
if (encryptedProfile != null) {
var profileKey = account.getProfileStore().getProfileKey(recipientId);
if (profileKey != null) {
newProfile = decryptProfileAndDownloadAvatar(recipientId, profileKey, encryptedProfile);
if (newProfile == null) {
account.getProfileStore().storeProfileKey(recipientId, null);
}
}
if (newProfile == null) {
newProfile = (
profile == null ? Profile.newBuilder() : Profile.newBuilder(profile)
).withLastUpdateTimestamp(System.currentTimeMillis())
.withUnidentifiedAccessMode(ProfileUtils.getUnidentifiedAccessMode(encryptedProfile, null))
.withCapabilities(ProfileUtils.getCapabilities(encryptedProfile))
.build();
}
}
if (newProfile == null) {
newProfile = (
profile == null ? Profile.newBuilder() : Profile.newBuilder(profile)
).withLastUpdateTimestamp(now)
.withUnidentifiedAccessMode(Profile.UnidentifiedAccessMode.UNKNOWN)
.withCapabilities(Set.of())
.build();
}
account.getProfileStore().storeProfile(recipientId, newProfile);
return newProfile;
}
private SignalServiceProfile retrieveEncryptedProfile(RecipientId recipientId) {
try {
return retrieveProfileAndCredential(recipientId, SignalServiceProfile.RequestType.PROFILE).getProfile();
blockingGetProfile(retrieveProfile(recipientId, SignalServiceProfile.RequestType.PROFILE));
} catch (IOException e) {
logger.warn("Failed to retrieve profile, ignoring: {}", e.getMessage());
return null;
}
return account.getProfileStore().getProfile(recipientId);
}
private boolean isProfileRefreshRequired(final Profile profile) {
if (profile == null) {
return true;
}
// Profiles are cached for 6h before retrieving them again, unless forced
final var now = System.currentTimeMillis();
return now - profile.getLastUpdateTimestamp() >= 6 * 60 * 60 * 1000;
}
private SignalServiceProfile retrieveProfileSync(String username) throws IOException {
@ -238,29 +210,6 @@ public final class ProfileHelper {
return dependencies.getMessageReceiver().retrieveProfileByUsername(username, Optional.absent(), locale);
}
private ProfileAndCredential retrieveProfileAndCredential(
final RecipientId recipientId, final SignalServiceProfile.RequestType requestType
) throws IOException {
final var profileAndCredential = retrieveProfileSync(recipientId, requestType);
final var profile = profileAndCredential.getProfile();
try {
var newIdentity = account.getIdentityKeyStore()
.saveIdentity(recipientId,
new IdentityKey(Base64.getDecoder().decode(profile.getIdentityKey())),
new Date());
if (newIdentity) {
account.getSessionStore().archiveSessions(recipientId);
account.getSenderKeyStore().deleteSharedWith(recipientId);
}
} catch (InvalidKeyException ignored) {
logger.warn("Got invalid identity key in profile for {}",
addressResolver.resolveSignalServiceAddress(recipientId).getIdentifier());
}
return profileAndCredential;
}
private Profile decryptProfileAndDownloadAvatar(
final RecipientId recipientId, final ProfileKey profileKey, final SignalServiceProfile encryptedProfile
) {
@ -281,11 +230,9 @@ public final class ProfileHelper {
}
}
private ProfileAndCredential retrieveProfileSync(
RecipientId recipientId, SignalServiceProfile.RequestType requestType
) throws IOException {
private ProfileAndCredential blockingGetProfile(Single<ProfileAndCredential> profile) throws IOException {
try {
return retrieveProfile(recipientId, requestType).blockingGet();
return profile.blockingGet();
} catch (RuntimeException e) {
if (e.getCause() instanceof PushNetworkException) {
throw (PushNetworkException) e.getCause();
@ -304,7 +251,58 @@ public final class ProfileHelper {
var profileKey = Optional.fromNullable(account.getProfileStore().getProfileKey(recipientId));
final var address = addressResolver.resolveSignalServiceAddress(recipientId);
return retrieveProfile(address, profileKey, unidentifiedAccess, requestType);
return retrieveProfile(address, profileKey, unidentifiedAccess, requestType).doOnSuccess(p -> {
final var encryptedProfile = p.getProfile();
if (requestType == SignalServiceProfile.RequestType.PROFILE_AND_CREDENTIAL) {
final var profileKeyCredential = p.getProfileKeyCredential().orNull();
account.getProfileStore().storeProfileKeyCredential(recipientId, profileKeyCredential);
}
final var profile = account.getProfileStore().getProfile(recipientId);
Profile newProfile = null;
if (profileKey.isPresent()) {
newProfile = decryptProfileAndDownloadAvatar(recipientId, profileKey.get(), encryptedProfile);
}
if (newProfile == null) {
newProfile = (
profile == null ? Profile.newBuilder() : Profile.newBuilder(profile)
).withLastUpdateTimestamp(System.currentTimeMillis())
.withUnidentifiedAccessMode(ProfileUtils.getUnidentifiedAccessMode(encryptedProfile, null))
.withCapabilities(ProfileUtils.getCapabilities(encryptedProfile))
.build();
}
account.getProfileStore().storeProfile(recipientId, newProfile);
try {
var newIdentity = account.getIdentityKeyStore()
.saveIdentity(recipientId,
new IdentityKey(Base64.getDecoder().decode(encryptedProfile.getIdentityKey())),
new Date());
if (newIdentity) {
account.getSessionStore().archiveSessions(recipientId);
account.getSenderKeyStore().deleteSharedWith(recipientId);
}
} catch (InvalidKeyException ignored) {
logger.warn("Got invalid identity key in profile for {}",
addressResolver.resolveSignalServiceAddress(recipientId).getIdentifier());
}
}).doOnError(e -> {
logger.warn("Failed to retrieve profile, ignoring: {}", e.getMessage());
final var profile = account.getProfileStore().getProfile(recipientId);
final var newProfile = (
profile == null ? Profile.newBuilder() : Profile.newBuilder(profile)
).withLastUpdateTimestamp(System.currentTimeMillis())
.withUnidentifiedAccessMode(Profile.UnidentifiedAccessMode.UNKNOWN)
.withCapabilities(Set.of())
.build();
account.getProfileStore().storeProfile(recipientId, newProfile);
});
}
private Single<ProfileAndCredential> retrieveProfile(
@ -376,7 +374,7 @@ public final class ProfileHelper {
}
private Optional<UnidentifiedAccess> getUnidentifiedAccess(RecipientId recipientId) {
var unidentifiedAccess = unidentifiedAccessProvider.getAccessFor(recipientId);
var unidentifiedAccess = unidentifiedAccessProvider.getAccessFor(recipientId, true);
if (unidentifiedAccess.isPresent()) {
return unidentifiedAccess.get().getTargetUnidentifiedAccess();

View file

@ -57,7 +57,7 @@ public class SendHelper {
private final RecipientResolver recipientResolver;
private final IdentityFailureHandler identityFailureHandler;
private final GroupProvider groupProvider;
private final ProfileProvider profileProvider;
private final ProfileHelper profileHelper;
private final RecipientRegistrationRefresher recipientRegistrationRefresher;
public SendHelper(
@ -68,7 +68,7 @@ public class SendHelper {
final RecipientResolver recipientResolver,
final IdentityFailureHandler identityFailureHandler,
final GroupProvider groupProvider,
final ProfileProvider profileProvider,
final ProfileHelper profileHelper,
final RecipientRegistrationRefresher recipientRegistrationRefresher
) {
this.account = account;
@ -78,7 +78,7 @@ public class SendHelper {
this.recipientResolver = recipientResolver;
this.identityFailureHandler = identityFailureHandler;
this.groupProvider = groupProvider;
this.profileProvider = profileProvider;
this.profileHelper = profileHelper;
this.recipientRegistrationRefresher = recipientRegistrationRefresher;
}
@ -356,16 +356,17 @@ public class SendHelper {
}
private Set<RecipientId> getSenderKeyCapableRecipientIds(final Set<RecipientId> recipientIds) {
final var selfProfile = profileProvider.getProfile(account.getSelfRecipientId());
final var selfProfile = profileHelper.getRecipientProfile(account.getSelfRecipientId());
if (selfProfile == null || !selfProfile.getCapabilities().contains(Profile.Capability.senderKey)) {
logger.debug("Not all of our devices support sender key. Using legacy.");
return Set.of();
}
final var senderKeyTargets = new HashSet<RecipientId>();
for (final var recipientId : recipientIds) {
// TODO filter out unregistered
final var profile = profileProvider.getProfile(recipientId);
final var recipientList = new ArrayList<>(recipientIds);
final var profiles = profileHelper.getRecipientProfile(recipientList).iterator();
for (final var recipientId : recipientList) {
final var profile = profiles.next();
if (profile == null || !profile.getCapabilities().contains(Profile.Capability.senderKey)) {
continue;
}
@ -433,8 +434,8 @@ public class SendHelper {
List<SignalServiceAddress> addresses = recipientIdList.stream()
.map(addressResolver::resolveSignalServiceAddress)
.collect(Collectors.toList());
List<UnidentifiedAccess> unidentifiedAccesses = recipientIdList.stream()
.map(unidentifiedAccessHelper::getAccessFor)
List<UnidentifiedAccess> unidentifiedAccesses = unidentifiedAccessHelper.getAccessFor(recipientIdList)
.stream()
.map(Optional::get)
.map(UnidentifiedAccessPair::getTargetUnidentifiedAccess)
.map(Optional::get)

View file

@ -7,6 +7,7 @@ import org.asamk.signal.manager.storage.recipients.Profile;
import org.asamk.signal.manager.storage.recipients.RecipientId;
import org.signal.libsignal.metadata.certificate.InvalidCertificateException;
import org.signal.libsignal.metadata.certificate.SenderCertificate;
import org.signal.zkgroup.profiles.ProfileKey;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.libsignal.util.guava.Optional;
@ -89,8 +90,10 @@ public class UnidentifiedAccessHelper {
}
}
private byte[] getSelfUnidentifiedAccessKey() {
var selfProfile = profileProvider.getProfile(account.getSelfRecipientId());
private byte[] getSelfUnidentifiedAccessKey(boolean noRefresh) {
var selfProfile = noRefresh
? account.getProfileStore().getProfile(account.getSelfRecipientId())
: profileProvider.getProfile(account.getSelfRecipientId());
if (selfProfile != null
&& selfProfile.getUnidentifiedAccessMode() == Profile.UnidentifiedAccessMode.UNRESTRICTED) {
return createUnrestrictedUnidentifiedAccess();
@ -98,15 +101,23 @@ public class UnidentifiedAccessHelper {
return UnidentifiedAccess.deriveAccessKeyFrom(selfProfileKeyProvider.getProfileKey());
}
public byte[] getTargetUnidentifiedAccessKey(RecipientId recipient) {
var targetProfile = profileProvider.getProfile(recipient);
private byte[] getTargetUnidentifiedAccessKey(RecipientId recipientId, boolean noRefresh) {
var targetProfile = noRefresh
? account.getProfileStore().getProfile(recipientId)
: profileProvider.getProfile(recipientId);
if (targetProfile == null) {
return null;
}
var theirProfileKey = account.getProfileStore().getProfileKey(recipientId);
return getTargetUnidentifiedAccessKey(targetProfile, theirProfileKey);
}
private static byte[] getTargetUnidentifiedAccessKey(
final Profile targetProfile, final ProfileKey theirProfileKey
) {
switch (targetProfile.getUnidentifiedAccessMode()) {
case ENABLED:
var theirProfileKey = account.getProfileStore().getProfileKey(recipient);
if (theirProfileKey == null) {
return null;
}
@ -120,7 +131,7 @@ public class UnidentifiedAccessHelper {
}
public Optional<UnidentifiedAccessPair> getAccessForSync() {
var selfUnidentifiedAccessKey = getSelfUnidentifiedAccessKey();
var selfUnidentifiedAccessKey = getSelfUnidentifiedAccessKey(false);
var selfUnidentifiedAccessCertificate = getSenderCertificate();
if (selfUnidentifiedAccessKey == null || selfUnidentifiedAccessCertificate == null) {
@ -141,12 +152,16 @@ public class UnidentifiedAccessHelper {
}
public Optional<UnidentifiedAccessPair> getAccessFor(RecipientId recipient) {
var recipientUnidentifiedAccessKey = getTargetUnidentifiedAccessKey(recipient);
return getAccessFor(recipient, false);
}
public Optional<UnidentifiedAccessPair> getAccessFor(RecipientId recipient, boolean noRefresh) {
var recipientUnidentifiedAccessKey = getTargetUnidentifiedAccessKey(recipient, noRefresh);
if (recipientUnidentifiedAccessKey == null) {
return Optional.absent();
}
var selfUnidentifiedAccessKey = getSelfUnidentifiedAccessKey();
var selfUnidentifiedAccessKey = getSelfUnidentifiedAccessKey(noRefresh);
var selfUnidentifiedAccessCertificate = getSenderCertificateFor(recipient);
if (selfUnidentifiedAccessKey == null || selfUnidentifiedAccessCertificate == null) {
return Optional.absent();

View file

@ -6,5 +6,5 @@ import org.whispersystems.signalservice.api.crypto.UnidentifiedAccessPair;
public interface UnidentifiedAccessProvider {
Optional<UnidentifiedAccessPair> getAccessFor(RecipientId recipientId);
Optional<UnidentifiedAccessPair> getAccessFor(RecipientId recipientId, boolean noRefresh);
}

View file

@ -16,6 +16,12 @@ import java.io.InputStream;
import java.net.URLConnection;
import java.nio.file.Files;
import java.util.Locale;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
public class Utils {
@ -88,4 +94,16 @@ public class Utils {
return locale;
}
public static <L, R, T> Stream<T> zip(Stream<L> leftStream, Stream<R> rightStream, BiFunction<L, R, T> combiner) {
Spliterator<L> lefts = leftStream.spliterator();
Spliterator<R> rights = rightStream.spliterator();
return StreamSupport.stream(new Spliterators.AbstractSpliterator<T>(Long.min(lefts.estimateSize(),
rights.estimateSize()), lefts.characteristics() & rights.characteristics()) {
@Override
public boolean tryAdvance(Consumer<? super T> action) {
return lefts.tryAdvance(left -> rights.tryAdvance(right -> action.accept(combiner.apply(left, right))));
}
}, leftStream.isParallel() || rightStream.isParallel());
}
}