Skip to content

Commit

Permalink
Simpler shard results collection in SnapshotsDeletion (elastic#100767)
Browse files Browse the repository at this point in the history
No need for a `GroupedActionListener` to build a list of all the shard
results per index, which are then copied them to another list, stored in
another per-index `GroupedActionListener`, copied again, and finally
concatenated into the final list. This commit computes the resulting
list directly.
  • Loading branch information
DaveCTurner authored Oct 13, 2023
1 parent 6d96998 commit b517541
Showing 1 changed file with 41 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1013,6 +1013,21 @@ private record ShardSnapshotMetaDeleteResult(
Collection<String> blobsToDelete
) {}

/**
* <p>
* Shard-level results, see {@link ShardSnapshotMetaDeleteResult}.
* </p>
* <p>
* Writes to this list are all synchronized (via {@link #addShardDeleteResult}), and happen-before it is read so the reads need
* no further synchronization
* </p>
*/
private final List<ShardSnapshotMetaDeleteResult> shardDeleteResults = new ArrayList<>();

private synchronized void addShardDeleteResult(ShardSnapshotMetaDeleteResult shardDeleteResult) {
shardDeleteResults.add(shardDeleteResult);
}

// ---------------------------------------------------------------------------------------------------------------------------------
// The overall flow of execution

Expand All @@ -1026,8 +1041,7 @@ void runDelete(SnapshotDeleteListener listener) {

private void runWithUniqueShardMetadataNaming(SnapshotDeleteListener listener) {
// First write the new shard state metadata (with the removed snapshot) and compute deletion targets
final ListenableFuture<Collection<ShardSnapshotMetaDeleteResult>> writeShardMetaDataAndComputeDeletesStep =
new ListenableFuture<>();
final ListenableFuture<Void> writeShardMetaDataAndComputeDeletesStep = new ListenableFuture<>();
writeUpdatedShardMetadataAndComputeDeletes(writeShardMetaDataAndComputeDeletesStep);
// Once we have put the new shard-level metadata into place, we can update the repository metadata as follows:
// 1. Remove the snapshots from the list of existing snapshots
Expand All @@ -1037,7 +1051,7 @@ private void runWithUniqueShardMetadataNaming(SnapshotDeleteListener listener) {
// index-${gen_uuid} will not be referenced by the existing RepositoryData and new RepositoryData is only
// written if all shard paths have been successfully updated.
final ListenableFuture<RepositoryData> writeUpdatedRepoDataStep = new ListenableFuture<>();
writeShardMetaDataAndComputeDeletesStep.addListener(ActionListener.wrap(shardDeleteResults -> {
writeShardMetaDataAndComputeDeletesStep.addListener(ActionListener.wrap(ignored -> {
final ShardGenerations.Builder builder = ShardGenerations.builder();
for (ShardSnapshotMetaDeleteResult newGen : shardDeleteResults) {
builder.put(newGen.indexId, newGen.shardId, newGen.newGeneration);
Expand All @@ -1057,7 +1071,7 @@ private void runWithUniqueShardMetadataNaming(SnapshotDeleteListener listener) {
// Run unreferenced blobs cleanup in parallel to shard-level snapshot deletion
try (var refs = new RefCountingRunnable(listener::onDone)) {
cleanupUnlinkedRootAndIndicesBlobs(newRepositoryData, refs.acquireListener().map(ignored -> null));
cleanupUnlinkedShardLevelBlobs(writeShardMetaDataAndComputeDeletesStep.result(), refs.acquireListener());
cleanupUnlinkedShardLevelBlobs(shardDeleteResults, refs.acquireListener());
}
}, listener::onFailure));
}
Expand All @@ -1082,7 +1096,7 @@ private void runWithLegacyNumericShardMetadataNaming(SnapshotDeleteListener list
ActionRunnable.wrap(
refs.acquireListener(),
l0 -> writeUpdatedShardMetadataAndComputeDeletes(
l0.delegateFailure((l, shardDeleteResults) -> cleanupUnlinkedShardLevelBlobs(shardDeleteResults, l))
l0.delegateFailure((l, ignored) -> cleanupUnlinkedShardLevelBlobs(shardDeleteResults, l))
)
)
);
Expand Down Expand Up @@ -1122,26 +1136,11 @@ void runCleanup(ActionListener<RepositoryCleanupResult> listener) {
// ---------------------------------------------------------------------------------------------------------------------------------
// Updating the shard-level metadata and accumulating results

// updates the shard state metadata for shards of a snapshot that is to be deleted. Also computes the files to be cleaned up.
private void writeUpdatedShardMetadataAndComputeDeletes(
ActionListener<Collection<ShardSnapshotMetaDeleteResult>> onAllShardsCompleted
) {

final List<IndexId> indices = originalRepositoryData.indicesToUpdateAfterRemovingSnapshot(snapshotIds);

if (indices.isEmpty()) {
onAllShardsCompleted.onResponse(Collections.emptyList());
return;
}

// Listener that flattens out the delete results for each index
final ActionListener<Collection<ShardSnapshotMetaDeleteResult>> deleteIndexMetadataListener = new GroupedActionListener<>(
indices.size(),
onAllShardsCompleted.map(res -> res.stream().flatMap(Collection::stream).toList())
);

for (IndexId indexId : indices) {
new IndexSnapshotsDeletion(indexId).run(deleteIndexMetadataListener);
private void writeUpdatedShardMetadataAndComputeDeletes(ActionListener<Void> listener) {
try (var listeners = new RefCountingListener(listener)) {
for (IndexId indexId : originalRepositoryData.indicesToUpdateAfterRemovingSnapshot(snapshotIds)) {
new IndexSnapshotsDeletion(indexId).run(listeners.acquire());
}
}
}

Expand All @@ -1163,8 +1162,8 @@ private synchronized void updateShardCount(int newShardCount) {
shardCount = Math.max(shardCount, newShardCount);
}

void run(ActionListener<Collection<ShardSnapshotMetaDeleteResult>> shardResultsListener) {
determineShardCount(shardResultsListener.delegateFailureAndWrap((l, v) -> processShards(l)));
void run(ActionListener<Void> listener) {
determineShardCount(listener.delegateFailureAndWrap((l, v) -> processShards(l)));
}

// -----------------------------------------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -1203,14 +1202,14 @@ private void getOneShardCount(String indexMetaGeneration) {
}
}

private void processShards(ActionListener<Collection<ShardSnapshotMetaDeleteResult>> listener) {
private void processShards(ActionListener<Void> listener) {
final Set<SnapshotId> survivingSnapshots = snapshotsWithIndex.stream()
.filter(id -> snapshotIds.contains(id) == false)
.collect(Collectors.toSet());
// Listener for collecting the results of removing the snapshot from each shard's metadata in the current index
final ActionListener<ShardSnapshotMetaDeleteResult> allShardsListener = new GroupedActionListener<>(shardCount, listener);
for (int shardId = 0; shardId < shardCount; shardId++) {
snapshotExecutor.execute(new ShardSnapshotsDeletion(shardId, survivingSnapshots, allShardsListener));
try (var listeners = new RefCountingListener(listener)) {
for (int shardId = 0; shardId < shardCount; shardId++) {
snapshotExecutor.execute(new ShardSnapshotsDeletion(shardId, survivingSnapshots, listeners.acquire()));
}
}
}

Expand All @@ -1221,20 +1220,16 @@ private class ShardSnapshotsDeletion extends AbstractRunnable {

private final int shardId;
private final Set<SnapshotId> survivingSnapshots;
private final ActionListener<ShardSnapshotMetaDeleteResult> allShardsListener;
private final ActionListener<Void> listener;

// Computed at the start of doRun(), after forking, rather than in the constructor. TODO make these local variables perhaps?
private BlobContainer shardContainer;
private Set<String> originalShardBlobs;

ShardSnapshotsDeletion(
int shardId,
Set<SnapshotId> survivingSnapshots,
ActionListener<ShardSnapshotMetaDeleteResult> allShardsListener
) {
ShardSnapshotsDeletion(int shardId, Set<SnapshotId> survivingSnapshots, ActionListener<Void> listener) {
this.shardId = shardId;
this.survivingSnapshots = survivingSnapshots;
this.allShardsListener = allShardsListener;
this.listener = listener;
}

@Override
Expand All @@ -1258,7 +1253,7 @@ protected void doRun() throws Exception {
newGen = tuple.v2() + 1;
blobStoreIndexShardSnapshots = tuple.v1();
}
allShardsListener.onResponse(
addShardDeleteResult(
deleteFromShardSnapshotMeta(blobStoreIndexShardSnapshots.withRetainedSnapshots(survivingSnapshots), newGen)
);
}
Expand Down Expand Up @@ -1342,13 +1337,16 @@ private static List<String> unusedBlobs(

@Override
public void onFailure(Exception ex) {
// TODO: Should we fail the delete here? See https://github.com/elastic/elasticsearch/issues/100569.
logger.warn(
() -> format("%s failed to delete shard data for shard [%s][%s]", snapshotIds, indexId.getName(), shardId),
ex
);
// Just passing null here to count down the listener instead of failing it, the stale data left behind
// here will be retried in the next delete or repository cleanup
allShardsListener.onResponse(null);
}

@Override
public void onAfter() {
listener.onResponse(null);
}
}
}
Expand Down

0 comments on commit b517541

Please sign in to comment.