Skip to content

Commit

Permalink
[Segment Replication] Mute flaky tests (opensearch-project#5739)
Browse files Browse the repository at this point in the history
Signed-off-by: Suraj Singh <[email protected]>

Signed-off-by: Suraj Singh <[email protected]>
  • Loading branch information
dreamer-89 authored and mch2 committed Jan 19, 2023
1 parent 93ea337 commit 4915d02
Showing 1 changed file with 87 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
import org.opensearch.action.admin.indices.segments.IndicesSegmentsRequest;
import org.opensearch.action.admin.indices.segments.ShardSegments;
import org.opensearch.action.support.WriteRequest;
import org.opensearch.action.update.UpdateResponse;
import org.opensearch.client.Requests;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
Expand Down Expand Up @@ -55,15 +57,17 @@
import java.util.stream.Collectors;

import static org.hamcrest.Matchers.equalTo;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.index.query.QueryBuilders.matchQuery;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertSearchHits;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class SegmentReplicationIT extends OpenSearchIntegTestCase {

private static final String INDEX_NAME = "test-idx-1";
private static final int SHARD_COUNT = 1;
private static final int REPLICA_COUNT = 1;
protected static final String INDEX_NAME = "test-idx-1";
protected static final int SHARD_COUNT = 1;
protected static final int REPLICA_COUNT = 1;

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
Expand Down Expand Up @@ -91,6 +95,26 @@ protected Settings featureFlagSettings() {
return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.REPLICATION_TYPE, "true").build();
}

public void ingestDocs(int docCount) throws Exception {
try (
BackgroundIndexer indexer = new BackgroundIndexer(
INDEX_NAME,
"_doc",
client(),
-1,
RandomizedTest.scaledRandomIntBetween(2, 5),
false,
random()
)
) {
indexer.start(docCount);
waitForDocs(docCount, indexer);
refresh(INDEX_NAME);
waitForReplicaUpdate();
}
}

@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/5669")
public void testPrimaryStopped_ReplicaPromoted() throws Exception {
final String primary = internalCluster().startNode(featureFlagSettings());
createIndex(INDEX_NAME);
Expand Down Expand Up @@ -266,6 +290,7 @@ public void testAddNewReplicaFailure() throws Exception {
assertFalse(indicesService.hasIndex(resolveIndex(INDEX_NAME)));
}

@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/5669")
public void testReplicationAfterPrimaryRefreshAndFlush() throws Exception {
final String nodeA = internalCluster().startNode(featureFlagSettings());
final String nodeB = internalCluster().startNode(featureFlagSettings());
Expand Down Expand Up @@ -497,6 +522,7 @@ public void testCancellation() throws Exception {
assertDocCounts(docCount, primaryNode);
}

@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/5669")
public void testStartReplicaAfterPrimaryIndexesDocs() throws Exception {
final String primaryNode = internalCluster().startNode(featureFlagSettings());
createIndex(INDEX_NAME, Settings.builder().put(indexSettings()).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build());
Expand Down Expand Up @@ -598,6 +624,61 @@ public void testDeleteOperations() throws Exception {
}
}

@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/5669")
public void testUpdateOperations() throws Exception {
final String primary = internalCluster().startNode(featureFlagSettings());
createIndex(INDEX_NAME);
ensureYellow(INDEX_NAME);
final String replica = internalCluster().startNode(featureFlagSettings());

final int initialDocCount = scaledRandomIntBetween(0, 200);
try (
BackgroundIndexer indexer = new BackgroundIndexer(
INDEX_NAME,
"_doc",
client(),
-1,
RandomizedTest.scaledRandomIntBetween(2, 5),
false,
random()
)
) {
indexer.start(initialDocCount);
waitForDocs(initialDocCount, indexer);
refresh(INDEX_NAME);
waitForReplicaUpdate();

// wait a short amount of time to give replication a chance to complete.
assertHitCount(client(primary).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount);
assertHitCount(client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount);

final int additionalDocCount = scaledRandomIntBetween(0, 200);
final int expectedHitCount = initialDocCount + additionalDocCount;
indexer.start(additionalDocCount);
waitForDocs(expectedHitCount, indexer);
waitForReplicaUpdate();

assertHitCount(client(primary).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount);
assertHitCount(client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount);

Set<String> ids = indexer.getIds();
String id = ids.toArray()[0].toString();
UpdateResponse updateResponse = client(primary).prepareUpdate(INDEX_NAME, id)
.setDoc(Requests.INDEX_CONTENT_TYPE, "foo", "baz")
.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL)
.get();
assertFalse("request shouldn't have forced a refresh", updateResponse.forcedRefresh());
assertEquals(2, updateResponse.getVersion());

refresh(INDEX_NAME);
waitForReplicaUpdate();

assertSearchHits(client(primary).prepareSearch(INDEX_NAME).setQuery(matchQuery("foo", "baz")).get(), id);
assertSearchHits(client(replica).prepareSearch(INDEX_NAME).setQuery(matchQuery("foo", "baz")).get(), id);

}
}

private void assertSegmentStats(int numberOfReplicas) throws IOException {
final IndicesSegmentResponse indicesSegmentResponse = client().admin().indices().segments(new IndicesSegmentsRequest()).actionGet();

Expand Down Expand Up @@ -689,7 +770,7 @@ public void testDropPrimaryDuringReplication() throws Exception {

/**
* Waits until the replica is caught up to the latest primary segments gen.
* @throws Exception
* @throws Exception if assertion fails
*/
private void waitForReplicaUpdate() throws Exception {
// wait until the replica has the latest segment generation.
Expand All @@ -706,7 +787,7 @@ private void waitForReplicaUpdate() throws Exception {
// if we don't have any segments yet, proceed.
final ShardSegments primaryShardSegments = primaryShardSegmentsList.stream().findFirst().get();
logger.debug("Primary Segments: {}", primaryShardSegments.getSegments());
if (primaryShardSegments.getSegments().isEmpty() == false) {
if (primaryShardSegments.getSegments().isEmpty() == false && replicaShardSegments != null) {
final Map<String, Segment> latestPrimarySegments = getLatestSegments(primaryShardSegments);
final Long latestPrimaryGen = latestPrimarySegments.values().stream().findFirst().map(Segment::getGeneration).get();
for (ShardSegments shardSegments : replicaShardSegments) {
Expand Down

0 comments on commit 4915d02

Please sign in to comment.