Skip to content

Commit

Permalink
Apply snapshot ?after filter inline (#107003)
Browse files Browse the repository at this point in the history
In `TransportGetSnapshotsAction` today we build a list of all candidate
snapshots and then copy them into another list to apply the `?after`
filter. With this commit we construct the final filtered list directly.
  • Loading branch information
DaveCTurner authored Apr 4, 2024
1 parent 9a2f8a8 commit daa9006
Showing 1 changed file with 52 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -179,8 +179,18 @@ private class GetSnapshotsOperation {
// results
private final Map<String, ElasticsearchException> failuresByRepository = ConcurrentCollections.newConcurrentMap();
private final Queue<List<SnapshotInfo>> allSnapshotInfos = ConcurrentCollections.newQueue();

/**
* Accumulates number of snapshots that match the name/fromSortValue/slmPolicy predicates, to be returned in the response.
*/
private final AtomicInteger totalCount = new AtomicInteger();

/**
* Accumulates the number of snapshots that match the name/fromSortValue/slmPolicy/after predicates, for sizing the final result
* list.
*/
private final AtomicInteger resultsCount = new AtomicInteger();

GetSnapshotsOperation(
CancellableTask cancellableTask,
ResolvedRepositories resolvedRepositories,
Expand Down Expand Up @@ -261,7 +271,7 @@ void getMultipleReposSnapshotInfo(ActionListener<GetSnapshotsResponse> listener)
}
})

.addListener(listener.map(ignored -> buildResponse()));
.addListener(listener.map(ignored -> buildResponse()), executor, threadPool.getThreadContext());
}

private boolean skipRepository(String repositoryName) {
Expand Down Expand Up @@ -306,7 +316,7 @@ private void loadSnapshotInfos(String repo, @Nullable RepositoryData repositoryD
}

if (verbose) {
snapshots(repo, toResolve.stream().map(Snapshot::getSnapshotId).toList(), listener);
loadSnapshotInfos(repo, toResolve.stream().map(Snapshot::getSnapshotId).toList(), listener);
} else {
assert fromSortValuePredicates.isMatchAll() : "filtering is not supported in non-verbose mode";
assert slmPolicyPredicate == SlmPolicyPredicate.MATCH_ALL_POLICIES : "filtering is not supported in non-verbose mode";
Expand All @@ -321,10 +331,11 @@ private void loadSnapshotInfos(String repo, @Nullable RepositoryData repositoryD
}
}

private void snapshots(String repositoryName, Collection<SnapshotId> snapshotIds, ActionListener<Void> listener) {
private void loadSnapshotInfos(String repositoryName, Collection<SnapshotId> snapshotIds, ActionListener<Void> listener) {
if (cancellableTask.notifyIfCancelled(listener)) {
return;
}
final AtomicInteger repositoryTotalCount = new AtomicInteger();
final List<SnapshotInfo> snapshots = new ArrayList<>(snapshotIds.size());
final Set<SnapshotId> snapshotIdsToIterate = new HashSet<>(snapshotIds);
// first, look at the snapshots in progress
Expand All @@ -337,7 +348,10 @@ private void snapshots(String repositoryName, Collection<SnapshotId> snapshotIds
if (snapshotIdsToIterate.remove(entry.snapshot().getSnapshotId())) {
final SnapshotInfo snapshotInfo = SnapshotInfo.inProgress(entry);
if (matchesPredicates(snapshotInfo)) {
snapshots.add(snapshotInfo.maybeWithoutIndices(indices));
repositoryTotalCount.incrementAndGet();
if (afterPredicate.test(snapshotInfo)) {
snapshots.add(snapshotInfo.maybeWithoutIndices(indices));
}
}
}
}
Expand Down Expand Up @@ -372,7 +386,10 @@ private void snapshots(String repositoryName, Collection<SnapshotId> snapshotIds
@Override
public void onResponse(SnapshotInfo snapshotInfo) {
if (matchesPredicates(snapshotInfo)) {
syncSnapshots.add(snapshotInfo.maybeWithoutIndices(indices));
repositoryTotalCount.incrementAndGet();
if (afterPredicate.test(snapshotInfo)) {
syncSnapshots.add(snapshotInfo.maybeWithoutIndices(indices));
}
}
refListener.onResponse(null);
}
Expand All @@ -398,11 +415,16 @@ public void onFailure(Exception e) {
}
})

.addListener(listener.safeMap(v -> {
// no need to synchronize access to snapshots: Repository#getSnapshotInfo fails fast but we're on the success path here
applyAfterPredicateAndAdd(snapshots);
return null;
}), executor, threadPool.getThreadContext());
// no need to synchronize access to snapshots: Repository#getSnapshotInfo fails fast but we're on the success path here
.andThenAccept(ignored -> addResults(repositoryTotalCount.get(), snapshots))

.addListener(listener);
}

private void addResults(int repositoryTotalCount, List<SnapshotInfo> snapshots) {
totalCount.addAndGet(repositoryTotalCount);
resultsCount.addAndGet(snapshots.size());
allSnapshotInfos.add(snapshots);
}

private void addSimpleSnapshotInfos(
Expand All @@ -413,15 +435,19 @@ private void addSimpleSnapshotInfos(
) {
if (repositoryData == null) {
// only want current snapshots
applyAfterPredicateAndAdd(currentSnapshots);
addResults(currentSnapshots.size(), currentSnapshots.stream().filter(afterPredicate).toList());
return;
} // else want non-current snapshots as well, which are found in the repository data

List<SnapshotInfo> snapshotInfos = new ArrayList<>();
List<SnapshotInfo> snapshotInfos = new ArrayList<>(currentSnapshots.size() + toResolve.size());
int repositoryTotalCount = 0;
for (SnapshotInfo snapshotInfo : currentSnapshots) {
assert snapshotInfo.startTime() == 0L && snapshotInfo.endTime() == 0L && snapshotInfo.totalShards() == 0L : snapshotInfo;
if (toResolve.remove(snapshotInfo.snapshot())) {
snapshotInfos.add(snapshotInfo);
repositoryTotalCount += 1;
if (afterPredicate.test(snapshotInfo)) {
snapshotInfos.add(snapshotInfo);
}
}
}
Map<SnapshotId, List<String>> snapshotsToIndices = new HashMap<>();
Expand All @@ -435,22 +461,19 @@ private void addSimpleSnapshotInfos(
}
}
for (Snapshot snapshot : toResolve) {
snapshotInfos.add(
new SnapshotInfo(
snapshot,
snapshotsToIndices.getOrDefault(snapshot.getSnapshotId(), Collections.emptyList()),
Collections.emptyList(),
Collections.emptyList(),
repositoryData.getSnapshotState(snapshot.getSnapshotId())
)
final var snapshotInfo = new SnapshotInfo(
snapshot,
snapshotsToIndices.getOrDefault(snapshot.getSnapshotId(), Collections.emptyList()),
Collections.emptyList(),
Collections.emptyList(),
repositoryData.getSnapshotState(snapshot.getSnapshotId())
);
repositoryTotalCount += 1;
if (afterPredicate.test(snapshotInfo)) {
snapshotInfos.add(snapshotInfo);
}
}
applyAfterPredicateAndAdd(snapshotInfos);
}

private void applyAfterPredicateAndAdd(List<SnapshotInfo> snapshotInfos) {
allSnapshotInfos.add(snapshotInfos.stream().filter(afterPredicate).toList());
totalCount.addAndGet(snapshotInfos.size());
addResults(repositoryTotalCount, snapshotInfos);
}

private GetSnapshotsResponse buildResponse() {
Expand All @@ -463,11 +486,10 @@ private GetSnapshotsResponse buildResponse() {
.sorted(sortBy.getSnapshotInfoComparator(order))
.skip(offset);
final List<SnapshotInfo> snapshotInfos;
if (size == GetSnapshotsRequest.NO_LIMIT) {
if (size == GetSnapshotsRequest.NO_LIMIT || resultsCount.get() <= size) {
snapshotInfos = resultsStream.toList();
} else {
final var allocateSize = Math.min(size, 1000); // ignore excessively-large sizes in request params
snapshotInfos = new ArrayList<>(allocateSize);
snapshotInfos = new ArrayList<>(size);
for (var iterator = resultsStream.iterator(); iterator.hasNext();) {
final var snapshotInfo = iterator.next();
if (snapshotInfos.size() < size) {
Expand Down

0 comments on commit daa9006

Please sign in to comment.