diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationBaseIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationBaseIT.java index 641f714d33414..796f09cb9528f 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationBaseIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationBaseIT.java @@ -19,7 +19,6 @@ import org.opensearch.common.lease.Releasable; import org.opensearch.common.settings.Settings; import org.opensearch.core.index.Index; -import org.opensearch.core.index.shard.ShardId; import org.opensearch.index.IndexModule; import org.opensearch.index.IndexService; import org.opensearch.index.SegmentReplicationShardStats; @@ -175,17 +174,6 @@ private IndexShard getIndexShard(ClusterState state, ShardRouting routing, Strin return getIndexShard(state.nodes().get(routing.currentNodeId()).getName(), routing.shardId(), indexName); } - /** - * Fetch IndexShard by shardId, multiple shards per node allowed. - */ - protected IndexShard getIndexShard(String node, ShardId shardId, String indexName) { - final Index index = resolveIndex(indexName); - IndicesService indicesService = internalCluster().getInstance(IndicesService.class, node); - IndexService indexService = indicesService.indexServiceSafe(index); - final Optional id = indexService.shardIds().stream().filter(sid -> sid == shardId.id()).findFirst(); - return indexService.getShard(id.get()); - } - /** * Fetch IndexShard, assumes only a single shard per node. */ diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/stats/IndexStatsIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/stats/IndexStatsIT.java index 21ab6e8a4f018..1d5da9370cce3 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/stats/IndexStatsIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/stats/IndexStatsIT.java @@ -107,6 +107,7 @@ import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS; import static org.opensearch.common.xcontent.XContentFactory.jsonBuilder; +import static org.opensearch.indices.IndicesService.CLUSTER_REPLICATION_TYPE_SETTING; import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAllSuccessful; @@ -130,7 +131,8 @@ public IndexStatsIT(Settings settings) { public static Collection parameters() { return Arrays.asList( new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), false).build() }, - new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), true).build() } + new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), true).build() }, + new Object[] { Settings.builder().put(CLUSTER_REPLICATION_TYPE_SETTING.getKey(), ReplicationType.SEGMENT).build() } ); } @@ -175,7 +177,7 @@ public void testFieldDataStats() throws InterruptedException { ensureGreen(); client().prepareIndex("test").setId("1").setSource("field", "value1", "field2", "value1").execute().actionGet(); client().prepareIndex("test").setId("2").setSource("field", "value2", "field2", "value2").execute().actionGet(); - client().admin().indices().prepareRefresh().execute().actionGet(); + refreshAndWaitForReplication(); indexRandomForConcurrentSearch("test"); NodesStatsResponse nodesStats = client().admin().cluster().prepareNodesStats("data:true").setIndices(true).execute().actionGet(); @@ -299,7 +301,7 @@ public void testClearAllCaches() throws Exception { client().admin().cluster().prepareHealth().setWaitForGreenStatus().execute().actionGet(); client().prepareIndex("test").setId("1").setSource("field", "value1").execute().actionGet(); client().prepareIndex("test").setId("2").setSource("field", "value2").execute().actionGet(); - client().admin().indices().prepareRefresh().execute().actionGet(); + refreshAndWaitForReplication(); indexRandomForConcurrentSearch("test"); NodesStatsResponse nodesStats = client().admin().cluster().prepareNodesStats("data:true").setIndices(true).execute().actionGet(); @@ -667,7 +669,7 @@ public void testSimpleStats() throws Exception { client().prepareIndex("test1").setId(Integer.toString(1)).setSource("field", "value").execute().actionGet(); client().prepareIndex("test1").setId(Integer.toString(2)).setSource("field", "value").execute().actionGet(); client().prepareIndex("test2").setId(Integer.toString(1)).setSource("field", "value").execute().actionGet(); - refresh(); + refreshAndWaitForReplication(); NumShards test1 = getNumShards("test1"); long test1ExpectedWrites = 2 * test1.dataCopies; @@ -682,7 +684,13 @@ public void testSimpleStats() throws Exception { assertThat(stats.getPrimaries().getIndexing().getTotal().getIndexFailedCount(), equalTo(0L)); assertThat(stats.getPrimaries().getIndexing().getTotal().isThrottled(), equalTo(false)); assertThat(stats.getPrimaries().getIndexing().getTotal().getThrottleTime().millis(), equalTo(0L)); - assertThat(stats.getTotal().getIndexing().getTotal().getIndexCount(), equalTo(totalExpectedWrites)); + + // This assert should not be done on segrep enabled indices because we are asserting Indexing/Write operations count on + // all primary and replica shards. But in case of segrep, Indexing/Write operation don't happen on replica shards. So we can + // ignore this assert check for segrep enabled indices. + if (isSegmentReplicationEnabledForIndex("test1") == false && isSegmentReplicationEnabledForIndex("test2") == false) { + assertThat(stats.getTotal().getIndexing().getTotal().getIndexCount(), equalTo(totalExpectedWrites)); + } assertThat(stats.getTotal().getStore(), notNullValue()); assertThat(stats.getTotal().getMerge(), notNullValue()); assertThat(stats.getTotal().getFlush(), notNullValue()); @@ -825,6 +833,7 @@ public void testMergeStats() { client().admin().indices().prepareForceMerge().setMaxNumSegments(1).execute().actionGet(); stats = client().admin().indices().prepareStats().setMerge(true).execute().actionGet(); + refreshAndWaitForReplication(); assertThat(stats.getTotal().getMerge(), notNullValue()); assertThat(stats.getTotal().getMerge().getTotal(), greaterThan(0L)); } @@ -851,7 +860,7 @@ public void testSegmentsStats() { client().admin().indices().prepareFlush().get(); client().admin().indices().prepareForceMerge().setMaxNumSegments(1).execute().actionGet(); - client().admin().indices().prepareRefresh().get(); + refreshAndWaitForReplication(); stats = client().admin().indices().prepareStats().setSegments(true).get(); assertThat(stats.getTotal().getSegments(), notNullValue()); @@ -869,7 +878,7 @@ public void testAllFlags() throws Exception { client().prepareIndex("test_index").setId(Integer.toString(2)).setSource("field", "value").execute().actionGet(); client().prepareIndex("test_index_2").setId(Integer.toString(1)).setSource("field", "value").execute().actionGet(); - client().admin().indices().prepareRefresh().execute().actionGet(); + refreshAndWaitForReplication(); IndicesStatsRequestBuilder builder = client().admin().indices().prepareStats(); Flag[] values = CommonStatsFlags.Flag.values(); for (Flag flag : values) { @@ -1453,6 +1462,7 @@ public void testZeroRemoteStoreStatsOnNonRemoteStoreIndex() { .get() .status() ); + refreshAndWaitForReplication(); ShardStats shard = client().admin().indices().prepareStats(indexName).setSegments(true).setTranslog(true).get().getShards()[0]; RemoteSegmentStats remoteSegmentStatsFromIndexStats = shard.getStats().getSegments().getRemoteSegmentStats(); assertZeroRemoteSegmentStats(remoteSegmentStatsFromIndexStats); diff --git a/server/src/internalClusterTest/java/org/opensearch/recovery/RecoveryWhileUnderLoadIT.java b/server/src/internalClusterTest/java/org/opensearch/recovery/RecoveryWhileUnderLoadIT.java index 30d5af58df545..eb293aeb6d490 100644 --- a/server/src/internalClusterTest/java/org/opensearch/recovery/RecoveryWhileUnderLoadIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/recovery/RecoveryWhileUnderLoadIT.java @@ -32,6 +32,8 @@ package org.opensearch.recovery; +import com.carrotsearch.randomizedtesting.annotations.ParametersFactory; + import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.opensearch.action.admin.indices.refresh.RefreshResponse; @@ -52,10 +54,11 @@ import org.opensearch.index.IndexSettings; import org.opensearch.index.shard.DocsStats; import org.opensearch.index.translog.Translog; +import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.plugins.Plugin; import org.opensearch.search.sort.SortOrder; import org.opensearch.test.BackgroundIndexer; -import org.opensearch.test.OpenSearchIntegTestCase; +import org.opensearch.test.ParameterizedStaticSettingsOpenSearchIntegTestCase; import java.util.Arrays; import java.util.Collection; @@ -69,12 +72,26 @@ import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS; import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS; import static org.opensearch.index.query.QueryBuilders.matchAllQuery; +import static org.opensearch.indices.IndicesService.CLUSTER_REPLICATION_TYPE_SETTING; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAllSuccessful; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertNoTimeout; -public class RecoveryWhileUnderLoadIT extends OpenSearchIntegTestCase { +public class RecoveryWhileUnderLoadIT extends ParameterizedStaticSettingsOpenSearchIntegTestCase { + + public RecoveryWhileUnderLoadIT(Settings settings) { + super(settings); + } + + @ParametersFactory + public static Collection parameters() { + return Arrays.asList( + new Object[] { Settings.builder().put(CLUSTER_REPLICATION_TYPE_SETTING.getKey(), ReplicationType.DOCUMENT).build() }, + new Object[] { Settings.builder().put(CLUSTER_REPLICATION_TYPE_SETTING.getKey(), ReplicationType.SEGMENT).build() } + ); + } + private final Logger logger = LogManager.getLogger(RecoveryWhileUnderLoadIT.class); public static final class RetentionLeaseSyncIntervalSettingPlugin extends Plugin { @@ -150,7 +167,7 @@ public void testRecoverWhileUnderLoadAllocateReplicasTest() throws Exception { logger.info("--> indexing threads stopped"); logger.info("--> refreshing the index"); - refreshAndAssert(); + assertAfterRefreshAndWaitForReplication(); logger.info("--> verifying indexed content"); iterateAssertCount(numberOfShards, 10, indexer.getIds()); } @@ -211,7 +228,7 @@ public void testRecoverWhileUnderLoadAllocateReplicasRelocatePrimariesTest() thr logger.info("--> indexing threads stopped"); logger.info("--> refreshing the index"); - refreshAndAssert(); + assertAfterRefreshAndWaitForReplication(); logger.info("--> verifying indexed content"); iterateAssertCount(numberOfShards, 10, indexer.getIds()); } @@ -325,7 +342,7 @@ public void testRecoverWhileUnderLoadWithReducedAllowedNodes() throws Exception ); logger.info("--> refreshing the index"); - refreshAndAssert(); + assertAfterRefreshAndWaitForReplication(); logger.info("--> verifying indexed content"); iterateAssertCount(numberOfShards, 10, indexer.getIds()); } @@ -375,7 +392,7 @@ public void testRecoverWhileRelocating() throws Exception { ensureGreen(TimeValue.timeValueMinutes(5)); logger.info("--> refreshing the index"); - refreshAndAssert(); + assertAfterRefreshAndWaitForReplication(); logger.info("--> verifying indexed content"); iterateAssertCount(numShards, 10, indexer.getIds()); } @@ -474,10 +491,11 @@ private void logSearchResponse(int numberOfShards, long numberOfDocs, int iterat ); } - private void refreshAndAssert() throws Exception { + private void assertAfterRefreshAndWaitForReplication() throws Exception { assertBusy(() -> { RefreshResponse actionGet = client().admin().indices().prepareRefresh().get(); assertAllSuccessful(actionGet); }, 5, TimeUnit.MINUTES); + waitForReplication(); } } diff --git a/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java b/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java index 0f9ab3aa3d64f..33d5669d33297 100644 --- a/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java +++ b/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java @@ -37,6 +37,7 @@ import com.carrotsearch.randomizedtesting.generators.RandomPicks; import org.apache.lucene.codecs.Codec; +import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.search.Sort; import org.apache.lucene.search.TotalHits; import org.apache.lucene.tests.util.LuceneTestCase; @@ -92,6 +93,7 @@ import org.opensearch.common.Nullable; import org.opensearch.common.Priority; import org.opensearch.common.collect.Tuple; +import org.opensearch.common.concurrent.GatedCloseable; import org.opensearch.common.network.NetworkModule; import org.opensearch.common.regex.Regex; import org.opensearch.common.settings.FeatureFlagSettings; @@ -114,6 +116,7 @@ import org.opensearch.core.common.unit.ByteSizeValue; import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException; import org.opensearch.core.index.Index; +import org.opensearch.core.index.shard.ShardId; import org.opensearch.core.rest.RestStatus; import org.opensearch.core.xcontent.MediaTypeRegistry; import org.opensearch.core.xcontent.NamedXContentRegistry; @@ -123,6 +126,7 @@ import org.opensearch.env.Environment; import org.opensearch.env.TestEnvironment; import org.opensearch.index.IndexModule; +import org.opensearch.index.IndexService; import org.opensearch.index.IndexSettings; import org.opensearch.index.MergeSchedulerConfig; import org.opensearch.index.MockEngineFactoryPlugin; @@ -131,10 +135,12 @@ import org.opensearch.index.engine.Segment; import org.opensearch.index.mapper.CompletionFieldMapper; import org.opensearch.index.mapper.MockFieldFilterPlugin; +import org.opensearch.index.shard.IndexShard; import org.opensearch.index.store.Store; import org.opensearch.index.translog.Translog; import org.opensearch.indices.IndicesQueryCache; import org.opensearch.indices.IndicesRequestCache; +import org.opensearch.indices.IndicesService; import org.opensearch.indices.store.IndicesStore; import org.opensearch.monitor.os.OsInfo; import org.opensearch.node.NodeMocksPlugin; @@ -182,6 +188,7 @@ import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.Optional; import java.util.Random; import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; @@ -1556,14 +1563,17 @@ public void indexRandom(boolean forceRefresh, boolean dummyDocuments, boolean ma if (dummyDocuments) { indexRandomForMultipleSlices(indicesArray); } + if (forceRefresh) { + waitForReplication(); + } } /* - * This method ingests bogus documents for the given indices such that multiple slices - * are formed. This is useful for testing with the concurrent search use-case as it creates - * multiple slices based on segment count. - * @param indices the indices in which bogus documents should be ingested - * */ + * This method ingests bogus documents for the given indices such that multiple slices + * are formed. This is useful for testing with the concurrent search use-case as it creates + * multiple slices based on segment count. + * @param indices the indices in which bogus documents should be ingested + * */ protected void indexRandomForMultipleSlices(String... indices) throws InterruptedException { Set> bogusIds = new HashSet<>(); int refreshCount = randomIntBetween(2, 3); @@ -2357,4 +2367,96 @@ protected ClusterState getClusterState() { return client(internalCluster().getClusterManagerName()).admin().cluster().prepareState().get().getState(); } + /** + * Refreshes the indices in the cluster and waits until active/started replica shards + * are caught up with primary shard only when Segment Replication is enabled. + * This doesn't wait for inactive/non-started replica shards to become active/started. + */ + protected RefreshResponse refreshAndWaitForReplication(String... indices) { + RefreshResponse refreshResponse = refresh(indices); + waitForReplication(); + return refreshResponse; + } + + /** + * Waits until active/started replica shards are caught up with primary shard only when Segment Replication is enabled. + * This doesn't wait for inactive/non-started replica shards to become active/started. + */ + protected void waitForReplication(String... indices) { + if (indices.length == 0) { + indices = getClusterState().routingTable().indicesRouting().keySet().toArray(String[]::new); + } + try { + for (String index : indices) { + if (isSegmentReplicationEnabledForIndex(index)) { + if (isInternalCluster()) { + IndexRoutingTable indexRoutingTable = getClusterState().routingTable().index(index); + if (indexRoutingTable != null) { + assertBusy(() -> { + for (IndexShardRoutingTable shardRoutingTable : indexRoutingTable) { + final ShardRouting primaryRouting = shardRoutingTable.primaryShard(); + if (primaryRouting.state().toString().equals("STARTED")) { + if (isSegmentReplicationEnabledForIndex(index)) { + final List replicaRouting = shardRoutingTable.replicaShards(); + final IndexShard primaryShard = getIndexShard(primaryRouting, index); + for (ShardRouting replica : replicaRouting) { + if (replica.state().toString().equals("STARTED")) { + IndexShard replicaShard = getIndexShard(replica, index); + assertEquals( + "replica shards haven't caught up with primary", + getLatestSegmentInfoVersion(primaryShard), + getLatestSegmentInfoVersion(replicaShard) + ); + } + } + } + } + } + }, 30, TimeUnit.SECONDS); + } + } else { + throw new IllegalStateException( + "Segment Replication is not supported for testing tests using External Test Cluster" + ); + } + } + } + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + /** + * Checks if Segment Replication is enabled on Index. + */ + protected boolean isSegmentReplicationEnabledForIndex(String index) { + return clusterService().state().getMetadata().isSegmentReplicationEnabled(index); + } + + protected IndexShard getIndexShard(ShardRouting routing, String indexName) { + return getIndexShard(getClusterState().nodes().get(routing.currentNodeId()).getName(), routing.shardId(), indexName); + } + + /** + * Fetch IndexShard by shardId, multiple shards per node allowed. + */ + protected IndexShard getIndexShard(String node, ShardId shardId, String indexName) { + final Index index = resolveIndex(indexName); + IndicesService indicesService = internalCluster().getInstance(IndicesService.class, node); + IndexService indexService = indicesService.indexServiceSafe(index); + final Optional id = indexService.shardIds().stream().filter(sid -> sid.equals(shardId.id())).findFirst(); + return indexService.getShard(id.get()); + } + + /** + * Fetch latest segment info snapshot version of an index. + */ + protected long getLatestSegmentInfoVersion(IndexShard shard) { + try (final GatedCloseable snapshot = shard.getSegmentInfosSnapshot()) { + return snapshot.get().version; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } diff --git a/test/framework/src/main/java/org/opensearch/test/ParameterizedOpenSearchIntegTestCase.java b/test/framework/src/main/java/org/opensearch/test/ParameterizedOpenSearchIntegTestCase.java index 88a44f87952f5..23316adf6a2d7 100644 --- a/test/framework/src/main/java/org/opensearch/test/ParameterizedOpenSearchIntegTestCase.java +++ b/test/framework/src/main/java/org/opensearch/test/ParameterizedOpenSearchIntegTestCase.java @@ -35,7 +35,7 @@ abstract class ParameterizedOpenSearchIntegTestCase extends OpenSearchIntegTestC // This method shouldn't be called in setupSuiteScopeCluster(). Only call this method inside single test. public void indexRandomForConcurrentSearch(String... indices) throws InterruptedException { - if (settings.get(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey()).equals("true")) { + if (CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.get(settings)) { indexRandomForMultipleSlices(indices); } }