Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] Support centralize snapshot creation #15569

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Star tree mapping changes ([#14605](https://github.com/opensearch-project/OpenSearch/pull/14605))
- Add support for index level max slice count setting for concurrent segment search ([#15336](https://github.com/opensearch-project/OpenSearch/pull/15336))
- [Streaming Indexing] Introduce bulk HTTP API streaming flavor ([#15381](https://github.com/opensearch-project/OpenSearch/pull/15381))
- Add support for centralize snapshot creation with pinned timestamp ([#15124](https://github.com/opensearch-project/OpenSearch/pull/15124))
- Add concurrent search support for Derived Fields ([#15326](https://github.com/opensearch-project/OpenSearch/pull/15326))
- [Workload Management] Add query group stats constructs ([#15343](https://github.com/opensearch-project/OpenSearch/pull/15343)))
- Add runAs to Subject interface and introduce IdentityAwarePlugin extension point ([#14630](https://github.com/opensearch-project/OpenSearch/pull/14630))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.metadata.RepositoryMetadata;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.Priority;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.blobstore.BlobStore;
import org.opensearch.common.blobstore.BlobStoreException;
Expand Down Expand Up @@ -424,6 +425,7 @@ public void finalizeSnapshot(
SnapshotInfo snapshotInfo,
Version repositoryMetaVersion,
Function<ClusterState, ClusterState> stateTransformer,
Priority repositoryUpdatePriority,
ActionListener<RepositoryData> listener
) {
if (SnapshotsService.useShardGenerations(repositoryMetaVersion) == false) {
Expand All @@ -436,6 +438,7 @@ public void finalizeSnapshot(
snapshotInfo,
repositoryMetaVersion,
stateTransformer,
repositoryUpdatePriority,
listener
);
}
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.Priority;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.xcontent.NamedXContentRegistry;
Expand Down Expand Up @@ -127,6 +128,7 @@ public void finalizeSnapshot(
SnapshotInfo snapshotInfo,
Version repositoryMetaVersion,
Function<ClusterState, ClusterState> stateTransformer,
Priority repositoryUpdatePriority,
ActionListener<RepositoryData> listener
) {
super.finalizeSnapshot(
Expand All @@ -136,6 +138,7 @@ public void finalizeSnapshot(
snapshotInfo,
repositoryMetaVersion,
stateTransformer,
repositoryUpdatePriority,
listener
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,16 @@
import org.opensearch.common.inject.Inject;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
import org.opensearch.snapshots.SnapshotsService;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;

import java.io.IOException;

import static org.opensearch.repositories.blobstore.BlobStoreRepository.SHALLOW_SNAPSHOT_V2;

/**
* Transport action for create snapshot operation
*
Expand All @@ -56,12 +60,15 @@
public class TransportCreateSnapshotAction extends TransportClusterManagerNodeAction<CreateSnapshotRequest, CreateSnapshotResponse> {
private final SnapshotsService snapshotsService;

private final RepositoriesService repositoriesService;

@Inject
public TransportCreateSnapshotAction(
TransportService transportService,
ClusterService clusterService,
ThreadPool threadPool,
SnapshotsService snapshotsService,
RepositoriesService repositoriesService,
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver
) {
Expand All @@ -75,6 +82,7 @@ public TransportCreateSnapshotAction(
indexNameExpressionResolver
);
this.snapshotsService = snapshotsService;
this.repositoriesService = repositoriesService;
}

@Override
Expand Down Expand Up @@ -110,7 +118,10 @@ protected void clusterManagerOperation(
snapshotsService.createSnapshotLegacy(request, ActionListener.map(listener, snapshot -> new CreateSnapshotResponse()));
}
} else {
if (request.waitForCompletion()) {
sachinpkale marked this conversation as resolved.
Show resolved Hide resolved
Repository repository = repositoriesService.repository(request.repository());
boolean isSnapshotV2 = SHALLOW_SNAPSHOT_V2.get(repository.getMetadata().settings());

if (request.waitForCompletion() || isSnapshotV2) {
snapshotsService.executeSnapshot(request, ActionListener.map(listener, CreateSnapshotResponse::new));
} else {
snapshotsService.createSnapshot(request, ActionListener.map(listener, snapshot -> new CreateSnapshotResponse()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.metadata.RepositoryMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.Priority;
import org.opensearch.common.lifecycle.Lifecycle;
import org.opensearch.common.lifecycle.LifecycleListener;
import org.opensearch.core.action.ActionListener;
Expand Down Expand Up @@ -123,6 +124,29 @@
);
}

@Override
public void finalizeSnapshot(
ShardGenerations shardGenerations,
long repositoryStateId,
Metadata clusterMetadata,
SnapshotInfo snapshotInfo,
Version repositoryMetaVersion,
Function<ClusterState, ClusterState> stateTransformer,
Priority repositoryUpdatePriority,
ActionListener<RepositoryData> listener
) {
in.finalizeSnapshot(

Check warning on line 138 in server/src/main/java/org/opensearch/repositories/FilterRepository.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/repositories/FilterRepository.java#L138

Added line #L138 was not covered by tests
shardGenerations,
repositoryStateId,
clusterMetadata,
snapshotInfo,
repositoryMetaVersion,
stateTransformer,
repositoryUpdatePriority,
listener
);
}

Check warning on line 148 in server/src/main/java/org/opensearch/repositories/FilterRepository.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/repositories/FilterRepository.java#L148

Added line #L148 was not covered by tests

@Override
public void deleteSnapshots(
Collection<SnapshotId> snapshotIds,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.opensearch.cluster.metadata.RepositoryMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.Nullable;
import org.opensearch.common.Priority;
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.common.lifecycle.LifecycleComponent;
import org.opensearch.common.settings.Setting;
Expand Down Expand Up @@ -175,6 +176,32 @@ void finalizeSnapshot(
ActionListener<RepositoryData> listener
);

/**
* Finalizes snapshotting process
* <p>
* This method is called on cluster-manager after all shards are snapshotted.
*
* @param shardGenerations updated shard generations
* @param repositoryStateId the unique id identifying the state of the repository when the snapshot began
* @param clusterMetadata cluster metadata
* @param snapshotInfo SnapshotInfo instance to write for this snapshot
* @param repositoryMetaVersion version of the updated repository metadata to write
* @param stateTransformer a function that filters the last cluster state update that the snapshot finalization will execute and
* is used to remove any state tracked for the in-progress snapshot from the cluster state
* @param repositoryUpdatePriority priority for the cluster state update task
* @param listener listener to be invoked with the new {@link RepositoryData} after completing the snapshot
*/
void finalizeSnapshot(
ShardGenerations shardGenerations,
long repositoryStateId,
Metadata clusterMetadata,
SnapshotInfo snapshotInfo,
Version repositoryMetaVersion,
Function<ClusterState, ClusterState> stateTransformer,
Priority repositoryUpdatePriority,
ActionListener<RepositoryData> listener
);

/**
* Deletes snapshots
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.Nullable;
import org.opensearch.common.Numbers;
import org.opensearch.common.Priority;
import org.opensearch.common.SetOnce;
import org.opensearch.common.UUIDs;
import org.opensearch.common.blobstore.BlobContainer;
Expand Down Expand Up @@ -267,6 +268,8 @@

public static final Setting<Boolean> REMOTE_STORE_INDEX_SHALLOW_COPY = Setting.boolSetting("remote_store_index_shallow_copy", false);

public static final Setting<Boolean> SHALLOW_SNAPSHOT_V2 = Setting.boolSetting("shallow_snapshot_v2", false);

/**
* Setting to set batch size of stale snapshot shard blobs that will be deleted by snapshot workers as part of snapshot deletion.
* For optimal performance the value of the setting should be equal to or close to repository's max # of keys that can be deleted in single operation
Expand Down Expand Up @@ -1072,6 +1075,7 @@
repositoryStateId,
sachinpkale marked this conversation as resolved.
Show resolved Hide resolved
repoMetaVersion,
Function.identity(),
Priority.NORMAL,
ActionListener.wrap(writeUpdatedRepoDataStep::onResponse, listener::onFailure)
);
}, listener::onFailure);
Expand Down Expand Up @@ -1101,39 +1105,46 @@
} else {
// Write the new repository data first (with the removed snapshot), using no shard generations
final RepositoryData updatedRepoData = repositoryData.removeSnapshots(snapshotIds, ShardGenerations.EMPTY);
writeIndexGen(updatedRepoData, repositoryStateId, repoMetaVersion, Function.identity(), ActionListener.wrap(newRepoData -> {
// Run unreferenced blobs cleanup in parallel to shard-level snapshot deletion
final ActionListener<Void> afterCleanupsListener = new GroupedActionListener<>(
ActionListener.wrap(() -> listener.onResponse(newRepoData)),
2
);
cleanupUnlinkedRootAndIndicesBlobs(
snapshotIds,
foundIndices,
rootBlobs,
newRepoData,
remoteStoreLockManagerFactory,
afterCleanupsListener
);
final StepListener<Collection<ShardSnapshotMetaDeleteResult>> writeMetaAndComputeDeletesStep = new StepListener<>();
writeUpdatedShardMetaDataAndComputeDeletes(
snapshotIds,
repositoryData,
false,
remoteStoreLockManagerFactory,
writeMetaAndComputeDeletesStep
);
writeMetaAndComputeDeletesStep.whenComplete(
deleteResults -> asyncCleanupUnlinkedShardLevelBlobs(
repositoryData,
writeIndexGen(

Check warning on line 1108 in server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java#L1108

Added line #L1108 was not covered by tests
updatedRepoData,
repositoryStateId,
repoMetaVersion,
Function.identity(),

Check warning on line 1112 in server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java#L1112

Added line #L1112 was not covered by tests
Priority.NORMAL,
ActionListener.wrap(newRepoData -> {

Check warning on line 1114 in server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java#L1114

Added line #L1114 was not covered by tests
// Run unreferenced blobs cleanup in parallel to shard-level snapshot deletion
final ActionListener<Void> afterCleanupsListener = new GroupedActionListener<>(
ActionListener.wrap(() -> listener.onResponse(newRepoData)),

Check warning on line 1117 in server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java#L1116-L1117

Added lines #L1116 - L1117 were not covered by tests
2
);
cleanupUnlinkedRootAndIndicesBlobs(

Check warning on line 1120 in server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java#L1120

Added line #L1120 was not covered by tests
snapshotIds,
deleteResults,
foundIndices,
rootBlobs,
newRepoData,
remoteStoreLockManagerFactory,
afterCleanupsListener
),
afterCleanupsListener::onFailure
);
}, listener::onFailure));
);
final StepListener<Collection<ShardSnapshotMetaDeleteResult>> writeMetaAndComputeDeletesStep = new StepListener<>();
writeUpdatedShardMetaDataAndComputeDeletes(

Check warning on line 1129 in server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java#L1128-L1129

Added lines #L1128 - L1129 were not covered by tests
snapshotIds,
repositoryData,
false,
remoteStoreLockManagerFactory,
writeMetaAndComputeDeletesStep
);
writeMetaAndComputeDeletesStep.whenComplete(
deleteResults -> asyncCleanupUnlinkedShardLevelBlobs(

Check warning on line 1137 in server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java#L1136-L1137

Added lines #L1136 - L1137 were not covered by tests
repositoryData,
snapshotIds,
deleteResults,
remoteStoreLockManagerFactory,
afterCleanupsListener
),
afterCleanupsListener::onFailure

Check warning on line 1144 in server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java#L1144

Added line #L1144 was not covered by tests
);
}, listener::onFailure)

Check warning on line 1146 in server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java#L1146

Added line #L1146 was not covered by tests
);
}
}

Expand Down Expand Up @@ -1583,6 +1594,7 @@
repositoryStateId,
repositoryMetaVersion,
Function.identity(),
Priority.NORMAL,
ActionListener.wrap(
v -> cleanupStaleBlobs(
Collections.emptyList(),
Expand Down Expand Up @@ -1787,6 +1799,29 @@
Version repositoryMetaVersion,
Function<ClusterState, ClusterState> stateTransformer,
final ActionListener<RepositoryData> listener
) {
finalizeSnapshot(

Check warning on line 1803 in server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java#L1803

Added line #L1803 was not covered by tests
shardGenerations,
repositoryStateId,
clusterMetadata,
snapshotInfo,
repositoryMetaVersion,
stateTransformer,
Priority.NORMAL,
listener
);
}

Check warning on line 1813 in server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java#L1813

Added line #L1813 was not covered by tests

@Override
public void finalizeSnapshot(
final ShardGenerations shardGenerations,
final long repositoryStateId,
final Metadata clusterMetadata,
SnapshotInfo snapshotInfo,
Version repositoryMetaVersion,
Function<ClusterState, ClusterState> stateTransformer,
Priority repositoryUpdatePriority,
final ActionListener<RepositoryData> listener
) {
assert repositoryStateId > RepositoryData.UNKNOWN_REPO_GEN : "Must finalize based on a valid repository generation but received ["
+ repositoryStateId
Expand Down Expand Up @@ -1834,6 +1869,7 @@
repositoryStateId,
repositoryMetaVersion,
stateTransformer,
repositoryUpdatePriority,
ActionListener.wrap(newRepoData -> {
if (writeShardGens) {
cleanupOldShardGens(existingRepositoryData, updatedRepositoryData);
Expand Down Expand Up @@ -2367,17 +2403,19 @@
* Lastly, the {@link RepositoryMetadata} entry for this repository is updated to the new generation {@code P + 1} and thus
* pending and safe generation are set to the same value marking the end of the update of the repository data.
*
* @param repositoryData RepositoryData to write
* @param expectedGen expected repository generation at the start of the operation
* @param version version of the repository metadata to write
* @param stateFilter filter for the last cluster state update executed by this method
* @param repositoryData RepositoryData to write
* @param expectedGen expected repository generation at the start of the operation
* @param version version of the repository metadata to write
* @param stateFilter filter for the last cluster state update executed by this method
* @param repositoryUpdatePriority priority for the cluster state update task
* @param listener completion listener
*/
protected void writeIndexGen(
RepositoryData repositoryData,
long expectedGen,
Version version,
Function<ClusterState, ClusterState> stateFilter,
Priority repositoryUpdatePriority,
ActionListener<RepositoryData> listener
) {
assert isReadOnly() == false; // can not write to a read only repository
Expand All @@ -2402,7 +2440,7 @@
final StepListener<Long> setPendingStep = new StepListener<>();
clusterService.submitStateUpdateTask(
"set pending repository generation [" + metadata.name() + "][" + expectedGen + "]",
new ClusterStateUpdateTask() {
new ClusterStateUpdateTask(repositoryUpdatePriority) {

private long newGen;

Expand Down Expand Up @@ -2540,7 +2578,7 @@
// Step 3: Update CS to reflect new repository generation.
clusterService.submitStateUpdateTask(
"set safe repository generation [" + metadata.name() + "][" + newGen + "]",
new ClusterStateUpdateTask() {
new ClusterStateUpdateTask(repositoryUpdatePriority) {
@Override
public ClusterState execute(ClusterState currentState) {
final RepositoryMetadata meta = getRepoMetadata(currentState);
Expand Down
Loading
Loading