Skip to content

Commit

Permalink
Reduce amount of docs ingested for SegRep ITs (opensearch-project#10584)
Browse files Browse the repository at this point in the history
This change reduces the amount of docs we ingest inside SegmentReplicationIT. These tests
were often ingesting 1-200 docs where it was not required. Many only required a few so that
segments are created.
This speeds up tests when run with remote store through SegmentReplicationUsingRemoteStore.
Also add Testlogging annotation on the replication package for known flaky tests.

Signed-off-by: Marc Handalian <[email protected]>
Signed-off-by: Shivansh Arora <[email protected]>
  • Loading branch information
mch2 authored and shiv0408 committed Apr 25, 2024
1 parent 9346a1c commit 7a54d7e
Showing 1 changed file with 31 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
import org.opensearch.test.BackgroundIndexer;
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.junit.annotations.TestLogging;
import org.opensearch.test.transport.MockTransportService;
import org.opensearch.transport.TransportService;
import org.junit.Before;
Expand Down Expand Up @@ -238,7 +239,7 @@ public void testReplicationAfterPrimaryRefreshAndFlush() throws Exception {
createIndex(INDEX_NAME, settings);
ensureGreen(INDEX_NAME);

final int initialDocCount = scaledRandomIntBetween(0, 200);
final int initialDocCount = scaledRandomIntBetween(0, 10);
try (
BackgroundIndexer indexer = new BackgroundIndexer(
INDEX_NAME,
Expand All @@ -255,7 +256,7 @@ public void testReplicationAfterPrimaryRefreshAndFlush() throws Exception {
refresh(INDEX_NAME);
waitForSearchableDocs(initialDocCount, nodeA, nodeB);

final int additionalDocCount = scaledRandomIntBetween(0, 200);
final int additionalDocCount = scaledRandomIntBetween(0, 10);
final int expectedHitCount = initialDocCount + additionalDocCount;
indexer.start(additionalDocCount);
waitForDocs(expectedHitCount, indexer);
Expand All @@ -274,7 +275,7 @@ public void testIndexReopenClose() throws Exception {
createIndex(INDEX_NAME);
ensureGreen(INDEX_NAME);

final int initialDocCount = scaledRandomIntBetween(100, 200);
final int initialDocCount = scaledRandomIntBetween(1, 10);
try (
BackgroundIndexer indexer = new BackgroundIndexer(
INDEX_NAME,
Expand Down Expand Up @@ -309,7 +310,7 @@ public void testScrollWithConcurrentIndexAndSearch() throws Exception {
ensureGreen(INDEX_NAME);
final List<ActionFuture<IndexResponse>> pendingIndexResponses = new ArrayList<>();
final List<ActionFuture<SearchResponse>> pendingSearchResponse = new ArrayList<>();
final int searchCount = randomIntBetween(10, 20);
final int searchCount = randomIntBetween(1, 2);
final WriteRequest.RefreshPolicy refreshPolicy = randomFrom(WriteRequest.RefreshPolicy.values());

for (int i = 0; i < searchCount; i++) {
Expand Down Expand Up @@ -354,6 +355,7 @@ public void testScrollWithConcurrentIndexAndSearch() throws Exception {
waitForSearchableDocs(INDEX_NAME, 2 * searchCount, List.of(primary, replica));
}

@TestLogging(reason = "Getting trace logs from replication package", value = "org.opensearch.indices.replication:TRACE")
public void testMultipleShards() throws Exception {
Settings indexSettings = Settings.builder()
.put(super.indexSettings())
Expand All @@ -367,7 +369,7 @@ public void testMultipleShards() throws Exception {
createIndex(INDEX_NAME, indexSettings);
ensureGreen(INDEX_NAME);

final int initialDocCount = scaledRandomIntBetween(1, 200);
final int initialDocCount = scaledRandomIntBetween(1, 10);
try (
BackgroundIndexer indexer = new BackgroundIndexer(
INDEX_NAME,
Expand All @@ -384,7 +386,7 @@ public void testMultipleShards() throws Exception {
refresh(INDEX_NAME);
waitForSearchableDocs(initialDocCount, nodeA, nodeB);

final int additionalDocCount = scaledRandomIntBetween(0, 200);
final int additionalDocCount = scaledRandomIntBetween(0, 10);
final int expectedHitCount = initialDocCount + additionalDocCount;
indexer.start(additionalDocCount);
waitForDocs(expectedHitCount, indexer);
Expand All @@ -403,8 +405,8 @@ public void testReplicationAfterForceMerge() throws Exception {
createIndex(INDEX_NAME);
ensureGreen(INDEX_NAME);

final int initialDocCount = scaledRandomIntBetween(0, 200);
final int additionalDocCount = scaledRandomIntBetween(0, 200);
final int initialDocCount = scaledRandomIntBetween(0, 10);
final int additionalDocCount = scaledRandomIntBetween(0, 10);
final int expectedHitCount = initialDocCount + additionalDocCount;
try (
BackgroundIndexer indexer = new BackgroundIndexer(
Expand Down Expand Up @@ -509,7 +511,7 @@ public void testNodeDropWithOngoingReplication() throws Exception {
connection.sendRequest(requestId, action, request, options);
}
);
final int docCount = scaledRandomIntBetween(10, 200);
final int docCount = scaledRandomIntBetween(1, 10);
for (int i = 0; i < docCount; i++) {
client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().get();
}
Expand Down Expand Up @@ -570,7 +572,7 @@ public void testCancellation() throws Exception {
}
);

final int docCount = scaledRandomIntBetween(0, 200);
final int docCount = scaledRandomIntBetween(0, 10);
try (
BackgroundIndexer indexer = new BackgroundIndexer(
INDEX_NAME,
Expand Down Expand Up @@ -630,13 +632,14 @@ public void testStartReplicaAfterPrimaryIndexesDocs() throws Exception {
verifyStoreContent();
}

@TestLogging(reason = "Getting trace logs from replication package", value = "org.opensearch.indices.replication:TRACE")
public void testDeleteOperations() throws Exception {
final String nodeA = internalCluster().startDataOnlyNode();
final String nodeB = internalCluster().startDataOnlyNode();

createIndex(INDEX_NAME);
ensureGreen(INDEX_NAME);
final int initialDocCount = scaledRandomIntBetween(1, 20);
final int initialDocCount = scaledRandomIntBetween(1, 5);
try (
BackgroundIndexer indexer = new BackgroundIndexer(
INDEX_NAME,
Expand All @@ -653,7 +656,7 @@ public void testDeleteOperations() throws Exception {
refresh(INDEX_NAME);
waitForSearchableDocs(initialDocCount, nodeA, nodeB);

final int additionalDocCount = scaledRandomIntBetween(0, 20);
final int additionalDocCount = scaledRandomIntBetween(0, 2);
final int expectedHitCount = initialDocCount + additionalDocCount;
indexer.start(additionalDocCount);
waitForDocs(expectedHitCount, indexer);
Expand Down Expand Up @@ -682,14 +685,14 @@ public void testReplicationPostDeleteAndForceMerge() throws Exception {
createIndex(INDEX_NAME);
final String replica = internalCluster().startDataOnlyNode();
ensureGreen(INDEX_NAME);
final int initialDocCount = scaledRandomIntBetween(10, 200);
final int initialDocCount = scaledRandomIntBetween(1, 10);
for (int i = 0; i < initialDocCount; i++) {
client().prepareIndex(INDEX_NAME).setId(String.valueOf(i)).setSource("foo", "bar").get();
}
refresh(INDEX_NAME);
waitForSearchableDocs(initialDocCount, primary, replica);

final int deletedDocCount = randomIntBetween(10, initialDocCount);
final int deletedDocCount = randomIntBetween(1, initialDocCount);
for (int i = 0; i < deletedDocCount; i++) {
client(primary).prepareDelete(INDEX_NAME, String.valueOf(i)).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();
}
Expand All @@ -710,7 +713,7 @@ public void testReplicationPostDeleteAndForceMerge() throws Exception {
);

// add some docs to the xlog and drop primary.
final int additionalDocs = randomIntBetween(1, 50);
final int additionalDocs = randomIntBetween(1, 5);
for (int i = initialDocCount; i < initialDocCount + additionalDocs; i++) {
client().prepareIndex(INDEX_NAME).setId(String.valueOf(i)).setSource("foo", "bar").get();
}
Expand Down Expand Up @@ -741,7 +744,7 @@ public void testUpdateOperations() throws Exception {
final String replica = internalCluster().startDataOnlyNode();
ensureGreen(INDEX_NAME);

final int initialDocCount = scaledRandomIntBetween(0, 200);
final int initialDocCount = scaledRandomIntBetween(1, 5);
try (
BackgroundIndexer indexer = new BackgroundIndexer(
INDEX_NAME,
Expand All @@ -758,7 +761,7 @@ public void testUpdateOperations() throws Exception {
refresh(INDEX_NAME);
waitForSearchableDocs(initialDocCount, asList(primary, replica));

final int additionalDocCount = scaledRandomIntBetween(0, 200);
final int additionalDocCount = scaledRandomIntBetween(0, 5);
final int expectedHitCount = initialDocCount + additionalDocCount;
indexer.start(additionalDocCount);
waitForDocs(expectedHitCount, indexer);
Expand Down Expand Up @@ -793,7 +796,7 @@ public void testDropPrimaryDuringReplication() throws Exception {
final List<String> dataNodes = internalCluster().startDataOnlyNodes(6);
ensureGreen(INDEX_NAME);

int initialDocCount = scaledRandomIntBetween(100, 200);
int initialDocCount = scaledRandomIntBetween(5, 10);
try (
BackgroundIndexer indexer = new BackgroundIndexer(
INDEX_NAME,
Expand Down Expand Up @@ -827,6 +830,7 @@ public void testDropPrimaryDuringReplication() throws Exception {
}
}

@TestLogging(reason = "Getting trace logs from replication package", value = "org.opensearch.indices.replication:TRACE")
public void testReplicaHasDiffFilesThanPrimary() throws Exception {
final String primaryNode = internalCluster().startDataOnlyNode();
createIndex(INDEX_NAME, Settings.builder().put(indexSettings()).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1).build());
Expand All @@ -838,7 +842,7 @@ public void testReplicaHasDiffFilesThanPrimary() throws Exception {
IndexWriterConfig iwc = newIndexWriterConfig().setOpenMode(IndexWriterConfig.OpenMode.APPEND);

// create a doc to index
int numDocs = 2 + random().nextInt(100);
int numDocs = 2 + random().nextInt(10);

List<Document> docs = new ArrayList<>();
for (int i = 0; i < numDocs; i++) {
Expand Down Expand Up @@ -867,7 +871,7 @@ public void testReplicaHasDiffFilesThanPrimary() throws Exception {
replicaShard.finalizeReplication(segmentInfos);
ensureYellow(INDEX_NAME);

final int docCount = scaledRandomIntBetween(10, 200);
final int docCount = scaledRandomIntBetween(10, 20);
for (int i = 0; i < docCount; i++) {
client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().get();
// Refresh, this should trigger round of segment replication
Expand All @@ -887,7 +891,7 @@ public void testPressureServiceStats() throws Exception {
final String replicaNode = internalCluster().startDataOnlyNode();
ensureGreen(INDEX_NAME);

int initialDocCount = scaledRandomIntBetween(100, 200);
int initialDocCount = scaledRandomIntBetween(10, 20);
try (
BackgroundIndexer indexer = new BackgroundIndexer(
INDEX_NAME,
Expand Down Expand Up @@ -993,8 +997,8 @@ public void testScrollCreatedOnReplica() throws Exception {
final String replica = internalCluster().startDataOnlyNode();
ensureGreen(INDEX_NAME);

// index 100 docs
for (int i = 0; i < 100; i++) {
// index 10 docs
for (int i = 0; i < 10; i++) {
client().prepareIndex(INDEX_NAME)
.setId(String.valueOf(i))
.setSource(jsonBuilder().startObject().field("field", i).endObject())
Expand Down Expand Up @@ -1030,7 +1034,7 @@ public void testScrollCreatedOnReplica() throws Exception {
// force call flush
flush(INDEX_NAME);

for (int i = 3; i < 50; i++) {
for (int i = 3; i < 5; i++) {
client().prepareDelete(INDEX_NAME, String.valueOf(i)).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();
refresh(INDEX_NAME);
if (randomBoolean()) {
Expand Down Expand Up @@ -1072,7 +1076,7 @@ public void testScrollCreatedOnReplica() throws Exception {
List.of(replicaShard.store().directory().listAll()).containsAll(snapshottedSegments)
)
);
assertEquals(100, scrollHits);
assertEquals(10, scrollHits);

}

Expand Down Expand Up @@ -1218,19 +1222,8 @@ public void testPitCreatedOnReplica() throws Exception {
ensureYellowAndNoInitializingShards(INDEX_NAME);
final String replica = internalCluster().startDataOnlyNode();
ensureGreen(INDEX_NAME);
client().prepareIndex(INDEX_NAME)
.setId("1")
.setSource("foo", randomInt())
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.get();
refresh(INDEX_NAME);

client().prepareIndex(INDEX_NAME)
.setId("2")
.setSource("foo", randomInt())
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.get();
for (int i = 3; i < 100; i++) {
for (int i = 0; i < 10; i++) {
client().prepareIndex(INDEX_NAME)
.setId(String.valueOf(i))
.setSource("foo", randomInt())
Expand Down Expand Up @@ -1280,7 +1273,7 @@ public void testPitCreatedOnReplica() throws Exception {
}

flush(INDEX_NAME);
for (int i = 101; i < 200; i++) {
for (int i = 11; i < 20; i++) {
client().prepareIndex(INDEX_NAME)
.setId(String.valueOf(i))
.setSource("foo", randomInt())
Expand Down

0 comments on commit 7a54d7e

Please sign in to comment.