From daa90069305cdf5c837c5cfd404fd88464956ee5 Mon Sep 17 00:00:00 2001 From: David Turner Date: Thu, 4 Apr 2024 08:17:14 +0100 Subject: [PATCH] Apply snapshot `?after` filter inline (#107003) In `TransportGetSnapshotsAction` today we build a list of all candidate snapshots and then copy them into another list to apply the `?after` filter. With this commit we construct the final filtered list directly. --- .../get/TransportGetSnapshotsAction.java | 82 ++++++++++++------- 1 file changed, 52 insertions(+), 30 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/get/TransportGetSnapshotsAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/get/TransportGetSnapshotsAction.java index cb4942cc0efb8..190c4c565f1b7 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/get/TransportGetSnapshotsAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/get/TransportGetSnapshotsAction.java @@ -179,8 +179,18 @@ private class GetSnapshotsOperation { // results private final Map failuresByRepository = ConcurrentCollections.newConcurrentMap(); private final Queue> allSnapshotInfos = ConcurrentCollections.newQueue(); + + /** + * Accumulates number of snapshots that match the name/fromSortValue/slmPolicy predicates, to be returned in the response. + */ private final AtomicInteger totalCount = new AtomicInteger(); + /** + * Accumulates the number of snapshots that match the name/fromSortValue/slmPolicy/after predicates, for sizing the final result + * list. + */ + private final AtomicInteger resultsCount = new AtomicInteger(); + GetSnapshotsOperation( CancellableTask cancellableTask, ResolvedRepositories resolvedRepositories, @@ -261,7 +271,7 @@ void getMultipleReposSnapshotInfo(ActionListener listener) } }) - .addListener(listener.map(ignored -> buildResponse())); + .addListener(listener.map(ignored -> buildResponse()), executor, threadPool.getThreadContext()); } private boolean skipRepository(String repositoryName) { @@ -306,7 +316,7 @@ private void loadSnapshotInfos(String repo, @Nullable RepositoryData repositoryD } if (verbose) { - snapshots(repo, toResolve.stream().map(Snapshot::getSnapshotId).toList(), listener); + loadSnapshotInfos(repo, toResolve.stream().map(Snapshot::getSnapshotId).toList(), listener); } else { assert fromSortValuePredicates.isMatchAll() : "filtering is not supported in non-verbose mode"; assert slmPolicyPredicate == SlmPolicyPredicate.MATCH_ALL_POLICIES : "filtering is not supported in non-verbose mode"; @@ -321,10 +331,11 @@ private void loadSnapshotInfos(String repo, @Nullable RepositoryData repositoryD } } - private void snapshots(String repositoryName, Collection snapshotIds, ActionListener listener) { + private void loadSnapshotInfos(String repositoryName, Collection snapshotIds, ActionListener listener) { if (cancellableTask.notifyIfCancelled(listener)) { return; } + final AtomicInteger repositoryTotalCount = new AtomicInteger(); final List snapshots = new ArrayList<>(snapshotIds.size()); final Set snapshotIdsToIterate = new HashSet<>(snapshotIds); // first, look at the snapshots in progress @@ -337,7 +348,10 @@ private void snapshots(String repositoryName, Collection snapshotIds if (snapshotIdsToIterate.remove(entry.snapshot().getSnapshotId())) { final SnapshotInfo snapshotInfo = SnapshotInfo.inProgress(entry); if (matchesPredicates(snapshotInfo)) { - snapshots.add(snapshotInfo.maybeWithoutIndices(indices)); + repositoryTotalCount.incrementAndGet(); + if (afterPredicate.test(snapshotInfo)) { + snapshots.add(snapshotInfo.maybeWithoutIndices(indices)); + } } } } @@ -372,7 +386,10 @@ private void snapshots(String repositoryName, Collection snapshotIds @Override public void onResponse(SnapshotInfo snapshotInfo) { if (matchesPredicates(snapshotInfo)) { - syncSnapshots.add(snapshotInfo.maybeWithoutIndices(indices)); + repositoryTotalCount.incrementAndGet(); + if (afterPredicate.test(snapshotInfo)) { + syncSnapshots.add(snapshotInfo.maybeWithoutIndices(indices)); + } } refListener.onResponse(null); } @@ -398,11 +415,16 @@ public void onFailure(Exception e) { } }) - .addListener(listener.safeMap(v -> { - // no need to synchronize access to snapshots: Repository#getSnapshotInfo fails fast but we're on the success path here - applyAfterPredicateAndAdd(snapshots); - return null; - }), executor, threadPool.getThreadContext()); + // no need to synchronize access to snapshots: Repository#getSnapshotInfo fails fast but we're on the success path here + .andThenAccept(ignored -> addResults(repositoryTotalCount.get(), snapshots)) + + .addListener(listener); + } + + private void addResults(int repositoryTotalCount, List snapshots) { + totalCount.addAndGet(repositoryTotalCount); + resultsCount.addAndGet(snapshots.size()); + allSnapshotInfos.add(snapshots); } private void addSimpleSnapshotInfos( @@ -413,15 +435,19 @@ private void addSimpleSnapshotInfos( ) { if (repositoryData == null) { // only want current snapshots - applyAfterPredicateAndAdd(currentSnapshots); + addResults(currentSnapshots.size(), currentSnapshots.stream().filter(afterPredicate).toList()); return; } // else want non-current snapshots as well, which are found in the repository data - List snapshotInfos = new ArrayList<>(); + List snapshotInfos = new ArrayList<>(currentSnapshots.size() + toResolve.size()); + int repositoryTotalCount = 0; for (SnapshotInfo snapshotInfo : currentSnapshots) { assert snapshotInfo.startTime() == 0L && snapshotInfo.endTime() == 0L && snapshotInfo.totalShards() == 0L : snapshotInfo; if (toResolve.remove(snapshotInfo.snapshot())) { - snapshotInfos.add(snapshotInfo); + repositoryTotalCount += 1; + if (afterPredicate.test(snapshotInfo)) { + snapshotInfos.add(snapshotInfo); + } } } Map> snapshotsToIndices = new HashMap<>(); @@ -435,22 +461,19 @@ private void addSimpleSnapshotInfos( } } for (Snapshot snapshot : toResolve) { - snapshotInfos.add( - new SnapshotInfo( - snapshot, - snapshotsToIndices.getOrDefault(snapshot.getSnapshotId(), Collections.emptyList()), - Collections.emptyList(), - Collections.emptyList(), - repositoryData.getSnapshotState(snapshot.getSnapshotId()) - ) + final var snapshotInfo = new SnapshotInfo( + snapshot, + snapshotsToIndices.getOrDefault(snapshot.getSnapshotId(), Collections.emptyList()), + Collections.emptyList(), + Collections.emptyList(), + repositoryData.getSnapshotState(snapshot.getSnapshotId()) ); + repositoryTotalCount += 1; + if (afterPredicate.test(snapshotInfo)) { + snapshotInfos.add(snapshotInfo); + } } - applyAfterPredicateAndAdd(snapshotInfos); - } - - private void applyAfterPredicateAndAdd(List snapshotInfos) { - allSnapshotInfos.add(snapshotInfos.stream().filter(afterPredicate).toList()); - totalCount.addAndGet(snapshotInfos.size()); + addResults(repositoryTotalCount, snapshotInfos); } private GetSnapshotsResponse buildResponse() { @@ -463,11 +486,10 @@ private GetSnapshotsResponse buildResponse() { .sorted(sortBy.getSnapshotInfoComparator(order)) .skip(offset); final List snapshotInfos; - if (size == GetSnapshotsRequest.NO_LIMIT) { + if (size == GetSnapshotsRequest.NO_LIMIT || resultsCount.get() <= size) { snapshotInfos = resultsStream.toList(); } else { - final var allocateSize = Math.min(size, 1000); // ignore excessively-large sizes in request params - snapshotInfos = new ArrayList<>(allocateSize); + snapshotInfos = new ArrayList<>(size); for (var iterator = resultsStream.iterator(); iterator.hasNext();) { final var snapshotInfo = iterator.next(); if (snapshotInfos.size() < size) {