diff --git a/CHANGELOG.md b/CHANGELOG.md index 4700a5ec185fb..c1c4a19f4697f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -26,6 +26,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add allowlist setting for ingest-geoip and ingest-useragent ([#15325](https://github.com/opensearch-project/OpenSearch/pull/15325)) - Adding access to noSubMatches and noOverlappingMatches in Hyphenation ([#13895](https://github.com/opensearch-project/OpenSearch/pull/13895)) - Add support for index level max slice count setting for concurrent segment search ([#15336](https://github.com/opensearch-project/OpenSearch/pull/15336)) +- Add support for centralize snapshot creation with pinned timestamp ([#15124](https://github.com/opensearch-project/OpenSearch/pull/15124)) +- Add concurrent search support for Derived Fields ([#15326](https://github.com/opensearch-project/OpenSearch/pull/15326)) - [Range Queries] Add new approximateable query framework to short-circuit range queries ([#13788](https://github.com/opensearch-project/OpenSearch/pull/13788)) ### Dependencies diff --git a/modules/lang-painless/src/internalClusterTest/java/org/opensearch/painless/SimplePainlessIT.java b/modules/lang-painless/src/internalClusterTest/java/org/opensearch/painless/SimplePainlessIT.java index df327bf4871c6..c9078fdeeea28 100644 --- a/modules/lang-painless/src/internalClusterTest/java/org/opensearch/painless/SimplePainlessIT.java +++ b/modules/lang-painless/src/internalClusterTest/java/org/opensearch/painless/SimplePainlessIT.java @@ -188,10 +188,6 @@ public void testTermsValuesSource() throws Exception { } public void testSimpleDerivedFieldsQuery() { - assumeFalse( - "Derived fields do not support concurrent search https://github.com/opensearch-project/OpenSearch/issues/15007", - internalCluster().clusterService().getClusterSettings().get(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING) - ); SearchRequest searchRequest = new SearchRequest("test-df").source( SearchSourceBuilder.searchSource() .derivedField("result", "keyword", new Script("emit(params._source[\"field\"])")) @@ -204,10 +200,6 @@ public void testSimpleDerivedFieldsQuery() { } public void testSimpleDerivedFieldsAgg() { - assumeFalse( - "Derived fields do not support concurrent search https://github.com/opensearch-project/OpenSearch/issues/15007", - internalCluster().clusterService().getClusterSettings().get(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING) - ); SearchRequest searchRequest = new SearchRequest("test-df").source( SearchSourceBuilder.searchSource() .derivedField("result", "keyword", new Script("emit(params._source[\"field\"])")) diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3Repository.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3Repository.java index 01b75c0b915f2..b5c526451899e 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3Repository.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3Repository.java @@ -42,6 +42,7 @@ import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.metadata.RepositoryMetadata; import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.Priority; import org.opensearch.common.blobstore.BlobPath; import org.opensearch.common.blobstore.BlobStore; import org.opensearch.common.blobstore.BlobStoreException; @@ -391,6 +392,7 @@ public void finalizeSnapshot( SnapshotInfo snapshotInfo, Version repositoryMetaVersion, Function stateTransformer, + Priority repositoryUpdatePriority, ActionListener listener ) { super.finalizeSnapshot( @@ -400,6 +402,7 @@ public void finalizeSnapshot( snapshotInfo, repositoryMetaVersion, stateTransformer, + repositoryUpdatePriority, listener ); } diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteRestoreSnapshotIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteRestoreSnapshotIT.java index f8e5079b01a36..dc0654c623137 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteRestoreSnapshotIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteRestoreSnapshotIT.java @@ -10,6 +10,7 @@ import org.opensearch.action.DocWriteResponse; import org.opensearch.action.admin.cluster.remotestore.restore.RestoreRemoteStoreRequest; +import org.opensearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse; import org.opensearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse; import org.opensearch.action.admin.indices.delete.DeleteIndexRequest; import org.opensearch.action.delete.DeleteResponse; @@ -23,6 +24,7 @@ import org.opensearch.common.io.PathUtils; import org.opensearch.common.settings.Settings; import org.opensearch.common.util.io.IOUtils; +import org.opensearch.core.common.unit.ByteSizeUnit; import org.opensearch.core.index.Index; import org.opensearch.core.rest.RestStatus; import org.opensearch.index.IndexService; @@ -31,7 +33,13 @@ import org.opensearch.index.remote.RemoteStoreEnums.PathType; import org.opensearch.index.shard.IndexShard; import org.opensearch.indices.IndicesService; +import org.opensearch.indices.RemoteStoreSettings; import org.opensearch.indices.replication.common.ReplicationType; +import org.opensearch.repositories.RepositoriesService; +import org.opensearch.repositories.Repository; +import org.opensearch.repositories.RepositoryData; +import org.opensearch.repositories.blobstore.BlobStoreRepository; +import org.opensearch.repositories.fs.FsRepository; import org.opensearch.snapshots.AbstractSnapshotIntegTestCase; import org.opensearch.snapshots.SnapshotInfo; import org.opensearch.snapshots.SnapshotRestoreException; @@ -46,6 +54,7 @@ import java.nio.file.Path; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Objects; @@ -63,6 +72,7 @@ import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) public class RemoteRestoreSnapshotIT extends AbstractSnapshotIntegTestCase { @@ -746,4 +756,596 @@ public void testInvalidRestoreRequestScenarios() throws Exception { assertTrue(exception.getMessage().contains("cannot remove setting [index.remote_store.segment.repository]" + " on restore")); } + public void testCreateSnapshotV2() throws Exception { + internalCluster().startClusterManagerOnlyNode(pinnedTimestampSettings()); + internalCluster().startDataOnlyNode(pinnedTimestampSettings()); + internalCluster().startDataOnlyNode(pinnedTimestampSettings()); + String indexName1 = "testindex1"; + String indexName2 = "testindex2"; + String indexName3 = "testindex3"; + String snapshotRepoName = "test-create-snapshot-repo"; + String snapshotName1 = "test-create-snapshot1"; + Path absolutePath1 = randomRepoPath().toAbsolutePath(); + logger.info("Snapshot Path [{}]", absolutePath1); + + assertAcked( + client().admin() + .cluster() + .preparePutRepository(snapshotRepoName) + .setType(FsRepository.TYPE) + .setSettings( + Settings.builder() + .put(FsRepository.LOCATION_SETTING.getKey(), absolutePath1) + .put(FsRepository.COMPRESS_SETTING.getKey(), randomBoolean()) + .put(FsRepository.CHUNK_SIZE_SETTING.getKey(), randomIntBetween(100, 1000), ByteSizeUnit.BYTES) + .put(BlobStoreRepository.REMOTE_STORE_INDEX_SHALLOW_COPY.getKey(), true) + .put(BlobStoreRepository.SHALLOW_SNAPSHOT_V2.getKey(), true) + ) + ); + + Client client = client(); + Settings indexSettings = getIndexSettings(20, 0).build(); + createIndex(indexName1, indexSettings); + + Settings indexSettings2 = getIndexSettings(15, 0).build(); + createIndex(indexName2, indexSettings2); + + final int numDocsInIndex1 = 10; + final int numDocsInIndex2 = 20; + indexDocuments(client, indexName1, numDocsInIndex1); + indexDocuments(client, indexName2, numDocsInIndex2); + ensureGreen(indexName1, indexName2); + + SnapshotInfo snapshotInfo = createSnapshot(snapshotRepoName, snapshotName1, Collections.emptyList()); + assertThat(snapshotInfo.state(), equalTo(SnapshotState.SUCCESS)); + assertThat(snapshotInfo.successfulShards(), greaterThan(0)); + assertThat(snapshotInfo.successfulShards(), equalTo(snapshotInfo.totalShards())); + assertThat(snapshotInfo.getPinnedTimestamp(), greaterThan(0L)); + + indexDocuments(client, indexName1, 10); + indexDocuments(client, indexName2, 20); + + createIndex(indexName3, indexSettings); + indexDocuments(client, indexName3, 10); + + String snapshotName2 = "test-create-snapshot2"; + + // verify even if waitForCompletion is not true, the request executes in a sync manner + CreateSnapshotResponse createSnapshotResponse2 = client().admin() + .cluster() + .prepareCreateSnapshot(snapshotRepoName, snapshotName2) + .get(); + snapshotInfo = createSnapshotResponse2.getSnapshotInfo(); + assertThat(snapshotInfo.state(), equalTo(SnapshotState.SUCCESS)); + assertThat(snapshotInfo.successfulShards(), greaterThan(0)); + assertThat(snapshotInfo.successfulShards(), equalTo(snapshotInfo.totalShards())); + assertThat(snapshotInfo.snapshotId().getName(), equalTo(snapshotName2)); + assertThat(snapshotInfo.getPinnedTimestamp(), greaterThan(0L)); + + } + + public void testMixedSnapshotCreationWithV2RepositorySetting() throws Exception { + + internalCluster().startClusterManagerOnlyNode(pinnedTimestampSettings()); + internalCluster().startDataOnlyNode(pinnedTimestampSettings()); + internalCluster().startDataOnlyNode(pinnedTimestampSettings()); + String indexName1 = "testindex1"; + String indexName2 = "testindex2"; + String indexName3 = "testindex3"; + String snapshotRepoName = "test-create-snapshot-repo"; + String snapshotName1 = "test-create-snapshot-v1"; + Path absolutePath1 = randomRepoPath().toAbsolutePath(); + logger.info("Snapshot Path [{}]", absolutePath1); + + assertAcked( + client().admin() + .cluster() + .preparePutRepository(snapshotRepoName) + .setType(FsRepository.TYPE) + .setSettings( + Settings.builder() + .put(FsRepository.LOCATION_SETTING.getKey(), absolutePath1) + .put(FsRepository.COMPRESS_SETTING.getKey(), randomBoolean()) + .put(FsRepository.CHUNK_SIZE_SETTING.getKey(), randomIntBetween(100, 1000), ByteSizeUnit.BYTES) + .put(BlobStoreRepository.REMOTE_STORE_INDEX_SHALLOW_COPY.getKey(), true) + .put(BlobStoreRepository.SHALLOW_SNAPSHOT_V2.getKey(), false) + ) + ); + Client client = client(); + Settings indexSettings = getIndexSettings(20, 0).build(); + createIndex(indexName1, indexSettings); + + Settings indexSettings2 = getIndexSettings(15, 0).build(); + createIndex(indexName2, indexSettings2); + + final int numDocsInIndex1 = 10; + final int numDocsInIndex2 = 20; + indexDocuments(client, indexName1, numDocsInIndex1); + indexDocuments(client, indexName2, numDocsInIndex2); + ensureGreen(indexName1, indexName2); + + SnapshotInfo snapshotInfo = createSnapshot(snapshotRepoName, snapshotName1, Collections.emptyList()); + assertThat(snapshotInfo.state(), equalTo(SnapshotState.SUCCESS)); + assertThat(snapshotInfo.successfulShards(), greaterThan(0)); + assertThat(snapshotInfo.successfulShards(), equalTo(snapshotInfo.totalShards())); + assertThat(snapshotInfo.getPinnedTimestamp(), equalTo(0L)); + + // enable shallow_snapshot_v2 + assertAcked( + client().admin() + .cluster() + .preparePutRepository(snapshotRepoName) + .setType(FsRepository.TYPE) + .setSettings( + Settings.builder() + .put(FsRepository.LOCATION_SETTING.getKey(), absolutePath1) + .put(FsRepository.COMPRESS_SETTING.getKey(), randomBoolean()) + .put(FsRepository.CHUNK_SIZE_SETTING.getKey(), randomIntBetween(100, 1000), ByteSizeUnit.BYTES) + .put(BlobStoreRepository.REMOTE_STORE_INDEX_SHALLOW_COPY.getKey(), true) + .put(BlobStoreRepository.SHALLOW_SNAPSHOT_V2.getKey(), true) + ) + ); + indexDocuments(client, indexName1, 10); + indexDocuments(client, indexName2, 20); + + createIndex(indexName3, indexSettings); + indexDocuments(client, indexName3, 10); + + String snapshotName2 = "test-create-snapshot-v2"; + + // verify even if waitForCompletion is not true, the request executes in a sync manner + CreateSnapshotResponse createSnapshotResponse2 = client().admin() + .cluster() + .prepareCreateSnapshot(snapshotRepoName, snapshotName2) + .get(); + snapshotInfo = createSnapshotResponse2.getSnapshotInfo(); + assertThat(snapshotInfo.state(), equalTo(SnapshotState.SUCCESS)); + assertThat(snapshotInfo.successfulShards(), greaterThan(0)); + assertThat(snapshotInfo.successfulShards(), equalTo(snapshotInfo.totalShards())); + assertThat(snapshotInfo.snapshotId().getName(), equalTo(snapshotName2)); + assertThat(snapshotInfo.getPinnedTimestamp(), greaterThan(0L)); + + } + + public void testConcurrentSnapshotV2CreateOperation() throws InterruptedException, ExecutionException { + internalCluster().startClusterManagerOnlyNode(pinnedTimestampSettings()); + internalCluster().startDataOnlyNode(pinnedTimestampSettings()); + internalCluster().startDataOnlyNode(pinnedTimestampSettings()); + String indexName1 = "testindex1"; + String indexName2 = "testindex2"; + String snapshotRepoName = "test-create-snapshot-repo"; + Path absolutePath1 = randomRepoPath().toAbsolutePath(); + logger.info("Snapshot Path [{}]", absolutePath1); + + assertAcked( + client().admin() + .cluster() + .preparePutRepository(snapshotRepoName) + .setType(FsRepository.TYPE) + .setSettings( + Settings.builder() + .put(FsRepository.LOCATION_SETTING.getKey(), absolutePath1) + .put(FsRepository.COMPRESS_SETTING.getKey(), randomBoolean()) + .put(FsRepository.CHUNK_SIZE_SETTING.getKey(), randomIntBetween(100, 1000), ByteSizeUnit.BYTES) + .put(BlobStoreRepository.REMOTE_STORE_INDEX_SHALLOW_COPY.getKey(), true) + .put(BlobStoreRepository.SHALLOW_SNAPSHOT_V2.getKey(), true) + ) + ); + + Client client = client(); + Settings indexSettings = getIndexSettings(20, 0).build(); + createIndex(indexName1, indexSettings); + + Settings indexSettings2 = getIndexSettings(15, 0).build(); + createIndex(indexName2, indexSettings2); + + final int numDocsInIndex1 = 10; + final int numDocsInIndex2 = 20; + indexDocuments(client, indexName1, numDocsInIndex1); + indexDocuments(client, indexName2, numDocsInIndex2); + ensureGreen(indexName1, indexName2); + + int concurrentSnapshots = 5; + + // Prepare threads for concurrent snapshot creation + List threads = new ArrayList<>(); + + for (int i = 0; i < concurrentSnapshots; i++) { + int snapshotIndex = i; + Thread thread = new Thread(() -> { + try { + String snapshotName = "snapshot-concurrent-" + snapshotIndex; + CreateSnapshotResponse createSnapshotResponse2 = client().admin() + .cluster() + .prepareCreateSnapshot(snapshotRepoName, snapshotName) + .get(); + SnapshotInfo snapshotInfo = createSnapshotResponse2.getSnapshotInfo(); + assertThat(snapshotInfo.state(), equalTo(SnapshotState.SUCCESS)); + assertThat(snapshotInfo.successfulShards(), greaterThan(0)); + assertThat(snapshotInfo.successfulShards(), equalTo(snapshotInfo.totalShards())); + assertThat(snapshotInfo.snapshotId().getName(), equalTo(snapshotName)); + assertThat(snapshotInfo.getPinnedTimestamp(), greaterThan(0L)); + } catch (Exception e) {} + }); + threads.add(thread); + } + // start all threads + for (Thread thread : threads) { + thread.start(); + } + + // Wait for all threads to complete + for (Thread thread : threads) { + thread.join(); + } + + // Validate that only one snapshot has been created + Repository repository = internalCluster().getInstance(RepositoriesService.class).repository(snapshotRepoName); + PlainActionFuture repositoryDataPlainActionFuture = new PlainActionFuture<>(); + repository.getRepositoryData(repositoryDataPlainActionFuture); + + RepositoryData repositoryData = repositoryDataPlainActionFuture.get(); + assertThat(repositoryData.getSnapshotIds().size(), greaterThanOrEqualTo(1)); + } + + public void testCreateSnapshotV2WithRedIndex() throws Exception { + internalCluster().startClusterManagerOnlyNode(pinnedTimestampSettings()); + internalCluster().startDataOnlyNode(pinnedTimestampSettings()); + internalCluster().startDataOnlyNode(pinnedTimestampSettings()); + String indexName1 = "testindex1"; + String indexName2 = "testindex2"; + String snapshotRepoName = "test-create-snapshot-repo"; + String snapshotName1 = "test-create-snapshot1"; + Path absolutePath1 = randomRepoPath().toAbsolutePath(); + logger.info("Snapshot Path [{}]", absolutePath1); + + assertAcked( + client().admin() + .cluster() + .preparePutRepository(snapshotRepoName) + .setType(FsRepository.TYPE) + .setSettings( + Settings.builder() + .put(FsRepository.LOCATION_SETTING.getKey(), absolutePath1) + .put(FsRepository.COMPRESS_SETTING.getKey(), randomBoolean()) + .put(FsRepository.CHUNK_SIZE_SETTING.getKey(), randomIntBetween(100, 1000), ByteSizeUnit.BYTES) + .put(BlobStoreRepository.REMOTE_STORE_INDEX_SHALLOW_COPY.getKey(), true) + .put(BlobStoreRepository.SHALLOW_SNAPSHOT_V2.getKey(), true) + ) + ); + + Client client = client(); + Settings indexSettings = getIndexSettings(20, 0).build(); + createIndex(indexName1, indexSettings); + + Settings indexSettings2 = getIndexSettings(15, 0).build(); + createIndex(indexName2, indexSettings2); + + final int numDocsInIndex1 = 10; + final int numDocsInIndex2 = 20; + indexDocuments(client, indexName1, numDocsInIndex1); + indexDocuments(client, indexName2, numDocsInIndex2); + ensureGreen(indexName1, indexName2); + + internalCluster().ensureAtLeastNumDataNodes(0); + ensureRed(indexName1); + ensureRed(indexName2); + CreateSnapshotResponse createSnapshotResponse2 = client().admin() + .cluster() + .prepareCreateSnapshot(snapshotRepoName, snapshotName1) + .get(); + SnapshotInfo snapshotInfo = createSnapshotResponse2.getSnapshotInfo(); + assertThat(snapshotInfo.state(), equalTo(SnapshotState.SUCCESS)); + assertThat(snapshotInfo.successfulShards(), greaterThan(0)); + assertThat(snapshotInfo.successfulShards(), equalTo(snapshotInfo.totalShards())); + assertThat(snapshotInfo.snapshotId().getName(), equalTo(snapshotName1)); + assertThat(snapshotInfo.getPinnedTimestamp(), greaterThan(0L)); + } + + public void testCreateSnapshotV2WithIndexingLoad() throws Exception { + internalCluster().startClusterManagerOnlyNode(pinnedTimestampSettings()); + internalCluster().startDataOnlyNode(pinnedTimestampSettings()); + internalCluster().startDataOnlyNode(pinnedTimestampSettings()); + String indexName1 = "testindex1"; + String indexName2 = "testindex2"; + String snapshotRepoName = "test-create-snapshot-repo"; + String snapshotName1 = "test-create-snapshot1"; + Path absolutePath1 = randomRepoPath().toAbsolutePath(); + logger.info("Snapshot Path [{}]", absolutePath1); + + assertAcked( + client().admin() + .cluster() + .preparePutRepository(snapshotRepoName) + .setType(FsRepository.TYPE) + .setSettings( + Settings.builder() + .put(FsRepository.LOCATION_SETTING.getKey(), absolutePath1) + .put(FsRepository.COMPRESS_SETTING.getKey(), randomBoolean()) + .put(FsRepository.CHUNK_SIZE_SETTING.getKey(), randomIntBetween(100, 1000), ByteSizeUnit.BYTES) + .put(BlobStoreRepository.REMOTE_STORE_INDEX_SHALLOW_COPY.getKey(), true) + .put(BlobStoreRepository.SHALLOW_SNAPSHOT_V2.getKey(), true) + ) + ); + + Client client = client(); + Settings indexSettings = getIndexSettings(20, 0).build(); + createIndex(indexName1, indexSettings); + + Settings indexSettings2 = getIndexSettings(15, 0).build(); + createIndex(indexName2, indexSettings2); + + final int numDocsInIndex1 = 10; + final int numDocsInIndex2 = 20; + indexDocuments(client, indexName1, numDocsInIndex1); + indexDocuments(client, indexName2, numDocsInIndex2); + ensureGreen(indexName1, indexName2); + + Thread indexingThread = new Thread(() -> { + try { + for (int i = 0; i < 50; i++) { + internalCluster().client().prepareIndex("test-index-load").setSource("field", "value" + i).execute().actionGet(); + } + } catch (Exception e) { + fail("indexing failed due to exception: " + e.getMessage()); + } + }); + + // Start indexing + indexingThread.start(); + + // Wait for a bit to let some documents be indexed + Thread.sleep(1000); + + // Create a snapshot while indexing is ongoing + CreateSnapshotResponse createSnapshotResponse2 = client().admin() + .cluster() + .prepareCreateSnapshot(snapshotRepoName, snapshotName1) + .get(); + + SnapshotInfo snapshotInfo = createSnapshotResponse2.getSnapshotInfo(); + assertThat(snapshotInfo.state(), equalTo(SnapshotState.SUCCESS)); + assertThat(snapshotInfo.successfulShards(), greaterThan(0)); + assertThat(snapshotInfo.successfulShards(), equalTo(snapshotInfo.totalShards())); + assertThat(snapshotInfo.snapshotId().getName(), equalTo(snapshotName1)); + assertThat(snapshotInfo.getPinnedTimestamp(), greaterThan(0L)); + assertTrue(snapshotInfo.indices().contains("test-index-load")); + assertTrue(snapshotInfo.indices().contains(indexName1)); + assertTrue(snapshotInfo.indices().contains(indexName2)); + indexingThread.join(); + + } + + public void testCreateSnapshotV2WithShallowCopySettingDisabled() throws Exception { + internalCluster().startClusterManagerOnlyNode(pinnedTimestampSettings()); + internalCluster().startDataOnlyNode(pinnedTimestampSettings()); + internalCluster().startDataOnlyNode(pinnedTimestampSettings()); + String indexName1 = "testindex1"; + String indexName2 = "testindex2"; + String snapshotRepoName = "test-create-snapshot-repo"; + String snapshotName1 = "test-create-snapshot1"; + Path absolutePath1 = randomRepoPath().toAbsolutePath(); + logger.info("Snapshot Path [{}]", absolutePath1); + + assertAcked( + client().admin() + .cluster() + .preparePutRepository(snapshotRepoName) + .setType(FsRepository.TYPE) + .setSettings( + Settings.builder() + .put(FsRepository.LOCATION_SETTING.getKey(), absolutePath1) + .put(FsRepository.COMPRESS_SETTING.getKey(), randomBoolean()) + .put(FsRepository.CHUNK_SIZE_SETTING.getKey(), randomIntBetween(100, 1000), ByteSizeUnit.BYTES) + .put(BlobStoreRepository.REMOTE_STORE_INDEX_SHALLOW_COPY.getKey(), false) + .put(BlobStoreRepository.SHALLOW_SNAPSHOT_V2.getKey(), true) + ) + ); + + Client client = client(); + Settings indexSettings = getIndexSettings(20, 0).build(); + createIndex(indexName1, indexSettings); + + Settings indexSettings2 = getIndexSettings(15, 0).build(); + createIndex(indexName2, indexSettings2); + + final int numDocsInIndex1 = 10; + final int numDocsInIndex2 = 20; + indexDocuments(client, indexName1, numDocsInIndex1); + indexDocuments(client, indexName2, numDocsInIndex2); + ensureGreen(indexName1, indexName2); + + // Will create full copy snapshot if `REMOTE_STORE_INDEX_SHALLOW_COPY` is false but `SHALLOW_SNAPSHOT_V2` is true + SnapshotInfo snapshotInfo = createSnapshot(snapshotRepoName, snapshotName1, Collections.emptyList()); + assertThat(snapshotInfo.state(), equalTo(SnapshotState.SUCCESS)); + assertThat(snapshotInfo.successfulShards(), greaterThan(0)); + assertThat(snapshotInfo.successfulShards(), equalTo(snapshotInfo.totalShards())); + assertThat(snapshotInfo.getPinnedTimestamp(), equalTo(0L)); + + // Validate that snapshot is present in repository data + Repository repository = internalCluster().getInstance(RepositoriesService.class).repository(snapshotRepoName); + PlainActionFuture repositoryDataPlainActionFuture = new PlainActionFuture<>(); + repository.getRepositoryData(repositoryDataPlainActionFuture); + + RepositoryData repositoryData = repositoryDataPlainActionFuture.get(); + assertTrue(repositoryData.getSnapshotIds().contains(snapshotInfo.snapshotId())); + } + + public void testClusterManagerFailoverDuringSnapshotCreation() throws Exception { + + internalCluster().startClusterManagerOnlyNodes(3, pinnedTimestampSettings()); + internalCluster().startDataOnlyNode(pinnedTimestampSettings()); + String indexName1 = "testindex1"; + String indexName2 = "testindex2"; + String snapshotRepoName = "test-create-snapshot-repo"; + String snapshotName1 = "test-create-snapshot1"; + Path absolutePath1 = randomRepoPath().toAbsolutePath(); + logger.info("Snapshot Path [{}]", absolutePath1); + + assertAcked( + client().admin() + .cluster() + .preparePutRepository(snapshotRepoName) + .setType(FsRepository.TYPE) + .setSettings( + Settings.builder() + .put(FsRepository.LOCATION_SETTING.getKey(), absolutePath1) + .put(FsRepository.COMPRESS_SETTING.getKey(), randomBoolean()) + .put(FsRepository.CHUNK_SIZE_SETTING.getKey(), randomIntBetween(100, 1000), ByteSizeUnit.BYTES) + .put(BlobStoreRepository.REMOTE_STORE_INDEX_SHALLOW_COPY.getKey(), true) + .put(BlobStoreRepository.SHALLOW_SNAPSHOT_V2.getKey(), true) + ) + ); + + Client client = client(); + Settings indexSettings = getIndexSettings(20, 0).build(); + createIndex(indexName1, indexSettings); + + Settings indexSettings2 = getIndexSettings(15, 0).build(); + createIndex(indexName2, indexSettings2); + + final int numDocsInIndex1 = 10; + final int numDocsInIndex2 = 20; + indexDocuments(client, indexName1, numDocsInIndex1); + indexDocuments(client, indexName2, numDocsInIndex2); + ensureGreen(indexName1, indexName2); + + ensureStableCluster(4, internalCluster().getClusterManagerName()); + + final SnapshotInfo[] snapshotInfo = new SnapshotInfo[1]; + final Boolean[] snapshotFailed = new Boolean[1]; + snapshotFailed[0] = false; + Thread snapshotThread = new Thread(() -> { + try { + // Start snapshot creation + CreateSnapshotResponse createSnapshotResponse = client().admin() + .cluster() + .prepareCreateSnapshot(snapshotRepoName, snapshotName1) + .get(); + snapshotInfo[0] = createSnapshotResponse.getSnapshotInfo(); + + } catch (Exception e) { + snapshotFailed[0] = true; + } + }); + snapshotThread.start(); + Thread.sleep(100); + + internalCluster().stopCurrentClusterManagerNode(); + + // Wait for the cluster to elect a new Cluster Manager and stabilize + ensureStableCluster(3, internalCluster().getClusterManagerName()); + + // Wait for the snapshot thread to complete + snapshotThread.join(); + + // Validate that the snapshot was created or handled gracefully + Repository repository = internalCluster().getInstance(RepositoriesService.class).repository(snapshotRepoName); + PlainActionFuture repositoryDataPlainActionFuture = new PlainActionFuture<>(); + repository.getRepositoryData(repositoryDataPlainActionFuture); + + RepositoryData repositoryData = repositoryDataPlainActionFuture.get(); + if (snapshotFailed[0]) { + assertFalse(repositoryData.getSnapshotIds().contains(snapshotInfo[0].snapshotId())); + } else { + assertTrue(repositoryData.getSnapshotIds().contains(snapshotInfo[0].snapshotId())); + } + } + + public void testConcurrentV1SnapshotAndV2RepoSettingUpdate() throws Exception { + internalCluster().startClusterManagerOnlyNode(pinnedTimestampSettings()); + internalCluster().startDataOnlyNode(pinnedTimestampSettings()); + internalCluster().startDataOnlyNode(pinnedTimestampSettings()); + String snapshotRepoName = "test-create-snapshot-repo"; + String snapshotName1 = "test-create-snapshot-v1"; + Path absolutePath1 = randomRepoPath().toAbsolutePath(); + logger.info("Snapshot Path [{}]", absolutePath1); + + assertAcked( + client().admin() + .cluster() + .preparePutRepository(snapshotRepoName) + .setType(FsRepository.TYPE) + .setSettings( + Settings.builder() + .put(FsRepository.LOCATION_SETTING.getKey(), absolutePath1) + .put(FsRepository.COMPRESS_SETTING.getKey(), randomBoolean()) + .put(FsRepository.CHUNK_SIZE_SETTING.getKey(), randomIntBetween(100, 1000), ByteSizeUnit.BYTES) + .put(BlobStoreRepository.REMOTE_STORE_INDEX_SHALLOW_COPY.getKey(), true) + .put(BlobStoreRepository.SHALLOW_SNAPSHOT_V2.getKey(), false) + ) + ); + Client client = client(); + Settings indexSettings = getIndexSettings(20, 0).build(); + + for (int i = 0; i < 10; i++) { + createIndex("index" + i, indexSettings); + } + ensureStableCluster(3); + for (int i = 0; i < 10; i++) { + indexDocuments(client, "index" + i, 15); + } + + ensureStableCluster(3); + for (int i = 0; i < 10; i++) { + ensureGreen("index" + i); + } + final CreateSnapshotResponse[] snapshotV1Response = new CreateSnapshotResponse[1]; + // Create a separate thread to create the first snapshot + Thread createV1SnapshotThread = new Thread(() -> { + try { + snapshotV1Response[0] = client().admin() + .cluster() + .prepareCreateSnapshot(snapshotRepoName, snapshotName1) + .setWaitForCompletion(true) + .get(); + + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + + // Create a separate thread to enable shallow_snapshot_v2 + Thread enableV2Thread = new Thread(() -> { + try { + + assertThrows( + IllegalStateException.class, + () -> client().admin() + .cluster() + .preparePutRepository(snapshotRepoName) + .setType(FsRepository.TYPE) + .setSettings( + Settings.builder() + .put(FsRepository.LOCATION_SETTING.getKey(), absolutePath1) + .put(FsRepository.COMPRESS_SETTING.getKey(), randomBoolean()) + .put(FsRepository.CHUNK_SIZE_SETTING.getKey(), randomIntBetween(100, 1000), ByteSizeUnit.BYTES) + .put(BlobStoreRepository.REMOTE_STORE_INDEX_SHALLOW_COPY.getKey(), true) + .put(BlobStoreRepository.SHALLOW_SNAPSHOT_V2.getKey(), true) + ) + .get() + ); + + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + + createV1SnapshotThread.start(); + + Thread.sleep(100); + + enableV2Thread.start(); + + enableV2Thread.join(); + createV1SnapshotThread.join(); + } + + private Settings pinnedTimestampSettings() { + Settings settings = Settings.builder() + .put(RemoteStoreSettings.CLUSTER_REMOTE_STORE_PINNED_TIMESTAMP_ENABLED.getKey(), true) + .build(); + return settings; + } + } diff --git a/server/src/internalClusterTest/java/org/opensearch/snapshots/RepositoryFilterUserMetadataIT.java b/server/src/internalClusterTest/java/org/opensearch/snapshots/RepositoryFilterUserMetadataIT.java index 0eb37703eb0f1..0bebe969b3f3e 100644 --- a/server/src/internalClusterTest/java/org/opensearch/snapshots/RepositoryFilterUserMetadataIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/snapshots/RepositoryFilterUserMetadataIT.java @@ -36,6 +36,7 @@ import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.Priority; import org.opensearch.common.settings.Settings; import org.opensearch.core.action.ActionListener; import org.opensearch.core.xcontent.NamedXContentRegistry; @@ -127,6 +128,7 @@ public void finalizeSnapshot( SnapshotInfo snapshotInfo, Version repositoryMetaVersion, Function stateTransformer, + Priority repositoryUpdatePriority, ActionListener listener ) { super.finalizeSnapshot( @@ -136,6 +138,7 @@ public void finalizeSnapshot( snapshotInfo, repositoryMetaVersion, stateTransformer, + repositoryUpdatePriority, listener ); } diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/create/TransportCreateSnapshotAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/create/TransportCreateSnapshotAction.java index bb3bf014f213b..25e71d5598a98 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/create/TransportCreateSnapshotAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/create/TransportCreateSnapshotAction.java @@ -42,12 +42,16 @@ import org.opensearch.common.inject.Inject; import org.opensearch.core.action.ActionListener; import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.repositories.RepositoriesService; +import org.opensearch.repositories.Repository; import org.opensearch.snapshots.SnapshotsService; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.TransportService; import java.io.IOException; +import static org.opensearch.repositories.blobstore.BlobStoreRepository.SHALLOW_SNAPSHOT_V2; + /** * Transport action for create snapshot operation * @@ -56,12 +60,15 @@ public class TransportCreateSnapshotAction extends TransportClusterManagerNodeAction { private final SnapshotsService snapshotsService; + private final RepositoriesService repositoriesService; + @Inject public TransportCreateSnapshotAction( TransportService transportService, ClusterService clusterService, ThreadPool threadPool, SnapshotsService snapshotsService, + RepositoriesService repositoriesService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver ) { @@ -75,6 +82,7 @@ public TransportCreateSnapshotAction( indexNameExpressionResolver ); this.snapshotsService = snapshotsService; + this.repositoriesService = repositoriesService; } @Override @@ -103,7 +111,9 @@ protected void clusterManagerOperation( ClusterState state, final ActionListener listener ) { - if (request.waitForCompletion()) { + Repository repository = repositoriesService.repository(request.repository()); + boolean isSnapshotV2 = SHALLOW_SNAPSHOT_V2.get(repository.getMetadata().settings()); + if (request.waitForCompletion() || isSnapshotV2) { snapshotsService.executeSnapshot(request, ActionListener.map(listener, CreateSnapshotResponse::new)); } else { snapshotsService.createSnapshot(request, ActionListener.map(listener, snapshot -> new CreateSnapshotResponse())); diff --git a/server/src/main/java/org/opensearch/index/engine/TranslogLeafReader.java b/server/src/main/java/org/opensearch/index/engine/TranslogLeafReader.java index dea389bb6a0ff..94b8c6181de4e 100644 --- a/server/src/main/java/org/opensearch/index/engine/TranslogLeafReader.java +++ b/server/src/main/java/org/opensearch/index/engine/TranslogLeafReader.java @@ -264,13 +264,13 @@ public CacheHelper getReaderCacheHelper() { } @Override - public FloatVectorValues getFloatVectorValues(String field) throws IOException { - return getFloatVectorValues(field); + public FloatVectorValues getFloatVectorValues(String field) { + throw new UnsupportedOperationException(); } @Override - public ByteVectorValues getByteVectorValues(String field) throws IOException { - return getByteVectorValues(field); + public ByteVectorValues getByteVectorValues(String field) { + throw new UnsupportedOperationException(); } @Override diff --git a/server/src/main/java/org/opensearch/index/mapper/DerivedFieldType.java b/server/src/main/java/org/opensearch/index/mapper/DerivedFieldType.java index e230e37e6d826..fe81f19d74b21 100644 --- a/server/src/main/java/org/opensearch/index/mapper/DerivedFieldType.java +++ b/server/src/main/java/org/opensearch/index/mapper/DerivedFieldType.java @@ -159,10 +159,9 @@ public IndexFieldData.Builder fielddataBuilder(String fullyQualifiedIndexName, S @Override public Query termQuery(Object value, QueryShardContext context) { Query query = typeFieldMapper.mappedFieldType.termQuery(value, context); - DerivedFieldValueFetcher valueFetcher = valueFetcher(context, context.lookup(), null); DerivedFieldQuery derivedFieldQuery = new DerivedFieldQuery( query, - valueFetcher, + () -> valueFetcher(context, context.lookup(), null), context.lookup(), getIndexAnalyzer(), indexableFieldGenerator, @@ -176,10 +175,9 @@ public Query termQuery(Object value, QueryShardContext context) { @Override public Query termQueryCaseInsensitive(Object value, @Nullable QueryShardContext context) { Query query = typeFieldMapper.mappedFieldType.termQueryCaseInsensitive(value, context); - DerivedFieldValueFetcher valueFetcher = valueFetcher(context, context.lookup(), null); DerivedFieldQuery derivedFieldQuery = new DerivedFieldQuery( query, - valueFetcher, + () -> valueFetcher(context, context.lookup(), null), context.lookup(), getIndexAnalyzer(), indexableFieldGenerator, @@ -195,10 +193,9 @@ public Query termQueryCaseInsensitive(Object value, @Nullable QueryShardContext @Override public Query termsQuery(List values, @Nullable QueryShardContext context) { Query query = typeFieldMapper.mappedFieldType.termsQuery(values, context); - DerivedFieldValueFetcher valueFetcher = valueFetcher(context, context.lookup(), null); DerivedFieldQuery derivedFieldQuery = new DerivedFieldQuery( query, - valueFetcher, + () -> valueFetcher(context, context.lookup(), null), context.lookup(), getIndexAnalyzer(), indexableFieldGenerator, @@ -230,10 +227,9 @@ public Query rangeQuery( parser, context ); - DerivedFieldValueFetcher valueFetcher = valueFetcher(context, context.lookup(), null); return new DerivedFieldQuery( query, - valueFetcher, + () -> valueFetcher(context, context.lookup(), null), context.lookup(), getIndexAnalyzer(), indexableFieldGenerator, @@ -251,10 +247,9 @@ public Query fuzzyQuery( QueryShardContext context ) { Query query = typeFieldMapper.mappedFieldType.fuzzyQuery(value, fuzziness, prefixLength, maxExpansions, transpositions, context); - DerivedFieldValueFetcher valueFetcher = valueFetcher(context, context.lookup(), null); DerivedFieldQuery derivedFieldQuery = new DerivedFieldQuery( query, - valueFetcher, + () -> valueFetcher(context, context.lookup(), null), context.lookup(), getIndexAnalyzer(), indexableFieldGenerator, @@ -289,10 +284,9 @@ public Query fuzzyQuery( method, context ); - DerivedFieldValueFetcher valueFetcher = valueFetcher(context, context.lookup(), null); DerivedFieldQuery derivedFieldQuery = new DerivedFieldQuery( query, - valueFetcher, + () -> valueFetcher(context, context.lookup(), null), context.lookup(), getIndexAnalyzer(), indexableFieldGenerator, @@ -316,10 +310,9 @@ public Query prefixQuery( QueryShardContext context ) { Query query = typeFieldMapper.mappedFieldType.prefixQuery(value, method, caseInsensitive, context); - DerivedFieldValueFetcher valueFetcher = valueFetcher(context, context.lookup(), null); DerivedFieldQuery derivedFieldQuery = new DerivedFieldQuery( query, - valueFetcher, + () -> valueFetcher(context, context.lookup(), null), context.lookup(), getIndexAnalyzer(), indexableFieldGenerator, @@ -343,10 +336,9 @@ public Query wildcardQuery( QueryShardContext context ) { Query query = typeFieldMapper.mappedFieldType.wildcardQuery(value, method, caseInsensitive, context); - DerivedFieldValueFetcher valueFetcher = valueFetcher(context, context.lookup(), null); DerivedFieldQuery derivedFieldQuery = new DerivedFieldQuery( query, - valueFetcher, + () -> valueFetcher(context, context.lookup(), null), context.lookup(), getIndexAnalyzer(), indexableFieldGenerator, @@ -365,10 +357,9 @@ public Query wildcardQuery( @Override public Query normalizedWildcardQuery(String value, @Nullable MultiTermQuery.RewriteMethod method, QueryShardContext context) { Query query = typeFieldMapper.mappedFieldType.normalizedWildcardQuery(value, method, context); - DerivedFieldValueFetcher valueFetcher = valueFetcher(context, context.lookup(), null); DerivedFieldQuery derivedFieldQuery = new DerivedFieldQuery( query, - valueFetcher, + () -> valueFetcher(context, context.lookup(), null), context.lookup(), getIndexAnalyzer(), indexableFieldGenerator, @@ -394,10 +385,9 @@ public Query regexpQuery( QueryShardContext context ) { Query query = typeFieldMapper.mappedFieldType.regexpQuery(value, syntaxFlags, matchFlags, maxDeterminizedStates, method, context); - DerivedFieldValueFetcher valueFetcher = valueFetcher(context, context.lookup(), null); DerivedFieldQuery derivedFieldQuery = new DerivedFieldQuery( query, - valueFetcher, + () -> valueFetcher(context, context.lookup(), null), context.lookup(), getIndexAnalyzer(), indexableFieldGenerator, @@ -416,10 +406,9 @@ public Query regexpQuery( @Override public Query phraseQuery(TokenStream stream, int slop, boolean enablePositionIncrements, QueryShardContext context) throws IOException { Query query = typeFieldMapper.mappedFieldType.phraseQuery(stream, slop, enablePositionIncrements, context); - DerivedFieldValueFetcher valueFetcher = valueFetcher(context, context.lookup(), null); DerivedFieldQuery derivedFieldQuery = new DerivedFieldQuery( query, - valueFetcher, + () -> valueFetcher(context, context.lookup(), null), context.lookup(), getIndexAnalyzer(), indexableFieldGenerator, @@ -441,10 +430,9 @@ public Query phraseQuery(TokenStream stream, int slop, boolean enablePositionInc public Query multiPhraseQuery(TokenStream stream, int slop, boolean enablePositionIncrements, QueryShardContext context) throws IOException { Query query = typeFieldMapper.mappedFieldType.multiPhraseQuery(stream, slop, enablePositionIncrements, context); - DerivedFieldValueFetcher valueFetcher = valueFetcher(context, context.lookup(), null); DerivedFieldQuery derivedFieldQuery = new DerivedFieldQuery( query, - valueFetcher, + () -> valueFetcher(context, context.lookup(), null), context.lookup(), getIndexAnalyzer(), indexableFieldGenerator, @@ -465,10 +453,9 @@ public Query multiPhraseQuery(TokenStream stream, int slop, boolean enablePositi @Override public Query phrasePrefixQuery(TokenStream stream, int slop, int maxExpansions, QueryShardContext context) throws IOException { Query query = typeFieldMapper.mappedFieldType.phrasePrefixQuery(stream, slop, maxExpansions, context); - DerivedFieldValueFetcher valueFetcher = valueFetcher(context, context.lookup(), null); DerivedFieldQuery derivedFieldQuery = new DerivedFieldQuery( query, - valueFetcher, + () -> valueFetcher(context, context.lookup(), null), context.lookup(), getIndexAnalyzer(), indexableFieldGenerator, @@ -493,10 +480,9 @@ public SpanQuery spanPrefixQuery(String value, SpanMultiTermQueryWrapper.SpanRew @Override public Query distanceFeatureQuery(Object origin, String pivot, float boost, QueryShardContext context) { Query query = typeFieldMapper.mappedFieldType.distanceFeatureQuery(origin, pivot, boost, context); - DerivedFieldValueFetcher valueFetcher = valueFetcher(context, context.lookup(), null); return new DerivedFieldQuery( query, - valueFetcher, + () -> valueFetcher(context, context.lookup(), null), context.lookup(), getIndexAnalyzer(), indexableFieldGenerator, @@ -507,10 +493,9 @@ public Query distanceFeatureQuery(Object origin, String pivot, float boost, Quer @Override public Query geoShapeQuery(Geometry shape, String fieldName, ShapeRelation relation, QueryShardContext context) { Query query = ((GeoShapeQueryable) (typeFieldMapper.mappedFieldType)).geoShapeQuery(shape, fieldName, relation, context); - DerivedFieldValueFetcher valueFetcher = valueFetcher(context, context.lookup(), null); return new DerivedFieldQuery( query, - valueFetcher, + () -> valueFetcher(context, context.lookup(), null), context.lookup(), getIndexAnalyzer(), indexableFieldGenerator, diff --git a/server/src/main/java/org/opensearch/index/query/DerivedFieldQuery.java b/server/src/main/java/org/opensearch/index/query/DerivedFieldQuery.java index db943bdef0a12..dcc02726cb0ef 100644 --- a/server/src/main/java/org/opensearch/index/query/DerivedFieldQuery.java +++ b/server/src/main/java/org/opensearch/index/query/DerivedFieldQuery.java @@ -30,6 +30,7 @@ import java.util.List; import java.util.Objects; import java.util.function.Function; +import java.util.function.Supplier; /** * DerivedFieldQuery used for querying derived fields. It contains the logic to execute an input lucene query against @@ -37,7 +38,7 @@ */ public final class DerivedFieldQuery extends Query { private final Query query; - private final DerivedFieldValueFetcher valueFetcher; + private final Supplier valueFetcherSupplier; private final SearchLookup searchLookup; private final Analyzer indexAnalyzer; private final boolean ignoreMalformed; @@ -46,20 +47,19 @@ public final class DerivedFieldQuery extends Query { /** * @param query lucene query to be executed against the derived field - * @param valueFetcher DerivedFieldValueFetcher ValueFetcher to fetch the value of a derived field from _source - * using LeafSearchLookup + * @param valueFetcherSupplier Supplier of a DerivedFieldValueFetcher that will be reconstructed per leaf * @param searchLookup SearchLookup to get the LeafSearchLookup look used by valueFetcher to fetch the _source */ public DerivedFieldQuery( Query query, - DerivedFieldValueFetcher valueFetcher, + Supplier valueFetcherSupplier, SearchLookup searchLookup, Analyzer indexAnalyzer, Function indexableFieldGenerator, boolean ignoreMalformed ) { this.query = query; - this.valueFetcher = valueFetcher; + this.valueFetcherSupplier = valueFetcherSupplier; this.searchLookup = searchLookup; this.indexAnalyzer = indexAnalyzer; this.indexableFieldGenerator = indexableFieldGenerator; @@ -77,7 +77,15 @@ public Query rewrite(IndexSearcher indexSearcher) throws IOException { if (rewritten == query) { return this; } - return new DerivedFieldQuery(rewritten, valueFetcher, searchLookup, indexAnalyzer, indexableFieldGenerator, ignoreMalformed); + ; + return new DerivedFieldQuery( + rewritten, + valueFetcherSupplier, + searchLookup, + indexAnalyzer, + indexableFieldGenerator, + ignoreMalformed + ); } @Override @@ -88,6 +96,11 @@ public Weight createWeight(IndexSearcher searcher, ScoreMode scoreMode, float bo public Scorer scorer(LeafReaderContext context) { DocIdSetIterator approximation; approximation = DocIdSetIterator.all(context.reader().maxDoc()); + + // Create a new ValueFetcher per thread. + // ValueFetcher.setNextReader creates a DerivedFieldScript and internally SourceLookup and these objects are not + // thread safe. + final DerivedFieldValueFetcher valueFetcher = valueFetcherSupplier.get(); valueFetcher.setNextReader(context); LeafSearchLookup leafSearchLookup = searchLookup.getLeafSearchLookup(context); TwoPhaseIterator twoPhase = new TwoPhaseIterator(approximation) { diff --git a/server/src/main/java/org/opensearch/repositories/FilterRepository.java b/server/src/main/java/org/opensearch/repositories/FilterRepository.java index d700a92ed4bad..114cd0260fcca 100644 --- a/server/src/main/java/org/opensearch/repositories/FilterRepository.java +++ b/server/src/main/java/org/opensearch/repositories/FilterRepository.java @@ -39,6 +39,7 @@ import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.metadata.RepositoryMetadata; import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.common.Priority; import org.opensearch.common.lifecycle.Lifecycle; import org.opensearch.common.lifecycle.LifecycleListener; import org.opensearch.core.action.ActionListener; @@ -104,6 +105,7 @@ public void finalizeSnapshot( SnapshotInfo snapshotInfo, Version repositoryMetaVersion, Function stateTransformer, + Priority repositoryUpdatePriority, ActionListener listener ) { in.finalizeSnapshot( @@ -113,6 +115,7 @@ public void finalizeSnapshot( snapshotInfo, repositoryMetaVersion, stateTransformer, + repositoryUpdatePriority, listener ); } diff --git a/server/src/main/java/org/opensearch/repositories/Repository.java b/server/src/main/java/org/opensearch/repositories/Repository.java index ed30aad7b4dd2..637503d3f54df 100644 --- a/server/src/main/java/org/opensearch/repositories/Repository.java +++ b/server/src/main/java/org/opensearch/repositories/Repository.java @@ -41,6 +41,7 @@ import org.opensearch.cluster.metadata.RepositoryMetadata; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.common.Nullable; +import org.opensearch.common.Priority; import org.opensearch.common.annotation.PublicApi; import org.opensearch.common.lifecycle.LifecycleComponent; import org.opensearch.common.settings.Setting; @@ -150,6 +151,7 @@ default Repository create(RepositoryMetadata metadata, Function stateTransformer, + Priority repositoryUpdatePriority, ActionListener listener ); diff --git a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java index 3e6a75565891f..e18706824d39d 100644 --- a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java @@ -65,6 +65,7 @@ import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.Nullable; import org.opensearch.common.Numbers; +import org.opensearch.common.Priority; import org.opensearch.common.SetOnce; import org.opensearch.common.UUIDs; import org.opensearch.common.blobstore.BlobContainer; @@ -266,6 +267,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp public static final Setting REMOTE_STORE_INDEX_SHALLOW_COPY = Setting.boolSetting("remote_store_index_shallow_copy", false); + public static final Setting SHALLOW_SNAPSHOT_V2 = Setting.boolSetting("shallow_snapshot_v2", false); + /** * Setting to set batch size of stale snapshot shard blobs that will be deleted by snapshot workers as part of snapshot deletion. * For optimal performance the value of the setting should be equal to or close to repository's max # of keys that can be deleted in single operation @@ -1046,6 +1049,7 @@ private void doDeleteShardSnapshots( repositoryStateId, repoMetaVersion, Function.identity(), + Priority.NORMAL, ActionListener.wrap(writeUpdatedRepoDataStep::onResponse, listener::onFailure) ); }, listener::onFailure); @@ -1520,6 +1524,7 @@ public void cleanup( repositoryStateId, repositoryMetaVersion, Function.identity(), + Priority.NORMAL, ActionListener.wrap( v -> cleanupStaleBlobs( Collections.emptyList(), @@ -1723,6 +1728,7 @@ public void finalizeSnapshot( SnapshotInfo snapshotInfo, Version repositoryMetaVersion, Function stateTransformer, + Priority repositoryUpdatePriority, final ActionListener listener ) { assert repositoryStateId > RepositoryData.UNKNOWN_REPO_GEN : "Must finalize based on a valid repository generation but received [" @@ -1759,6 +1765,7 @@ public void finalizeSnapshot( repositoryStateId, repositoryMetaVersion, stateTransformer, + repositoryUpdatePriority, ActionListener.wrap(newRepoData -> { cleanupOldShardGens(existingRepositoryData, updatedRepositoryData); listener.onResponse(newRepoData); @@ -2280,10 +2287,11 @@ public boolean isSystemRepository() { * Lastly, the {@link RepositoryMetadata} entry for this repository is updated to the new generation {@code P + 1} and thus * pending and safe generation are set to the same value marking the end of the update of the repository data. * - * @param repositoryData RepositoryData to write - * @param expectedGen expected repository generation at the start of the operation - * @param version version of the repository metadata to write - * @param stateFilter filter for the last cluster state update executed by this method + * @param repositoryData RepositoryData to write + * @param expectedGen expected repository generation at the start of the operation + * @param version version of the repository metadata to write + * @param stateFilter filter for the last cluster state update executed by this method + * @param repositoryUpdatePriority priority for the cluster state update task * @param listener completion listener */ protected void writeIndexGen( @@ -2291,6 +2299,7 @@ protected void writeIndexGen( long expectedGen, Version version, Function stateFilter, + Priority repositoryUpdatePriority, ActionListener listener ) { assert isReadOnly() == false; // can not write to a read only repository @@ -2315,7 +2324,7 @@ protected void writeIndexGen( final StepListener setPendingStep = new StepListener<>(); clusterService.submitStateUpdateTask( "set pending repository generation [" + metadata.name() + "][" + expectedGen + "]", - new ClusterStateUpdateTask() { + new ClusterStateUpdateTask(repositoryUpdatePriority) { private long newGen; @@ -2453,7 +2462,7 @@ public void onFailure(Exception e) { // Step 3: Update CS to reflect new repository generation. clusterService.submitStateUpdateTask( "set safe repository generation [" + metadata.name() + "][" + newGen + "]", - new ClusterStateUpdateTask() { + new ClusterStateUpdateTask(repositoryUpdatePriority) { @Override public ClusterState execute(ClusterState currentState) { final RepositoryMetadata meta = getRepoMetadata(currentState); diff --git a/server/src/main/java/org/opensearch/snapshots/SnapshotInfo.java b/server/src/main/java/org/opensearch/snapshots/SnapshotInfo.java index 191b872cdd563..7558c4456109e 100644 --- a/server/src/main/java/org/opensearch/snapshots/SnapshotInfo.java +++ b/server/src/main/java/org/opensearch/snapshots/SnapshotInfo.java @@ -98,6 +98,9 @@ public final class SnapshotInfo implements Comparable, ToXContent, private static final String INCLUDE_GLOBAL_STATE = "include_global_state"; private static final String REMOTE_STORE_INDEX_SHALLOW_COPY = "remote_store_index_shallow_copy"; + + private static final String PINNED_TIMESTAMP = "pinned_timestamp"; + private static final String USER_METADATA = "metadata"; private static final Comparator COMPARATOR = Comparator.comparing(SnapshotInfo::startTime) @@ -121,6 +124,7 @@ public static final class SnapshotInfoBuilder { private Boolean includeGlobalState = null; private Boolean remoteStoreIndexShallowCopy = null; + private long pinnedTimestamp = 0L; private Map userMetadata = null; private int version = -1; private List shardFailures = null; @@ -177,6 +181,10 @@ private void setRemoteStoreIndexShallowCopy(Boolean remoteStoreIndexShallowCopy) this.remoteStoreIndexShallowCopy = remoteStoreIndexShallowCopy; } + private void setPinnedTimestamp(long pinnedTimestamp) { + this.pinnedTimestamp = pinnedTimestamp; + } + private void setShardFailures(List shardFailures) { this.shardFailures = shardFailures; } @@ -216,7 +224,8 @@ public SnapshotInfo build() { shardFailures, includeGlobalState, userMetadata, - remoteStoreIndexShallowCopy + remoteStoreIndexShallowCopy, + pinnedTimestamp ); } } @@ -271,6 +280,7 @@ int getSuccessfulShards() { SnapshotInfoBuilder::setRemoteStoreIndexShallowCopy, new ParseField(REMOTE_STORE_INDEX_SHALLOW_COPY) ); + SNAPSHOT_INFO_PARSER.declareLong(SnapshotInfoBuilder::setPinnedTimestamp, new ParseField(PINNED_TIMESTAMP)); SNAPSHOT_INFO_PARSER.declareObjectArray( SnapshotInfoBuilder::setShardFailures, SnapshotShardFailure.SNAPSHOT_SHARD_FAILURE_PARSER, @@ -307,6 +317,7 @@ int getSuccessfulShards() { @Nullable private Boolean remoteStoreIndexShallowCopy; + private long pinnedTimestamp; @Nullable private final Map userMetadata; @@ -316,11 +327,11 @@ int getSuccessfulShards() { private final List shardFailures; public SnapshotInfo(SnapshotId snapshotId, List indices, List dataStreams, SnapshotState state) { - this(snapshotId, indices, dataStreams, state, null, null, 0L, 0L, 0, 0, Collections.emptyList(), null, null, null); + this(snapshotId, indices, dataStreams, state, null, null, 0L, 0L, 0, 0, Collections.emptyList(), null, null, null, 0); } public SnapshotInfo(SnapshotId snapshotId, List indices, List dataStreams, SnapshotState state, Version version) { - this(snapshotId, indices, dataStreams, state, null, version, 0L, 0L, 0, 0, Collections.emptyList(), null, null, null); + this(snapshotId, indices, dataStreams, state, null, version, 0L, 0L, 0, 0, Collections.emptyList(), null, null, null, 0); } public SnapshotInfo(SnapshotsInProgress.Entry entry) { @@ -338,7 +349,8 @@ public SnapshotInfo(SnapshotsInProgress.Entry entry) { Collections.emptyList(), entry.includeGlobalState(), entry.userMetadata(), - entry.remoteStoreIndexShallowCopy() + entry.remoteStoreIndexShallowCopy(), + 0L ); } @@ -353,7 +365,8 @@ public SnapshotInfo( List shardFailures, Boolean includeGlobalState, Map userMetadata, - Boolean remoteStoreIndexShallowCopy + Boolean remoteStoreIndexShallowCopy, + long pinnedTimestamp ) { this( snapshotId, @@ -369,7 +382,8 @@ public SnapshotInfo( shardFailures, includeGlobalState, userMetadata, - remoteStoreIndexShallowCopy + remoteStoreIndexShallowCopy, + pinnedTimestamp ); } @@ -387,7 +401,8 @@ public SnapshotInfo( List shardFailures, Boolean includeGlobalState, Map userMetadata, - Boolean remoteStoreIndexShallowCopy + Boolean remoteStoreIndexShallowCopy, + long pinnedTimestamp ) { this.snapshotId = Objects.requireNonNull(snapshotId); this.indices = Collections.unmodifiableList(Objects.requireNonNull(indices)); @@ -403,6 +418,7 @@ public SnapshotInfo( this.includeGlobalState = includeGlobalState; this.userMetadata = userMetadata; this.remoteStoreIndexShallowCopy = remoteStoreIndexShallowCopy; + this.pinnedTimestamp = pinnedTimestamp; } /** @@ -425,6 +441,9 @@ public SnapshotInfo(final StreamInput in) throws IOException { if (in.getVersion().onOrAfter(Version.V_2_9_0)) { remoteStoreIndexShallowCopy = in.readOptionalBoolean(); } + if (in.getVersion().onOrAfter(Version.V_3_0_0)) { + pinnedTimestamp = in.readVLong(); + } } /** @@ -539,6 +558,10 @@ public Boolean isRemoteStoreIndexShallowCopyEnabled() { return remoteStoreIndexShallowCopy; } + public long getPinnedTimestamp() { + return pinnedTimestamp; + } + /** * Returns shard failures; an empty list will be returned if there were no shard * failures, or if {@link #state()} returns {@code null}. @@ -606,6 +629,8 @@ public String toString() { + shardFailures + ", isRemoteStoreInteropEnabled=" + remoteStoreIndexShallowCopy + + ", pinnedTimestamp=" + + pinnedTimestamp + '}'; } @@ -641,6 +666,10 @@ public XContentBuilder toXContent(final XContentBuilder builder, final Params pa if (remoteStoreIndexShallowCopy != null) { builder.field(REMOTE_STORE_INDEX_SHALLOW_COPY, remoteStoreIndexShallowCopy); } + if (pinnedTimestamp != 0) { + builder.field(PINNED_TIMESTAMP, pinnedTimestamp); + } + builder.startArray(INDICES); for (String index : indices) { builder.value(index); @@ -699,6 +728,9 @@ private XContentBuilder toXContentInternal(final XContentBuilder builder, final if (remoteStoreIndexShallowCopy != null) { builder.field(REMOTE_STORE_INDEX_SHALLOW_COPY, remoteStoreIndexShallowCopy); } + if (pinnedTimestamp != 0) { + builder.field(PINNED_TIMESTAMP, pinnedTimestamp); + } builder.startArray(INDICES); for (String index : indices) { builder.value(index); @@ -747,6 +779,7 @@ public static SnapshotInfo fromXContentInternal(final XContentParser parser) thr long endTime = 0; int totalShards = 0; int successfulShards = 0; + long pinnedTimestamp = 0; Boolean includeGlobalState = null; Boolean remoteStoreIndexShallowCopy = null; Map userMetadata = null; @@ -788,6 +821,8 @@ public static SnapshotInfo fromXContentInternal(final XContentParser parser) thr includeGlobalState = parser.booleanValue(); } else if (REMOTE_STORE_INDEX_SHALLOW_COPY.equals(currentFieldName)) { remoteStoreIndexShallowCopy = parser.booleanValue(); + } else if (PINNED_TIMESTAMP.equals(currentFieldName)) { + pinnedTimestamp = parser.longValue(); } } else if (token == XContentParser.Token.START_ARRAY) { if (DATA_STREAMS.equals(currentFieldName)) { @@ -840,7 +875,8 @@ public static SnapshotInfo fromXContentInternal(final XContentParser parser) thr shardFailures, includeGlobalState, userMetadata, - remoteStoreIndexShallowCopy + remoteStoreIndexShallowCopy, + pinnedTimestamp ); } @@ -872,6 +908,9 @@ public void writeTo(final StreamOutput out) throws IOException { if (out.getVersion().onOrAfter(Version.V_2_9_0)) { out.writeOptionalBoolean(remoteStoreIndexShallowCopy); } + if (out.getVersion().onOrAfter(Version.V_3_0_0)) { + out.writeVLong(pinnedTimestamp); + } } private static SnapshotState snapshotState(final String reason, final List shardFailures) { @@ -904,7 +943,8 @@ public boolean equals(Object o) { && Objects.equals(version, that.version) && Objects.equals(shardFailures, that.shardFailures) && Objects.equals(userMetadata, that.userMetadata) - && Objects.equals(remoteStoreIndexShallowCopy, that.remoteStoreIndexShallowCopy); + && Objects.equals(remoteStoreIndexShallowCopy, that.remoteStoreIndexShallowCopy) + && Objects.equals(pinnedTimestamp, that.pinnedTimestamp); } @Override @@ -924,7 +964,8 @@ public int hashCode() { version, shardFailures, userMetadata, - remoteStoreIndexShallowCopy + remoteStoreIndexShallowCopy, + pinnedTimestamp ); } } diff --git a/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java b/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java index 5e49208465dbb..b7fea116a12b7 100644 --- a/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java @@ -136,6 +136,7 @@ import static org.opensearch.node.remotestore.RemoteStoreNodeService.CompatibilityMode; import static org.opensearch.node.remotestore.RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING; import static org.opensearch.repositories.blobstore.BlobStoreRepository.REMOTE_STORE_INDEX_SHALLOW_COPY; +import static org.opensearch.repositories.blobstore.BlobStoreRepository.SHALLOW_SNAPSHOT_V2; import static org.opensearch.snapshots.SnapshotUtils.validateSnapshotsBackingAnyIndex; /** @@ -202,6 +203,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus Setting.Property.Dynamic ); + private static final String SNAPSHOT_PINNED_TIMESTAMP_DELIMITER = ":"; private volatile int maxConcurrentOperations; public SnapshotsService( @@ -251,10 +253,36 @@ public SnapshotsService( * @param listener snapshot completion listener */ public void executeSnapshot(final CreateSnapshotRequest request, final ActionListener listener) { - createSnapshot( - request, - ActionListener.wrap(snapshot -> addListener(snapshot, ActionListener.map(listener, Tuple::v2)), listener::onFailure) - ); + Repository repository = repositoriesService.repository(request.repository()); + + boolean isSnapshotV2 = SHALLOW_SNAPSHOT_V2.get(repository.getMetadata().settings()); + logger.debug("shallow_snapshot_v2 is set as [{}]", isSnapshotV2); + + boolean remoteStoreIndexShallowCopy = remoteStoreShallowCopyEnabled(repository); + if (remoteStoreIndexShallowCopy + && isSnapshotV2 + && request.indices().length == 0 + && clusterService.state().nodes().getMinNodeVersion().onOrAfter(Version.CURRENT)) { + createSnapshotV2(request, listener); + } else { + createSnapshot( + request, + ActionListener.wrap(snapshot -> addListener(snapshot, ActionListener.map(listener, Tuple::v2)), listener::onFailure) + ); + } + } + + private boolean remoteStoreShallowCopyEnabled(Repository repository) { + boolean remoteStoreIndexShallowCopy = REMOTE_STORE_INDEX_SHALLOW_COPY.get(repository.getMetadata().settings()); + logger.debug("remote_store_index_shallow_copy setting is set as [{}]", remoteStoreIndexShallowCopy); + if (remoteStoreIndexShallowCopy + && clusterService.getClusterSettings().get(REMOTE_STORE_COMPATIBILITY_MODE_SETTING).equals(CompatibilityMode.MIXED)) { + // don't allow shallow snapshots if compatibility mode is not strict + logger.warn("Shallow snapshots are not supported during migration. Falling back to full snapshot."); + remoteStoreIndexShallowCopy = false; + } + return remoteStoreIndexShallowCopy; + } /** @@ -262,6 +290,7 @@ public void executeSnapshot(final CreateSnapshotRequest request, final ActionLis *

* This method is used by clients to start snapshot. It makes sure that there is no snapshots are currently running and * creates a snapshot record in cluster state metadata. + *

* * @param request snapshot request * @param listener snapshot creation listener @@ -287,27 +316,13 @@ public void createSnapshot(final CreateSnapshotRequest request, final ActionList @Override public ClusterState execute(ClusterState currentState) { - ensureSnapshotNameAvailableInRepo(repositoryData, snapshotName, repository); + createSnapshotPreValidations(currentState, repositoryData, repositoryName, snapshotName); final SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY); final List runningSnapshots = snapshots.entries(); - ensureSnapshotNameNotRunning(runningSnapshots, repositoryName, snapshotName); - validate(repositoryName, snapshotName, currentState); final SnapshotDeletionsInProgress deletionsInProgress = currentState.custom( SnapshotDeletionsInProgress.TYPE, SnapshotDeletionsInProgress.EMPTY ); - final RepositoryCleanupInProgress repositoryCleanupInProgress = currentState.custom( - RepositoryCleanupInProgress.TYPE, - RepositoryCleanupInProgress.EMPTY - ); - if (repositoryCleanupInProgress.hasCleanupInProgress()) { - throw new ConcurrentSnapshotExecutionException( - repositoryName, - snapshotName, - "cannot snapshot while a repository cleanup is in-progress in [" + repositoryCleanupInProgress + "]" - ); - } - ensureNoCleanupInProgress(currentState, repositoryName, snapshotName); ensureBelowConcurrencyLimit(repositoryName, snapshotName, snapshots, deletionsInProgress); // Store newSnapshot here to be processed in clusterStateProcessed List indices = Arrays.asList(indexNameExpressionResolver.concreteIndexNames(currentState, request)); @@ -407,6 +422,184 @@ public TimeValue timeout() { }, "create_snapshot [" + snapshotName + ']', listener::onFailure); } + /** + * Initializes the snapshotting process for clients when Snapshot v2 is enabled. This method is responsible for taking + * a shallow snapshot and pinning the snapshot timestamp.The entire process is executed on the cluster manager node. + * + * Unlike traditional snapshot operations, this method performs a synchronous snapshot execution and doesn't + * upload any shard metadata to the snapshot repository. + * The pinned timestamp is later reconciled with remote store segment and translog metadata files during the restore + * operation. + * + * @param request snapshot request + * @param listener snapshot creation listener + */ + public void createSnapshotV2(final CreateSnapshotRequest request, final ActionListener listener) { + long pinnedTimestamp = System.currentTimeMillis(); + final String repositoryName = request.repository(); + final String snapshotName = indexNameExpressionResolver.resolveDateMathExpression(request.snapshot()); + validate(repositoryName, snapshotName); + + final SnapshotId snapshotId = new SnapshotId(snapshotName, UUIDs.randomBase64UUID()); // new UUID for the snapshot + Repository repository = repositoriesService.repository(repositoryName); + + if (repository.isReadOnly()) { + listener.onFailure( + new RepositoryException(repository.getMetadata().name(), "cannot create snapshot-v2 in a readonly repository") + ); + return; + } + + final Snapshot snapshot = new Snapshot(repositoryName, snapshotId); + ClusterState currentState = clusterService.state(); + final Map userMeta = repository.adaptUserMetadata(request.userMetadata()); + try { + final StepListener repositoryDataListener = new StepListener<>(); + repositoriesService.getRepositoryData(repositoryName, repositoryDataListener); + + repositoryDataListener.whenComplete(repositoryData -> { + createSnapshotPreValidations(currentState, repositoryData, repositoryName, snapshotName); + + List indices = new ArrayList<>(currentState.metadata().indices().keySet()); + + final List dataStreams = indexNameExpressionResolver.dataStreamNames( + currentState, + request.indicesOptions(), + request.indices() + ); + + logger.trace("[{}][{}] creating snapshot-v2 for indices [{}]", repositoryName, snapshotName, indices); + + final SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY); + final List runningSnapshots = snapshots.entries(); + + final List indexIds = repositoryData.resolveNewIndices( + indices, + getInFlightIndexIds(runningSnapshots, repositoryName) + ); + final Version version = minCompatibleVersion(currentState.nodes().getMinNodeVersion(), repositoryData, null); + final ShardGenerations shardGenerations = buildShardsGenerationFromRepositoryData( + currentState.metadata(), + currentState.routingTable(), + indexIds, + repositoryData + ); + + if (repositoryData.getGenId() == RepositoryData.UNKNOWN_REPO_GEN) { + logger.debug("[{}] was aborted before starting", snapshot); + throw new SnapshotException(snapshot, "Aborted on initialization"); + } + final SnapshotInfo snapshotInfo = new SnapshotInfo( + snapshot.getSnapshotId(), + shardGenerations.indices().stream().map(IndexId::getName).collect(Collectors.toList()), + dataStreams, + pinnedTimestamp, + null, + System.currentTimeMillis(), + shardGenerations.totalShards(), + Collections.emptyList(), + request.includeGlobalState(), + userMeta, + true, + pinnedTimestamp + ); + if (!clusterService.state().nodes().isLocalNodeElectedClusterManager()) { + throw new SnapshotException(repositoryName, snapshotName, "Aborting snapshot-v2, no longer cluster manager"); + } + final StepListener pinnedTimestampListener = new StepListener<>(); + pinnedTimestampListener.whenComplete(repoData -> { listener.onResponse(snapshotInfo); }, listener::onFailure); + repository.finalizeSnapshot( + shardGenerations, + repositoryData.getGenId(), + metadataForSnapshot(currentState.metadata(), request.includeGlobalState(), false, dataStreams, indexIds), + snapshotInfo, + version, + state -> state, + Priority.IMMEDIATE, + new ActionListener() { + @Override + public void onResponse(RepositoryData repositoryData) { + if (!clusterService.state().nodes().isLocalNodeElectedClusterManager()) { + failSnapshotCompletionListeners( + snapshot, + new SnapshotException(snapshot, "Aborting snapshot-v2, no longer cluster manager") + ); + listener.onFailure( + new SnapshotException(repositoryName, snapshotName, "Aborting snapshot-v2, no longer cluster manager") + ); + return; + } + updateSnapshotPinnedTimestamp(repositoryData, snapshot, pinnedTimestamp, pinnedTimestampListener); + } + + @Override + public void onFailure(Exception e) { + logger.error("Failed to upload files to snapshot repo {} for snapshot-v2 {} ", repositoryName, snapshotName); + listener.onFailure(e); + } + } + ); + + }, listener::onFailure); + } catch (Exception e) { + assert false : new AssertionError(e); + logger.error("Snapshot-v2 {} creation failed with exception {}", snapshot.getSnapshotId().getName(), e); + listener.onFailure(e); + } + } + + private void createSnapshotPreValidations( + ClusterState currentState, + RepositoryData repositoryData, + String repositoryName, + String snapshotName + ) { + Repository repository = repositoriesService.repository(repositoryName); + ensureSnapshotNameAvailableInRepo(repositoryData, snapshotName, repository); + final SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY); + final List runningSnapshots = snapshots.entries(); + ensureSnapshotNameNotRunning(runningSnapshots, repositoryName, snapshotName); + validate(repositoryName, snapshotName, currentState); + final RepositoryCleanupInProgress repositoryCleanupInProgress = currentState.custom( + RepositoryCleanupInProgress.TYPE, + RepositoryCleanupInProgress.EMPTY + ); + if (repositoryCleanupInProgress.hasCleanupInProgress()) { + throw new ConcurrentSnapshotExecutionException( + repositoryName, + snapshotName, + "cannot snapshot-v2 while a repository cleanup is in-progress in [" + repositoryCleanupInProgress + "]" + ); + } + ensureNoCleanupInProgress(currentState, repositoryName, snapshotName); + } + + private void updateSnapshotPinnedTimestamp( + RepositoryData repositoryData, + Snapshot snapshot, + long timestampToPin, + ActionListener listener + ) { + remoteStorePinnedTimestampService.pinTimestamp( + timestampToPin, + snapshot.getRepository() + SNAPSHOT_PINNED_TIMESTAMP_DELIMITER + snapshot.getSnapshotId().getUUID(), + new ActionListener() { + @Override + public void onResponse(Void unused) { + logger.debug("Timestamp pinned successfully for snapshot {}", snapshot.getSnapshotId().getName()); + listener.onResponse(repositoryData); + } + + @Override + public void onFailure(Exception e) { + logger.error("Failed to pin timestamp for snapshot {} with exception {}", snapshot.getSnapshotId().getName(), e); + listener.onFailure(e); + + } + } + ); + } + private static void ensureSnapshotNameNotRunning( List runningSnapshots, String repositoryName, @@ -903,15 +1096,21 @@ private static ShardGenerations buildGenerations(SnapshotsInProgress.Entry snaps return builder.build(); } - private static Metadata metadataForSnapshot(SnapshotsInProgress.Entry snapshot, Metadata metadata) { + private static Metadata metadataForSnapshot( + Metadata metadata, + boolean includeGlobalState, + boolean isPartial, + List dataStreamsList, + List indices + ) { final Metadata.Builder builder; - if (snapshot.includeGlobalState() == false) { + if (includeGlobalState == false) { // Remove global state from the cluster state builder = Metadata.builder(); - for (IndexId index : snapshot.indices()) { + for (IndexId index : indices) { final IndexMetadata indexMetadata = metadata.index(index.getName()); if (indexMetadata == null) { - assert snapshot.partial() : "Index [" + index + "] was deleted during a snapshot but snapshot was not partial."; + assert isPartial : "Index [" + index + "] was deleted during a snapshot but snapshot was not partial."; } else { builder.put(indexMetadata, false); } @@ -921,12 +1120,10 @@ private static Metadata metadataForSnapshot(SnapshotsInProgress.Entry snapshot, } // Only keep those data streams in the metadata that were actually requested by the initial snapshot create operation Map dataStreams = new HashMap<>(); - for (String dataStreamName : snapshot.dataStreams()) { + for (String dataStreamName : dataStreamsList) { DataStream dataStream = metadata.dataStreams().get(dataStreamName); if (dataStream == null) { - assert snapshot.partial() : "Data stream [" - + dataStreamName - + "] was deleted during a snapshot but snapshot was not partial."; + assert isPartial : "Data stream [" + dataStreamName + "] was deleted during a snapshot but snapshot was not partial."; } else { dataStreams.put(dataStreamName, dataStream); } @@ -1474,7 +1671,8 @@ private void finalizeSnapshotEntry(SnapshotsInProgress.Entry entry, Metadata met shardFailures, entry.includeGlobalState(), entry.userMetadata(), - entry.remoteStoreIndexShallowCopy() + entry.remoteStoreIndexShallowCopy(), + 0 ); final StepListener metadataListener = new StepListener<>(); final Repository repo = repositoriesService.repository(snapshot.getRepository()); @@ -1493,10 +1691,11 @@ private void finalizeSnapshotEntry(SnapshotsInProgress.Entry entry, Metadata met meta -> repo.finalizeSnapshot( shardGenerations, repositoryData.getGenId(), - metadataForSnapshot(entry, meta), + metadataForSnapshot(meta, entry.includeGlobalState(), entry.partial(), entry.dataStreams(), entry.indices()), snapshotInfo, entry.version(), state -> stateWithoutSnapshot(state, snapshot), + Priority.NORMAL, ActionListener.wrap(newRepoData -> { completeListenersIgnoringException(endAndGetListenersToResolve(snapshot), Tuple.tuple(newRepoData, snapshotInfo)); logger.info("snapshot [{}] completed with state [{}]", snapshot, snapshotInfo.state()); @@ -2673,6 +2872,42 @@ private static Map shards( return Collections.unmodifiableMap(builder); } + private static ShardGenerations buildShardsGenerationFromRepositoryData( + Metadata metadata, + RoutingTable routingTable, + List indices, + RepositoryData repositoryData + ) { + ShardGenerations.Builder builder = ShardGenerations.builder(); + final ShardGenerations shardGenerations = repositoryData.shardGenerations(); + + for (IndexId index : indices) { + final String indexName = index.getName(); + final boolean isNewIndex = repositoryData.getIndices().containsKey(indexName) == false; + IndexMetadata indexMetadata = metadata.index(indexName); + + final IndexRoutingTable indexRoutingTable = routingTable.index(indexName); + for (int i = 0; i < indexMetadata.getNumberOfShards(); i++) { + final ShardId shardId = indexRoutingTable.shard(i).shardId(); + final String shardRepoGeneration; + + if (isNewIndex) { + assert shardGenerations.getShardGen(index, shardId.getId()) == null : "Found shard generation for new index [" + + index + + "]"; + shardRepoGeneration = ShardGenerations.NEW_SHARD_GEN; + } else { + shardRepoGeneration = shardGenerations.getShardGen(index, shardId.id()); + } + builder.put(index, shardId.id(), shardRepoGeneration); + + } + + } + + return builder.build(); + } + /** * Returns the data streams that are currently being snapshotted (with partial == false) and that are contained in the * indices-to-check set. diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/snapshots/create/CreateSnapshotResponseTests.java b/server/src/test/java/org/opensearch/action/admin/cluster/snapshots/create/CreateSnapshotResponseTests.java index 274a548fd98ab..2feb0d3ba9405 100644 --- a/server/src/test/java/org/opensearch/action/admin/cluster/snapshots/create/CreateSnapshotResponseTests.java +++ b/server/src/test/java/org/opensearch/action/admin/cluster/snapshots/create/CreateSnapshotResponseTests.java @@ -95,7 +95,8 @@ protected CreateSnapshotResponse createTestInstance() { shardFailures, globalState, SnapshotInfoTests.randomUserMetadata(), - false + false, + 0 ) ); } diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/snapshots/get/GetSnapshotsResponseTests.java b/server/src/test/java/org/opensearch/action/admin/cluster/snapshots/get/GetSnapshotsResponseTests.java index 3ef143e36dab9..58af390d194d3 100644 --- a/server/src/test/java/org/opensearch/action/admin/cluster/snapshots/get/GetSnapshotsResponseTests.java +++ b/server/src/test/java/org/opensearch/action/admin/cluster/snapshots/get/GetSnapshotsResponseTests.java @@ -77,7 +77,8 @@ protected GetSnapshotsResponse createTestInstance() { shardFailures, randomBoolean(), SnapshotInfoTests.randomUserMetadata(), - false + false, + 0 ) ); } diff --git a/server/src/test/java/org/opensearch/index/mapper/DerivedFieldMapperQueryTests.java b/server/src/test/java/org/opensearch/index/mapper/DerivedFieldMapperQueryTests.java index b9bdfca3509e3..c744f2592e24f 100644 --- a/server/src/test/java/org/opensearch/index/mapper/DerivedFieldMapperQueryTests.java +++ b/server/src/test/java/org/opensearch/index/mapper/DerivedFieldMapperQueryTests.java @@ -15,6 +15,7 @@ import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.queryparser.classic.ParseException; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.Query; import org.apache.lucene.search.TopDocs; @@ -24,9 +25,12 @@ import org.opensearch.common.lucene.Lucene; import org.opensearch.core.index.Index; import org.opensearch.geometry.Rectangle; +import org.opensearch.index.query.MatchPhrasePrefixQueryBuilder; +import org.opensearch.index.query.MultiMatchQueryBuilder; import org.opensearch.index.query.QueryBuilders; import org.opensearch.index.query.QueryShardContext; import org.opensearch.index.query.TermQueryBuilder; +import org.opensearch.index.search.QueryStringQueryParser; import org.opensearch.script.DerivedFieldScript; import java.io.IOException; @@ -435,7 +439,7 @@ public void execute() { } } - public void testObjectDerivedFields() throws IOException { + public void testObjectDerivedFields() throws IOException, ParseException { MapperService mapperService = createMapperService(topMapping(b -> { b.startObject("properties"); { @@ -545,6 +549,17 @@ public void execute() { topDocs = searcher.search(query, 10); assertEquals(0, topDocs.totalHits.value); + query = new MatchPhrasePrefixQueryBuilder("object_field.text_field", "document number").toQuery(queryShardContext); + topDocs = searcher.search(query, 10); + assertEquals(0, topDocs.totalHits.value); + + // Multi Phrase Query + query = QueryBuilders.multiMatchQuery("GET", "object_field.nested_field.sub_field_1", "object_field.keyword_field") + .type(MultiMatchQueryBuilder.Type.PHRASE) + .toQuery(queryShardContext); + topDocs = searcher.search(query, 10); + assertEquals(7, topDocs.totalHits.value); + // Range queries of types - date, long and double query = QueryBuilders.rangeQuery("object_field.date_field").from("2024-03-20T14:20:50").toQuery(queryShardContext); topDocs = searcher.search(query, 10); @@ -567,6 +582,11 @@ public void execute() { topDocs = searcher.search(query, 10); assertEquals(7, topDocs.totalHits.value); + QueryStringQueryParser queryParser = new QueryStringQueryParser(queryShardContext, "object_field.keyword_field"); + queryParser.parse("GE?"); + topDocs = searcher.search(query, 10); + assertEquals(7, topDocs.totalHits.value); + // Regexp Query query = QueryBuilders.regexpQuery("object_field.keyword_field", ".*let.*").toQuery(queryShardContext); topDocs = searcher.search(query, 10); diff --git a/server/src/test/java/org/opensearch/index/mapper/DerivedFieldTypeTests.java b/server/src/test/java/org/opensearch/index/mapper/DerivedFieldTypeTests.java index fe9db24f494ad..7da8c9eb1efa0 100644 --- a/server/src/test/java/org/opensearch/index/mapper/DerivedFieldTypeTests.java +++ b/server/src/test/java/org/opensearch/index/mapper/DerivedFieldTypeTests.java @@ -17,6 +17,7 @@ import org.apache.lucene.document.LongPoint; import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.memory.MemoryIndex; +import org.apache.lucene.queries.spans.SpanMultiTermQueryWrapper; import org.apache.lucene.util.BytesRef; import org.opensearch.OpenSearchException; import org.opensearch.common.collect.Tuple; @@ -59,6 +60,7 @@ public void testBooleanType() { assertTrue(dft.getFieldMapper() instanceof BooleanFieldMapper); assertTrue(dft.getIndexableFieldGenerator().apply(true) instanceof Field); assertTrue(dft.getIndexableFieldGenerator().apply(false) instanceof Field); + assertEquals("derived", dft.typeName()); } public void testDateType() { @@ -159,6 +161,22 @@ public void testGetAggregationScript_ip() throws IOException { assertEquals(new BytesRef(InetAddressPoint.encode(InetAddresses.forString((String) expected.get(0)))), result.get(0)); } + public void testDerivedFieldValueFetcherDoesNotSupportCustomFormats() { + DerivedFieldType dft = createDerivedFieldType("boolean"); + expectThrows( + IllegalArgumentException.class, + () -> dft.valueFetcher(mock(QueryShardContext.class), mock(SearchLookup.class), "yyyy-MM-dd") + ); + } + + public void testSpanPrefixQueryNotSupported() { + DerivedFieldType dft = createDerivedFieldType("boolean"); + expectThrows( + IllegalArgumentException.class, + () -> dft.spanPrefixQuery("value", mock(SpanMultiTermQueryWrapper.SpanRewriteMethod.class), mock(QueryShardContext.class)) + ); + } + private static LeafSearchLookup mockValueFetcherForAggs(QueryShardContext mockContext, DerivedFieldType dft, List expected) { SearchLookup searchLookup = mock(SearchLookup.class); LeafSearchLookup leafLookup = mock(LeafSearchLookup.class); diff --git a/server/src/test/java/org/opensearch/index/query/DerivedFieldQueryTests.java b/server/src/test/java/org/opensearch/index/query/DerivedFieldQueryTests.java index ecad1291bed19..bed2d22125810 100644 --- a/server/src/test/java/org/opensearch/index/query/DerivedFieldQueryTests.java +++ b/server/src/test/java/org/opensearch/index/query/DerivedFieldQueryTests.java @@ -88,7 +88,7 @@ public void execute() { // Create DerivedFieldQuery DerivedFieldQuery derivedFieldQuery = new DerivedFieldQuery( new TermQuery(new Term("ip_from_raw_request", "247.37.0.0")), - valueFetcher, + () -> valueFetcher, searchLookup, Lucene.STANDARD_ANALYZER, indexableFieldFunction, @@ -157,7 +157,7 @@ public void execute() { // Create DerivedFieldQuery DerivedFieldQuery derivedFieldQuery = new DerivedFieldQuery( new TermQuery(new Term("ip_from_raw_request", "247.37.0.0")), - valueFetcher, + () -> valueFetcher, searchLookup, Lucene.STANDARD_ANALYZER, badIndexableFieldFunction, @@ -169,7 +169,7 @@ public void execute() { // set ignore_malformed as true, query should pass derivedFieldQuery = new DerivedFieldQuery( new TermQuery(new Term("ip_from_raw_request", "247.37.0.0")), - valueFetcher, + () -> valueFetcher, searchLookup, Lucene.STANDARD_ANALYZER, badIndexableFieldFunction, diff --git a/server/src/test/java/org/opensearch/repositories/RepositoriesServiceTests.java b/server/src/test/java/org/opensearch/repositories/RepositoriesServiceTests.java index 43ebb86fd5342..cb0a36c870d07 100644 --- a/server/src/test/java/org/opensearch/repositories/RepositoriesServiceTests.java +++ b/server/src/test/java/org/opensearch/repositories/RepositoriesServiceTests.java @@ -51,6 +51,7 @@ import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.cluster.service.ClusterApplierService; import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.Priority; import org.opensearch.common.UUIDs; import org.opensearch.common.blobstore.BlobPath; import org.opensearch.common.blobstore.BlobStore; @@ -669,6 +670,7 @@ public void finalizeSnapshot( SnapshotInfo snapshotInfo, Version repositoryMetaVersion, Function stateTransformer, + Priority repositoryUpdatePriority, ActionListener listener ) { listener.onResponse(null); diff --git a/server/src/test/java/org/opensearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java b/server/src/test/java/org/opensearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java index e4e83f2453fa2..7fc987dcfa9bb 100644 --- a/server/src/test/java/org/opensearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java +++ b/server/src/test/java/org/opensearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java @@ -42,6 +42,7 @@ import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.cluster.routing.ShardRoutingHelper; import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.Priority; import org.opensearch.common.UUIDs; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; @@ -213,10 +214,12 @@ public void testSnapshotWithConflictingName() throws Exception { Collections.emptyList(), true, Collections.emptyMap(), - false + false, + 0 ), Version.CURRENT, Function.identity(), + Priority.NORMAL, f ) ); diff --git a/server/src/test/java/org/opensearch/repositories/blobstore/BlobStoreRepositoryTests.java b/server/src/test/java/org/opensearch/repositories/blobstore/BlobStoreRepositoryTests.java index bd47507da4863..eabac37bf3434 100644 --- a/server/src/test/java/org/opensearch/repositories/blobstore/BlobStoreRepositoryTests.java +++ b/server/src/test/java/org/opensearch/repositories/blobstore/BlobStoreRepositoryTests.java @@ -39,6 +39,7 @@ import org.opensearch.client.Client; import org.opensearch.cluster.metadata.RepositoryMetadata; import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.Priority; import org.opensearch.common.UUIDs; import org.opensearch.common.settings.Settings; import org.opensearch.core.common.unit.ByteSizeUnit; @@ -224,7 +225,7 @@ public void testRepositoryDataConcurrentModificationNotAllowed() { RepositoryData repositoryData = generateRandomRepoData(); final long startingGeneration = repositoryData.getGenId(); final PlainActionFuture future1 = PlainActionFuture.newFuture(); - repository.writeIndexGen(repositoryData, startingGeneration, Version.CURRENT, Function.identity(), future1); + repository.writeIndexGen(repositoryData, startingGeneration, Version.CURRENT, Function.identity(), Priority.NORMAL, future1); // write repo data again to index generational file, errors because we already wrote to the // N+1 generation from which this repository data instance was created @@ -295,7 +296,7 @@ public void testFsRepositoryCompressDeprecatedIgnored() { private static void writeIndexGen(BlobStoreRepository repository, RepositoryData repositoryData, long generation) throws Exception { PlainActionFuture.get( - f -> repository.writeIndexGen(repositoryData, generation, Version.CURRENT, Function.identity(), f) + f -> repository.writeIndexGen(repositoryData, generation, Version.CURRENT, Function.identity(), Priority.NORMAL, f) ); } diff --git a/server/src/test/java/org/opensearch/snapshots/SnapshotInfoTests.java b/server/src/test/java/org/opensearch/snapshots/SnapshotInfoTests.java index 850a392c9619c..684a8dd36fccc 100644 --- a/server/src/test/java/org/opensearch/snapshots/SnapshotInfoTests.java +++ b/server/src/test/java/org/opensearch/snapshots/SnapshotInfoTests.java @@ -86,7 +86,8 @@ protected SnapshotInfo createTestInstance() { shardFailures, includeGlobalState, userMetadata, - remoteStoreIndexShallowCopy + remoteStoreIndexShallowCopy, + 0 ); } @@ -114,7 +115,8 @@ protected SnapshotInfo mutateInstance(SnapshotInfo instance) { instance.shardFailures(), instance.includeGlobalState(), instance.userMetadata(), - instance.isRemoteStoreIndexShallowCopyEnabled() + instance.isRemoteStoreIndexShallowCopyEnabled(), + 0 ); case 1: int indicesSize = randomValueOtherThan(instance.indices().size(), () -> randomIntBetween(1, 10)); @@ -132,7 +134,8 @@ protected SnapshotInfo mutateInstance(SnapshotInfo instance) { instance.shardFailures(), instance.includeGlobalState(), instance.userMetadata(), - instance.isRemoteStoreIndexShallowCopyEnabled() + instance.isRemoteStoreIndexShallowCopyEnabled(), + 0 ); case 2: return new SnapshotInfo( @@ -146,7 +149,8 @@ protected SnapshotInfo mutateInstance(SnapshotInfo instance) { instance.shardFailures(), instance.includeGlobalState(), instance.userMetadata(), - instance.isRemoteStoreIndexShallowCopyEnabled() + instance.isRemoteStoreIndexShallowCopyEnabled(), + 0 ); case 3: return new SnapshotInfo( @@ -160,7 +164,8 @@ protected SnapshotInfo mutateInstance(SnapshotInfo instance) { instance.shardFailures(), instance.includeGlobalState(), instance.userMetadata(), - instance.isRemoteStoreIndexShallowCopyEnabled() + instance.isRemoteStoreIndexShallowCopyEnabled(), + 0 ); case 4: return new SnapshotInfo( @@ -174,7 +179,8 @@ protected SnapshotInfo mutateInstance(SnapshotInfo instance) { instance.shardFailures(), instance.includeGlobalState(), instance.userMetadata(), - instance.isRemoteStoreIndexShallowCopyEnabled() + instance.isRemoteStoreIndexShallowCopyEnabled(), + 0 ); case 5: int totalShards = randomValueOtherThan(instance.totalShards(), () -> randomIntBetween(0, 100)); @@ -200,7 +206,8 @@ protected SnapshotInfo mutateInstance(SnapshotInfo instance) { shardFailures, instance.includeGlobalState(), instance.userMetadata(), - instance.isRemoteStoreIndexShallowCopyEnabled() + instance.isRemoteStoreIndexShallowCopyEnabled(), + 0 ); case 6: return new SnapshotInfo( @@ -214,7 +221,8 @@ protected SnapshotInfo mutateInstance(SnapshotInfo instance) { instance.shardFailures(), Boolean.FALSE.equals(instance.includeGlobalState()), instance.userMetadata(), - instance.isRemoteStoreIndexShallowCopyEnabled() + instance.isRemoteStoreIndexShallowCopyEnabled(), + 0 ); case 7: return new SnapshotInfo( @@ -228,7 +236,8 @@ protected SnapshotInfo mutateInstance(SnapshotInfo instance) { instance.shardFailures(), instance.includeGlobalState(), randomValueOtherThan(instance.userMetadata(), SnapshotInfoTests::randomUserMetadata), - instance.isRemoteStoreIndexShallowCopyEnabled() + instance.isRemoteStoreIndexShallowCopyEnabled(), + 0 ); case 8: List dataStreams = randomValueOtherThan( @@ -246,7 +255,8 @@ protected SnapshotInfo mutateInstance(SnapshotInfo instance) { instance.shardFailures(), instance.includeGlobalState(), instance.userMetadata(), - instance.isRemoteStoreIndexShallowCopyEnabled() + instance.isRemoteStoreIndexShallowCopyEnabled(), + 123456 ); case 9: return new SnapshotInfo( @@ -260,7 +270,8 @@ protected SnapshotInfo mutateInstance(SnapshotInfo instance) { instance.shardFailures(), instance.includeGlobalState(), instance.userMetadata(), - Boolean.FALSE.equals(instance.isRemoteStoreIndexShallowCopyEnabled()) + Boolean.FALSE.equals(instance.isRemoteStoreIndexShallowCopyEnabled()), + 123456 ); default: throw new IllegalArgumentException("invalid randomization case"); diff --git a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java index 769dfeb37ff8d..e27223cea0778 100644 --- a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java @@ -2379,6 +2379,7 @@ public void onFailure(final Exception e) { clusterService, threadPool, snapshotsService, + repositoriesService, actionFilters, indexNameExpressionResolver ) diff --git a/server/src/test/java/org/opensearch/snapshots/mockstore/MockEventuallyConsistentRepositoryTests.java b/server/src/test/java/org/opensearch/snapshots/mockstore/MockEventuallyConsistentRepositoryTests.java index 43dde7281fb2d..06a486b3cb997 100644 --- a/server/src/test/java/org/opensearch/snapshots/mockstore/MockEventuallyConsistentRepositoryTests.java +++ b/server/src/test/java/org/opensearch/snapshots/mockstore/MockEventuallyConsistentRepositoryTests.java @@ -36,6 +36,7 @@ import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.metadata.RepositoryMetadata; import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.Priority; import org.opensearch.common.UUIDs; import org.opensearch.common.blobstore.BlobContainer; import org.opensearch.common.settings.ClusterSettings; @@ -233,10 +234,12 @@ public void testOverwriteSnapshotInfoBlob() throws Exception { Collections.emptyList(), true, Collections.emptyMap(), - false + false, + 0 ), Version.CURRENT, Function.identity(), + Priority.NORMAL, f )); @@ -259,10 +262,12 @@ public void testOverwriteSnapshotInfoBlob() throws Exception { Collections.emptyList(), true, Collections.emptyMap(), - false + false, + 0 ), Version.CURRENT, Function.identity(), + Priority.NORMAL, f ) ) @@ -287,10 +292,12 @@ public void testOverwriteSnapshotInfoBlob() throws Exception { Collections.emptyList(), true, Collections.emptyMap(), - false + false, + 0 ), Version.CURRENT, Function.identity(), + Priority.NORMAL, f ) ); diff --git a/test/framework/src/main/java/org/opensearch/index/shard/RestoreOnlyRepository.java b/test/framework/src/main/java/org/opensearch/index/shard/RestoreOnlyRepository.java index be2f895301396..1ca1a6969ab2d 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/RestoreOnlyRepository.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/RestoreOnlyRepository.java @@ -39,6 +39,7 @@ import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.metadata.RepositoryMetadata; import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.common.Priority; import org.opensearch.common.lifecycle.AbstractLifecycleComponent; import org.opensearch.core.action.ActionListener; import org.opensearch.core.index.shard.ShardId; @@ -125,6 +126,7 @@ public void finalizeSnapshot( SnapshotInfo snapshotInfo, Version repositoryMetaVersion, Function stateTransformer, + Priority repositoryUpdatePriority, ActionListener listener ) { listener.onResponse(null); diff --git a/test/framework/src/main/java/org/opensearch/repositories/blobstore/BlobStoreTestUtil.java b/test/framework/src/main/java/org/opensearch/repositories/blobstore/BlobStoreTestUtil.java index 32f445bf24a41..187c30be0044e 100644 --- a/test/framework/src/main/java/org/opensearch/repositories/blobstore/BlobStoreTestUtil.java +++ b/test/framework/src/main/java/org/opensearch/repositories/blobstore/BlobStoreTestUtil.java @@ -298,23 +298,25 @@ private static void assertSnapshotUUIDs(BlobStoreRepository repository, Reposito .stream() .noneMatch(shardFailure -> shardFailure.index().equals(index) && shardFailure.shardId() == shardId)) { final Map shardPathContents = shardContainer.listBlobs(); - - assertTrue( - shardPathContents.containsKey( - String.format(Locale.ROOT, BlobStoreRepository.SHALLOW_SNAPSHOT_NAME_FORMAT, snapshotId.getUUID()) - ) - || shardPathContents.containsKey( - String.format(Locale.ROOT, BlobStoreRepository.SNAPSHOT_NAME_FORMAT, snapshotId.getUUID()) + if (snapshotInfo.getPinnedTimestamp() == 0) { + assertTrue( + shardPathContents.containsKey( + String.format(Locale.ROOT, BlobStoreRepository.SHALLOW_SNAPSHOT_NAME_FORMAT, snapshotId.getUUID()) ) - ); + || shardPathContents.containsKey( + String.format(Locale.ROOT, BlobStoreRepository.SNAPSHOT_NAME_FORMAT, snapshotId.getUUID()) + ) + ); + + assertThat( + shardPathContents.keySet() + .stream() + .filter(name -> name.startsWith(BlobStoreRepository.INDEX_FILE_PREFIX)) + .count(), + lessThanOrEqualTo(2L) + ); + } - assertThat( - shardPathContents.keySet() - .stream() - .filter(name -> name.startsWith(BlobStoreRepository.INDEX_FILE_PREFIX)) - .count(), - lessThanOrEqualTo(2L) - ); } } } diff --git a/test/framework/src/main/java/org/opensearch/snapshots/AbstractSnapshotIntegTestCase.java b/test/framework/src/main/java/org/opensearch/snapshots/AbstractSnapshotIntegTestCase.java index ce76914882150..ec9cd5b64353e 100644 --- a/test/framework/src/main/java/org/opensearch/snapshots/AbstractSnapshotIntegTestCase.java +++ b/test/framework/src/main/java/org/opensearch/snapshots/AbstractSnapshotIntegTestCase.java @@ -48,6 +48,7 @@ import org.opensearch.cluster.metadata.RepositoryMetadata; import org.opensearch.cluster.routing.allocation.decider.EnableAllocationDecider; import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.Priority; import org.opensearch.common.UUIDs; import org.opensearch.common.action.ActionFuture; import org.opensearch.common.blobstore.BlobContainer; @@ -612,7 +613,8 @@ protected void addBwCFailedSnapshot(String repoName, String snapshotName, Mapget( f -> repo.finalizeSnapshot( @@ -622,6 +624,7 @@ protected void addBwCFailedSnapshot(String repoName, String snapshotName, Map