Skip to content

Commit

Permalink
Optimise clone operation for incremental full cluster snapshots (#16296)
Browse files Browse the repository at this point in the history
* Optimise clone operation for incremental full cluster snapshots

Signed-off-by: Ashish Singh <[email protected]>

* Add UTs

Signed-off-by: Ashish Singh <[email protected]>

* Add CHANGELOG

Signed-off-by: Ashish Singh <[email protected]>

---------

Signed-off-by: Ashish Singh <[email protected]>
  • Loading branch information
ashking94 authored Oct 14, 2024
1 parent 5d2d392 commit 2166b44
Show file tree
Hide file tree
Showing 3 changed files with 504 additions and 58 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Remove Identity FeatureFlag ([#16024](https://github.com/opensearch-project/OpenSearch/pull/16024))
- Ensure RestHandler.Wrapper delegates all implementations to the wrapped handler ([#16154](https://github.com/opensearch-project/OpenSearch/pull/16154))
- Code cleanup: Remove ApproximateIndexOrDocValuesQuery ([#16273](https://github.com/opensearch-project/OpenSearch/pull/16273))

- Optimise clone operation for incremental full cluster snapshots ([#16296](https://github.com/opensearch-project/OpenSearch/pull/16296))

### Deprecated

Expand Down
121 changes: 64 additions & 57 deletions server/src/main/java/org/opensearch/snapshots/SnapshotsService.java
Original file line number Diff line number Diff line change
Expand Up @@ -1327,7 +1327,8 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS

private final Set<RepositoryShardId> currentlyCloning = Collections.synchronizedSet(new HashSet<>());

private void runReadyClone(
// Made to package private to be able to test the method in UTs
void runReadyClone(
Snapshot target,
SnapshotId sourceSnapshot,
ShardSnapshotStatus shardStatusBefore,
Expand All @@ -1351,69 +1352,75 @@ public void onFailure(Exception e) {
@Override
protected void doRun() {
final String localNodeId = clusterService.localNode().getId();
repository.getRepositoryData(ActionListener.wrap(repositoryData -> {
try {
final IndexMetadata indexMetadata = repository.getSnapshotIndexMetaData(
repositoryData,
if (remoteStoreIndexShallowCopy == false) {
executeClone(localNodeId, false);
} else {
repository.getRepositoryData(ActionListener.wrap(repositoryData -> {
try {
final IndexMetadata indexMetadata = repository.getSnapshotIndexMetaData(
repositoryData,
sourceSnapshot,
repoShardId.index()
);
final boolean cloneRemoteStoreIndexShardSnapshot = indexMetadata.getSettings()
.getAsBoolean(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, false);
executeClone(localNodeId, cloneRemoteStoreIndexShardSnapshot);
} catch (IOException e) {
logger.warn("Failed to get index-metadata from repository data for index [{}]", repoShardId.index().getName());
failCloneShardAndUpdateClusterState(target, sourceSnapshot, repoShardId);
}
}, this::onFailure));
}
}

private void executeClone(String localNodeId, boolean cloneRemoteStoreIndexShardSnapshot) {
if (currentlyCloning.add(repoShardId)) {
if (cloneRemoteStoreIndexShardSnapshot) {
repository.cloneRemoteStoreIndexShardSnapshot(
sourceSnapshot,
repoShardId.index()
target.getSnapshotId(),
repoShardId,
shardStatusBefore.generation(),
remoteStoreLockManagerFactory,
getCloneCompletionListener(localNodeId)
);
final boolean cloneRemoteStoreIndexShardSnapshot = remoteStoreIndexShallowCopy
&& indexMetadata.getSettings().getAsBoolean(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, false);
final SnapshotId targetSnapshot = target.getSnapshotId();
final ActionListener<String> listener = ActionListener.wrap(
generation -> innerUpdateSnapshotState(
new ShardSnapshotUpdate(
target,
repoShardId,
new ShardSnapshotStatus(localNodeId, ShardState.SUCCESS, generation)
),
ActionListener.runBefore(
ActionListener.wrap(
v -> logger.trace(
"Marked [{}] as successfully cloned from [{}] to [{}]",
repoShardId,
sourceSnapshot,
targetSnapshot
),
e -> {
logger.warn("Cluster state update after successful shard clone [{}] failed", repoShardId);
failAllListenersOnMasterFailOver(e);
}
),
() -> currentlyCloning.remove(repoShardId)
)
),
e -> {
logger.warn("Exception [{}] while trying to clone shard [{}]", e, repoShardId);
failCloneShardAndUpdateClusterState(target, sourceSnapshot, repoShardId);
}
} else {
repository.cloneShardSnapshot(
sourceSnapshot,
target.getSnapshotId(),
repoShardId,
shardStatusBefore.generation(),
getCloneCompletionListener(localNodeId)
);
if (currentlyCloning.add(repoShardId)) {
if (cloneRemoteStoreIndexShardSnapshot) {
repository.cloneRemoteStoreIndexShardSnapshot(
sourceSnapshot,
targetSnapshot,
}
}
}

private ActionListener<String> getCloneCompletionListener(String localNodeId) {
return ActionListener.wrap(
generation -> innerUpdateSnapshotState(
new ShardSnapshotUpdate(target, repoShardId, new ShardSnapshotStatus(localNodeId, ShardState.SUCCESS, generation)),
ActionListener.runBefore(
ActionListener.wrap(
v -> logger.trace(
"Marked [{}] as successfully cloned from [{}] to [{}]",
repoShardId,
shardStatusBefore.generation(),
remoteStoreLockManagerFactory,
listener
);
} else {
repository.cloneShardSnapshot(
sourceSnapshot,
targetSnapshot,
repoShardId,
shardStatusBefore.generation(),
listener
);
}
}
} catch (IOException e) {
logger.warn("Failed to get index-metadata from repository data for index [{}]", repoShardId.index().getName());
target.getSnapshotId()
),
e -> {
logger.warn("Cluster state update after successful shard clone [{}] failed", repoShardId);
failAllListenersOnMasterFailOver(e);
}
),
() -> currentlyCloning.remove(repoShardId)
)
),
e -> {
logger.warn("Exception [{}] while trying to clone shard [{}]", e, repoShardId);
failCloneShardAndUpdateClusterState(target, sourceSnapshot, repoShardId);
}
}, this::onFailure));
);
}
});
}
Expand Down
Loading

0 comments on commit 2166b44

Please sign in to comment.