diff --git a/CHANGELOG.md b/CHANGELOG.md index 197762e7a10c3..97ade9686e490 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -28,6 +28,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Star tree mapping changes ([#14605](https://github.com/opensearch-project/OpenSearch/pull/14605)) - Add support for index level max slice count setting for concurrent segment search ([#15336](https://github.com/opensearch-project/OpenSearch/pull/15336)) - [Streaming Indexing] Introduce bulk HTTP API streaming flavor ([#15381](https://github.com/opensearch-project/OpenSearch/pull/15381)) +- Add support for centralize snapshot creation with pinned timestamp ([#15124](https://github.com/opensearch-project/OpenSearch/pull/15124)) - Add concurrent search support for Derived Fields ([#15326](https://github.com/opensearch-project/OpenSearch/pull/15326)) - [Workload Management] Add query group stats constructs ([#15343](https://github.com/opensearch-project/OpenSearch/pull/15343))) - Add runAs to Subject interface and introduce IdentityAwarePlugin extension point ([#14630](https://github.com/opensearch-project/OpenSearch/pull/14630)) diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3Repository.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3Repository.java index 42f6b9a88a4a5..cecfb6012f730 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3Repository.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3Repository.java @@ -44,6 +44,7 @@ import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.metadata.RepositoryMetadata; import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.Priority; import org.opensearch.common.blobstore.BlobPath; import org.opensearch.common.blobstore.BlobStore; import org.opensearch.common.blobstore.BlobStoreException; @@ -424,6 +425,7 @@ public void finalizeSnapshot( SnapshotInfo snapshotInfo, Version repositoryMetaVersion, Function stateTransformer, + Priority repositoryUpdatePriority, ActionListener listener ) { if (SnapshotsService.useShardGenerations(repositoryMetaVersion) == false) { @@ -436,6 +438,7 @@ public void finalizeSnapshot( snapshotInfo, repositoryMetaVersion, stateTransformer, + repositoryUpdatePriority, listener ); } diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteRestoreSnapshotIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteRestoreSnapshotIT.java index 2b5b04ad94193..42e44bd3f37c3 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteRestoreSnapshotIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteRestoreSnapshotIT.java @@ -10,6 +10,7 @@ import org.opensearch.action.DocWriteResponse; import org.opensearch.action.admin.cluster.remotestore.restore.RestoreRemoteStoreRequest; +import org.opensearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse; import org.opensearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse; import org.opensearch.action.admin.indices.delete.DeleteIndexRequest; import org.opensearch.action.admin.indices.recovery.RecoveryResponse; @@ -25,6 +26,7 @@ import org.opensearch.common.io.PathUtils; import org.opensearch.common.settings.Settings; import org.opensearch.common.util.io.IOUtils; +import org.opensearch.core.common.unit.ByteSizeUnit; import org.opensearch.core.index.Index; import org.opensearch.core.rest.RestStatus; import org.opensearch.index.IndexService; @@ -33,8 +35,14 @@ import org.opensearch.index.remote.RemoteStoreEnums.PathType; import org.opensearch.index.shard.IndexShard; import org.opensearch.indices.IndicesService; +import org.opensearch.indices.RemoteStoreSettings; import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.indices.replication.common.ReplicationType; +import org.opensearch.repositories.RepositoriesService; +import org.opensearch.repositories.Repository; +import org.opensearch.repositories.RepositoryData; +import org.opensearch.repositories.blobstore.BlobStoreRepository; +import org.opensearch.repositories.fs.FsRepository; import org.opensearch.snapshots.AbstractSnapshotIntegTestCase; import org.opensearch.snapshots.SnapshotInfo; import org.opensearch.snapshots.SnapshotRestoreException; @@ -49,6 +57,7 @@ import java.nio.file.Path; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Objects; @@ -782,4 +791,596 @@ public void testInvalidRestoreRequestScenarios() throws Exception { assertTrue(exception.getMessage().contains("cannot remove setting [index.remote_store.segment.repository]" + " on restore")); } + public void testCreateSnapshotV2() throws Exception { + internalCluster().startClusterManagerOnlyNode(pinnedTimestampSettings()); + internalCluster().startDataOnlyNode(pinnedTimestampSettings()); + internalCluster().startDataOnlyNode(pinnedTimestampSettings()); + String indexName1 = "testindex1"; + String indexName2 = "testindex2"; + String indexName3 = "testindex3"; + String snapshotRepoName = "test-create-snapshot-repo"; + String snapshotName1 = "test-create-snapshot1"; + Path absolutePath1 = randomRepoPath().toAbsolutePath(); + logger.info("Snapshot Path [{}]", absolutePath1); + + assertAcked( + client().admin() + .cluster() + .preparePutRepository(snapshotRepoName) + .setType(FsRepository.TYPE) + .setSettings( + Settings.builder() + .put(FsRepository.LOCATION_SETTING.getKey(), absolutePath1) + .put(FsRepository.COMPRESS_SETTING.getKey(), randomBoolean()) + .put(FsRepository.CHUNK_SIZE_SETTING.getKey(), randomIntBetween(100, 1000), ByteSizeUnit.BYTES) + .put(BlobStoreRepository.REMOTE_STORE_INDEX_SHALLOW_COPY.getKey(), true) + .put(BlobStoreRepository.SHALLOW_SNAPSHOT_V2.getKey(), true) + ) + ); + + Client client = client(); + Settings indexSettings = getIndexSettings(20, 0).build(); + createIndex(indexName1, indexSettings); + + Settings indexSettings2 = getIndexSettings(15, 0).build(); + createIndex(indexName2, indexSettings2); + + final int numDocsInIndex1 = 10; + final int numDocsInIndex2 = 20; + indexDocuments(client, indexName1, numDocsInIndex1); + indexDocuments(client, indexName2, numDocsInIndex2); + ensureGreen(indexName1, indexName2); + + SnapshotInfo snapshotInfo = createSnapshot(snapshotRepoName, snapshotName1, Collections.emptyList()); + assertThat(snapshotInfo.state(), equalTo(SnapshotState.SUCCESS)); + assertThat(snapshotInfo.successfulShards(), greaterThan(0)); + assertThat(snapshotInfo.successfulShards(), equalTo(snapshotInfo.totalShards())); + assertThat(snapshotInfo.getPinnedTimestamp(), greaterThan(0L)); + + indexDocuments(client, indexName1, 10); + indexDocuments(client, indexName2, 20); + + createIndex(indexName3, indexSettings); + indexDocuments(client, indexName3, 10); + + String snapshotName2 = "test-create-snapshot2"; + + // verify even if waitForCompletion is not true, the request executes in a sync manner + CreateSnapshotResponse createSnapshotResponse2 = client().admin() + .cluster() + .prepareCreateSnapshot(snapshotRepoName, snapshotName2) + .get(); + snapshotInfo = createSnapshotResponse2.getSnapshotInfo(); + assertThat(snapshotInfo.state(), equalTo(SnapshotState.SUCCESS)); + assertThat(snapshotInfo.successfulShards(), greaterThan(0)); + assertThat(snapshotInfo.successfulShards(), equalTo(snapshotInfo.totalShards())); + assertThat(snapshotInfo.snapshotId().getName(), equalTo(snapshotName2)); + assertThat(snapshotInfo.getPinnedTimestamp(), greaterThan(0L)); + + } + + public void testMixedSnapshotCreationWithV2RepositorySetting() throws Exception { + + internalCluster().startClusterManagerOnlyNode(pinnedTimestampSettings()); + internalCluster().startDataOnlyNode(pinnedTimestampSettings()); + internalCluster().startDataOnlyNode(pinnedTimestampSettings()); + String indexName1 = "testindex1"; + String indexName2 = "testindex2"; + String indexName3 = "testindex3"; + String snapshotRepoName = "test-create-snapshot-repo"; + String snapshotName1 = "test-create-snapshot-v1"; + Path absolutePath1 = randomRepoPath().toAbsolutePath(); + logger.info("Snapshot Path [{}]", absolutePath1); + + assertAcked( + client().admin() + .cluster() + .preparePutRepository(snapshotRepoName) + .setType(FsRepository.TYPE) + .setSettings( + Settings.builder() + .put(FsRepository.LOCATION_SETTING.getKey(), absolutePath1) + .put(FsRepository.COMPRESS_SETTING.getKey(), randomBoolean()) + .put(FsRepository.CHUNK_SIZE_SETTING.getKey(), randomIntBetween(100, 1000), ByteSizeUnit.BYTES) + .put(BlobStoreRepository.REMOTE_STORE_INDEX_SHALLOW_COPY.getKey(), true) + .put(BlobStoreRepository.SHALLOW_SNAPSHOT_V2.getKey(), false) + ) + ); + Client client = client(); + Settings indexSettings = getIndexSettings(20, 0).build(); + createIndex(indexName1, indexSettings); + + Settings indexSettings2 = getIndexSettings(15, 0).build(); + createIndex(indexName2, indexSettings2); + + final int numDocsInIndex1 = 10; + final int numDocsInIndex2 = 20; + indexDocuments(client, indexName1, numDocsInIndex1); + indexDocuments(client, indexName2, numDocsInIndex2); + ensureGreen(indexName1, indexName2); + + SnapshotInfo snapshotInfo = createSnapshot(snapshotRepoName, snapshotName1, Collections.emptyList()); + assertThat(snapshotInfo.state(), equalTo(SnapshotState.SUCCESS)); + assertThat(snapshotInfo.successfulShards(), greaterThan(0)); + assertThat(snapshotInfo.successfulShards(), equalTo(snapshotInfo.totalShards())); + assertThat(snapshotInfo.getPinnedTimestamp(), equalTo(0L)); + + // enable shallow_snapshot_v2 + assertAcked( + client().admin() + .cluster() + .preparePutRepository(snapshotRepoName) + .setType(FsRepository.TYPE) + .setSettings( + Settings.builder() + .put(FsRepository.LOCATION_SETTING.getKey(), absolutePath1) + .put(FsRepository.COMPRESS_SETTING.getKey(), randomBoolean()) + .put(FsRepository.CHUNK_SIZE_SETTING.getKey(), randomIntBetween(100, 1000), ByteSizeUnit.BYTES) + .put(BlobStoreRepository.REMOTE_STORE_INDEX_SHALLOW_COPY.getKey(), true) + .put(BlobStoreRepository.SHALLOW_SNAPSHOT_V2.getKey(), true) + ) + ); + indexDocuments(client, indexName1, 10); + indexDocuments(client, indexName2, 20); + + createIndex(indexName3, indexSettings); + indexDocuments(client, indexName3, 10); + + String snapshotName2 = "test-create-snapshot-v2"; + + // verify even if waitForCompletion is not true, the request executes in a sync manner + CreateSnapshotResponse createSnapshotResponse2 = client().admin() + .cluster() + .prepareCreateSnapshot(snapshotRepoName, snapshotName2) + .get(); + snapshotInfo = createSnapshotResponse2.getSnapshotInfo(); + assertThat(snapshotInfo.state(), equalTo(SnapshotState.SUCCESS)); + assertThat(snapshotInfo.successfulShards(), greaterThan(0)); + assertThat(snapshotInfo.successfulShards(), equalTo(snapshotInfo.totalShards())); + assertThat(snapshotInfo.snapshotId().getName(), equalTo(snapshotName2)); + assertThat(snapshotInfo.getPinnedTimestamp(), greaterThan(0L)); + + } + + public void testConcurrentSnapshotV2CreateOperation() throws InterruptedException, ExecutionException { + internalCluster().startClusterManagerOnlyNode(pinnedTimestampSettings()); + internalCluster().startDataOnlyNode(pinnedTimestampSettings()); + internalCluster().startDataOnlyNode(pinnedTimestampSettings()); + String indexName1 = "testindex1"; + String indexName2 = "testindex2"; + String snapshotRepoName = "test-create-snapshot-repo"; + Path absolutePath1 = randomRepoPath().toAbsolutePath(); + logger.info("Snapshot Path [{}]", absolutePath1); + + assertAcked( + client().admin() + .cluster() + .preparePutRepository(snapshotRepoName) + .setType(FsRepository.TYPE) + .setSettings( + Settings.builder() + .put(FsRepository.LOCATION_SETTING.getKey(), absolutePath1) + .put(FsRepository.COMPRESS_SETTING.getKey(), randomBoolean()) + .put(FsRepository.CHUNK_SIZE_SETTING.getKey(), randomIntBetween(100, 1000), ByteSizeUnit.BYTES) + .put(BlobStoreRepository.REMOTE_STORE_INDEX_SHALLOW_COPY.getKey(), true) + .put(BlobStoreRepository.SHALLOW_SNAPSHOT_V2.getKey(), true) + ) + ); + + Client client = client(); + Settings indexSettings = getIndexSettings(20, 0).build(); + createIndex(indexName1, indexSettings); + + Settings indexSettings2 = getIndexSettings(15, 0).build(); + createIndex(indexName2, indexSettings2); + + final int numDocsInIndex1 = 10; + final int numDocsInIndex2 = 20; + indexDocuments(client, indexName1, numDocsInIndex1); + indexDocuments(client, indexName2, numDocsInIndex2); + ensureGreen(indexName1, indexName2); + + int concurrentSnapshots = 5; + + // Prepare threads for concurrent snapshot creation + List threads = new ArrayList<>(); + + for (int i = 0; i < concurrentSnapshots; i++) { + int snapshotIndex = i; + Thread thread = new Thread(() -> { + try { + String snapshotName = "snapshot-concurrent-" + snapshotIndex; + CreateSnapshotResponse createSnapshotResponse2 = client().admin() + .cluster() + .prepareCreateSnapshot(snapshotRepoName, snapshotName) + .get(); + SnapshotInfo snapshotInfo = createSnapshotResponse2.getSnapshotInfo(); + assertThat(snapshotInfo.state(), equalTo(SnapshotState.SUCCESS)); + assertThat(snapshotInfo.successfulShards(), greaterThan(0)); + assertThat(snapshotInfo.successfulShards(), equalTo(snapshotInfo.totalShards())); + assertThat(snapshotInfo.snapshotId().getName(), equalTo(snapshotName)); + assertThat(snapshotInfo.getPinnedTimestamp(), greaterThan(0L)); + } catch (Exception e) {} + }); + threads.add(thread); + } + // start all threads + for (Thread thread : threads) { + thread.start(); + } + + // Wait for all threads to complete + for (Thread thread : threads) { + thread.join(); + } + + // Validate that only one snapshot has been created + Repository repository = internalCluster().getInstance(RepositoriesService.class).repository(snapshotRepoName); + PlainActionFuture repositoryDataPlainActionFuture = new PlainActionFuture<>(); + repository.getRepositoryData(repositoryDataPlainActionFuture); + + RepositoryData repositoryData = repositoryDataPlainActionFuture.get(); + assertThat(repositoryData.getSnapshotIds().size(), greaterThanOrEqualTo(1)); + } + + public void testCreateSnapshotV2WithRedIndex() throws Exception { + internalCluster().startClusterManagerOnlyNode(pinnedTimestampSettings()); + internalCluster().startDataOnlyNode(pinnedTimestampSettings()); + internalCluster().startDataOnlyNode(pinnedTimestampSettings()); + String indexName1 = "testindex1"; + String indexName2 = "testindex2"; + String snapshotRepoName = "test-create-snapshot-repo"; + String snapshotName1 = "test-create-snapshot1"; + Path absolutePath1 = randomRepoPath().toAbsolutePath(); + logger.info("Snapshot Path [{}]", absolutePath1); + + assertAcked( + client().admin() + .cluster() + .preparePutRepository(snapshotRepoName) + .setType(FsRepository.TYPE) + .setSettings( + Settings.builder() + .put(FsRepository.LOCATION_SETTING.getKey(), absolutePath1) + .put(FsRepository.COMPRESS_SETTING.getKey(), randomBoolean()) + .put(FsRepository.CHUNK_SIZE_SETTING.getKey(), randomIntBetween(100, 1000), ByteSizeUnit.BYTES) + .put(BlobStoreRepository.REMOTE_STORE_INDEX_SHALLOW_COPY.getKey(), true) + .put(BlobStoreRepository.SHALLOW_SNAPSHOT_V2.getKey(), true) + ) + ); + + Client client = client(); + Settings indexSettings = getIndexSettings(20, 0).build(); + createIndex(indexName1, indexSettings); + + Settings indexSettings2 = getIndexSettings(15, 0).build(); + createIndex(indexName2, indexSettings2); + + final int numDocsInIndex1 = 10; + final int numDocsInIndex2 = 20; + indexDocuments(client, indexName1, numDocsInIndex1); + indexDocuments(client, indexName2, numDocsInIndex2); + ensureGreen(indexName1, indexName2); + + internalCluster().ensureAtLeastNumDataNodes(0); + ensureRed(indexName1); + ensureRed(indexName2); + CreateSnapshotResponse createSnapshotResponse2 = client().admin() + .cluster() + .prepareCreateSnapshot(snapshotRepoName, snapshotName1) + .get(); + SnapshotInfo snapshotInfo = createSnapshotResponse2.getSnapshotInfo(); + assertThat(snapshotInfo.state(), equalTo(SnapshotState.SUCCESS)); + assertThat(snapshotInfo.successfulShards(), greaterThan(0)); + assertThat(snapshotInfo.successfulShards(), equalTo(snapshotInfo.totalShards())); + assertThat(snapshotInfo.snapshotId().getName(), equalTo(snapshotName1)); + assertThat(snapshotInfo.getPinnedTimestamp(), greaterThan(0L)); + } + + public void testCreateSnapshotV2WithIndexingLoad() throws Exception { + internalCluster().startClusterManagerOnlyNode(pinnedTimestampSettings()); + internalCluster().startDataOnlyNode(pinnedTimestampSettings()); + internalCluster().startDataOnlyNode(pinnedTimestampSettings()); + String indexName1 = "testindex1"; + String indexName2 = "testindex2"; + String snapshotRepoName = "test-create-snapshot-repo"; + String snapshotName1 = "test-create-snapshot1"; + Path absolutePath1 = randomRepoPath().toAbsolutePath(); + logger.info("Snapshot Path [{}]", absolutePath1); + + assertAcked( + client().admin() + .cluster() + .preparePutRepository(snapshotRepoName) + .setType(FsRepository.TYPE) + .setSettings( + Settings.builder() + .put(FsRepository.LOCATION_SETTING.getKey(), absolutePath1) + .put(FsRepository.COMPRESS_SETTING.getKey(), randomBoolean()) + .put(FsRepository.CHUNK_SIZE_SETTING.getKey(), randomIntBetween(100, 1000), ByteSizeUnit.BYTES) + .put(BlobStoreRepository.REMOTE_STORE_INDEX_SHALLOW_COPY.getKey(), true) + .put(BlobStoreRepository.SHALLOW_SNAPSHOT_V2.getKey(), true) + ) + ); + + Client client = client(); + Settings indexSettings = getIndexSettings(20, 0).build(); + createIndex(indexName1, indexSettings); + + Settings indexSettings2 = getIndexSettings(15, 0).build(); + createIndex(indexName2, indexSettings2); + + final int numDocsInIndex1 = 10; + final int numDocsInIndex2 = 20; + indexDocuments(client, indexName1, numDocsInIndex1); + indexDocuments(client, indexName2, numDocsInIndex2); + ensureGreen(indexName1, indexName2); + + Thread indexingThread = new Thread(() -> { + try { + for (int i = 0; i < 50; i++) { + internalCluster().client().prepareIndex("test-index-load").setSource("field", "value" + i).execute().actionGet(); + } + } catch (Exception e) { + fail("indexing failed due to exception: " + e.getMessage()); + } + }); + + // Start indexing + indexingThread.start(); + + // Wait for a bit to let some documents be indexed + Thread.sleep(1000); + + // Create a snapshot while indexing is ongoing + CreateSnapshotResponse createSnapshotResponse2 = client().admin() + .cluster() + .prepareCreateSnapshot(snapshotRepoName, snapshotName1) + .get(); + + SnapshotInfo snapshotInfo = createSnapshotResponse2.getSnapshotInfo(); + assertThat(snapshotInfo.state(), equalTo(SnapshotState.SUCCESS)); + assertThat(snapshotInfo.successfulShards(), greaterThan(0)); + assertThat(snapshotInfo.successfulShards(), equalTo(snapshotInfo.totalShards())); + assertThat(snapshotInfo.snapshotId().getName(), equalTo(snapshotName1)); + assertThat(snapshotInfo.getPinnedTimestamp(), greaterThan(0L)); + assertTrue(snapshotInfo.indices().contains("test-index-load")); + assertTrue(snapshotInfo.indices().contains(indexName1)); + assertTrue(snapshotInfo.indices().contains(indexName2)); + indexingThread.join(); + + } + + public void testCreateSnapshotV2WithShallowCopySettingDisabled() throws Exception { + internalCluster().startClusterManagerOnlyNode(pinnedTimestampSettings()); + internalCluster().startDataOnlyNode(pinnedTimestampSettings()); + internalCluster().startDataOnlyNode(pinnedTimestampSettings()); + String indexName1 = "testindex1"; + String indexName2 = "testindex2"; + String snapshotRepoName = "test-create-snapshot-repo"; + String snapshotName1 = "test-create-snapshot1"; + Path absolutePath1 = randomRepoPath().toAbsolutePath(); + logger.info("Snapshot Path [{}]", absolutePath1); + + assertAcked( + client().admin() + .cluster() + .preparePutRepository(snapshotRepoName) + .setType(FsRepository.TYPE) + .setSettings( + Settings.builder() + .put(FsRepository.LOCATION_SETTING.getKey(), absolutePath1) + .put(FsRepository.COMPRESS_SETTING.getKey(), randomBoolean()) + .put(FsRepository.CHUNK_SIZE_SETTING.getKey(), randomIntBetween(100, 1000), ByteSizeUnit.BYTES) + .put(BlobStoreRepository.REMOTE_STORE_INDEX_SHALLOW_COPY.getKey(), false) + .put(BlobStoreRepository.SHALLOW_SNAPSHOT_V2.getKey(), true) + ) + ); + + Client client = client(); + Settings indexSettings = getIndexSettings(20, 0).build(); + createIndex(indexName1, indexSettings); + + Settings indexSettings2 = getIndexSettings(15, 0).build(); + createIndex(indexName2, indexSettings2); + + final int numDocsInIndex1 = 10; + final int numDocsInIndex2 = 20; + indexDocuments(client, indexName1, numDocsInIndex1); + indexDocuments(client, indexName2, numDocsInIndex2); + ensureGreen(indexName1, indexName2); + + // Will create full copy snapshot if `REMOTE_STORE_INDEX_SHALLOW_COPY` is false but `SHALLOW_SNAPSHOT_V2` is true + SnapshotInfo snapshotInfo = createSnapshot(snapshotRepoName, snapshotName1, Collections.emptyList()); + assertThat(snapshotInfo.state(), equalTo(SnapshotState.SUCCESS)); + assertThat(snapshotInfo.successfulShards(), greaterThan(0)); + assertThat(snapshotInfo.successfulShards(), equalTo(snapshotInfo.totalShards())); + assertThat(snapshotInfo.getPinnedTimestamp(), equalTo(0L)); + + // Validate that snapshot is present in repository data + Repository repository = internalCluster().getInstance(RepositoriesService.class).repository(snapshotRepoName); + PlainActionFuture repositoryDataPlainActionFuture = new PlainActionFuture<>(); + repository.getRepositoryData(repositoryDataPlainActionFuture); + + RepositoryData repositoryData = repositoryDataPlainActionFuture.get(); + assertTrue(repositoryData.getSnapshotIds().contains(snapshotInfo.snapshotId())); + } + + public void testClusterManagerFailoverDuringSnapshotCreation() throws Exception { + + internalCluster().startClusterManagerOnlyNodes(3, pinnedTimestampSettings()); + internalCluster().startDataOnlyNode(pinnedTimestampSettings()); + String indexName1 = "testindex1"; + String indexName2 = "testindex2"; + String snapshotRepoName = "test-create-snapshot-repo"; + String snapshotName1 = "test-create-snapshot1"; + Path absolutePath1 = randomRepoPath().toAbsolutePath(); + logger.info("Snapshot Path [{}]", absolutePath1); + + assertAcked( + client().admin() + .cluster() + .preparePutRepository(snapshotRepoName) + .setType(FsRepository.TYPE) + .setSettings( + Settings.builder() + .put(FsRepository.LOCATION_SETTING.getKey(), absolutePath1) + .put(FsRepository.COMPRESS_SETTING.getKey(), randomBoolean()) + .put(FsRepository.CHUNK_SIZE_SETTING.getKey(), randomIntBetween(100, 1000), ByteSizeUnit.BYTES) + .put(BlobStoreRepository.REMOTE_STORE_INDEX_SHALLOW_COPY.getKey(), true) + .put(BlobStoreRepository.SHALLOW_SNAPSHOT_V2.getKey(), true) + ) + ); + + Client client = client(); + Settings indexSettings = getIndexSettings(20, 0).build(); + createIndex(indexName1, indexSettings); + + Settings indexSettings2 = getIndexSettings(15, 0).build(); + createIndex(indexName2, indexSettings2); + + final int numDocsInIndex1 = 10; + final int numDocsInIndex2 = 20; + indexDocuments(client, indexName1, numDocsInIndex1); + indexDocuments(client, indexName2, numDocsInIndex2); + ensureGreen(indexName1, indexName2); + + ensureStableCluster(4, internalCluster().getClusterManagerName()); + + final SnapshotInfo[] snapshotInfo = new SnapshotInfo[1]; + final Boolean[] snapshotFailed = new Boolean[1]; + snapshotFailed[0] = false; + Thread snapshotThread = new Thread(() -> { + try { + // Start snapshot creation + CreateSnapshotResponse createSnapshotResponse = client().admin() + .cluster() + .prepareCreateSnapshot(snapshotRepoName, snapshotName1) + .get(); + snapshotInfo[0] = createSnapshotResponse.getSnapshotInfo(); + + } catch (Exception e) { + snapshotFailed[0] = true; + } + }); + snapshotThread.start(); + Thread.sleep(100); + + internalCluster().stopCurrentClusterManagerNode(); + + // Wait for the cluster to elect a new Cluster Manager and stabilize + ensureStableCluster(3, internalCluster().getClusterManagerName()); + + // Wait for the snapshot thread to complete + snapshotThread.join(); + + // Validate that the snapshot was created or handled gracefully + Repository repository = internalCluster().getInstance(RepositoriesService.class).repository(snapshotRepoName); + PlainActionFuture repositoryDataPlainActionFuture = new PlainActionFuture<>(); + repository.getRepositoryData(repositoryDataPlainActionFuture); + + RepositoryData repositoryData = repositoryDataPlainActionFuture.get(); + if (snapshotFailed[0]) { + assertFalse(repositoryData.getSnapshotIds().contains(snapshotInfo[0].snapshotId())); + } else { + assertTrue(repositoryData.getSnapshotIds().contains(snapshotInfo[0].snapshotId())); + } + } + + public void testConcurrentV1SnapshotAndV2RepoSettingUpdate() throws Exception { + internalCluster().startClusterManagerOnlyNode(pinnedTimestampSettings()); + internalCluster().startDataOnlyNode(pinnedTimestampSettings()); + internalCluster().startDataOnlyNode(pinnedTimestampSettings()); + String snapshotRepoName = "test-create-snapshot-repo"; + String snapshotName1 = "test-create-snapshot-v1"; + Path absolutePath1 = randomRepoPath().toAbsolutePath(); + logger.info("Snapshot Path [{}]", absolutePath1); + + assertAcked( + client().admin() + .cluster() + .preparePutRepository(snapshotRepoName) + .setType(FsRepository.TYPE) + .setSettings( + Settings.builder() + .put(FsRepository.LOCATION_SETTING.getKey(), absolutePath1) + .put(FsRepository.COMPRESS_SETTING.getKey(), randomBoolean()) + .put(FsRepository.CHUNK_SIZE_SETTING.getKey(), randomIntBetween(100, 1000), ByteSizeUnit.BYTES) + .put(BlobStoreRepository.REMOTE_STORE_INDEX_SHALLOW_COPY.getKey(), true) + .put(BlobStoreRepository.SHALLOW_SNAPSHOT_V2.getKey(), false) + ) + ); + Client client = client(); + Settings indexSettings = getIndexSettings(20, 0).build(); + + for (int i = 0; i < 10; i++) { + createIndex("index" + i, indexSettings); + } + ensureStableCluster(3); + for (int i = 0; i < 10; i++) { + indexDocuments(client, "index" + i, 15); + } + + ensureStableCluster(3); + for (int i = 0; i < 10; i++) { + ensureGreen("index" + i); + } + final CreateSnapshotResponse[] snapshotV1Response = new CreateSnapshotResponse[1]; + // Create a separate thread to create the first snapshot + Thread createV1SnapshotThread = new Thread(() -> { + try { + snapshotV1Response[0] = client().admin() + .cluster() + .prepareCreateSnapshot(snapshotRepoName, snapshotName1) + .setWaitForCompletion(true) + .get(); + + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + + // Create a separate thread to enable shallow_snapshot_v2 + Thread enableV2Thread = new Thread(() -> { + try { + + assertThrows( + IllegalStateException.class, + () -> client().admin() + .cluster() + .preparePutRepository(snapshotRepoName) + .setType(FsRepository.TYPE) + .setSettings( + Settings.builder() + .put(FsRepository.LOCATION_SETTING.getKey(), absolutePath1) + .put(FsRepository.COMPRESS_SETTING.getKey(), randomBoolean()) + .put(FsRepository.CHUNK_SIZE_SETTING.getKey(), randomIntBetween(100, 1000), ByteSizeUnit.BYTES) + .put(BlobStoreRepository.REMOTE_STORE_INDEX_SHALLOW_COPY.getKey(), true) + .put(BlobStoreRepository.SHALLOW_SNAPSHOT_V2.getKey(), true) + ) + .get() + ); + + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + + createV1SnapshotThread.start(); + + Thread.sleep(100); + + enableV2Thread.start(); + + enableV2Thread.join(); + createV1SnapshotThread.join(); + } + + private Settings pinnedTimestampSettings() { + Settings settings = Settings.builder() + .put(RemoteStoreSettings.CLUSTER_REMOTE_STORE_PINNED_TIMESTAMP_ENABLED.getKey(), true) + .build(); + return settings; + } + } diff --git a/server/src/internalClusterTest/java/org/opensearch/snapshots/RepositoryFilterUserMetadataIT.java b/server/src/internalClusterTest/java/org/opensearch/snapshots/RepositoryFilterUserMetadataIT.java index 0eb37703eb0f1..0bebe969b3f3e 100644 --- a/server/src/internalClusterTest/java/org/opensearch/snapshots/RepositoryFilterUserMetadataIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/snapshots/RepositoryFilterUserMetadataIT.java @@ -36,6 +36,7 @@ import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.Priority; import org.opensearch.common.settings.Settings; import org.opensearch.core.action.ActionListener; import org.opensearch.core.xcontent.NamedXContentRegistry; @@ -127,6 +128,7 @@ public void finalizeSnapshot( SnapshotInfo snapshotInfo, Version repositoryMetaVersion, Function stateTransformer, + Priority repositoryUpdatePriority, ActionListener listener ) { super.finalizeSnapshot( @@ -136,6 +138,7 @@ public void finalizeSnapshot( snapshotInfo, repositoryMetaVersion, stateTransformer, + repositoryUpdatePriority, listener ); } diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/create/TransportCreateSnapshotAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/create/TransportCreateSnapshotAction.java index 768a6578c75fb..54e275d6d8982 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/create/TransportCreateSnapshotAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/create/TransportCreateSnapshotAction.java @@ -42,12 +42,16 @@ import org.opensearch.common.inject.Inject; import org.opensearch.core.action.ActionListener; import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.repositories.RepositoriesService; +import org.opensearch.repositories.Repository; import org.opensearch.snapshots.SnapshotsService; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.TransportService; import java.io.IOException; +import static org.opensearch.repositories.blobstore.BlobStoreRepository.SHALLOW_SNAPSHOT_V2; + /** * Transport action for create snapshot operation * @@ -56,12 +60,15 @@ public class TransportCreateSnapshotAction extends TransportClusterManagerNodeAction { private final SnapshotsService snapshotsService; + private final RepositoriesService repositoriesService; + @Inject public TransportCreateSnapshotAction( TransportService transportService, ClusterService clusterService, ThreadPool threadPool, SnapshotsService snapshotsService, + RepositoriesService repositoriesService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver ) { @@ -75,6 +82,7 @@ public TransportCreateSnapshotAction( indexNameExpressionResolver ); this.snapshotsService = snapshotsService; + this.repositoriesService = repositoriesService; } @Override @@ -110,7 +118,10 @@ protected void clusterManagerOperation( snapshotsService.createSnapshotLegacy(request, ActionListener.map(listener, snapshot -> new CreateSnapshotResponse())); } } else { - if (request.waitForCompletion()) { + Repository repository = repositoriesService.repository(request.repository()); + boolean isSnapshotV2 = SHALLOW_SNAPSHOT_V2.get(repository.getMetadata().settings()); + + if (request.waitForCompletion() || isSnapshotV2) { snapshotsService.executeSnapshot(request, ActionListener.map(listener, CreateSnapshotResponse::new)); } else { snapshotsService.createSnapshot(request, ActionListener.map(listener, snapshot -> new CreateSnapshotResponse())); diff --git a/server/src/main/java/org/opensearch/repositories/FilterRepository.java b/server/src/main/java/org/opensearch/repositories/FilterRepository.java index 487d5febb82dd..cb1fd6a8694ab 100644 --- a/server/src/main/java/org/opensearch/repositories/FilterRepository.java +++ b/server/src/main/java/org/opensearch/repositories/FilterRepository.java @@ -39,6 +39,7 @@ import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.metadata.RepositoryMetadata; import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.common.Priority; import org.opensearch.common.lifecycle.Lifecycle; import org.opensearch.common.lifecycle.LifecycleListener; import org.opensearch.core.action.ActionListener; @@ -123,6 +124,29 @@ public void finalizeSnapshot( ); } + @Override + public void finalizeSnapshot( + ShardGenerations shardGenerations, + long repositoryStateId, + Metadata clusterMetadata, + SnapshotInfo snapshotInfo, + Version repositoryMetaVersion, + Function stateTransformer, + Priority repositoryUpdatePriority, + ActionListener listener + ) { + in.finalizeSnapshot( + shardGenerations, + repositoryStateId, + clusterMetadata, + snapshotInfo, + repositoryMetaVersion, + stateTransformer, + repositoryUpdatePriority, + listener + ); + } + @Override public void deleteSnapshots( Collection snapshotIds, diff --git a/server/src/main/java/org/opensearch/repositories/Repository.java b/server/src/main/java/org/opensearch/repositories/Repository.java index a1570c83c5e95..29d4638f4bfbc 100644 --- a/server/src/main/java/org/opensearch/repositories/Repository.java +++ b/server/src/main/java/org/opensearch/repositories/Repository.java @@ -41,6 +41,7 @@ import org.opensearch.cluster.metadata.RepositoryMetadata; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.common.Nullable; +import org.opensearch.common.Priority; import org.opensearch.common.annotation.PublicApi; import org.opensearch.common.lifecycle.LifecycleComponent; import org.opensearch.common.settings.Setting; @@ -175,6 +176,32 @@ void finalizeSnapshot( ActionListener listener ); + /** + * Finalizes snapshotting process + *

+ * This method is called on cluster-manager after all shards are snapshotted. + * + * @param shardGenerations updated shard generations + * @param repositoryStateId the unique id identifying the state of the repository when the snapshot began + * @param clusterMetadata cluster metadata + * @param snapshotInfo SnapshotInfo instance to write for this snapshot + * @param repositoryMetaVersion version of the updated repository metadata to write + * @param stateTransformer a function that filters the last cluster state update that the snapshot finalization will execute and + * is used to remove any state tracked for the in-progress snapshot from the cluster state + * @param repositoryUpdatePriority priority for the cluster state update task + * @param listener listener to be invoked with the new {@link RepositoryData} after completing the snapshot + */ + void finalizeSnapshot( + ShardGenerations shardGenerations, + long repositoryStateId, + Metadata clusterMetadata, + SnapshotInfo snapshotInfo, + Version repositoryMetaVersion, + Function stateTransformer, + Priority repositoryUpdatePriority, + ActionListener listener + ); + /** * Deletes snapshots * diff --git a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java index f37fb8fe5d85c..5d5b0e91d8a6a 100644 --- a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java @@ -65,6 +65,7 @@ import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.Nullable; import org.opensearch.common.Numbers; +import org.opensearch.common.Priority; import org.opensearch.common.SetOnce; import org.opensearch.common.UUIDs; import org.opensearch.common.blobstore.BlobContainer; @@ -267,6 +268,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp public static final Setting REMOTE_STORE_INDEX_SHALLOW_COPY = Setting.boolSetting("remote_store_index_shallow_copy", false); + public static final Setting SHALLOW_SNAPSHOT_V2 = Setting.boolSetting("shallow_snapshot_v2", false); + /** * Setting to set batch size of stale snapshot shard blobs that will be deleted by snapshot workers as part of snapshot deletion. * For optimal performance the value of the setting should be equal to or close to repository's max # of keys that can be deleted in single operation @@ -1072,6 +1075,7 @@ private void doDeleteShardSnapshots( repositoryStateId, repoMetaVersion, Function.identity(), + Priority.NORMAL, ActionListener.wrap(writeUpdatedRepoDataStep::onResponse, listener::onFailure) ); }, listener::onFailure); @@ -1101,39 +1105,46 @@ private void doDeleteShardSnapshots( } else { // Write the new repository data first (with the removed snapshot), using no shard generations final RepositoryData updatedRepoData = repositoryData.removeSnapshots(snapshotIds, ShardGenerations.EMPTY); - writeIndexGen(updatedRepoData, repositoryStateId, repoMetaVersion, Function.identity(), ActionListener.wrap(newRepoData -> { - // Run unreferenced blobs cleanup in parallel to shard-level snapshot deletion - final ActionListener afterCleanupsListener = new GroupedActionListener<>( - ActionListener.wrap(() -> listener.onResponse(newRepoData)), - 2 - ); - cleanupUnlinkedRootAndIndicesBlobs( - snapshotIds, - foundIndices, - rootBlobs, - newRepoData, - remoteStoreLockManagerFactory, - afterCleanupsListener - ); - final StepListener> writeMetaAndComputeDeletesStep = new StepListener<>(); - writeUpdatedShardMetaDataAndComputeDeletes( - snapshotIds, - repositoryData, - false, - remoteStoreLockManagerFactory, - writeMetaAndComputeDeletesStep - ); - writeMetaAndComputeDeletesStep.whenComplete( - deleteResults -> asyncCleanupUnlinkedShardLevelBlobs( - repositoryData, + writeIndexGen( + updatedRepoData, + repositoryStateId, + repoMetaVersion, + Function.identity(), + Priority.NORMAL, + ActionListener.wrap(newRepoData -> { + // Run unreferenced blobs cleanup in parallel to shard-level snapshot deletion + final ActionListener afterCleanupsListener = new GroupedActionListener<>( + ActionListener.wrap(() -> listener.onResponse(newRepoData)), + 2 + ); + cleanupUnlinkedRootAndIndicesBlobs( snapshotIds, - deleteResults, + foundIndices, + rootBlobs, + newRepoData, remoteStoreLockManagerFactory, afterCleanupsListener - ), - afterCleanupsListener::onFailure - ); - }, listener::onFailure)); + ); + final StepListener> writeMetaAndComputeDeletesStep = new StepListener<>(); + writeUpdatedShardMetaDataAndComputeDeletes( + snapshotIds, + repositoryData, + false, + remoteStoreLockManagerFactory, + writeMetaAndComputeDeletesStep + ); + writeMetaAndComputeDeletesStep.whenComplete( + deleteResults -> asyncCleanupUnlinkedShardLevelBlobs( + repositoryData, + snapshotIds, + deleteResults, + remoteStoreLockManagerFactory, + afterCleanupsListener + ), + afterCleanupsListener::onFailure + ); + }, listener::onFailure) + ); } } @@ -1583,6 +1594,7 @@ public void cleanup( repositoryStateId, repositoryMetaVersion, Function.identity(), + Priority.NORMAL, ActionListener.wrap( v -> cleanupStaleBlobs( Collections.emptyList(), @@ -1787,6 +1799,29 @@ public void finalizeSnapshot( Version repositoryMetaVersion, Function stateTransformer, final ActionListener listener + ) { + finalizeSnapshot( + shardGenerations, + repositoryStateId, + clusterMetadata, + snapshotInfo, + repositoryMetaVersion, + stateTransformer, + Priority.NORMAL, + listener + ); + } + + @Override + public void finalizeSnapshot( + final ShardGenerations shardGenerations, + final long repositoryStateId, + final Metadata clusterMetadata, + SnapshotInfo snapshotInfo, + Version repositoryMetaVersion, + Function stateTransformer, + Priority repositoryUpdatePriority, + final ActionListener listener ) { assert repositoryStateId > RepositoryData.UNKNOWN_REPO_GEN : "Must finalize based on a valid repository generation but received [" + repositoryStateId @@ -1834,6 +1869,7 @@ public void finalizeSnapshot( repositoryStateId, repositoryMetaVersion, stateTransformer, + repositoryUpdatePriority, ActionListener.wrap(newRepoData -> { if (writeShardGens) { cleanupOldShardGens(existingRepositoryData, updatedRepositoryData); @@ -2367,10 +2403,11 @@ public boolean isSystemRepository() { * Lastly, the {@link RepositoryMetadata} entry for this repository is updated to the new generation {@code P + 1} and thus * pending and safe generation are set to the same value marking the end of the update of the repository data. * - * @param repositoryData RepositoryData to write - * @param expectedGen expected repository generation at the start of the operation - * @param version version of the repository metadata to write - * @param stateFilter filter for the last cluster state update executed by this method + * @param repositoryData RepositoryData to write + * @param expectedGen expected repository generation at the start of the operation + * @param version version of the repository metadata to write + * @param stateFilter filter for the last cluster state update executed by this method + * @param repositoryUpdatePriority priority for the cluster state update task * @param listener completion listener */ protected void writeIndexGen( @@ -2378,6 +2415,7 @@ protected void writeIndexGen( long expectedGen, Version version, Function stateFilter, + Priority repositoryUpdatePriority, ActionListener listener ) { assert isReadOnly() == false; // can not write to a read only repository @@ -2402,7 +2440,7 @@ protected void writeIndexGen( final StepListener setPendingStep = new StepListener<>(); clusterService.submitStateUpdateTask( "set pending repository generation [" + metadata.name() + "][" + expectedGen + "]", - new ClusterStateUpdateTask() { + new ClusterStateUpdateTask(repositoryUpdatePriority) { private long newGen; @@ -2540,7 +2578,7 @@ public void onFailure(Exception e) { // Step 3: Update CS to reflect new repository generation. clusterService.submitStateUpdateTask( "set safe repository generation [" + metadata.name() + "][" + newGen + "]", - new ClusterStateUpdateTask() { + new ClusterStateUpdateTask(repositoryUpdatePriority) { @Override public ClusterState execute(ClusterState currentState) { final RepositoryMetadata meta = getRepoMetadata(currentState); diff --git a/server/src/main/java/org/opensearch/snapshots/SnapshotInfo.java b/server/src/main/java/org/opensearch/snapshots/SnapshotInfo.java index ba4e53fc43f8a..7e1b0d9901050 100644 --- a/server/src/main/java/org/opensearch/snapshots/SnapshotInfo.java +++ b/server/src/main/java/org/opensearch/snapshots/SnapshotInfo.java @@ -102,6 +102,9 @@ public final class SnapshotInfo implements Comparable, ToXContent, private static final String INCLUDE_GLOBAL_STATE = "include_global_state"; private static final String REMOTE_STORE_INDEX_SHALLOW_COPY = "remote_store_index_shallow_copy"; + + private static final String PINNED_TIMESTAMP = "pinned_timestamp"; + private static final String USER_METADATA = "metadata"; private static final Comparator COMPARATOR = Comparator.comparing(SnapshotInfo::startTime) @@ -125,6 +128,7 @@ public static final class SnapshotInfoBuilder { private Boolean includeGlobalState = null; private Boolean remoteStoreIndexShallowCopy = null; + private long pinnedTimestamp = 0L; private Map userMetadata = null; private int version = -1; private List shardFailures = null; @@ -181,6 +185,10 @@ private void setRemoteStoreIndexShallowCopy(Boolean remoteStoreIndexShallowCopy) this.remoteStoreIndexShallowCopy = remoteStoreIndexShallowCopy; } + private void setPinnedTimestamp(long pinnedTimestamp) { + this.pinnedTimestamp = pinnedTimestamp; + } + private void setShardFailures(List shardFailures) { this.shardFailures = shardFailures; } @@ -220,7 +228,8 @@ public SnapshotInfo build() { shardFailures, includeGlobalState, userMetadata, - remoteStoreIndexShallowCopy + remoteStoreIndexShallowCopy, + pinnedTimestamp ); } } @@ -275,6 +284,7 @@ int getSuccessfulShards() { SnapshotInfoBuilder::setRemoteStoreIndexShallowCopy, new ParseField(REMOTE_STORE_INDEX_SHALLOW_COPY) ); + SNAPSHOT_INFO_PARSER.declareLong(SnapshotInfoBuilder::setPinnedTimestamp, new ParseField(PINNED_TIMESTAMP)); SNAPSHOT_INFO_PARSER.declareObjectArray( SnapshotInfoBuilder::setShardFailures, SnapshotShardFailure.SNAPSHOT_SHARD_FAILURE_PARSER, @@ -311,6 +321,7 @@ int getSuccessfulShards() { @Nullable private Boolean remoteStoreIndexShallowCopy; + private long pinnedTimestamp; @Nullable private final Map userMetadata; @@ -320,11 +331,11 @@ int getSuccessfulShards() { private final List shardFailures; public SnapshotInfo(SnapshotId snapshotId, List indices, List dataStreams, SnapshotState state) { - this(snapshotId, indices, dataStreams, state, null, null, 0L, 0L, 0, 0, Collections.emptyList(), null, null, null); + this(snapshotId, indices, dataStreams, state, null, null, 0L, 0L, 0, 0, Collections.emptyList(), null, null, null, 0); } public SnapshotInfo(SnapshotId snapshotId, List indices, List dataStreams, SnapshotState state, Version version) { - this(snapshotId, indices, dataStreams, state, null, version, 0L, 0L, 0, 0, Collections.emptyList(), null, null, null); + this(snapshotId, indices, dataStreams, state, null, version, 0L, 0L, 0, 0, Collections.emptyList(), null, null, null, 0); } public SnapshotInfo(SnapshotsInProgress.Entry entry) { @@ -342,7 +353,8 @@ public SnapshotInfo(SnapshotsInProgress.Entry entry) { Collections.emptyList(), entry.includeGlobalState(), entry.userMetadata(), - entry.remoteStoreIndexShallowCopy() + entry.remoteStoreIndexShallowCopy(), + 0L ); } @@ -373,7 +385,41 @@ public SnapshotInfo( shardFailures, includeGlobalState, userMetadata, - remoteStoreIndexShallowCopy + remoteStoreIndexShallowCopy, + 0 + ); + } + + public SnapshotInfo( + SnapshotId snapshotId, + List indices, + List dataStreams, + long startTime, + String reason, + long endTime, + int totalShards, + List shardFailures, + Boolean includeGlobalState, + Map userMetadata, + Boolean remoteStoreIndexShallowCopy, + long pinnedTimestamp + ) { + this( + snapshotId, + indices, + dataStreams, + snapshotState(reason, shardFailures), + reason, + Version.CURRENT, + startTime, + endTime, + totalShards, + totalShards - shardFailures.size(), + shardFailures, + includeGlobalState, + userMetadata, + remoteStoreIndexShallowCopy, + pinnedTimestamp ); } @@ -391,7 +437,8 @@ public SnapshotInfo( List shardFailures, Boolean includeGlobalState, Map userMetadata, - Boolean remoteStoreIndexShallowCopy + Boolean remoteStoreIndexShallowCopy, + long pinnedTimestamp ) { this.snapshotId = Objects.requireNonNull(snapshotId); this.indices = Collections.unmodifiableList(Objects.requireNonNull(indices)); @@ -407,6 +454,7 @@ public SnapshotInfo( this.includeGlobalState = includeGlobalState; this.userMetadata = userMetadata; this.remoteStoreIndexShallowCopy = remoteStoreIndexShallowCopy; + this.pinnedTimestamp = pinnedTimestamp; } /** @@ -437,6 +485,9 @@ public SnapshotInfo(final StreamInput in) throws IOException { if (in.getVersion().onOrAfter(Version.V_2_9_0)) { remoteStoreIndexShallowCopy = in.readOptionalBoolean(); } + if (in.getVersion().onOrAfter(Version.CURRENT)) { + pinnedTimestamp = in.readVLong(); + } } /** @@ -551,6 +602,10 @@ public Boolean isRemoteStoreIndexShallowCopyEnabled() { return remoteStoreIndexShallowCopy; } + public long getPinnedTimestamp() { + return pinnedTimestamp; + } + /** * Returns shard failures; an empty list will be returned if there were no shard * failures, or if {@link #state()} returns {@code null}. @@ -618,6 +673,8 @@ public String toString() { + shardFailures + ", isRemoteStoreInteropEnabled=" + remoteStoreIndexShallowCopy + + ", pinnedTimestamp=" + + pinnedTimestamp + '}'; } @@ -657,6 +714,10 @@ public XContentBuilder toXContent(final XContentBuilder builder, final Params pa if (remoteStoreIndexShallowCopy != null) { builder.field(REMOTE_STORE_INDEX_SHALLOW_COPY, remoteStoreIndexShallowCopy); } + if (pinnedTimestamp != 0) { + builder.field(PINNED_TIMESTAMP, pinnedTimestamp); + } + builder.startArray(INDICES); for (String index : indices) { builder.value(index); @@ -715,6 +776,9 @@ private XContentBuilder toXContentInternal(final XContentBuilder builder, final if (remoteStoreIndexShallowCopy != null) { builder.field(REMOTE_STORE_INDEX_SHALLOW_COPY, remoteStoreIndexShallowCopy); } + if (pinnedTimestamp != 0) { + builder.field(PINNED_TIMESTAMP, pinnedTimestamp); + } builder.startArray(INDICES); for (String index : indices) { builder.value(index); @@ -763,6 +827,7 @@ public static SnapshotInfo fromXContentInternal(final XContentParser parser) thr long endTime = 0; int totalShards = 0; int successfulShards = 0; + long pinnedTimestamp = 0; Boolean includeGlobalState = null; Boolean remoteStoreIndexShallowCopy = null; Map userMetadata = null; @@ -804,6 +869,8 @@ public static SnapshotInfo fromXContentInternal(final XContentParser parser) thr includeGlobalState = parser.booleanValue(); } else if (REMOTE_STORE_INDEX_SHALLOW_COPY.equals(currentFieldName)) { remoteStoreIndexShallowCopy = parser.booleanValue(); + } else if (PINNED_TIMESTAMP.equals(currentFieldName)) { + pinnedTimestamp = parser.longValue(); } } else if (token == XContentParser.Token.START_ARRAY) { if (DATA_STREAMS.equals(currentFieldName)) { @@ -856,7 +923,8 @@ public static SnapshotInfo fromXContentInternal(final XContentParser parser) thr shardFailures, includeGlobalState, userMetadata, - remoteStoreIndexShallowCopy + remoteStoreIndexShallowCopy, + pinnedTimestamp ); } @@ -892,6 +960,9 @@ public void writeTo(final StreamOutput out) throws IOException { if (out.getVersion().onOrAfter(Version.V_2_9_0)) { out.writeOptionalBoolean(remoteStoreIndexShallowCopy); } + if (out.getVersion().onOrAfter(Version.CURRENT)) { + out.writeVLong(pinnedTimestamp); + } } private static SnapshotState snapshotState(final String reason, final List shardFailures) { @@ -924,7 +995,8 @@ public boolean equals(Object o) { && Objects.equals(version, that.version) && Objects.equals(shardFailures, that.shardFailures) && Objects.equals(userMetadata, that.userMetadata) - && Objects.equals(remoteStoreIndexShallowCopy, that.remoteStoreIndexShallowCopy); + && Objects.equals(remoteStoreIndexShallowCopy, that.remoteStoreIndexShallowCopy) + && Objects.equals(pinnedTimestamp, that.pinnedTimestamp); } @Override @@ -944,7 +1016,8 @@ public int hashCode() { version, shardFailures, userMetadata, - remoteStoreIndexShallowCopy + remoteStoreIndexShallowCopy, + pinnedTimestamp ); } } diff --git a/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java b/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java index 16012df2b9234..f3afbd53f7703 100644 --- a/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java @@ -137,6 +137,7 @@ import static org.opensearch.node.remotestore.RemoteStoreNodeService.CompatibilityMode; import static org.opensearch.node.remotestore.RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING; import static org.opensearch.repositories.blobstore.BlobStoreRepository.REMOTE_STORE_INDEX_SHALLOW_COPY; +import static org.opensearch.repositories.blobstore.BlobStoreRepository.SHALLOW_SNAPSHOT_V2; import static org.opensearch.snapshots.SnapshotUtils.validateSnapshotsBackingAnyIndex; /** @@ -224,6 +225,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus Setting.Property.Dynamic ); + private static final String SNAPSHOT_PINNED_TIMESTAMP_DELIMITER = "__"; private volatile int maxConcurrentOperations; public SnapshotsService( @@ -413,10 +415,36 @@ public TimeValue timeout() { * @param listener snapshot completion listener */ public void executeSnapshot(final CreateSnapshotRequest request, final ActionListener listener) { - createSnapshot( - request, - ActionListener.wrap(snapshot -> addListener(snapshot, ActionListener.map(listener, Tuple::v2)), listener::onFailure) - ); + Repository repository = repositoriesService.repository(request.repository()); + + boolean isSnapshotV2 = SHALLOW_SNAPSHOT_V2.get(repository.getMetadata().settings()); + logger.debug("shallow_snapshot_v2 is set as [{}]", isSnapshotV2); + + boolean remoteStoreIndexShallowCopy = remoteStoreShallowCopyEnabled(repository); + if (remoteStoreIndexShallowCopy + && isSnapshotV2 + && request.indices().length == 0 + && clusterService.state().nodes().getMinNodeVersion().onOrAfter(Version.CURRENT)) { + createSnapshotV2(request, listener); + } else { + createSnapshot( + request, + ActionListener.wrap(snapshot -> addListener(snapshot, ActionListener.map(listener, Tuple::v2)), listener::onFailure) + ); + } + } + + private boolean remoteStoreShallowCopyEnabled(Repository repository) { + boolean remoteStoreIndexShallowCopy = REMOTE_STORE_INDEX_SHALLOW_COPY.get(repository.getMetadata().settings()); + logger.debug("remote_store_index_shallow_copy setting is set as [{}]", remoteStoreIndexShallowCopy); + if (remoteStoreIndexShallowCopy + && clusterService.getClusterSettings().get(REMOTE_STORE_COMPATIBILITY_MODE_SETTING).equals(CompatibilityMode.MIXED)) { + // don't allow shallow snapshots if compatibility mode is not strict + logger.warn("Shallow snapshots are not supported during migration. Falling back to full snapshot."); + remoteStoreIndexShallowCopy = false; + } + return remoteStoreIndexShallowCopy; + } /** @@ -424,6 +452,7 @@ public void executeSnapshot(final CreateSnapshotRequest request, final ActionLis *

* This method is used by clients to start snapshot. It makes sure that there is no snapshots are currently running and * creates a snapshot record in cluster state metadata. + *

* * @param request snapshot request * @param listener snapshot creation listener @@ -449,12 +478,11 @@ public void createSnapshot(final CreateSnapshotRequest request, final ActionList @Override public ClusterState execute(ClusterState currentState) { - ensureSnapshotNameAvailableInRepo(repositoryData, snapshotName, repository); + createSnapshotPreValidations(currentState, repositoryData, repositoryName, snapshotName); final SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY); final List runningSnapshots = snapshots.entries(); - ensureSnapshotNameNotRunning(runningSnapshots, repositoryName, snapshotName); - validate(repositoryName, snapshotName, currentState); final boolean concurrentOperationsAllowed = currentState.nodes().getMinNodeVersion().onOrAfter(FULL_CONCURRENCY_VERSION); + final SnapshotDeletionsInProgress deletionsInProgress = currentState.custom( SnapshotDeletionsInProgress.TYPE, SnapshotDeletionsInProgress.EMPTY @@ -466,17 +494,7 @@ public ClusterState execute(ClusterState currentState) { "cannot snapshot while a snapshot deletion is in-progress in [" + deletionsInProgress + "]" ); } - final RepositoryCleanupInProgress repositoryCleanupInProgress = currentState.custom( - RepositoryCleanupInProgress.TYPE, - RepositoryCleanupInProgress.EMPTY - ); - if (repositoryCleanupInProgress.hasCleanupInProgress()) { - throw new ConcurrentSnapshotExecutionException( - repositoryName, - snapshotName, - "cannot snapshot while a repository cleanup is in-progress in [" + repositoryCleanupInProgress + "]" - ); - } + // Fail if there are any concurrently running snapshots. The only exception to this being a snapshot in INIT state from a // previous cluster-manager that we can simply ignore and remove from the cluster state because we would clean it up from // the @@ -484,7 +502,7 @@ public ClusterState execute(ClusterState currentState) { if (concurrentOperationsAllowed == false && runningSnapshots.stream().anyMatch(entry -> entry.state() != State.INIT)) { throw new ConcurrentSnapshotExecutionException(repositoryName, snapshotName, " a snapshot is already running"); } - ensureNoCleanupInProgress(currentState, repositoryName, snapshotName); + ensureBelowConcurrencyLimit(repositoryName, snapshotName, snapshots, deletionsInProgress); // Store newSnapshot here to be processed in clusterStateProcessed List indices = Arrays.asList(indexNameExpressionResolver.concreteIndexNames(currentState, request)); @@ -585,6 +603,184 @@ public TimeValue timeout() { }, "create_snapshot [" + snapshotName + ']', listener::onFailure); } + /** + * Initializes the snapshotting process for clients when Snapshot v2 is enabled. This method is responsible for taking + * a shallow snapshot and pinning the snapshot timestamp.The entire process is executed on the cluster manager node. + * + * Unlike traditional snapshot operations, this method performs a synchronous snapshot execution and doesn't + * upload any shard metadata to the snapshot repository. + * The pinned timestamp is later reconciled with remote store segment and translog metadata files during the restore + * operation. + * + * @param request snapshot request + * @param listener snapshot creation listener + */ + public void createSnapshotV2(final CreateSnapshotRequest request, final ActionListener listener) { + long pinnedTimestamp = System.currentTimeMillis(); + final String repositoryName = request.repository(); + final String snapshotName = indexNameExpressionResolver.resolveDateMathExpression(request.snapshot()); + validate(repositoryName, snapshotName); + + final SnapshotId snapshotId = new SnapshotId(snapshotName, UUIDs.randomBase64UUID()); // new UUID for the snapshot + Repository repository = repositoriesService.repository(repositoryName); + + if (repository.isReadOnly()) { + listener.onFailure( + new RepositoryException(repository.getMetadata().name(), "cannot create snapshot-v2 in a readonly repository") + ); + return; + } + + final Snapshot snapshot = new Snapshot(repositoryName, snapshotId); + ClusterState currentState = clusterService.state(); + final Map userMeta = repository.adaptUserMetadata(request.userMetadata()); + try { + final StepListener repositoryDataListener = new StepListener<>(); + repositoriesService.getRepositoryData(repositoryName, repositoryDataListener); + + repositoryDataListener.whenComplete(repositoryData -> { + createSnapshotPreValidations(currentState, repositoryData, repositoryName, snapshotName); + + List indices = new ArrayList<>(currentState.metadata().indices().keySet()); + + final List dataStreams = indexNameExpressionResolver.dataStreamNames( + currentState, + request.indicesOptions(), + request.indices() + ); + + logger.trace("[{}][{}] creating snapshot-v2 for indices [{}]", repositoryName, snapshotName, indices); + + final SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY); + final List runningSnapshots = snapshots.entries(); + + final List indexIds = repositoryData.resolveNewIndices( + indices, + getInFlightIndexIds(runningSnapshots, repositoryName) + ); + final Version version = minCompatibleVersion(currentState.nodes().getMinNodeVersion(), repositoryData, null); + final ShardGenerations shardGenerations = buildShardsGenerationFromRepositoryData( + currentState.metadata(), + currentState.routingTable(), + indexIds, + repositoryData + ); + + if (repositoryData.getGenId() == RepositoryData.UNKNOWN_REPO_GEN) { + logger.debug("[{}] was aborted before starting", snapshot); + throw new SnapshotException(snapshot, "Aborted on initialization"); + } + final SnapshotInfo snapshotInfo = new SnapshotInfo( + snapshot.getSnapshotId(), + shardGenerations.indices().stream().map(IndexId::getName).collect(Collectors.toList()), + dataStreams, + pinnedTimestamp, + null, + System.currentTimeMillis(), + shardGenerations.totalShards(), + Collections.emptyList(), + request.includeGlobalState(), + userMeta, + true, + pinnedTimestamp + ); + if (!clusterService.state().nodes().isLocalNodeElectedClusterManager()) { + throw new SnapshotException(repositoryName, snapshotName, "Aborting snapshot-v2, no longer cluster manager"); + } + final StepListener pinnedTimestampListener = new StepListener<>(); + pinnedTimestampListener.whenComplete(repoData -> { listener.onResponse(snapshotInfo); }, listener::onFailure); + repository.finalizeSnapshot( + shardGenerations, + repositoryData.getGenId(), + metadataForSnapshot(currentState.metadata(), request.includeGlobalState(), false, dataStreams, indexIds), + snapshotInfo, + version, + state -> state, + Priority.IMMEDIATE, + new ActionListener() { + @Override + public void onResponse(RepositoryData repositoryData) { + if (!clusterService.state().nodes().isLocalNodeElectedClusterManager()) { + failSnapshotCompletionListeners( + snapshot, + new SnapshotException(snapshot, "Aborting snapshot-v2, no longer cluster manager") + ); + listener.onFailure( + new SnapshotException(repositoryName, snapshotName, "Aborting snapshot-v2, no longer cluster manager") + ); + return; + } + updateSnapshotPinnedTimestamp(repositoryData, snapshot, pinnedTimestamp, pinnedTimestampListener); + } + + @Override + public void onFailure(Exception e) { + logger.error("Failed to upload files to snapshot repo {} for snapshot-v2 {} ", repositoryName, snapshotName); + listener.onFailure(e); + } + } + ); + + }, listener::onFailure); + } catch (Exception e) { + assert false : new AssertionError(e); + logger.error("Snapshot-v2 {} creation failed with exception {}", snapshot.getSnapshotId().getName(), e); + listener.onFailure(e); + } + } + + private void createSnapshotPreValidations( + ClusterState currentState, + RepositoryData repositoryData, + String repositoryName, + String snapshotName + ) { + Repository repository = repositoriesService.repository(repositoryName); + ensureSnapshotNameAvailableInRepo(repositoryData, snapshotName, repository); + final SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY); + final List runningSnapshots = snapshots.entries(); + ensureSnapshotNameNotRunning(runningSnapshots, repositoryName, snapshotName); + validate(repositoryName, snapshotName, currentState); + final RepositoryCleanupInProgress repositoryCleanupInProgress = currentState.custom( + RepositoryCleanupInProgress.TYPE, + RepositoryCleanupInProgress.EMPTY + ); + if (repositoryCleanupInProgress.hasCleanupInProgress()) { + throw new ConcurrentSnapshotExecutionException( + repositoryName, + snapshotName, + "cannot snapshot-v2 while a repository cleanup is in-progress in [" + repositoryCleanupInProgress + "]" + ); + } + ensureNoCleanupInProgress(currentState, repositoryName, snapshotName); + } + + private void updateSnapshotPinnedTimestamp( + RepositoryData repositoryData, + Snapshot snapshot, + long timestampToPin, + ActionListener listener + ) { + remoteStorePinnedTimestampService.pinTimestamp( + timestampToPin, + snapshot.getRepository() + SNAPSHOT_PINNED_TIMESTAMP_DELIMITER + snapshot.getSnapshotId().getUUID(), + new ActionListener() { + @Override + public void onResponse(Void unused) { + logger.debug("Timestamp pinned successfully for snapshot {}", snapshot.getSnapshotId().getName()); + listener.onResponse(repositoryData); + } + + @Override + public void onFailure(Exception e) { + logger.error("Failed to pin timestamp for snapshot {} with exception {}", snapshot.getSnapshotId().getName(), e); + listener.onFailure(e); + + } + } + ); + } + private static void ensureSnapshotNameNotRunning( List runningSnapshots, String repositoryName, @@ -1089,7 +1285,13 @@ protected void doRun() { repository.initializeSnapshot( snapshot.snapshot().getSnapshotId(), snapshot.indices(), - metadataForSnapshot(snapshot, clusterState.metadata()) + metadataForSnapshot( + clusterState.metadata(), + snapshot.includeGlobalState(), + snapshot.partial(), + snapshot.dataStreams(), + snapshot.indices() + ) ); } @@ -1303,15 +1505,21 @@ private static ShardGenerations buildGenerations(SnapshotsInProgress.Entry snaps return builder.build(); } - private static Metadata metadataForSnapshot(SnapshotsInProgress.Entry snapshot, Metadata metadata) { + private static Metadata metadataForSnapshot( + Metadata metadata, + boolean includeGlobalState, + boolean isPartial, + List dataStreamsList, + List indices + ) { final Metadata.Builder builder; - if (snapshot.includeGlobalState() == false) { + if (includeGlobalState == false) { // Remove global state from the cluster state builder = Metadata.builder(); - for (IndexId index : snapshot.indices()) { + for (IndexId index : indices) { final IndexMetadata indexMetadata = metadata.index(index.getName()); if (indexMetadata == null) { - assert snapshot.partial() : "Index [" + index + "] was deleted during a snapshot but snapshot was not partial."; + assert isPartial : "Index [" + index + "] was deleted during a snapshot but snapshot was not partial."; } else { builder.put(indexMetadata, false); } @@ -1321,12 +1529,10 @@ private static Metadata metadataForSnapshot(SnapshotsInProgress.Entry snapshot, } // Only keep those data streams in the metadata that were actually requested by the initial snapshot create operation Map dataStreams = new HashMap<>(); - for (String dataStreamName : snapshot.dataStreams()) { + for (String dataStreamName : dataStreamsList) { DataStream dataStream = metadata.dataStreams().get(dataStreamName); if (dataStream == null) { - assert snapshot.partial() : "Data stream [" - + dataStreamName - + "] was deleted during a snapshot but snapshot was not partial."; + assert isPartial : "Data stream [" + dataStreamName + "] was deleted during a snapshot but snapshot was not partial."; } else { dataStreams.put(dataStreamName, dataStream); } @@ -1914,7 +2120,8 @@ private void finalizeSnapshotEntry(SnapshotsInProgress.Entry entry, Metadata met shardFailures, entry.includeGlobalState(), entry.userMetadata(), - entry.remoteStoreIndexShallowCopy() + entry.remoteStoreIndexShallowCopy(), + 0 ); final StepListener metadataListener = new StepListener<>(); final Repository repo = repositoriesService.repository(snapshot.getRepository()); @@ -1933,10 +2140,11 @@ private void finalizeSnapshotEntry(SnapshotsInProgress.Entry entry, Metadata met meta -> repo.finalizeSnapshot( shardGenerations, repositoryData.getGenId(), - metadataForSnapshot(entry, meta), + metadataForSnapshot(meta, entry.includeGlobalState(), entry.partial(), entry.dataStreams(), entry.indices()), snapshotInfo, entry.version(), state -> stateWithoutSnapshot(state, snapshot), + Priority.NORMAL, ActionListener.wrap(newRepoData -> { completeListenersIgnoringException(endAndGetListenersToResolve(snapshot), Tuple.tuple(newRepoData, snapshotInfo)); logger.info("snapshot [{}] completed with state [{}]", snapshot, snapshotInfo.state()); @@ -3247,6 +3455,42 @@ private static Map shards( return Collections.unmodifiableMap(builder); } + private static ShardGenerations buildShardsGenerationFromRepositoryData( + Metadata metadata, + RoutingTable routingTable, + List indices, + RepositoryData repositoryData + ) { + ShardGenerations.Builder builder = ShardGenerations.builder(); + final ShardGenerations shardGenerations = repositoryData.shardGenerations(); + + for (IndexId index : indices) { + final String indexName = index.getName(); + final boolean isNewIndex = repositoryData.getIndices().containsKey(indexName) == false; + IndexMetadata indexMetadata = metadata.index(indexName); + + final IndexRoutingTable indexRoutingTable = routingTable.index(indexName); + for (int i = 0; i < indexMetadata.getNumberOfShards(); i++) { + final ShardId shardId = indexRoutingTable.shard(i).shardId(); + final String shardRepoGeneration; + + if (isNewIndex) { + assert shardGenerations.getShardGen(index, shardId.getId()) == null : "Found shard generation for new index [" + + index + + "]"; + shardRepoGeneration = ShardGenerations.NEW_SHARD_GEN; + } else { + shardRepoGeneration = shardGenerations.getShardGen(index, shardId.id()); + } + builder.put(index, shardId.id(), shardRepoGeneration); + + } + + } + + return builder.build(); + } + /** * Returns the data streams that are currently being snapshotted (with partial == false) and that are contained in the * indices-to-check set. diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/snapshots/create/CreateSnapshotResponseTests.java b/server/src/test/java/org/opensearch/action/admin/cluster/snapshots/create/CreateSnapshotResponseTests.java index 274a548fd98ab..2feb0d3ba9405 100644 --- a/server/src/test/java/org/opensearch/action/admin/cluster/snapshots/create/CreateSnapshotResponseTests.java +++ b/server/src/test/java/org/opensearch/action/admin/cluster/snapshots/create/CreateSnapshotResponseTests.java @@ -95,7 +95,8 @@ protected CreateSnapshotResponse createTestInstance() { shardFailures, globalState, SnapshotInfoTests.randomUserMetadata(), - false + false, + 0 ) ); } diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/snapshots/get/GetSnapshotsResponseTests.java b/server/src/test/java/org/opensearch/action/admin/cluster/snapshots/get/GetSnapshotsResponseTests.java index 3ef143e36dab9..58af390d194d3 100644 --- a/server/src/test/java/org/opensearch/action/admin/cluster/snapshots/get/GetSnapshotsResponseTests.java +++ b/server/src/test/java/org/opensearch/action/admin/cluster/snapshots/get/GetSnapshotsResponseTests.java @@ -77,7 +77,8 @@ protected GetSnapshotsResponse createTestInstance() { shardFailures, randomBoolean(), SnapshotInfoTests.randomUserMetadata(), - false + false, + 0 ) ); } diff --git a/server/src/test/java/org/opensearch/repositories/RepositoriesServiceTests.java b/server/src/test/java/org/opensearch/repositories/RepositoriesServiceTests.java index 1cc4e8aa7ebee..432a9c4bc4f56 100644 --- a/server/src/test/java/org/opensearch/repositories/RepositoriesServiceTests.java +++ b/server/src/test/java/org/opensearch/repositories/RepositoriesServiceTests.java @@ -51,6 +51,7 @@ import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.cluster.service.ClusterApplierService; import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.Priority; import org.opensearch.common.UUIDs; import org.opensearch.common.blobstore.BlobPath; import org.opensearch.common.blobstore.BlobStore; @@ -679,6 +680,20 @@ public void finalizeSnapshot( listener.onResponse(null); } + @Override + public void finalizeSnapshot( + ShardGenerations shardGenerations, + long repositoryStateId, + Metadata clusterMetadata, + SnapshotInfo snapshotInfo, + Version repositoryMetaVersion, + Function stateTransformer, + Priority repositoryUpdatePriority, + ActionListener listener + ) { + listener.onResponse(null); + } + @Override public void deleteSnapshots( Collection snapshotIds, diff --git a/server/src/test/java/org/opensearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java b/server/src/test/java/org/opensearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java index e4e83f2453fa2..7fc987dcfa9bb 100644 --- a/server/src/test/java/org/opensearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java +++ b/server/src/test/java/org/opensearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java @@ -42,6 +42,7 @@ import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.cluster.routing.ShardRoutingHelper; import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.Priority; import org.opensearch.common.UUIDs; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; @@ -213,10 +214,12 @@ public void testSnapshotWithConflictingName() throws Exception { Collections.emptyList(), true, Collections.emptyMap(), - false + false, + 0 ), Version.CURRENT, Function.identity(), + Priority.NORMAL, f ) ); diff --git a/server/src/test/java/org/opensearch/repositories/blobstore/BlobStoreRepositoryTests.java b/server/src/test/java/org/opensearch/repositories/blobstore/BlobStoreRepositoryTests.java index fa9cbabb3203b..6b550cbc60b29 100644 --- a/server/src/test/java/org/opensearch/repositories/blobstore/BlobStoreRepositoryTests.java +++ b/server/src/test/java/org/opensearch/repositories/blobstore/BlobStoreRepositoryTests.java @@ -39,6 +39,7 @@ import org.opensearch.client.Client; import org.opensearch.cluster.metadata.RepositoryMetadata; import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.Priority; import org.opensearch.common.UUIDs; import org.opensearch.common.settings.Settings; import org.opensearch.core.common.unit.ByteSizeUnit; @@ -224,7 +225,7 @@ public void testRepositoryDataConcurrentModificationNotAllowed() { RepositoryData repositoryData = generateRandomRepoData(); final long startingGeneration = repositoryData.getGenId(); final PlainActionFuture future1 = PlainActionFuture.newFuture(); - repository.writeIndexGen(repositoryData, startingGeneration, Version.CURRENT, Function.identity(), future1); + repository.writeIndexGen(repositoryData, startingGeneration, Version.CURRENT, Function.identity(), Priority.NORMAL, future1); // write repo data again to index generational file, errors because we already wrote to the // N+1 generation from which this repository data instance was created @@ -295,7 +296,7 @@ public void testFsRepositoryCompressDeprecatedIgnored() { private static void writeIndexGen(BlobStoreRepository repository, RepositoryData repositoryData, long generation) throws Exception { PlainActionFuture.get( - f -> repository.writeIndexGen(repositoryData, generation, Version.CURRENT, Function.identity(), f) + f -> repository.writeIndexGen(repositoryData, generation, Version.CURRENT, Function.identity(), Priority.NORMAL, f) ); } diff --git a/server/src/test/java/org/opensearch/snapshots/SnapshotInfoTests.java b/server/src/test/java/org/opensearch/snapshots/SnapshotInfoTests.java index 850a392c9619c..684a8dd36fccc 100644 --- a/server/src/test/java/org/opensearch/snapshots/SnapshotInfoTests.java +++ b/server/src/test/java/org/opensearch/snapshots/SnapshotInfoTests.java @@ -86,7 +86,8 @@ protected SnapshotInfo createTestInstance() { shardFailures, includeGlobalState, userMetadata, - remoteStoreIndexShallowCopy + remoteStoreIndexShallowCopy, + 0 ); } @@ -114,7 +115,8 @@ protected SnapshotInfo mutateInstance(SnapshotInfo instance) { instance.shardFailures(), instance.includeGlobalState(), instance.userMetadata(), - instance.isRemoteStoreIndexShallowCopyEnabled() + instance.isRemoteStoreIndexShallowCopyEnabled(), + 0 ); case 1: int indicesSize = randomValueOtherThan(instance.indices().size(), () -> randomIntBetween(1, 10)); @@ -132,7 +134,8 @@ protected SnapshotInfo mutateInstance(SnapshotInfo instance) { instance.shardFailures(), instance.includeGlobalState(), instance.userMetadata(), - instance.isRemoteStoreIndexShallowCopyEnabled() + instance.isRemoteStoreIndexShallowCopyEnabled(), + 0 ); case 2: return new SnapshotInfo( @@ -146,7 +149,8 @@ protected SnapshotInfo mutateInstance(SnapshotInfo instance) { instance.shardFailures(), instance.includeGlobalState(), instance.userMetadata(), - instance.isRemoteStoreIndexShallowCopyEnabled() + instance.isRemoteStoreIndexShallowCopyEnabled(), + 0 ); case 3: return new SnapshotInfo( @@ -160,7 +164,8 @@ protected SnapshotInfo mutateInstance(SnapshotInfo instance) { instance.shardFailures(), instance.includeGlobalState(), instance.userMetadata(), - instance.isRemoteStoreIndexShallowCopyEnabled() + instance.isRemoteStoreIndexShallowCopyEnabled(), + 0 ); case 4: return new SnapshotInfo( @@ -174,7 +179,8 @@ protected SnapshotInfo mutateInstance(SnapshotInfo instance) { instance.shardFailures(), instance.includeGlobalState(), instance.userMetadata(), - instance.isRemoteStoreIndexShallowCopyEnabled() + instance.isRemoteStoreIndexShallowCopyEnabled(), + 0 ); case 5: int totalShards = randomValueOtherThan(instance.totalShards(), () -> randomIntBetween(0, 100)); @@ -200,7 +206,8 @@ protected SnapshotInfo mutateInstance(SnapshotInfo instance) { shardFailures, instance.includeGlobalState(), instance.userMetadata(), - instance.isRemoteStoreIndexShallowCopyEnabled() + instance.isRemoteStoreIndexShallowCopyEnabled(), + 0 ); case 6: return new SnapshotInfo( @@ -214,7 +221,8 @@ protected SnapshotInfo mutateInstance(SnapshotInfo instance) { instance.shardFailures(), Boolean.FALSE.equals(instance.includeGlobalState()), instance.userMetadata(), - instance.isRemoteStoreIndexShallowCopyEnabled() + instance.isRemoteStoreIndexShallowCopyEnabled(), + 0 ); case 7: return new SnapshotInfo( @@ -228,7 +236,8 @@ protected SnapshotInfo mutateInstance(SnapshotInfo instance) { instance.shardFailures(), instance.includeGlobalState(), randomValueOtherThan(instance.userMetadata(), SnapshotInfoTests::randomUserMetadata), - instance.isRemoteStoreIndexShallowCopyEnabled() + instance.isRemoteStoreIndexShallowCopyEnabled(), + 0 ); case 8: List dataStreams = randomValueOtherThan( @@ -246,7 +255,8 @@ protected SnapshotInfo mutateInstance(SnapshotInfo instance) { instance.shardFailures(), instance.includeGlobalState(), instance.userMetadata(), - instance.isRemoteStoreIndexShallowCopyEnabled() + instance.isRemoteStoreIndexShallowCopyEnabled(), + 123456 ); case 9: return new SnapshotInfo( @@ -260,7 +270,8 @@ protected SnapshotInfo mutateInstance(SnapshotInfo instance) { instance.shardFailures(), instance.includeGlobalState(), instance.userMetadata(), - Boolean.FALSE.equals(instance.isRemoteStoreIndexShallowCopyEnabled()) + Boolean.FALSE.equals(instance.isRemoteStoreIndexShallowCopyEnabled()), + 123456 ); default: throw new IllegalArgumentException("invalid randomization case"); diff --git a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java index 769dfeb37ff8d..e27223cea0778 100644 --- a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java @@ -2379,6 +2379,7 @@ public void onFailure(final Exception e) { clusterService, threadPool, snapshotsService, + repositoriesService, actionFilters, indexNameExpressionResolver ) diff --git a/server/src/test/java/org/opensearch/snapshots/mockstore/MockEventuallyConsistentRepositoryTests.java b/server/src/test/java/org/opensearch/snapshots/mockstore/MockEventuallyConsistentRepositoryTests.java index 43dde7281fb2d..06a486b3cb997 100644 --- a/server/src/test/java/org/opensearch/snapshots/mockstore/MockEventuallyConsistentRepositoryTests.java +++ b/server/src/test/java/org/opensearch/snapshots/mockstore/MockEventuallyConsistentRepositoryTests.java @@ -36,6 +36,7 @@ import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.metadata.RepositoryMetadata; import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.Priority; import org.opensearch.common.UUIDs; import org.opensearch.common.blobstore.BlobContainer; import org.opensearch.common.settings.ClusterSettings; @@ -233,10 +234,12 @@ public void testOverwriteSnapshotInfoBlob() throws Exception { Collections.emptyList(), true, Collections.emptyMap(), - false + false, + 0 ), Version.CURRENT, Function.identity(), + Priority.NORMAL, f )); @@ -259,10 +262,12 @@ public void testOverwriteSnapshotInfoBlob() throws Exception { Collections.emptyList(), true, Collections.emptyMap(), - false + false, + 0 ), Version.CURRENT, Function.identity(), + Priority.NORMAL, f ) ) @@ -287,10 +292,12 @@ public void testOverwriteSnapshotInfoBlob() throws Exception { Collections.emptyList(), true, Collections.emptyMap(), - false + false, + 0 ), Version.CURRENT, Function.identity(), + Priority.NORMAL, f ) ); diff --git a/test/framework/src/main/java/org/opensearch/index/shard/RestoreOnlyRepository.java b/test/framework/src/main/java/org/opensearch/index/shard/RestoreOnlyRepository.java index 7e25ed97a7ea1..bd866b8d8173a 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/RestoreOnlyRepository.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/RestoreOnlyRepository.java @@ -39,6 +39,7 @@ import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.metadata.RepositoryMetadata; import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.common.Priority; import org.opensearch.common.lifecycle.AbstractLifecycleComponent; import org.opensearch.core.action.ActionListener; import org.opensearch.core.index.shard.ShardId; @@ -134,6 +135,20 @@ public void finalizeSnapshot( listener.onResponse(null); } + @Override + public void finalizeSnapshot( + ShardGenerations shardGenerations, + long repositoryStateId, + Metadata clusterMetadata, + SnapshotInfo snapshotInfo, + Version repositoryMetaVersion, + Function stateTransformer, + Priority repositoryUpdatePriority, + ActionListener listener + ) { + listener.onResponse(null); + } + @Override public void deleteSnapshots( Collection snapshotIds, diff --git a/test/framework/src/main/java/org/opensearch/repositories/blobstore/BlobStoreTestUtil.java b/test/framework/src/main/java/org/opensearch/repositories/blobstore/BlobStoreTestUtil.java index d26fff5fe51ff..01ca3aed54e6f 100644 --- a/test/framework/src/main/java/org/opensearch/repositories/blobstore/BlobStoreTestUtil.java +++ b/test/framework/src/main/java/org/opensearch/repositories/blobstore/BlobStoreTestUtil.java @@ -298,23 +298,25 @@ private static void assertSnapshotUUIDs(BlobStoreRepository repository, Reposito .stream() .noneMatch(shardFailure -> shardFailure.index().equals(index) && shardFailure.shardId() == shardId)) { final Map shardPathContents = shardContainer.listBlobs(); - - assertTrue( - shardPathContents.containsKey( - String.format(Locale.ROOT, BlobStoreRepository.SHALLOW_SNAPSHOT_NAME_FORMAT, snapshotId.getUUID()) - ) - || shardPathContents.containsKey( - String.format(Locale.ROOT, BlobStoreRepository.SNAPSHOT_NAME_FORMAT, snapshotId.getUUID()) + if (snapshotInfo.getPinnedTimestamp() == 0) { + assertTrue( + shardPathContents.containsKey( + String.format(Locale.ROOT, BlobStoreRepository.SHALLOW_SNAPSHOT_NAME_FORMAT, snapshotId.getUUID()) ) - ); + || shardPathContents.containsKey( + String.format(Locale.ROOT, BlobStoreRepository.SNAPSHOT_NAME_FORMAT, snapshotId.getUUID()) + ) + ); + + assertThat( + shardPathContents.keySet() + .stream() + .filter(name -> name.startsWith(BlobStoreRepository.INDEX_FILE_PREFIX)) + .count(), + lessThanOrEqualTo(2L) + ); + } - assertThat( - shardPathContents.keySet() - .stream() - .filter(name -> name.startsWith(BlobStoreRepository.INDEX_FILE_PREFIX)) - .count(), - lessThanOrEqualTo(2L) - ); } } } diff --git a/test/framework/src/main/java/org/opensearch/snapshots/AbstractSnapshotIntegTestCase.java b/test/framework/src/main/java/org/opensearch/snapshots/AbstractSnapshotIntegTestCase.java index e690593820f2e..c0265393ca7bb 100644 --- a/test/framework/src/main/java/org/opensearch/snapshots/AbstractSnapshotIntegTestCase.java +++ b/test/framework/src/main/java/org/opensearch/snapshots/AbstractSnapshotIntegTestCase.java @@ -48,6 +48,7 @@ import org.opensearch.cluster.metadata.RepositoryMetadata; import org.opensearch.cluster.routing.allocation.decider.EnableAllocationDecider; import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.Priority; import org.opensearch.common.UUIDs; import org.opensearch.common.action.ActionFuture; import org.opensearch.common.blobstore.BlobContainer; @@ -613,7 +614,8 @@ protected void addBwCFailedSnapshot(String repoName, String snapshotName, Mapget( f -> repo.finalizeSnapshot( @@ -623,6 +625,7 @@ protected void addBwCFailedSnapshot(String repoName, String snapshotName, Map