Skip to content

Commit

Permalink
Add Integration Tests
Browse files Browse the repository at this point in the history
Signed-off-by: Ashish Singh <[email protected]>
  • Loading branch information
ashking94 committed Oct 22, 2023
1 parent 0f3aba6 commit 83519fd
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import org.opensearch.action.admin.cluster.remotestore.stats.RemoteStoreStatsResponse;
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.coordination.FollowersChecker;
import org.opensearch.cluster.coordination.LeaderChecker;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.ShardRoutingState;
Expand All @@ -23,15 +25,20 @@
import org.opensearch.index.IndexSettings;
import org.opensearch.index.remote.RemoteSegmentTransferTracker;
import org.opensearch.index.remote.RemoteTranslogTransferTracker;
import org.opensearch.plugins.Plugin;
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.junit.Before;
import org.opensearch.test.disruption.NetworkDisruption;
import org.opensearch.test.transport.MockTransportService;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand All @@ -44,12 +51,17 @@ public class RemoteStoreStatsIT extends RemoteStoreBaseIntegTestCase {

private static final String INDEX_NAME = "remote-store-test-idx-1";

@Before
@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(MockTransportService.TestPlugin.class);
}

public void setup() {
internalCluster().startNodes(3);
}

public void testStatsResponseFromAllNodes() {
setup();

// Step 1 - We create cluster, create an index, and then index documents into. We also do multiple refreshes/flushes
// during this time frame. This ensures that the segment upload has started.
Expand Down Expand Up @@ -118,6 +130,7 @@ public void testStatsResponseFromAllNodes() {
}

public void testStatsResponseAllShards() {
setup();

// Step 1 - We create cluster, create an index, and then index documents into. We also do multiple refreshes/flushes
// during this time frame. This ensures that the segment upload has started.
Expand Down Expand Up @@ -175,6 +188,7 @@ public void testStatsResponseAllShards() {
}

public void testStatsResponseFromLocalNode() {
setup();

// Step 1 - We create cluster, create an index, and then index documents into. We also do multiple refreshes/flushes
// during this time frame. This ensures that the segment upload has started.
Expand Down Expand Up @@ -236,6 +250,7 @@ public void testStatsResponseFromLocalNode() {
}

public void testDownloadStatsCorrectnessSinglePrimarySingleReplica() throws Exception {
setup();
// Scenario:
// - Create index with single primary and single replica shard
// - Disable Refresh Interval for the index
Expand Down Expand Up @@ -325,6 +340,7 @@ public void testDownloadStatsCorrectnessSinglePrimarySingleReplica() throws Exce
}

public void testDownloadStatsCorrectnessSinglePrimaryMultipleReplicaShards() throws Exception {
setup();
// Scenario:
// - Create index with single primary and N-1 replica shards (N = no of data nodes)
// - Disable Refresh Interval for the index
Expand Down Expand Up @@ -416,6 +432,7 @@ public void testDownloadStatsCorrectnessSinglePrimaryMultipleReplicaShards() thr
}

public void testStatsOnShardRelocation() {
setup();
// Scenario:
// - Create index with single primary and single replica shard
// - Index documents
Expand Down Expand Up @@ -471,6 +488,7 @@ public void testStatsOnShardRelocation() {
}

public void testStatsOnShardUnassigned() throws IOException {
setup();
// Scenario:
// - Create index with single primary and two replica shard
// - Index documents
Expand All @@ -497,6 +515,7 @@ public void testStatsOnShardUnassigned() throws IOException {
}

public void testStatsOnRemoteStoreRestore() throws IOException {
setup();
// Creating an index with primary shard count == total nodes in cluster and 0 replicas
int dataNodeCount = client().admin().cluster().prepareHealth().get().getNumberOfDataNodes();
createIndex(INDEX_NAME, remoteStoreIndexSettings(0, dataNodeCount));
Expand Down Expand Up @@ -544,6 +563,7 @@ public void testStatsOnRemoteStoreRestore() throws IOException {
}

public void testNonZeroPrimaryStatsOnNewlyCreatedIndexWithZeroDocs() throws Exception {
setup();
// Create an index with one primary and one replica shard
createIndex(INDEX_NAME, remoteStoreIndexSettings(1, 1));
ensureGreen(INDEX_NAME);
Expand Down Expand Up @@ -581,6 +601,60 @@ public void testNonZeroPrimaryStatsOnNewlyCreatedIndexWithZeroDocs() throws Exce
}, 5, TimeUnit.SECONDS);
}

public void testStatsCorrectnessOnFailover() {
Settings clusterSettings = Settings.builder()
.put(LeaderChecker.LEADER_CHECK_TIMEOUT_SETTING.getKey(), "100ms")
.put(LeaderChecker.LEADER_CHECK_INTERVAL_SETTING.getKey(), "500ms")
.put(LeaderChecker.LEADER_CHECK_RETRY_COUNT_SETTING.getKey(), 1)
.put(FollowersChecker.FOLLOWER_CHECK_TIMEOUT_SETTING.getKey(), "100ms")
.put(FollowersChecker.FOLLOWER_CHECK_INTERVAL_SETTING.getKey(), "500ms")
.put(FollowersChecker.FOLLOWER_CHECK_RETRY_COUNT_SETTING.getKey(), 1)
.put(nodeSettings(0))
.build();
String clusterManagerNode = internalCluster().startClusterManagerOnlyNode(clusterSettings);
internalCluster().startDataOnlyNodes(2, clusterSettings);

// Create an index with one primary and one replica shard
createIndex(INDEX_NAME, remoteStoreIndexSettings(1, 1));
ensureGreen(INDEX_NAME);

// Index some docs and refresh
indexDocs();
refresh(INDEX_NAME);

String primaryNode = primaryNodeName(INDEX_NAME);
String replicaNode = replicaNodeName(INDEX_NAME);

// Start network disruption - primary node will be isolated
Set<String> nodesInOneSide = Stream.of(clusterManagerNode, replicaNode).collect(Collectors.toCollection(HashSet::new));
Set<String> nodesInOtherSide = Stream.of(primaryNode).collect(Collectors.toCollection(HashSet::new));
NetworkDisruption networkDisruption = new NetworkDisruption(
new NetworkDisruption.TwoPartitions(nodesInOneSide, nodesInOtherSide),
NetworkDisruption.DISCONNECT
);
internalCluster().setDisruptionScheme(networkDisruption);
logger.info("--> network disruption is started");
networkDisruption.startDisrupting();
ensureStableCluster(2, clusterManagerNode);

RemoteStoreStatsResponse response = client(clusterManagerNode).admin().cluster().prepareRemoteStoreStats(INDEX_NAME, "0").get();
final String indexShardId = String.format(Locale.ROOT, "[%s][%s]", INDEX_NAME, "0");
List<RemoteStoreStats> matches = Arrays.stream(response.getRemoteStoreStats())
.filter(stat -> indexShardId.equals(stat.getSegmentStats().shardId.toString()))
.collect(Collectors.toList());
assertEquals(1, matches.size());
RemoteSegmentTransferTracker.Stats segmentStats = matches.get(0).getSegmentStats();
assertEquals(0, segmentStats.refreshTimeLagMs);

networkDisruption.stopDisrupting();
// assertBusy(() -> assertEquals(ClusterHealthStatus.GREEN, ensureYellow(TimeValue.timeValueSeconds(1), INDEX_NAME)), 120,
// TimeUnit.SECONDS);
internalCluster().clearDisruptionScheme();
ensureStableCluster(3, clusterManagerNode);
ensureGreen(INDEX_NAME);
logger.info("Test completed");
}

private void indexDocs() {
for (int i = 0; i < randomIntBetween(5, 10); i++) {
if (randomBoolean()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -950,6 +950,13 @@ public ClusterHealthStatus ensureYellow(String... indices) {
return ensureColor(ClusterHealthStatus.YELLOW, TimeValue.timeValueSeconds(30), false, indices);
}

/**
* Ensures the cluster has a yellow state via the cluster health API.
*/
public ClusterHealthStatus ensureYellow(TimeValue timeout, String... indices) {
return ensureColor(ClusterHealthStatus.YELLOW, timeout, false, indices);
}

/**
* Ensures the cluster has a red state via the cluster health API.
*/
Expand Down

0 comments on commit 83519fd

Please sign in to comment.