From 41f6d9f49b2f82a3468699c1b1da91bcdcb06fcb Mon Sep 17 00:00:00 2001 From: Gaurav Bafna <85113518+gbbafna@users.noreply.github.com> Date: Thu, 10 Aug 2023 21:25:42 +0530 Subject: [PATCH] [Remote Store] Fix couple of Remote Store flaky test and use bulk api for ingestion (#9190) --------- Signed-off-by: Gaurav Bafna (cherry picked from commit d06926c863c4a1ad0caf9e87f43fc7c8d6369b8c) Signed-off-by: Marc Handalian --- .../RemoteStoreBaseIntegTestCase.java | 40 ++++++++++++++----- .../opensearch/remotestore/RemoteStoreIT.java | 29 ++++++++------ .../ReplicaToPrimaryPromotionIT.java | 3 +- 3 files changed, 49 insertions(+), 23 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java index d339530ab438c..caac7c581fc51 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java @@ -9,6 +9,10 @@ package org.opensearch.remotestore; import org.junit.After; +import org.opensearch.action.bulk.BulkItemResponse; +import org.opensearch.action.bulk.BulkRequest; +import org.opensearch.action.bulk.BulkResponse; +import org.opensearch.action.index.IndexRequest; import org.opensearch.action.index.IndexResponse; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.common.UUIDs; @@ -31,10 +35,10 @@ import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; -import static org.opensearch.indices.IndicesService.CLUSTER_REPLICATION_TYPE_SETTING; -import static org.opensearch.indices.IndicesService.CLUSTER_REMOTE_STORE_ENABLED_SETTING; import static org.opensearch.indices.IndicesService.CLUSTER_REMOTE_SEGMENT_STORE_REPOSITORY_SETTING; +import static org.opensearch.indices.IndicesService.CLUSTER_REMOTE_STORE_ENABLED_SETTING; import static org.opensearch.indices.IndicesService.CLUSTER_REMOTE_TRANSLOG_REPOSITORY_SETTING; +import static org.opensearch.indices.IndicesService.CLUSTER_REPLICATION_TYPE_SETTING; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; public class RemoteStoreBaseIntegTestCase extends OpenSearchIntegTestCase { @@ -74,13 +78,18 @@ protected Map indexData(int numberOfIterations, boolean invokeFlus indexingStats.put(MAX_SEQ_NO_REFRESHED_OR_FLUSHED + "-shard-" + shardId, maxSeqNoRefreshedOrFlushed); refreshedOrFlushedOperations = totalOperations; int numberOfOperations = randomIntBetween(20, 50); - for (int j = 0; j < numberOfOperations; j++) { - IndexResponse response = indexSingleDoc(index); - maxSeqNo = response.getSeqNo(); - shardId = response.getShardId().id(); - indexingStats.put(MAX_SEQ_NO_TOTAL + "-shard-" + shardId, maxSeqNo); + int numberOfBulk = randomIntBetween(1, 5); + for (int j = 0; j < numberOfBulk; j++) { + BulkResponse res = indexBulk(index, numberOfOperations); + for (BulkItemResponse singleResp : res.getItems()) { + indexingStats.put( + MAX_SEQ_NO_TOTAL + "-shard-" + singleResp.getResponse().getShardId().id(), + singleResp.getResponse().getSeqNo() + ); + maxSeqNo = singleResp.getResponse().getSeqNo(); + } + totalOperations += numberOfOperations; } - totalOperations += numberOfOperations; } indexingStats.put(TOTAL_OPERATIONS, totalOperations); @@ -123,6 +132,18 @@ protected IndexResponse indexSingleDoc(String indexName) { .get(); } + protected BulkResponse indexBulk(String indexName, int numDocs) { + BulkRequest bulkRequest = new BulkRequest(); + for (int i = 0; i < numDocs; i++) { + final IndexRequest request = client().prepareIndex(indexName) + .setId(UUIDs.randomBase64UUID()) + .setSource(documentKeys.get(randomIntBetween(0, documentKeys.size() - 1)), randomAlphaOfLength(5)) + .request(); + bulkRequest.add(request); + } + return client().bulk(bulkRequest).actionGet(); + } + public static Settings remoteStoreClusterSettings(String segmentRepoName) { return remoteStoreClusterSettings(segmentRepoName, segmentRepoName); } @@ -170,10 +191,11 @@ protected Settings remoteStoreIndexSettings(int numberOfReplicas) { return remoteStoreIndexSettings(numberOfReplicas, 1); } - protected Settings remoteStoreIndexSettings(int numberOfReplicas, long totalFieldLimit) { + protected Settings remoteStoreIndexSettings(int numberOfReplicas, long totalFieldLimit, int refresh) { return Settings.builder() .put(remoteStoreIndexSettings(numberOfReplicas)) .put(MapperService.INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING.getKey(), totalFieldLimit) + .put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), String.valueOf(refresh)) .build(); } diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java index ba80b7d06fd4b..b7bb0cc6608d0 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java @@ -8,6 +8,7 @@ package org.opensearch.remotestore; +import org.hamcrest.MatcherAssert; import org.junit.Before; import org.opensearch.action.admin.indices.delete.DeleteIndexRequest; import org.opensearch.action.admin.indices.recovery.RecoveryResponse; @@ -15,7 +16,6 @@ import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.routing.RecoverySource; import org.opensearch.common.settings.Settings; -import org.opensearch.index.shard.RemoteStoreRefreshListener; import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.plugins.Plugin; import org.opensearch.test.OpenSearchIntegTestCase; @@ -29,7 +29,9 @@ import java.util.concurrent.TimeUnit; import static org.hamcrest.Matchers.comparesEqualTo; -import static org.hamcrest.Matchers.comparesEqualTo; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.oneOf; +import static org.opensearch.index.shard.RemoteStoreRefreshListener.LAST_N_METADATA_FILES_TO_KEEP; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount; @@ -143,10 +145,9 @@ public void testRemoteTranslogCleanup() throws Exception { verifyRemoteStoreCleanup(); } - @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/8658") public void testStaleCommitDeletionWithInvokeFlush() throws Exception { - internalCluster().startDataOnlyNodes(3); - createIndex(INDEX_NAME, remoteStoreIndexSettings(1, 10000l)); + internalCluster().startDataOnlyNodes(1); + createIndex(INDEX_NAME, remoteStoreIndexSettings(1, 10000l, -1)); int numberOfIterations = randomIntBetween(5, 15); indexData(numberOfIterations, true, INDEX_NAME); String indexUUID = client().admin() @@ -158,20 +159,22 @@ public void testStaleCommitDeletionWithInvokeFlush() throws Exception { // Delete is async. assertBusy(() -> { int actualFileCount = getFileCount(indexPath); - if (numberOfIterations <= RemoteStoreRefreshListener.LAST_N_METADATA_FILES_TO_KEEP) { - assertEquals(numberOfIterations, actualFileCount); + if (numberOfIterations <= LAST_N_METADATA_FILES_TO_KEEP) { + MatcherAssert.assertThat(actualFileCount, is(oneOf(numberOfIterations - 1, numberOfIterations, numberOfIterations + 1))); } else { // As delete is async its possible that the file gets created before the deletion or after // deletion. - assertTrue(actualFileCount >= 10 || actualFileCount <= 11); + MatcherAssert.assertThat( + actualFileCount, + is(oneOf(LAST_N_METADATA_FILES_TO_KEEP - 1, LAST_N_METADATA_FILES_TO_KEEP, LAST_N_METADATA_FILES_TO_KEEP + 1)) + ); } }, 30, TimeUnit.SECONDS); } - @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/8658") public void testStaleCommitDeletionWithoutInvokeFlush() throws Exception { - internalCluster().startDataOnlyNodes(3); - createIndex(INDEX_NAME, remoteStoreIndexSettings(1, 10000l)); + internalCluster().startDataOnlyNodes(1); + createIndex(INDEX_NAME, remoteStoreIndexSettings(1, 10000l, -1)); int numberOfIterations = randomIntBetween(5, 15); indexData(numberOfIterations, false, INDEX_NAME); String indexUUID = client().admin() @@ -180,6 +183,8 @@ public void testStaleCommitDeletionWithoutInvokeFlush() throws Exception { .get() .getSetting(INDEX_NAME, IndexMetadata.SETTING_INDEX_UUID); Path indexPath = Path.of(String.valueOf(absolutePath), indexUUID, "/0/segments/metadata"); - assertEquals(numberOfIterations, getFileCount(indexPath)); + int actualFileCount = getFileCount(indexPath); + // We also allow (numberOfIterations + 1) as index creation also triggers refresh. + MatcherAssert.assertThat(actualFileCount, is(oneOf(numberOfIterations - 1, numberOfIterations, numberOfIterations + 1))); } } diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/ReplicaToPrimaryPromotionIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/ReplicaToPrimaryPromotionIT.java index b68fd1f764a63..275197ec831d3 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/ReplicaToPrimaryPromotionIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/ReplicaToPrimaryPromotionIT.java @@ -122,7 +122,6 @@ public void testPromoteReplicaToPrimary() throws Exception { assertHitCount(client().prepareSearch(indexName).setSize(0).get(), numOfDocs); } - @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/9130") public void testFailoverWhileIndexing() throws Exception { internalCluster().startNode(); internalCluster().startNode(); @@ -143,7 +142,7 @@ public void testFailoverWhileIndexing() throws Exception { .setSource("field", numAutoGenDocs.get()) .get(); - if (indexResponse.status() == RestStatus.CREATED || indexResponse.status() == RestStatus.ACCEPTED) { + if (indexResponse.status() == RestStatus.CREATED || indexResponse.status() == RestStatus.OK) { numAutoGenDocs.incrementAndGet(); if (numAutoGenDocs.get() == docCount / 2) { if (random().nextInt(3) == 0) {