From 470dea6ac1563a0c445b6b427ebe672db3f0da97 Mon Sep 17 00:00:00 2001 From: Ashish Singh Date: Fri, 7 Jul 2023 02:04:51 +0530 Subject: [PATCH 1/7] Extend existing IndexRecoveryIT for remote indexes Signed-off-by: Ashish Singh --- .../indices/recovery/IndexRecoveryIT.java | 47 ++++++++++-- .../remotestore/RemoteIndexRecoveryIT.java | 75 +++++++++++++++++++ .../opensearch/index/engine/NoOpEngine.java | 11 +-- .../opensearch/test/InternalTestCluster.java | 12 +-- .../test/OpenSearchIntegTestCase.java | 1 - 5 files changed, 129 insertions(+), 17 deletions(-) create mode 100644 server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteIndexRecoveryIT.java diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/recovery/IndexRecoveryIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/recovery/IndexRecoveryIT.java index d04c31c0d6e24..78416bfcb285d 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/recovery/IndexRecoveryIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/recovery/IndexRecoveryIT.java @@ -34,6 +34,7 @@ import org.apache.lucene.analysis.TokenStream; import org.apache.lucene.index.IndexCommit; +import org.hamcrest.Matcher; import org.opensearch.OpenSearchException; import org.opensearch.Version; import org.opensearch.action.admin.cluster.health.ClusterHealthResponse; @@ -101,8 +102,8 @@ import org.opensearch.indices.IndicesService; import org.opensearch.indices.NodeIndicesStats; import org.opensearch.indices.analysis.AnalysisModule; -import org.opensearch.indices.replication.common.ReplicationLuceneIndex; import org.opensearch.indices.recovery.RecoveryState.Stage; +import org.opensearch.indices.replication.common.ReplicationLuceneIndex; import org.opensearch.node.NodeClosedException; import org.opensearch.node.RecoverySettingsChunkSizePlugin; import org.opensearch.plugins.AnalysisPlugin; @@ -231,7 +232,7 @@ private void assertRecoveryState( int shardId, RecoverySource type, boolean primary, - Stage stage, + RecoveryState.Stage stage, String sourceNode, String targetNode ) { @@ -287,6 +288,7 @@ private void restoreRecoverySpeed() { public void testGatewayRecovery() throws Exception { logger.info("--> start nodes"); String node = internalCluster().startNode(); + afterFirstStartNode(); createAndPopulateIndex(INDEX_NAME, 1, SHARD_COUNT, REPLICA_COUNT); @@ -309,9 +311,14 @@ public void testGatewayRecovery() throws Exception { validateIndexRecoveryState(recoveryState.getIndex()); } + protected void afterFirstStartNode() { + // No-op + } + public void testGatewayRecoveryTestActiveOnly() throws Exception { logger.info("--> start nodes"); internalCluster().startNode(); + afterFirstStartNode(); createAndPopulateIndex(INDEX_NAME, 1, SHARD_COUNT, REPLICA_COUNT); @@ -328,6 +335,7 @@ public void testGatewayRecoveryTestActiveOnly() throws Exception { public void testReplicaRecovery() throws Exception { final String nodeA = internalCluster().startNode(); + afterFirstStartNode(); createIndex( INDEX_NAME, Settings.builder() @@ -399,6 +407,7 @@ public void testReplicaRecovery() throws Exception { public void testCancelNewShardRecoveryAndUsesExistingShardCopy() throws Exception { logger.info("--> start node A"); final String nodeA = internalCluster().startNode(); + afterFirstStartNode(); logger.info("--> create index on node: {}", nodeA); createIndex( @@ -497,6 +506,7 @@ public Settings onNodeStopped(String nodeName) throws Exception { public void testRerouteRecovery() throws Exception { logger.info("--> start node A"); final String nodeA = internalCluster().startNode(); + afterFirstStartNode(); logger.info("--> create index on node: {}", nodeA); ByteSizeValue shardSize = createAndPopulateIndex(INDEX_NAME, 1, SHARD_COUNT, REPLICA_COUNT).getShards()[0].getStats() @@ -584,14 +594,14 @@ public void testRerouteRecovery() throws Exception { assertThat( "node A throttling should increase", recoveryStats.throttleTime().millis(), - greaterThan(finalNodeAThrottling) + getMatcherForThrottling(finalNodeAThrottling) ); } if (nodeStats.getNode().getName().equals(nodeB)) { assertThat( "node B throttling should increase", recoveryStats.throttleTime().millis(), - greaterThan(finalNodeBThrottling) + getMatcherForThrottling(finalNodeBThrottling) ); } } @@ -623,7 +633,7 @@ public void testRerouteRecovery() throws Exception { final RecoveryStats recoveryStats = nodeStats.getIndices().getRecoveryStats(); assertThat(recoveryStats.currentAsSource(), equalTo(0)); assertThat(recoveryStats.currentAsTarget(), equalTo(0)); - assertThat(nodeName + " throttling should be >0", recoveryStats.throttleTime().millis(), greaterThan(0L)); + assertThat(nodeName + " throttling should be >0", recoveryStats.throttleTime().millis(), getMatcherForThrottling(0)); }; // we have to use assertBusy as recovery counters are decremented only when the last reference to the RecoveryTarget // is decremented, which may happen after the recovery was done. @@ -722,9 +732,14 @@ public void testRerouteRecovery() throws Exception { validateIndexRecoveryState(nodeCRecoveryStates.get(0).getIndex()); } + protected Matcher getMatcherForThrottling(long value) { + return greaterThan(value); + } + public void testSnapshotRecovery() throws Exception { logger.info("--> start node A"); String nodeA = internalCluster().startNode(); + afterFirstStartNode(); logger.info("--> create repository"); assertAcked( @@ -824,7 +839,7 @@ private IndicesStatsResponse createAndPopulateIndex(String name, int nodeCount, ensureGreen(); logger.info("--> indexing sample data"); - final int numDocs = between(MIN_DOC_COUNT, MAX_DOC_COUNT); + final int numDocs = numDocs(); final IndexRequestBuilder[] docs = new IndexRequestBuilder[numDocs]; for (int i = 0; i < numDocs; i++) { @@ -846,6 +861,10 @@ private void validateIndexRecoveryState(ReplicationLuceneIndex indexState) { assertThat(indexState.recoveredBytesPercent(), lessThanOrEqualTo(100.0f)); } + protected int numDocs() { + return between(MIN_DOC_COUNT, MAX_DOC_COUNT); + } + public void testTransientErrorsDuringRecoveryAreRetried() throws Exception { final String indexName = "test"; final Settings nodeSettings = Settings.builder() @@ -855,6 +874,7 @@ public void testTransientErrorsDuringRecoveryAreRetried() throws Exception { .build(); // start a cluster-manager node internalCluster().startNode(nodeSettings); + afterFirstStartNode(); final String blueNodeName = internalCluster().startNode( Settings.builder().put("node.attr.color", "blue").put(nodeSettings).build() @@ -1057,6 +1077,7 @@ public void testDisconnectsWhileRecovering() throws Exception { .build(); // start a cluster-manager node internalCluster().startNode(nodeSettings); + afterFirstStartNode(); final String blueNodeName = internalCluster().startNode( Settings.builder().put("node.attr.color", "blue").put(nodeSettings).build() @@ -1214,6 +1235,7 @@ public void testDisconnectsDuringRecovery() throws Exception { TimeValue disconnectAfterDelay = TimeValue.timeValueMillis(randomIntBetween(0, 100)); // start a cluster-manager node String clusterManagerNodeName = internalCluster().startClusterManagerOnlyNode(nodeSettings); + afterFirstStartNode(); final String blueNodeName = internalCluster().startNode( Settings.builder().put("node.attr.color", "blue").put(nodeSettings).build() @@ -1359,6 +1381,7 @@ public void sendRequest( public void testHistoryRetention() throws Exception { internalCluster().startNodes(3); + afterFirstStartNode(); final String indexName = "test"; client().admin() @@ -1425,6 +1448,7 @@ public void testHistoryRetention() throws Exception { public void testDoNotInfinitelyWaitForMapping() { internalCluster().ensureAtLeastNumDataNodes(3); + afterFirstStartNode(); createIndex( "test", Settings.builder() @@ -1471,6 +1495,7 @@ public void testDoNotInfinitelyWaitForMapping() { public void testOngoingRecoveryAndClusterManagerFailOver() throws Exception { String indexName = "test"; internalCluster().startNodes(2); + afterFirstStartNode(); String nodeWithPrimary = internalCluster().startDataOnlyNode(); assertAcked( client().admin() @@ -1535,6 +1560,7 @@ public void testOngoingRecoveryAndClusterManagerFailOver() throws Exception { public void testRecoverLocallyUpToGlobalCheckpoint() throws Exception { internalCluster().ensureAtLeastNumDataNodes(2); + afterFirstStartNode(); List nodes = randomSubsetOf( 2, StreamSupport.stream(Spliterators.spliterator(clusterService().state().nodes().getDataNodes().values(), 0), false) @@ -1642,6 +1668,7 @@ public void testRecoverLocallyUpToGlobalCheckpoint() throws Exception { public void testUsesFileBasedRecoveryIfRetentionLeaseMissing() throws Exception { internalCluster().ensureAtLeastNumDataNodes(2); + afterFirstStartNode(); String indexName = "test-index"; createIndex( @@ -1713,6 +1740,7 @@ public Settings onNodeStopped(String nodeName) throws Exception { public void testUsesFileBasedRecoveryIfRetentionLeaseAheadOfGlobalCheckpoint() throws Exception { internalCluster().ensureAtLeastNumDataNodes(2); + afterFirstStartNode(); String indexName = "test-index"; createIndex( @@ -1798,6 +1826,7 @@ public Settings onNodeStopped(String nodeName) throws Exception { public void testUsesFileBasedRecoveryIfOperationsBasedRecoveryWouldBeUnreasonable() throws Exception { internalCluster().ensureAtLeastNumDataNodes(2); + afterFirstStartNode(); String indexName = "test-index"; final Settings.Builder settings = Settings.builder() @@ -1943,6 +1972,7 @@ public Settings onNodeStopped(String nodeName) throws Exception { public void testDoesNotCopyOperationsInSafeCommit() throws Exception { internalCluster().ensureAtLeastNumDataNodes(2); + afterFirstStartNode(); String indexName = "test-index"; createIndex( @@ -2027,6 +2057,7 @@ public TokenStream create(TokenStream tokenStream) { public void testRepeatedRecovery() throws Exception { internalCluster().ensureAtLeastNumDataNodes(2); + afterFirstStartNode(); // Ensures that you can remove a replica and then add it back again without any ill effects, even if it's allocated back to the // node that held it previously, in case that node hasn't completely cleared it up. @@ -2091,6 +2122,7 @@ public void testRepeatedRecovery() throws Exception { public void testAllocateEmptyPrimaryResetsGlobalCheckpoint() throws Exception { internalCluster().startClusterManagerOnlyNode(Settings.EMPTY); final List dataNodes = internalCluster().startDataOnlyNodes(2); + afterFirstStartNode(); final Settings randomNodeDataPathSettings = internalCluster().dataPathSettings(randomFrom(dataNodes)); final String indexName = "test"; assertAcked( @@ -2131,6 +2163,7 @@ public void testAllocateEmptyPrimaryResetsGlobalCheckpoint() throws Exception { public void testPeerRecoveryTrimsLocalTranslog() throws Exception { internalCluster().startNode(); + afterFirstStartNode(); List dataNodes = internalCluster().startDataOnlyNodes(2); String indexName = "test-index"; createIndex( @@ -2190,6 +2223,7 @@ public void testPeerRecoveryTrimsLocalTranslog() throws Exception { public void testCancelRecoveryWithAutoExpandReplicas() throws Exception { internalCluster().startClusterManagerOnlyNode(); + afterFirstStartNode(); assertAcked( client().admin() .indices() @@ -2210,6 +2244,7 @@ public void testCancelRecoveryWithAutoExpandReplicas() throws Exception { public void testReservesBytesDuringPeerRecoveryPhaseOne() throws Exception { internalCluster().startNode(); + afterFirstStartNode(); List dataNodes = internalCluster().startDataOnlyNodes(2); String indexName = "test-index"; createIndex( diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteIndexRecoveryIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteIndexRecoveryIT.java new file mode 100644 index 0000000000000..43d2ad6354f28 --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteIndexRecoveryIT.java @@ -0,0 +1,75 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.remotestore; + +import org.hamcrest.Matcher; +import org.hamcrest.Matchers; +import org.junit.After; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.FeatureFlags; +import org.opensearch.index.IndexModule; +import org.opensearch.index.IndexSettings; +import org.opensearch.indices.recovery.IndexRecoveryIT; +import org.opensearch.indices.replication.common.ReplicationType; +import org.opensearch.test.OpenSearchIntegTestCase; + +import java.nio.file.Path; + +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; + +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) +public class RemoteIndexRecoveryIT extends IndexRecoveryIT { + + protected static final String REPOSITORY_NAME = "test-remore-store-repo"; + + protected Path absolutePath; + + @Override + protected Settings featureFlagSettings() { + return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.REMOTE_STORE, "true").build(); + } + + @Override + protected void afterFirstStartNode() { + absolutePath = randomRepoPath().toAbsolutePath(); + assertAcked( + clusterAdmin().preparePutRepository(REPOSITORY_NAME).setType("fs").setSettings(Settings.builder().put("location", absolutePath)) + ); + } + + @Override + public Settings indexSettings() { + return Settings.builder() + .put(super.indexSettings()) + .put(IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING.getKey(), false) + .put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true) + .put(IndexMetadata.SETTING_REMOTE_STORE_REPOSITORY, REPOSITORY_NAME) + .put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_ENABLED, true) + .put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY, REPOSITORY_NAME) + .put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), "300s") + .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) + .build(); + } + + @After + public void teardown() { + assertAcked(clusterAdmin().prepareDeleteRepository(REPOSITORY_NAME)); + } + + @Override + protected Matcher getMatcherForThrottling(long value) { + return Matchers.greaterThanOrEqualTo(value); + } + + @Override + protected int numDocs() { + return randomIntBetween(100, 200); + } +} diff --git a/server/src/main/java/org/opensearch/index/engine/NoOpEngine.java b/server/src/main/java/org/opensearch/index/engine/NoOpEngine.java index 2b126e627bd3d..6ca1c3b46253e 100644 --- a/server/src/main/java/org/opensearch/index/engine/NoOpEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/NoOpEngine.java @@ -209,11 +209,12 @@ public void trimUnreferencedTranslogFiles() throws TranslogException { translog.trimUnreferencedReaders(); // refresh the translog stats translogStats = translog.stats(); - assert translog.currentFileGeneration() == translog.getMinFileGeneration() : "translog was not trimmed " - + " current gen " - + translog.currentFileGeneration() - + " != min gen " - + translog.getMinFileGeneration(); + assert engineConfig.getIndexSettings().isRemoteTranslogStoreEnabled() + || translog.currentFileGeneration() == translog.getMinFileGeneration() : "translog was not trimmed " + + " current gen " + + translog.currentFileGeneration() + + " != min gen " + + translog.getMinFileGeneration(); } } } catch (final Exception e) { diff --git a/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java index 49d8b64bc71cd..adb44c5b7594a 100644 --- a/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java @@ -1524,11 +1524,13 @@ public void assertSeqNos() throws Exception { } assertThat(replicaShardRouting + " seq_no_stats mismatch", seqNoStats, equalTo(primarySeqNoStats)); // the local knowledge on the primary of the global checkpoint equals the global checkpoint on the shard - assertThat( - replicaShardRouting + " global checkpoint syncs mismatch", - seqNoStats.getGlobalCheckpoint(), - equalTo(syncGlobalCheckpoints.get(replicaShardRouting.allocationId().getId())) - ); + if (primaryShard.isRemoteTranslogEnabled() == false) { + assertThat( + replicaShardRouting + " global checkpoint syncs mismatch", + seqNoStats.getGlobalCheckpoint(), + equalTo(syncGlobalCheckpoints.get(replicaShardRouting.allocationId().getId())) + ); + } } } } diff --git a/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java b/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java index fec45219ace81..cd89d6d70839d 100644 --- a/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java +++ b/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java @@ -2481,5 +2481,4 @@ protected String replicaNodeName(String indexName) { protected ClusterState getClusterState() { return client(internalCluster().getClusterManagerName()).admin().cluster().prepareState().get().getState(); } - } From c42dc89cc1ae5d44bfe917a361ac6f99a0d8fd3a Mon Sep 17 00:00:00 2001 From: Ashish Singh Date: Fri, 7 Jul 2023 12:23:35 +0530 Subject: [PATCH 2/7] Fix flaky test & address PR comments Signed-off-by: Ashish Singh --- .../indices/recovery/IndexRecoveryIT.java | 16 ++++++++++++++++ .../remotestore/RemoteIndexRecoveryIT.java | 10 ++++++++++ .../org/opensearch/index/engine/NoOpEngine.java | 3 +++ 3 files changed, 29 insertions(+) diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/recovery/IndexRecoveryIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/recovery/IndexRecoveryIT.java index 78416bfcb285d..ab232a2dd4bae 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/recovery/IndexRecoveryIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/recovery/IndexRecoveryIT.java @@ -693,6 +693,18 @@ public void testRerouteRecovery() throws Exception { internalCluster().stopRandomNode(InternalTestCluster.nameFilter(nodeA)); ensureStableCluster(2); + if (indexFurtherInRerouteRecoveryBeforeAssertOngoingRecovery()) { + logger.info("--> indexing sample data"); + final int numDocs = numDocs(); + final IndexRequestBuilder[] docs = new IndexRequestBuilder[numDocs]; + + for (int i = 0; i < numDocs; i++) { + docs[i] = client().prepareIndex(INDEX_NAME) + .setSource("foo-int", randomInt(), "foo-string", randomAlphaOfLength(32), "foo-float", randomFloat()); + } + indexRandom(true, docs); + } + response = client().admin().indices().prepareRecoveries(INDEX_NAME).execute().actionGet(); recoveryStates = response.shardRecoveryStates().get(INDEX_NAME); @@ -732,6 +744,10 @@ public void testRerouteRecovery() throws Exception { validateIndexRecoveryState(nodeCRecoveryStates.get(0).getIndex()); } + protected boolean indexFurtherInRerouteRecoveryBeforeAssertOngoingRecovery() { + return false; + } + protected Matcher getMatcherForThrottling(long value) { return greaterThan(value); } diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteIndexRecoveryIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteIndexRecoveryIT.java index 43d2ad6354f28..231156b4adca4 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteIndexRecoveryIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteIndexRecoveryIT.java @@ -72,4 +72,14 @@ protected Matcher getMatcherForThrottling(long value) { protected int numDocs() { return randomIntBetween(100, 200); } + + @Override + public void testRerouteRecovery() throws Exception { + super.testRerouteRecovery(); + } + + @Override + protected boolean indexFurtherInRerouteRecoveryBeforeAssertOngoingRecovery() { + return true; + } } diff --git a/server/src/main/java/org/opensearch/index/engine/NoOpEngine.java b/server/src/main/java/org/opensearch/index/engine/NoOpEngine.java index 6ca1c3b46253e..5c548df1cbb60 100644 --- a/server/src/main/java/org/opensearch/index/engine/NoOpEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/NoOpEngine.java @@ -209,6 +209,9 @@ public void trimUnreferencedTranslogFiles() throws TranslogException { translog.trimUnreferencedReaders(); // refresh the translog stats translogStats = translog.stats(); + // When remote translog is enabled, the min file generation is dependent on the (N-1) + // lastRefreshedCheckpoint SeqNo - refer RemoteStoreRefreshListener. This leads to older generations not + // being trimmed and leading to current generation being higher than the min file generation. assert engineConfig.getIndexSettings().isRemoteTranslogStoreEnabled() || translog.currentFileGeneration() == translog.getMinFileGeneration() : "translog was not trimmed " + " current gen " From 4b85af36a04153391910cc94b9915f7b9c8ca63a Mon Sep 17 00:00:00 2001 From: Ashish Singh Date: Sat, 8 Jul 2023 09:07:03 +0530 Subject: [PATCH 3/7] Enable segment replication experimental flag Signed-off-by: Ashish Singh --- .../org/opensearch/remotestore/RemoteIndexRecoveryIT.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteIndexRecoveryIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteIndexRecoveryIT.java index 231156b4adca4..1cf9dafecb0ef 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteIndexRecoveryIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteIndexRecoveryIT.java @@ -33,7 +33,11 @@ public class RemoteIndexRecoveryIT extends IndexRecoveryIT { @Override protected Settings featureFlagSettings() { - return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.REMOTE_STORE, "true").build(); + return Settings.builder() + .put(super.featureFlagSettings()) + .put(FeatureFlags.REMOTE_STORE, "true") + .put(FeatureFlags.SEGMENT_REPLICATION_EXPERIMENTAL, "true") + .build(); } @Override From 59fe10ac539b9baf612e43597cc91d442fdc0729 Mon Sep 17 00:00:00 2001 From: Ashish Singh Date: Sat, 8 Jul 2023 15:46:48 +0530 Subject: [PATCH 4/7] Fix flaky behavior Signed-off-by: Ashish Singh --- .../indices/recovery/IndexRecoveryIT.java | 16 ++-------------- .../remotestore/RemoteIndexRecoveryIT.java | 9 ++------- 2 files changed, 4 insertions(+), 21 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/recovery/IndexRecoveryIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/recovery/IndexRecoveryIT.java index ab232a2dd4bae..8b3f61f9e866d 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/recovery/IndexRecoveryIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/recovery/IndexRecoveryIT.java @@ -688,23 +688,11 @@ public void testRerouteRecovery() throws Exception { assertOnGoingRecoveryState(nodeCRecoveryStates.get(0), 0, PeerRecoverySource.INSTANCE, false, nodeB, nodeC); validateIndexRecoveryState(nodeCRecoveryStates.get(0).getIndex()); - if (randomBoolean()) { + if (randomBoolean() && shouldAssertOngoingRecoveryInRerouteRecovery()) { // shutdown node with relocation source of replica shard and check if recovery continues internalCluster().stopRandomNode(InternalTestCluster.nameFilter(nodeA)); ensureStableCluster(2); - if (indexFurtherInRerouteRecoveryBeforeAssertOngoingRecovery()) { - logger.info("--> indexing sample data"); - final int numDocs = numDocs(); - final IndexRequestBuilder[] docs = new IndexRequestBuilder[numDocs]; - - for (int i = 0; i < numDocs; i++) { - docs[i] = client().prepareIndex(INDEX_NAME) - .setSource("foo-int", randomInt(), "foo-string", randomAlphaOfLength(32), "foo-float", randomFloat()); - } - indexRandom(true, docs); - } - response = client().admin().indices().prepareRecoveries(INDEX_NAME).execute().actionGet(); recoveryStates = response.shardRecoveryStates().get(INDEX_NAME); @@ -744,7 +732,7 @@ public void testRerouteRecovery() throws Exception { validateIndexRecoveryState(nodeCRecoveryStates.get(0).getIndex()); } - protected boolean indexFurtherInRerouteRecoveryBeforeAssertOngoingRecovery() { + protected boolean shouldAssertOngoingRecoveryInRerouteRecovery() { return false; } diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteIndexRecoveryIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteIndexRecoveryIT.java index 1cf9dafecb0ef..63d8d0b23a6b1 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteIndexRecoveryIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteIndexRecoveryIT.java @@ -78,12 +78,7 @@ protected int numDocs() { } @Override - public void testRerouteRecovery() throws Exception { - super.testRerouteRecovery(); - } - - @Override - protected boolean indexFurtherInRerouteRecoveryBeforeAssertOngoingRecovery() { - return true; + protected boolean shouldAssertOngoingRecoveryInRerouteRecovery() { + return false; } } From e5c7177ae90ac1f2e6c89d049da0e0d3ff04464f Mon Sep 17 00:00:00 2001 From: Ashish Singh Date: Mon, 10 Jul 2023 13:24:03 +0530 Subject: [PATCH 5/7] Incorporate feedback Signed-off-by: Ashish Singh --- .../indices/recovery/IndexRecoveryIT.java | 43 +++++-------------- .../remotestore/RemoteIndexRecoveryIT.java | 6 ++- .../opensearch/test/InternalTestCluster.java | 4 ++ .../test/OpenSearchIntegTestCase.java | 1 + 4 files changed, 21 insertions(+), 33 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/recovery/IndexRecoveryIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/recovery/IndexRecoveryIT.java index 8b3f61f9e866d..72b9b32236371 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/recovery/IndexRecoveryIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/recovery/IndexRecoveryIT.java @@ -232,7 +232,7 @@ private void assertRecoveryState( int shardId, RecoverySource type, boolean primary, - RecoveryState.Stage stage, + Stage stage, String sourceNode, String targetNode ) { @@ -288,7 +288,6 @@ private void restoreRecoverySpeed() { public void testGatewayRecovery() throws Exception { logger.info("--> start nodes"); String node = internalCluster().startNode(); - afterFirstStartNode(); createAndPopulateIndex(INDEX_NAME, 1, SHARD_COUNT, REPLICA_COUNT); @@ -311,14 +310,9 @@ public void testGatewayRecovery() throws Exception { validateIndexRecoveryState(recoveryState.getIndex()); } - protected void afterFirstStartNode() { - // No-op - } - public void testGatewayRecoveryTestActiveOnly() throws Exception { logger.info("--> start nodes"); internalCluster().startNode(); - afterFirstStartNode(); createAndPopulateIndex(INDEX_NAME, 1, SHARD_COUNT, REPLICA_COUNT); @@ -335,7 +329,6 @@ public void testGatewayRecoveryTestActiveOnly() throws Exception { public void testReplicaRecovery() throws Exception { final String nodeA = internalCluster().startNode(); - afterFirstStartNode(); createIndex( INDEX_NAME, Settings.builder() @@ -407,7 +400,6 @@ public void testReplicaRecovery() throws Exception { public void testCancelNewShardRecoveryAndUsesExistingShardCopy() throws Exception { logger.info("--> start node A"); final String nodeA = internalCluster().startNode(); - afterFirstStartNode(); logger.info("--> create index on node: {}", nodeA); createIndex( @@ -506,7 +498,6 @@ public Settings onNodeStopped(String nodeName) throws Exception { public void testRerouteRecovery() throws Exception { logger.info("--> start node A"); final String nodeA = internalCluster().startNode(); - afterFirstStartNode(); logger.info("--> create index on node: {}", nodeA); ByteSizeValue shardSize = createAndPopulateIndex(INDEX_NAME, 1, SHARD_COUNT, REPLICA_COUNT).getShards()[0].getStats() @@ -587,8 +578,12 @@ public void testRerouteRecovery() throws Exception { .clear() .setIndices(new CommonStatsFlags(CommonStatsFlags.Flag.Recovery)) .get(); - assertThat(statsResponse1.getNodes(), hasSize(2)); - for (NodeStats nodeStats : statsResponse1.getNodes()) { + List dataNodeStats = statsResponse1.getNodes() + .stream() + .filter(nodeStats -> nodeStats.getNode().isDataNode()) + .collect(Collectors.toList()); + assertThat(dataNodeStats, hasSize(2)); + for (NodeStats nodeStats : dataNodeStats) { final RecoveryStats recoveryStats = nodeStats.getIndices().getRecoveryStats(); if (nodeStats.getNode().getName().equals(nodeA)) { assertThat( @@ -654,7 +649,8 @@ public void testRerouteRecovery() throws Exception { logger.info("--> start node C"); String nodeC = internalCluster().startNode(); - assertFalse(client().admin().cluster().prepareHealth().setWaitForNodes("3").get().isTimedOut()); + int nodeCount = internalCluster().getNodeNames().length; + assertFalse(client().admin().cluster().prepareHealth().setWaitForNodes(String.valueOf(nodeCount)).get().isTimedOut()); logger.info("--> slowing down recoveries"); slowDownRecovery(shardSize); @@ -743,7 +739,6 @@ protected Matcher getMatcherForThrottling(long value) { public void testSnapshotRecovery() throws Exception { logger.info("--> start node A"); String nodeA = internalCluster().startNode(); - afterFirstStartNode(); logger.info("--> create repository"); assertAcked( @@ -878,7 +873,6 @@ public void testTransientErrorsDuringRecoveryAreRetried() throws Exception { .build(); // start a cluster-manager node internalCluster().startNode(nodeSettings); - afterFirstStartNode(); final String blueNodeName = internalCluster().startNode( Settings.builder().put("node.attr.color", "blue").put(nodeSettings).build() @@ -1081,7 +1075,6 @@ public void testDisconnectsWhileRecovering() throws Exception { .build(); // start a cluster-manager node internalCluster().startNode(nodeSettings); - afterFirstStartNode(); final String blueNodeName = internalCluster().startNode( Settings.builder().put("node.attr.color", "blue").put(nodeSettings).build() @@ -1239,7 +1232,6 @@ public void testDisconnectsDuringRecovery() throws Exception { TimeValue disconnectAfterDelay = TimeValue.timeValueMillis(randomIntBetween(0, 100)); // start a cluster-manager node String clusterManagerNodeName = internalCluster().startClusterManagerOnlyNode(nodeSettings); - afterFirstStartNode(); final String blueNodeName = internalCluster().startNode( Settings.builder().put("node.attr.color", "blue").put(nodeSettings).build() @@ -1385,7 +1377,6 @@ public void sendRequest( public void testHistoryRetention() throws Exception { internalCluster().startNodes(3); - afterFirstStartNode(); final String indexName = "test"; client().admin() @@ -1411,10 +1402,10 @@ public void testHistoryRetention() throws Exception { flush(indexName); } - String firstNodeToStop = randomFrom(internalCluster().getNodeNames()); + String firstNodeToStop = randomFrom(internalCluster().getDataNodeNames()); Settings firstNodeToStopDataPathSettings = internalCluster().dataPathSettings(firstNodeToStop); internalCluster().stopRandomNode(InternalTestCluster.nameFilter(firstNodeToStop)); - String secondNodeToStop = randomFrom(internalCluster().getNodeNames()); + String secondNodeToStop = randomFrom(internalCluster().getDataNodeNames()); Settings secondNodeToStopDataPathSettings = internalCluster().dataPathSettings(secondNodeToStop); internalCluster().stopRandomNode(InternalTestCluster.nameFilter(secondNodeToStop)); @@ -1452,7 +1443,6 @@ public void testHistoryRetention() throws Exception { public void testDoNotInfinitelyWaitForMapping() { internalCluster().ensureAtLeastNumDataNodes(3); - afterFirstStartNode(); createIndex( "test", Settings.builder() @@ -1499,7 +1489,6 @@ public void testDoNotInfinitelyWaitForMapping() { public void testOngoingRecoveryAndClusterManagerFailOver() throws Exception { String indexName = "test"; internalCluster().startNodes(2); - afterFirstStartNode(); String nodeWithPrimary = internalCluster().startDataOnlyNode(); assertAcked( client().admin() @@ -1564,7 +1553,6 @@ public void testOngoingRecoveryAndClusterManagerFailOver() throws Exception { public void testRecoverLocallyUpToGlobalCheckpoint() throws Exception { internalCluster().ensureAtLeastNumDataNodes(2); - afterFirstStartNode(); List nodes = randomSubsetOf( 2, StreamSupport.stream(Spliterators.spliterator(clusterService().state().nodes().getDataNodes().values(), 0), false) @@ -1672,7 +1660,6 @@ public void testRecoverLocallyUpToGlobalCheckpoint() throws Exception { public void testUsesFileBasedRecoveryIfRetentionLeaseMissing() throws Exception { internalCluster().ensureAtLeastNumDataNodes(2); - afterFirstStartNode(); String indexName = "test-index"; createIndex( @@ -1744,7 +1731,6 @@ public Settings onNodeStopped(String nodeName) throws Exception { public void testUsesFileBasedRecoveryIfRetentionLeaseAheadOfGlobalCheckpoint() throws Exception { internalCluster().ensureAtLeastNumDataNodes(2); - afterFirstStartNode(); String indexName = "test-index"; createIndex( @@ -1830,7 +1816,6 @@ public Settings onNodeStopped(String nodeName) throws Exception { public void testUsesFileBasedRecoveryIfOperationsBasedRecoveryWouldBeUnreasonable() throws Exception { internalCluster().ensureAtLeastNumDataNodes(2); - afterFirstStartNode(); String indexName = "test-index"; final Settings.Builder settings = Settings.builder() @@ -1976,7 +1961,6 @@ public Settings onNodeStopped(String nodeName) throws Exception { public void testDoesNotCopyOperationsInSafeCommit() throws Exception { internalCluster().ensureAtLeastNumDataNodes(2); - afterFirstStartNode(); String indexName = "test-index"; createIndex( @@ -2061,7 +2045,6 @@ public TokenStream create(TokenStream tokenStream) { public void testRepeatedRecovery() throws Exception { internalCluster().ensureAtLeastNumDataNodes(2); - afterFirstStartNode(); // Ensures that you can remove a replica and then add it back again without any ill effects, even if it's allocated back to the // node that held it previously, in case that node hasn't completely cleared it up. @@ -2126,7 +2109,6 @@ public void testRepeatedRecovery() throws Exception { public void testAllocateEmptyPrimaryResetsGlobalCheckpoint() throws Exception { internalCluster().startClusterManagerOnlyNode(Settings.EMPTY); final List dataNodes = internalCluster().startDataOnlyNodes(2); - afterFirstStartNode(); final Settings randomNodeDataPathSettings = internalCluster().dataPathSettings(randomFrom(dataNodes)); final String indexName = "test"; assertAcked( @@ -2167,7 +2149,6 @@ public void testAllocateEmptyPrimaryResetsGlobalCheckpoint() throws Exception { public void testPeerRecoveryTrimsLocalTranslog() throws Exception { internalCluster().startNode(); - afterFirstStartNode(); List dataNodes = internalCluster().startDataOnlyNodes(2); String indexName = "test-index"; createIndex( @@ -2227,7 +2208,6 @@ public void testPeerRecoveryTrimsLocalTranslog() throws Exception { public void testCancelRecoveryWithAutoExpandReplicas() throws Exception { internalCluster().startClusterManagerOnlyNode(); - afterFirstStartNode(); assertAcked( client().admin() .indices() @@ -2248,7 +2228,6 @@ public void testCancelRecoveryWithAutoExpandReplicas() throws Exception { public void testReservesBytesDuringPeerRecoveryPhaseOne() throws Exception { internalCluster().startNode(); - afterFirstStartNode(); List dataNodes = internalCluster().startDataOnlyNodes(2); String indexName = "test-index"; createIndex( diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteIndexRecoveryIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteIndexRecoveryIT.java index 63d8d0b23a6b1..11c9993ac7874 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteIndexRecoveryIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteIndexRecoveryIT.java @@ -11,6 +11,7 @@ import org.hamcrest.Matcher; import org.hamcrest.Matchers; import org.junit.After; +import org.junit.Before; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.common.settings.Settings; import org.opensearch.common.util.FeatureFlags; @@ -40,8 +41,11 @@ protected Settings featureFlagSettings() { .build(); } + @Before @Override - protected void afterFirstStartNode() { + public void setUp() throws Exception { + super.setUp(); + internalCluster().startClusterManagerOnlyNode(); absolutePath = randomRepoPath().toAbsolutePath(); assertAcked( clusterAdmin().preparePutRepository(REPOSITORY_NAME).setType("fs").setSettings(Settings.builder().put("location", absolutePath)) diff --git a/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java index adb44c5b7594a..3f7bb71b27681 100644 --- a/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java @@ -2157,6 +2157,10 @@ synchronized Set allDataNodesButN(int count) { return set; } + public Set getDataNodeNames() { + return allDataNodesButN(0); + } + /** * Returns a set of nodes that have at least one shard of the given index. */ diff --git a/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java b/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java index cd89d6d70839d..fec45219ace81 100644 --- a/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java +++ b/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java @@ -2481,4 +2481,5 @@ protected String replicaNodeName(String indexName) { protected ClusterState getClusterState() { return client(internalCluster().getClusterManagerName()).admin().cluster().prepareState().get().getState(); } + } From 24827ef27f059b7334b0f730082c87e290c49215 Mon Sep 17 00:00:00 2001 From: Ashish Singh Date: Mon, 10 Jul 2023 13:35:09 +0530 Subject: [PATCH 6/7] Empty-Commit Signed-off-by: Ashish Singh From d1cd82336feb0bdde9e73a0d4fca3e9d01ca21a2 Mon Sep 17 00:00:00 2001 From: Ashish Singh Date: Mon, 10 Jul 2023 16:34:13 +0530 Subject: [PATCH 7/7] Empty-Commit Signed-off-by: Ashish Singh