Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SnapshotV2] Support centralize snapshot creation #15124

Merged
merged 35 commits into from
Aug 28, 2024
Merged
Show file tree
Hide file tree
Changes from 33 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
9142a25
Initial Commit to support centralize snapshot creation and implicit l…
Aug 6, 2024
d9bbc65
Fix deserilization error
Aug 6, 2024
f72702f
Fix gradle spotless check
Aug 6, 2024
8e85231
Fix listener
Aug 6, 2024
8a507ad
Merge branch 'main' into snapshot-pinned-timestamp
Aug 7, 2024
2d404e8
Fix test
Aug 7, 2024
90c860c
Fix snapshot generation
Aug 8, 2024
193da65
Modify cluster setting name
Aug 14, 2024
fe2aaaf
Add more tests
Aug 14, 2024
14c08ae
Merge branch 'main' into snapshot-pinned-timestamp
Aug 15, 2024
ec17028
Merge branch 'main' into snapshot-pinned-timestamp
Aug 20, 2024
6504169
Uncomment pin timestamp code
Aug 20, 2024
626c2fa
Modify log messages
Aug 21, 2024
be65f6d
Add spotless check failure fix
Aug 21, 2024
62452ee
Fix completion listener for snapshot v2
Aug 21, 2024
00031ec
Elevate cluster state update priority for repository metadata update …
Aug 21, 2024
0c636ef
Add more integ tests
Aug 22, 2024
623f994
Add priority as IMMEDIATE for cluster state repo update task only for…
Aug 23, 2024
2e4795b
Fix build error
Aug 23, 2024
a6090a7
Fix spotless error
Aug 23, 2024
b5d012f
Add repository setting for snapshot v2
Aug 23, 2024
1394d8c
Merge branch 'main' into snapshot-pinned-timestamp
Aug 23, 2024
80bf6cc
Address review comments
Aug 26, 2024
b0cbc08
Add integ test to verify snapshot creation if shallow copy repo setti…
Aug 26, 2024
38af0f6
Fix spotless vilation error
Aug 26, 2024
73376a8
Address review comment
Aug 26, 2024
39b57e3
Address review comments
Aug 26, 2024
e1eecbd
Add min version check for backward compatibility
Aug 26, 2024
983d2b5
address review comments
Aug 27, 2024
a423475
add integ test for master failover scenario
Aug 27, 2024
207b03d
Add more integ tests
Aug 27, 2024
872e136
refactor code
Aug 27, 2024
1ef061a
add changelog
Aug 27, 2024
0289d67
Merge branch 'main' into snapshot-pinned-timestamp
anshu1106 Aug 28, 2024
fb48a2d
Add pinned timestamp setting in integ tests
Aug 28, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add allowlist setting for ingest-geoip and ingest-useragent ([#15325](https://github.com/opensearch-project/OpenSearch/pull/15325))
- Adding access to noSubMatches and noOverlappingMatches in Hyphenation ([#13895](https://github.com/opensearch-project/OpenSearch/pull/13895))
- Add support for index level max slice count setting for concurrent segment search ([#15336](https://github.com/opensearch-project/OpenSearch/pull/15336))
- Add support for centralize snapshot creation with pinned timestamp ([#15124](https://github.com/opensearch-project/OpenSearch/pull/15124))

### Dependencies
- Bump `netty` from 4.1.111.Final to 4.1.112.Final ([#15081](https://github.com/opensearch-project/OpenSearch/pull/15081))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.metadata.RepositoryMetadata;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.Priority;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.blobstore.BlobStore;
import org.opensearch.common.blobstore.BlobStoreException;
Expand Down Expand Up @@ -391,6 +392,7 @@ public void finalizeSnapshot(
SnapshotInfo snapshotInfo,
Version repositoryMetaVersion,
Function<ClusterState, ClusterState> stateTransformer,
Priority repositoryUpdatePriority,
ActionListener<RepositoryData> listener
) {
super.finalizeSnapshot(
Expand All @@ -400,6 +402,7 @@ public void finalizeSnapshot(
snapshotInfo,
repositoryMetaVersion,
stateTransformer,
repositoryUpdatePriority,
listener
);
}
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.Priority;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.xcontent.NamedXContentRegistry;
Expand Down Expand Up @@ -127,6 +128,7 @@ public void finalizeSnapshot(
SnapshotInfo snapshotInfo,
Version repositoryMetaVersion,
Function<ClusterState, ClusterState> stateTransformer,
Priority repositoryUpdatePriority,
ActionListener<RepositoryData> listener
) {
super.finalizeSnapshot(
Expand All @@ -136,6 +138,7 @@ public void finalizeSnapshot(
snapshotInfo,
repositoryMetaVersion,
stateTransformer,
repositoryUpdatePriority,
listener
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,16 @@
import org.opensearch.common.inject.Inject;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
import org.opensearch.snapshots.SnapshotsService;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;

import java.io.IOException;

import static org.opensearch.repositories.blobstore.BlobStoreRepository.SHALLOW_SNAPSHOT_V2;

/**
* Transport action for create snapshot operation
*
Expand All @@ -56,12 +60,15 @@
public class TransportCreateSnapshotAction extends TransportClusterManagerNodeAction<CreateSnapshotRequest, CreateSnapshotResponse> {
private final SnapshotsService snapshotsService;

private final RepositoriesService repositoriesService;

@Inject
public TransportCreateSnapshotAction(
TransportService transportService,
ClusterService clusterService,
ThreadPool threadPool,
SnapshotsService snapshotsService,
RepositoriesService repositoriesService,
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver
) {
Expand All @@ -75,6 +82,7 @@ public TransportCreateSnapshotAction(
indexNameExpressionResolver
);
this.snapshotsService = snapshotsService;
this.repositoriesService = repositoriesService;
}

@Override
Expand Down Expand Up @@ -103,7 +111,9 @@ protected void clusterManagerOperation(
ClusterState state,
final ActionListener<CreateSnapshotResponse> listener
) {
if (request.waitForCompletion()) {
Repository repository = repositoriesService.repository(request.repository());
boolean isSnapshotV2 = SHALLOW_SNAPSHOT_V2.get(repository.getMetadata().settings());
if (request.waitForCompletion() || isSnapshotV2) {
snapshotsService.executeSnapshot(request, ActionListener.map(listener, CreateSnapshotResponse::new));
} else {
snapshotsService.createSnapshot(request, ActionListener.map(listener, snapshot -> new CreateSnapshotResponse()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.metadata.RepositoryMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.Priority;
import org.opensearch.common.lifecycle.Lifecycle;
import org.opensearch.common.lifecycle.LifecycleListener;
import org.opensearch.core.action.ActionListener;
Expand Down Expand Up @@ -104,6 +105,7 @@ public void finalizeSnapshot(
SnapshotInfo snapshotInfo,
Version repositoryMetaVersion,
Function<ClusterState, ClusterState> stateTransformer,
Priority repositoryUpdatePriority,
ActionListener<RepositoryData> listener
) {
in.finalizeSnapshot(
Expand All @@ -113,6 +115,7 @@ public void finalizeSnapshot(
snapshotInfo,
repositoryMetaVersion,
stateTransformer,
repositoryUpdatePriority,
listener
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.opensearch.cluster.metadata.RepositoryMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.Nullable;
import org.opensearch.common.Priority;
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.common.lifecycle.LifecycleComponent;
import org.opensearch.common.settings.Setting;
Expand Down Expand Up @@ -150,6 +151,7 @@ default Repository create(RepositoryMetadata metadata, Function<String, Reposito
* @param repositoryMetaVersion version of the updated repository metadata to write
* @param stateTransformer a function that filters the last cluster state update that the snapshot finalization will execute and
* is used to remove any state tracked for the in-progress snapshot from the cluster state
* @param repositoryUpdatePriority priority for the cluster state update task
* @param listener listener to be invoked with the new {@link RepositoryData} after completing the snapshot
*/
void finalizeSnapshot(
Expand All @@ -159,6 +161,7 @@ void finalizeSnapshot(
SnapshotInfo snapshotInfo,
Version repositoryMetaVersion,
Function<ClusterState, ClusterState> stateTransformer,
Priority repositoryUpdatePriority,
anshu1106 marked this conversation as resolved.
Show resolved Hide resolved
ActionListener<RepositoryData> listener
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.Nullable;
import org.opensearch.common.Numbers;
import org.opensearch.common.Priority;
import org.opensearch.common.SetOnce;
import org.opensearch.common.UUIDs;
import org.opensearch.common.blobstore.BlobContainer;
Expand Down Expand Up @@ -266,6 +267,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp

public static final Setting<Boolean> REMOTE_STORE_INDEX_SHALLOW_COPY = Setting.boolSetting("remote_store_index_shallow_copy", false);

public static final Setting<Boolean> SHALLOW_SNAPSHOT_V2 = Setting.boolSetting("shallow_snapshot_v2", false);

/**
* Setting to set batch size of stale snapshot shard blobs that will be deleted by snapshot workers as part of snapshot deletion.
* For optimal performance the value of the setting should be equal to or close to repository's max # of keys that can be deleted in single operation
Expand Down Expand Up @@ -1046,6 +1049,7 @@ private void doDeleteShardSnapshots(
repositoryStateId,
repoMetaVersion,
Function.identity(),
Priority.NORMAL,
anshu1106 marked this conversation as resolved.
Show resolved Hide resolved
ActionListener.wrap(writeUpdatedRepoDataStep::onResponse, listener::onFailure)
);
}, listener::onFailure);
Expand Down Expand Up @@ -1520,6 +1524,7 @@ public void cleanup(
repositoryStateId,
repositoryMetaVersion,
Function.identity(),
Priority.NORMAL,
ActionListener.wrap(
v -> cleanupStaleBlobs(
Collections.emptyList(),
Expand Down Expand Up @@ -1723,6 +1728,7 @@ public void finalizeSnapshot(
SnapshotInfo snapshotInfo,
Version repositoryMetaVersion,
Function<ClusterState, ClusterState> stateTransformer,
Priority repositoryUpdatePriority,
final ActionListener<RepositoryData> listener
) {
assert repositoryStateId > RepositoryData.UNKNOWN_REPO_GEN : "Must finalize based on a valid repository generation but received ["
Expand Down Expand Up @@ -1759,6 +1765,7 @@ public void finalizeSnapshot(
repositoryStateId,
repositoryMetaVersion,
stateTransformer,
repositoryUpdatePriority,
ActionListener.wrap(newRepoData -> {
cleanupOldShardGens(existingRepositoryData, updatedRepositoryData);
listener.onResponse(newRepoData);
Expand Down Expand Up @@ -2280,17 +2287,19 @@ public boolean isSystemRepository() {
* Lastly, the {@link RepositoryMetadata} entry for this repository is updated to the new generation {@code P + 1} and thus
* pending and safe generation are set to the same value marking the end of the update of the repository data.
*
* @param repositoryData RepositoryData to write
* @param expectedGen expected repository generation at the start of the operation
* @param version version of the repository metadata to write
* @param stateFilter filter for the last cluster state update executed by this method
* @param repositoryData RepositoryData to write
* @param expectedGen expected repository generation at the start of the operation
* @param version version of the repository metadata to write
* @param stateFilter filter for the last cluster state update executed by this method
* @param repositoryUpdatePriority priority for the cluster state update task
* @param listener completion listener
*/
protected void writeIndexGen(
RepositoryData repositoryData,
long expectedGen,
Version version,
Function<ClusterState, ClusterState> stateFilter,
Priority repositoryUpdatePriority,
ActionListener<RepositoryData> listener
) {
assert isReadOnly() == false; // can not write to a read only repository
Expand All @@ -2315,7 +2324,7 @@ protected void writeIndexGen(
final StepListener<Long> setPendingStep = new StepListener<>();
clusterService.submitStateUpdateTask(
"set pending repository generation [" + metadata.name() + "][" + expectedGen + "]",
new ClusterStateUpdateTask() {
new ClusterStateUpdateTask(repositoryUpdatePriority) {

private long newGen;

Expand Down Expand Up @@ -2453,7 +2462,7 @@ public void onFailure(Exception e) {
// Step 3: Update CS to reflect new repository generation.
clusterService.submitStateUpdateTask(
"set safe repository generation [" + metadata.name() + "][" + newGen + "]",
new ClusterStateUpdateTask() {
new ClusterStateUpdateTask(repositoryUpdatePriority) {
@Override
public ClusterState execute(ClusterState currentState) {
final RepositoryMetadata meta = getRepoMetadata(currentState);
Expand Down
Loading
Loading