Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extend existing IndexRecoveryIT for remote indexes #8505

Merged
merged 7 commits into from
Jul 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

import org.apache.lucene.analysis.TokenStream;
import org.apache.lucene.index.IndexCommit;
import org.hamcrest.Matcher;
import org.opensearch.OpenSearchException;
import org.opensearch.Version;
import org.opensearch.action.admin.cluster.health.ClusterHealthResponse;
Expand Down Expand Up @@ -101,8 +102,8 @@
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.NodeIndicesStats;
import org.opensearch.indices.analysis.AnalysisModule;
import org.opensearch.indices.replication.common.ReplicationLuceneIndex;
import org.opensearch.indices.recovery.RecoveryState.Stage;
import org.opensearch.indices.replication.common.ReplicationLuceneIndex;
import org.opensearch.node.NodeClosedException;
import org.opensearch.node.RecoverySettingsChunkSizePlugin;
import org.opensearch.plugins.AnalysisPlugin;
Expand Down Expand Up @@ -577,21 +578,25 @@ public void testRerouteRecovery() throws Exception {
.clear()
.setIndices(new CommonStatsFlags(CommonStatsFlags.Flag.Recovery))
.get();
assertThat(statsResponse1.getNodes(), hasSize(2));
for (NodeStats nodeStats : statsResponse1.getNodes()) {
List<NodeStats> dataNodeStats = statsResponse1.getNodes()
.stream()
.filter(nodeStats -> nodeStats.getNode().isDataNode())
.collect(Collectors.toList());
assertThat(dataNodeStats, hasSize(2));
for (NodeStats nodeStats : dataNodeStats) {
final RecoveryStats recoveryStats = nodeStats.getIndices().getRecoveryStats();
if (nodeStats.getNode().getName().equals(nodeA)) {
assertThat(
"node A throttling should increase",
recoveryStats.throttleTime().millis(),
greaterThan(finalNodeAThrottling)
getMatcherForThrottling(finalNodeAThrottling)
);
}
if (nodeStats.getNode().getName().equals(nodeB)) {
assertThat(
"node B throttling should increase",
recoveryStats.throttleTime().millis(),
greaterThan(finalNodeBThrottling)
getMatcherForThrottling(finalNodeBThrottling)
);
}
}
Expand Down Expand Up @@ -623,7 +628,7 @@ public void testRerouteRecovery() throws Exception {
final RecoveryStats recoveryStats = nodeStats.getIndices().getRecoveryStats();
assertThat(recoveryStats.currentAsSource(), equalTo(0));
assertThat(recoveryStats.currentAsTarget(), equalTo(0));
assertThat(nodeName + " throttling should be >0", recoveryStats.throttleTime().millis(), greaterThan(0L));
assertThat(nodeName + " throttling should be >0", recoveryStats.throttleTime().millis(), getMatcherForThrottling(0));
};
// we have to use assertBusy as recovery counters are decremented only when the last reference to the RecoveryTarget
// is decremented, which may happen after the recovery was done.
Expand All @@ -644,7 +649,8 @@ public void testRerouteRecovery() throws Exception {

logger.info("--> start node C");
String nodeC = internalCluster().startNode();
assertFalse(client().admin().cluster().prepareHealth().setWaitForNodes("3").get().isTimedOut());
int nodeCount = internalCluster().getNodeNames().length;
assertFalse(client().admin().cluster().prepareHealth().setWaitForNodes(String.valueOf(nodeCount)).get().isTimedOut());

logger.info("--> slowing down recoveries");
slowDownRecovery(shardSize);
Expand Down Expand Up @@ -678,7 +684,7 @@ public void testRerouteRecovery() throws Exception {
assertOnGoingRecoveryState(nodeCRecoveryStates.get(0), 0, PeerRecoverySource.INSTANCE, false, nodeB, nodeC);
validateIndexRecoveryState(nodeCRecoveryStates.get(0).getIndex());

if (randomBoolean()) {
if (randomBoolean() && shouldAssertOngoingRecoveryInRerouteRecovery()) {
// shutdown node with relocation source of replica shard and check if recovery continues
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(nodeA));
ensureStableCluster(2);
Expand Down Expand Up @@ -722,6 +728,14 @@ public void testRerouteRecovery() throws Exception {
validateIndexRecoveryState(nodeCRecoveryStates.get(0).getIndex());
}

protected boolean shouldAssertOngoingRecoveryInRerouteRecovery() {
return false;
}

protected Matcher<Long> getMatcherForThrottling(long value) {
return greaterThan(value);
}

public void testSnapshotRecovery() throws Exception {
logger.info("--> start node A");
String nodeA = internalCluster().startNode();
Expand Down Expand Up @@ -824,7 +838,7 @@ private IndicesStatsResponse createAndPopulateIndex(String name, int nodeCount,
ensureGreen();

logger.info("--> indexing sample data");
final int numDocs = between(MIN_DOC_COUNT, MAX_DOC_COUNT);
final int numDocs = numDocs();
final IndexRequestBuilder[] docs = new IndexRequestBuilder[numDocs];

for (int i = 0; i < numDocs; i++) {
Expand All @@ -846,6 +860,10 @@ private void validateIndexRecoveryState(ReplicationLuceneIndex indexState) {
assertThat(indexState.recoveredBytesPercent(), lessThanOrEqualTo(100.0f));
}

protected int numDocs() {
return between(MIN_DOC_COUNT, MAX_DOC_COUNT);
}

public void testTransientErrorsDuringRecoveryAreRetried() throws Exception {
final String indexName = "test";
final Settings nodeSettings = Settings.builder()
Expand Down Expand Up @@ -1384,10 +1402,10 @@ public void testHistoryRetention() throws Exception {
flush(indexName);
}

String firstNodeToStop = randomFrom(internalCluster().getNodeNames());
String firstNodeToStop = randomFrom(internalCluster().getDataNodeNames());
Settings firstNodeToStopDataPathSettings = internalCluster().dataPathSettings(firstNodeToStop);
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(firstNodeToStop));
String secondNodeToStop = randomFrom(internalCluster().getNodeNames());
String secondNodeToStop = randomFrom(internalCluster().getDataNodeNames());
Settings secondNodeToStopDataPathSettings = internalCluster().dataPathSettings(secondNodeToStop);
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(secondNodeToStop));

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.remotestore;

import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Before;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.index.IndexModule;
import org.opensearch.index.IndexSettings;
import org.opensearch.indices.recovery.IndexRecoveryIT;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.nio.file.Path;

import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class RemoteIndexRecoveryIT extends IndexRecoveryIT {

protected static final String REPOSITORY_NAME = "test-remore-store-repo";

protected Path absolutePath;

@Override
protected Settings featureFlagSettings() {
return Settings.builder()
.put(super.featureFlagSettings())
.put(FeatureFlags.REMOTE_STORE, "true")
.put(FeatureFlags.SEGMENT_REPLICATION_EXPERIMENTAL, "true")
.build();
}

@Before
@Override
public void setUp() throws Exception {
super.setUp();
internalCluster().startClusterManagerOnlyNode();
absolutePath = randomRepoPath().toAbsolutePath();
assertAcked(
clusterAdmin().preparePutRepository(REPOSITORY_NAME).setType("fs").setSettings(Settings.builder().put("location", absolutePath))
);
}

@Override
public Settings indexSettings() {
return Settings.builder()
.put(super.indexSettings())
.put(IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING.getKey(), false)
.put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true)
.put(IndexMetadata.SETTING_REMOTE_STORE_REPOSITORY, REPOSITORY_NAME)
.put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_ENABLED, true)
.put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY, REPOSITORY_NAME)
.put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), "300s")
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.build();
}

@After
public void teardown() {
assertAcked(clusterAdmin().prepareDeleteRepository(REPOSITORY_NAME));
}

@Override
protected Matcher<Long> getMatcherForThrottling(long value) {
return Matchers.greaterThanOrEqualTo(value);
}

@Override
protected int numDocs() {
return randomIntBetween(100, 200);
}

@Override
protected boolean shouldAssertOngoingRecoveryInRerouteRecovery() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -209,11 +209,15 @@ public void trimUnreferencedTranslogFiles() throws TranslogException {
translog.trimUnreferencedReaders();
// refresh the translog stats
translogStats = translog.stats();
assert translog.currentFileGeneration() == translog.getMinFileGeneration() : "translog was not trimmed "
+ " current gen "
+ translog.currentFileGeneration()
+ " != min gen "
+ translog.getMinFileGeneration();
// When remote translog is enabled, the min file generation is dependent on the (N-1)
// lastRefreshedCheckpoint SeqNo - refer RemoteStoreRefreshListener. This leads to older generations not
// being trimmed and leading to current generation being higher than the min file generation.
assert engineConfig.getIndexSettings().isRemoteTranslogStoreEnabled()
ashking94 marked this conversation as resolved.
Show resolved Hide resolved
|| translog.currentFileGeneration() == translog.getMinFileGeneration() : "translog was not trimmed "
+ " current gen "
+ translog.currentFileGeneration()
+ " != min gen "
+ translog.getMinFileGeneration();
}
}
} catch (final Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1524,11 +1524,13 @@ public void assertSeqNos() throws Exception {
}
assertThat(replicaShardRouting + " seq_no_stats mismatch", seqNoStats, equalTo(primarySeqNoStats));
// the local knowledge on the primary of the global checkpoint equals the global checkpoint on the shard
assertThat(
replicaShardRouting + " global checkpoint syncs mismatch",
seqNoStats.getGlobalCheckpoint(),
equalTo(syncGlobalCheckpoints.get(replicaShardRouting.allocationId().getId()))
);
if (primaryShard.isRemoteTranslogEnabled() == false) {
assertThat(
replicaShardRouting + " global checkpoint syncs mismatch",
seqNoStats.getGlobalCheckpoint(),
equalTo(syncGlobalCheckpoints.get(replicaShardRouting.allocationId().getId()))
);
}
}
}
}
Expand Down Expand Up @@ -2155,6 +2157,10 @@ synchronized Set<String> allDataNodesButN(int count) {
return set;
}

public Set<String> getDataNodeNames() {
return allDataNodesButN(0);
}

/**
* Returns a set of nodes that have at least one shard of the given index.
*/
Expand Down