Skip to content

Commit

Permalink
Extend existing IndexRecoveryIT for remote indexes (opensearch-projec…
Browse files Browse the repository at this point in the history
…t#8505)

Signed-off-by: Ashish Singh <[email protected]>
Signed-off-by: Shivansh Arora <[email protected]>
  • Loading branch information
ashking94 authored and shiv0408 committed Apr 25, 2024
1 parent 9e3712f commit 1905aa0
Show file tree
Hide file tree
Showing 4 changed files with 137 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

import org.apache.lucene.analysis.TokenStream;
import org.apache.lucene.index.IndexCommit;
import org.hamcrest.Matcher;
import org.opensearch.OpenSearchException;
import org.opensearch.Version;
import org.opensearch.action.admin.cluster.health.ClusterHealthResponse;
Expand Down Expand Up @@ -101,8 +102,8 @@
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.NodeIndicesStats;
import org.opensearch.indices.analysis.AnalysisModule;
import org.opensearch.indices.replication.common.ReplicationLuceneIndex;
import org.opensearch.indices.recovery.RecoveryState.Stage;
import org.opensearch.indices.replication.common.ReplicationLuceneIndex;
import org.opensearch.node.NodeClosedException;
import org.opensearch.node.RecoverySettingsChunkSizePlugin;
import org.opensearch.plugins.AnalysisPlugin;
Expand Down Expand Up @@ -577,21 +578,25 @@ public void testRerouteRecovery() throws Exception {
.clear()
.setIndices(new CommonStatsFlags(CommonStatsFlags.Flag.Recovery))
.get();
assertThat(statsResponse1.getNodes(), hasSize(2));
for (NodeStats nodeStats : statsResponse1.getNodes()) {
List<NodeStats> dataNodeStats = statsResponse1.getNodes()
.stream()
.filter(nodeStats -> nodeStats.getNode().isDataNode())
.collect(Collectors.toList());
assertThat(dataNodeStats, hasSize(2));
for (NodeStats nodeStats : dataNodeStats) {
final RecoveryStats recoveryStats = nodeStats.getIndices().getRecoveryStats();
if (nodeStats.getNode().getName().equals(nodeA)) {
assertThat(
"node A throttling should increase",
recoveryStats.throttleTime().millis(),
greaterThan(finalNodeAThrottling)
getMatcherForThrottling(finalNodeAThrottling)
);
}
if (nodeStats.getNode().getName().equals(nodeB)) {
assertThat(
"node B throttling should increase",
recoveryStats.throttleTime().millis(),
greaterThan(finalNodeBThrottling)
getMatcherForThrottling(finalNodeBThrottling)
);
}
}
Expand Down Expand Up @@ -623,7 +628,7 @@ public void testRerouteRecovery() throws Exception {
final RecoveryStats recoveryStats = nodeStats.getIndices().getRecoveryStats();
assertThat(recoveryStats.currentAsSource(), equalTo(0));
assertThat(recoveryStats.currentAsTarget(), equalTo(0));
assertThat(nodeName + " throttling should be >0", recoveryStats.throttleTime().millis(), greaterThan(0L));
assertThat(nodeName + " throttling should be >0", recoveryStats.throttleTime().millis(), getMatcherForThrottling(0));
};
// we have to use assertBusy as recovery counters are decremented only when the last reference to the RecoveryTarget
// is decremented, which may happen after the recovery was done.
Expand All @@ -644,7 +649,8 @@ public void testRerouteRecovery() throws Exception {

logger.info("--> start node C");
String nodeC = internalCluster().startNode();
assertFalse(client().admin().cluster().prepareHealth().setWaitForNodes("3").get().isTimedOut());
int nodeCount = internalCluster().getNodeNames().length;
assertFalse(client().admin().cluster().prepareHealth().setWaitForNodes(String.valueOf(nodeCount)).get().isTimedOut());

logger.info("--> slowing down recoveries");
slowDownRecovery(shardSize);
Expand Down Expand Up @@ -678,7 +684,7 @@ public void testRerouteRecovery() throws Exception {
assertOnGoingRecoveryState(nodeCRecoveryStates.get(0), 0, PeerRecoverySource.INSTANCE, false, nodeB, nodeC);
validateIndexRecoveryState(nodeCRecoveryStates.get(0).getIndex());

if (randomBoolean()) {
if (randomBoolean() && shouldAssertOngoingRecoveryInRerouteRecovery()) {
// shutdown node with relocation source of replica shard and check if recovery continues
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(nodeA));
ensureStableCluster(2);
Expand Down Expand Up @@ -722,6 +728,14 @@ public void testRerouteRecovery() throws Exception {
validateIndexRecoveryState(nodeCRecoveryStates.get(0).getIndex());
}

protected boolean shouldAssertOngoingRecoveryInRerouteRecovery() {
return false;
}

protected Matcher<Long> getMatcherForThrottling(long value) {
return greaterThan(value);
}

public void testSnapshotRecovery() throws Exception {
logger.info("--> start node A");
String nodeA = internalCluster().startNode();
Expand Down Expand Up @@ -824,7 +838,7 @@ private IndicesStatsResponse createAndPopulateIndex(String name, int nodeCount,
ensureGreen();

logger.info("--> indexing sample data");
final int numDocs = between(MIN_DOC_COUNT, MAX_DOC_COUNT);
final int numDocs = numDocs();
final IndexRequestBuilder[] docs = new IndexRequestBuilder[numDocs];

for (int i = 0; i < numDocs; i++) {
Expand All @@ -846,6 +860,10 @@ private void validateIndexRecoveryState(ReplicationLuceneIndex indexState) {
assertThat(indexState.recoveredBytesPercent(), lessThanOrEqualTo(100.0f));
}

protected int numDocs() {
return between(MIN_DOC_COUNT, MAX_DOC_COUNT);
}

public void testTransientErrorsDuringRecoveryAreRetried() throws Exception {
final String indexName = "test";
final Settings nodeSettings = Settings.builder()
Expand Down Expand Up @@ -1384,10 +1402,10 @@ public void testHistoryRetention() throws Exception {
flush(indexName);
}

String firstNodeToStop = randomFrom(internalCluster().getNodeNames());
String firstNodeToStop = randomFrom(internalCluster().getDataNodeNames());
Settings firstNodeToStopDataPathSettings = internalCluster().dataPathSettings(firstNodeToStop);
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(firstNodeToStop));
String secondNodeToStop = randomFrom(internalCluster().getNodeNames());
String secondNodeToStop = randomFrom(internalCluster().getDataNodeNames());
Settings secondNodeToStopDataPathSettings = internalCluster().dataPathSettings(secondNodeToStop);
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(secondNodeToStop));

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.remotestore;

import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Before;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.index.IndexModule;
import org.opensearch.index.IndexSettings;
import org.opensearch.indices.recovery.IndexRecoveryIT;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.nio.file.Path;

import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class RemoteIndexRecoveryIT extends IndexRecoveryIT {

protected static final String REPOSITORY_NAME = "test-remore-store-repo";

protected Path absolutePath;

@Override
protected Settings featureFlagSettings() {
return Settings.builder()
.put(super.featureFlagSettings())
.put(FeatureFlags.REMOTE_STORE, "true")
.put(FeatureFlags.SEGMENT_REPLICATION_EXPERIMENTAL, "true")
.build();
}

@Before
@Override
public void setUp() throws Exception {
super.setUp();
internalCluster().startClusterManagerOnlyNode();
absolutePath = randomRepoPath().toAbsolutePath();
assertAcked(
clusterAdmin().preparePutRepository(REPOSITORY_NAME).setType("fs").setSettings(Settings.builder().put("location", absolutePath))
);
}

@Override
public Settings indexSettings() {
return Settings.builder()
.put(super.indexSettings())
.put(IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING.getKey(), false)
.put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true)
.put(IndexMetadata.SETTING_REMOTE_STORE_REPOSITORY, REPOSITORY_NAME)
.put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_ENABLED, true)
.put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY, REPOSITORY_NAME)
.put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), "300s")
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.build();
}

@After
public void teardown() {
assertAcked(clusterAdmin().prepareDeleteRepository(REPOSITORY_NAME));
}

@Override
protected Matcher<Long> getMatcherForThrottling(long value) {
return Matchers.greaterThanOrEqualTo(value);
}

@Override
protected int numDocs() {
return randomIntBetween(100, 200);
}

@Override
protected boolean shouldAssertOngoingRecoveryInRerouteRecovery() {
return false;
}
}
14 changes: 9 additions & 5 deletions server/src/main/java/org/opensearch/index/engine/NoOpEngine.java
Original file line number Diff line number Diff line change
Expand Up @@ -209,11 +209,15 @@ public void trimUnreferencedTranslogFiles() throws TranslogException {
translog.trimUnreferencedReaders();
// refresh the translog stats
translogStats = translog.stats();
assert translog.currentFileGeneration() == translog.getMinFileGeneration() : "translog was not trimmed "
+ " current gen "
+ translog.currentFileGeneration()
+ " != min gen "
+ translog.getMinFileGeneration();
// When remote translog is enabled, the min file generation is dependent on the (N-1)
// lastRefreshedCheckpoint SeqNo - refer RemoteStoreRefreshListener. This leads to older generations not
// being trimmed and leading to current generation being higher than the min file generation.
assert engineConfig.getIndexSettings().isRemoteTranslogStoreEnabled()
|| translog.currentFileGeneration() == translog.getMinFileGeneration() : "translog was not trimmed "
+ " current gen "
+ translog.currentFileGeneration()
+ " != min gen "
+ translog.getMinFileGeneration();
}
}
} catch (final Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1524,11 +1524,13 @@ public void assertSeqNos() throws Exception {
}
assertThat(replicaShardRouting + " seq_no_stats mismatch", seqNoStats, equalTo(primarySeqNoStats));
// the local knowledge on the primary of the global checkpoint equals the global checkpoint on the shard
assertThat(
replicaShardRouting + " global checkpoint syncs mismatch",
seqNoStats.getGlobalCheckpoint(),
equalTo(syncGlobalCheckpoints.get(replicaShardRouting.allocationId().getId()))
);
if (primaryShard.isRemoteTranslogEnabled() == false) {
assertThat(
replicaShardRouting + " global checkpoint syncs mismatch",
seqNoStats.getGlobalCheckpoint(),
equalTo(syncGlobalCheckpoints.get(replicaShardRouting.allocationId().getId()))
);
}
}
}
}
Expand Down Expand Up @@ -2155,6 +2157,10 @@ synchronized Set<String> allDataNodesButN(int count) {
return set;
}

public Set<String> getDataNodeNames() {
return allDataNodesButN(0);
}

/**
* Returns a set of nodes that have at least one shard of the given index.
*/
Expand Down

0 comments on commit 1905aa0

Please sign in to comment.