Skip to content

Commit

Permalink
Address comments on PR.
Browse files Browse the repository at this point in the history
Signed-off-by: Rishikesh1159 <[email protected]>
  • Loading branch information
Rishikesh1159 committed Jan 16, 2024
1 parent cde361f commit 21d766a
Show file tree
Hide file tree
Showing 8 changed files with 44 additions and 92 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,6 @@ public void indexData() throws Exception {
index("fuu", "buu", "1", XContentFactory.jsonBuilder().startObject().field("fuu", "fuu").endObject());
index("baz", "baz", "1", XContentFactory.jsonBuilder().startObject().field("baz", "baz").endObject());
refresh();
verifyReplicasCaughtUpWithPrimary();
}

public void testRoutingTable() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,6 @@ public void testWritesRejected() throws Exception {
indexDoc();
refresh(INDEX_NAME);
waitForSearchableDocs(totalDocs.incrementAndGet(), replicaNodes.toArray(new String[] {}));
verifyStoreContent();
}

/**
Expand Down Expand Up @@ -186,7 +185,6 @@ public void testAddReplicaWhileWritesBlocked() throws Exception {
indexDoc();
refresh(INDEX_NAME);
waitForSearchableDocs(totalDocs.incrementAndGet(), replicaNodes.toArray(new String[] {}));
verifyStoreContent();
}

public void testBelowReplicaLimit() throws Exception {
Expand Down Expand Up @@ -219,7 +217,6 @@ public void testBelowReplicaLimit() throws Exception {
indexDoc();
refresh(INDEX_NAME);
waitForSearchableDocs(totalDocs.incrementAndGet(), replicaNodes.toArray(new String[] {}));
verifyStoreContent();
}

public void testFailStaleReplica() throws Exception {
Expand Down Expand Up @@ -328,7 +325,6 @@ public void testBulkWritesRejected() throws Exception {
// index another doc showing there is no pressure enforced.
executeBulkRequest(nodes, totalDocs);
waitForSearchableDocs(totalDocs * 2L, replicaNodes.toArray(new String[] {}));
verifyStoreContent();
}

private BulkResponse executeBulkRequest(List<String> nodes, int docsPerBatch) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.routing.IndexRoutingTable;
import org.opensearch.cluster.routing.IndexShardRoutingTable;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.common.Nullable;
Expand All @@ -23,8 +22,6 @@
import org.opensearch.index.IndexService;
import org.opensearch.index.SegmentReplicationShardStats;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.store.Store;
import org.opensearch.index.store.StoreFileMetadata;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.plugins.Plugin;
Expand All @@ -35,7 +32,6 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
Expand Down Expand Up @@ -129,45 +125,6 @@ protected void waitForSearchableDocs(long docCount, String... nodes) throws Exce
waitForSearchableDocs(docCount, Arrays.stream(nodes).collect(Collectors.toList()));
}

protected void verifyStoreContent() throws Exception {
assertBusy(() -> {
final ClusterState clusterState = getClusterState();
for (IndexRoutingTable indexRoutingTable : clusterState.routingTable()) {
for (IndexShardRoutingTable shardRoutingTable : indexRoutingTable) {
final ShardRouting primaryRouting = shardRoutingTable.primaryShard();
final String indexName = primaryRouting.getIndexName();
final List<ShardRouting> replicaRouting = shardRoutingTable.replicaShards();
final IndexShard primaryShard = getIndexShard(clusterState, primaryRouting, indexName);
final int primaryDocCount = getDocCountFromShard(primaryShard);
final Map<String, StoreFileMetadata> primarySegmentMetadata = primaryShard.getSegmentMetadataMap();
for (ShardRouting replica : replicaRouting) {
IndexShard replicaShard = getIndexShard(clusterState, replica, indexName);
final Store.RecoveryDiff recoveryDiff = Store.segmentReplicationDiff(
primarySegmentMetadata,
replicaShard.getSegmentMetadataMap()
);
final int replicaDocCount = getDocCountFromShard(replicaShard);
assertEquals("Doc counts should match", primaryDocCount, replicaDocCount);
if (recoveryDiff.missing.isEmpty() == false || recoveryDiff.different.isEmpty() == false) {
fail(
"Expected no missing or different segments between primary and replica but diff was missing: "
+ recoveryDiff.missing
+ " Different: "
+ recoveryDiff.different
+ " Primary Replication Checkpoint : "
+ primaryShard.getLatestReplicationCheckpoint()
+ " Replica Replication Checkpoint: "
+ replicaShard.getLatestReplicationCheckpoint()
);
}
// calls to readCommit will fail if a valid commit point and all its segments are not in the store.
replicaShard.store().readLastCommittedSegmentsInfo();
}
}
}
}, 1, TimeUnit.MINUTES);
}

/**
* Fetch IndexShard, assumes only a single shard per node.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,6 @@ public void testPrimaryStopped_ReplicaPromoted() throws Exception {
client().prepareIndex(INDEX_NAME).setId("4").setSource("baz", "baz").get();
refresh(INDEX_NAME);
waitForSearchableDocs(4, nodeC, replica);
verifyStoreContent();
}

public void testRestartPrimary() throws Exception {
Expand All @@ -191,7 +190,6 @@ public void testRestartPrimary() throws Exception {

flushAndRefresh(INDEX_NAME);
waitForSearchableDocs(initialDocCount, replica, primary);
verifyStoreContent();
}

public void testCancelPrimaryAllocation() throws Exception {
Expand Down Expand Up @@ -222,7 +220,6 @@ public void testCancelPrimaryAllocation() throws Exception {

flushAndRefresh(INDEX_NAME);
waitForSearchableDocs(initialDocCount, replica, primary);
verifyStoreContent();
}

public void testReplicationAfterPrimaryRefreshAndFlush() throws Exception {
Expand Down Expand Up @@ -265,7 +262,6 @@ public void testReplicationAfterPrimaryRefreshAndFlush() throws Exception {
waitForSearchableDocs(expectedHitCount, nodeA, nodeB);

ensureGreen(INDEX_NAME);
verifyStoreContent();
}
}

Expand Down Expand Up @@ -300,7 +296,7 @@ public void testIndexReopenClose() throws Exception {

ensureGreen(INDEX_NAME);
waitForSearchableDocs(initialDocCount, primary, replica);
verifyStoreContent();
waitForReplicasToCatchUpWithPrimary();
}

public void testScrollWithConcurrentIndexAndSearch() throws Exception {
Expand Down Expand Up @@ -347,11 +343,10 @@ public void testScrollWithConcurrentIndexAndSearch() throws Exception {
client().prepareClearScroll().addScrollId(searchResponse.getScrollId()).get();

assertBusy(() -> {
client().admin().indices().prepareRefresh().execute().actionGet();
refresh();
assertTrue(pendingIndexResponses.stream().allMatch(ActionFuture::isDone));
assertTrue(pendingSearchResponse.stream().allMatch(ActionFuture::isDone));
}, 1, TimeUnit.MINUTES);
verifyStoreContent();
waitForSearchableDocs(INDEX_NAME, 2 * searchCount, List.of(primary, replica));
}

Expand Down Expand Up @@ -395,7 +390,6 @@ public void testMultipleShards() throws Exception {
waitForSearchableDocs(expectedHitCount, nodeA, nodeB);

ensureGreen(INDEX_NAME);
verifyStoreContent();
}
}

Expand Down Expand Up @@ -433,7 +427,6 @@ public void testReplicationAfterForceMerge() throws Exception {
// Force a merge here so that the in memory SegmentInfos does not reference old segments on disk.
client().admin().indices().prepareForceMerge(INDEX_NAME).setMaxNumSegments(1).setFlush(false).get();
refresh(INDEX_NAME);
verifyStoreContent();
}
}

Expand Down Expand Up @@ -629,7 +622,6 @@ public void testStartReplicaAfterPrimaryIndexesDocs() throws Exception {
waitForSearchableDocs(3, primaryNode, replicaNode);
assertHitCount(client(primaryNode).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 3);
assertHitCount(client(replicaNode).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 3);
verifyStoreContent();
}

@TestLogging(reason = "Getting trace logs from replication package", value = "org.opensearch.indices.replication:TRACE")
Expand Down Expand Up @@ -671,7 +663,6 @@ public void testDeleteOperations() throws Exception {

refresh(INDEX_NAME);
waitForSearchableDocs(expectedHitCount - 1, nodeA, nodeB);
verifyStoreContent();
}
}

Expand Down Expand Up @@ -778,7 +769,6 @@ public void testUpdateOperations() throws Exception {

refresh(INDEX_NAME);

verifyStoreContent();
assertSearchHits(client(primary).prepareSearch(INDEX_NAME).setQuery(matchQuery("foo", "baz")).get(), id);
assertSearchHits(client(replica).prepareSearch(INDEX_NAME).setQuery(matchQuery("foo", "baz")).get(), id);
}
Expand Down Expand Up @@ -826,7 +816,6 @@ public void testDropPrimaryDuringReplication() throws Exception {

flushAndRefresh(INDEX_NAME);
waitForSearchableDocs(initialDocCount + 1, dataNodes);
verifyStoreContent();
}
}

Expand Down Expand Up @@ -879,7 +868,6 @@ public void testReplicaHasDiffFilesThanPrimary() throws Exception {
}
ensureGreen(INDEX_NAME);
waitForSearchableDocs(docCount, primaryNode, replicaNode);
verifyStoreContent();
final IndexShard replicaAfterFailure = getIndexShard(replicaNode, INDEX_NAME);
assertNotEquals(replicaAfterFailure.routingEntry().allocationId().getId(), replicaShard.routingEntry().allocationId().getId());
}
Expand Down Expand Up @@ -1212,7 +1200,6 @@ public void testScrollWithOngoingSegmentReplication() throws Exception {
getIndexShard(replica, INDEX_NAME).getLatestReplicationCheckpoint().getSegmentInfosVersion()
);
});
verifyStoreContent();
waitForSearchableDocs(finalDocCount, primary, replica);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,6 @@ public void testPrimaryRelocation() throws Exception {
flushAndRefresh(INDEX_NAME);
logger.info("--> verify count again {}", 2 * initialDocCount);
waitForSearchableDocs(2 * initialDocCount, newPrimary, replica);
verifyStoreContent();
}

/**
Expand Down Expand Up @@ -212,7 +211,6 @@ public void testPrimaryRelocationWithSegRepFailure() throws Exception {
}, 1, TimeUnit.MINUTES);
flushAndRefresh(INDEX_NAME);
waitForSearchableDocs(2 * initialDocCount, oldPrimary, replica);
verifyStoreContent();
}

/**
Expand Down Expand Up @@ -286,7 +284,6 @@ public void testRelocateWhileContinuouslyIndexingAndWaitingForRefresh() throws E
}, 1, TimeUnit.MINUTES);
flushAndRefresh(INDEX_NAME);
waitForSearchableDocs(totalDocCount, newPrimary, replica);
verifyStoreContent();
}

/**
Expand Down Expand Up @@ -397,7 +394,6 @@ public void testRelocateWithQueuedOperationsDuringHandoff() throws Exception {
}, 2, TimeUnit.MINUTES);
flushAndRefresh(INDEX_NAME);
waitForSearchableDocs(totalDocCount, replica, newPrimary);
verifyStoreContent();
}

/**
Expand Down Expand Up @@ -430,7 +426,6 @@ public void testNewlyAddedReplicaIsUpdated() throws Exception {
ensureGreen(INDEX_NAME);
flushAndRefresh(INDEX_NAME);
waitForSearchableDocs(20, primary, replica);
verifyStoreContent();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ public void testBasicReplication() throws Exception {
}
refresh();
ensureGreen(INDEX_NAME);
verifyStoreContent();
}

public void testDropRandomNodeDuringReplication() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,9 +181,8 @@ public void testFieldDataStats() throws InterruptedException {
ensureGreen();
client().prepareIndex("test").setId("1").setSource("field", "value1", "field2", "value1").execute().actionGet();
client().prepareIndex("test").setId("2").setSource("field", "value2", "field2", "value2").execute().actionGet();
client().admin().indices().prepareRefresh().execute().actionGet();
refresh();
indexRandomForConcurrentSearch("test");
verifyReplicasCaughtUpWithPrimary();

NodesStatsResponse nodesStats = client().admin().cluster().prepareNodesStats("data:true").setIndices(true).execute().actionGet();
assertThat(
Expand Down Expand Up @@ -306,9 +305,8 @@ public void testClearAllCaches() throws Exception {
client().admin().cluster().prepareHealth().setWaitForGreenStatus().execute().actionGet();
client().prepareIndex("test").setId("1").setSource("field", "value1").execute().actionGet();
client().prepareIndex("test").setId("2").setSource("field", "value2").execute().actionGet();
client().admin().indices().prepareRefresh().execute().actionGet();
refresh();
indexRandomForConcurrentSearch("test");
verifyReplicasCaughtUpWithPrimary();

NodesStatsResponse nodesStats = client().admin().cluster().prepareNodesStats("data:true").setIndices(true).execute().actionGet();
assertThat(
Expand Down Expand Up @@ -613,7 +611,6 @@ public void testNonThrottleStats() throws Exception {
client().prepareIndex("test").setId("" + termUpto).setSource("field" + (i % 10), sb.toString()).get();
}
refresh();
verifyReplicasCaughtUpWithPrimary();
stats = client().admin().indices().prepareStats().execute().actionGet();
// nodesStats = client().admin().cluster().prepareNodesStats().setIndices(true).get();

Expand Down Expand Up @@ -653,7 +650,6 @@ public void testThrottleStats() throws Exception {
}
}
refresh();
verifyReplicasCaughtUpWithPrimary();
stats = client().admin().indices().prepareStats().execute().actionGet();
// nodesStats = client().admin().cluster().prepareNodesStats().setIndices(true).get();
done = stats.getPrimaries().getIndexing().getTotal().getThrottleTime().millis() > 0;
Expand Down Expand Up @@ -833,7 +829,7 @@ public void testMergeStats() {
client().admin().indices().prepareFlush().execute().actionGet();
}
client().admin().indices().prepareForceMerge().setMaxNumSegments(1).execute().actionGet();
verifyReplicasCaughtUpWithPrimary();
waitForReplicasToCatchUpWithPrimary();
stats = client().admin().indices().prepareStats().setMerge(true).execute().actionGet();

assertThat(stats.getTotal().getMerge(), notNullValue());
Expand Down Expand Up @@ -862,8 +858,7 @@ public void testSegmentsStats() {

client().admin().indices().prepareFlush().get();
client().admin().indices().prepareForceMerge().setMaxNumSegments(1).execute().actionGet();
client().admin().indices().prepareRefresh().get();
verifyReplicasCaughtUpWithPrimary();
refresh();
stats = client().admin().indices().prepareStats().setSegments(true).get();

assertThat(stats.getTotal().getSegments(), notNullValue());
Expand All @@ -881,8 +876,7 @@ public void testAllFlags() throws Exception {
client().prepareIndex("test_index").setId(Integer.toString(2)).setSource("field", "value").execute().actionGet();
client().prepareIndex("test_index_2").setId(Integer.toString(1)).setSource("field", "value").execute().actionGet();

client().admin().indices().prepareRefresh().execute().actionGet();
verifyReplicasCaughtUpWithPrimary();
refresh();
IndicesStatsRequestBuilder builder = client().admin().indices().prepareStats();
Flag[] values = CommonStatsFlags.Flag.values();
for (Flag flag : values) {
Expand Down Expand Up @@ -1009,7 +1003,6 @@ public void testMultiIndex() throws Exception {
client().prepareIndex("test1").setId(Integer.toString(2)).setSource("field", "value").execute().actionGet();
client().prepareIndex("test2").setId(Integer.toString(1)).setSource("field", "value").execute().actionGet();
refresh();
verifyReplicasCaughtUpWithPrimary();

int numShards1 = getNumShards("test1").totalNumShards;
int numShards2 = getNumShards("test2").totalNumShards;
Expand Down Expand Up @@ -1053,7 +1046,6 @@ public void testCompletionFieldsParam() throws Exception {
.setSource("{\"bar\":\"bar\",\"baz\":\"baz\"}", MediaTypeRegistry.JSON)
.get();
refresh();
verifyReplicasCaughtUpWithPrimary();

IndicesStatsRequestBuilder builder = client().admin().indices().prepareStats();
IndicesStatsResponse stats = builder.execute().actionGet();
Expand Down Expand Up @@ -1096,7 +1088,6 @@ public void testGroupsParam() throws Exception {

client().prepareIndex("test1").setId(Integer.toString(1)).setSource("foo", "bar").execute().actionGet();
refresh();
verifyReplicasCaughtUpWithPrimary();

client().prepareSearch("_all").setStats("bar", "baz").execute().actionGet();

Expand Down Expand Up @@ -1260,7 +1251,6 @@ public void testFilterCacheStats() throws Exception {
);
persistGlobalCheckpoint("index"); // Need to persist the global checkpoint for the soft-deletes retention MP.
refresh();
verifyReplicasCaughtUpWithPrimary();
ensureGreen();

IndicesStatsResponse response = client().admin().indices().prepareStats("index").setQueryCache(true).get();
Expand Down Expand Up @@ -1400,7 +1390,6 @@ public void testConcurrentIndexingAndStatsRequests() throws BrokenBarrierExcepti
while (!stop.get()) {
final String id = Integer.toString(idGenerator.incrementAndGet());
final IndexResponse response = client().prepareIndex("test").setId(id).setSource("{}", MediaTypeRegistry.JSON).get();
verifyReplicasCaughtUpWithPrimary();
assertThat(response.getResult(), equalTo(DocWriteResponse.Result.CREATED));
}
});
Expand Down Expand Up @@ -1471,7 +1460,7 @@ public void testZeroRemoteStoreStatsOnNonRemoteStoreIndex() {
.get()
.status()
);
verifyReplicasCaughtUpWithPrimary();
waitForReplicasToCatchUpWithPrimary();
ShardStats shard = client().admin().indices().prepareStats(indexName).setSegments(true).setTranslog(true).get().getShards()[0];
RemoteSegmentStats remoteSegmentStatsFromIndexStats = shard.getStats().getSegments().getRemoteSegmentStats();
assertZeroRemoteSegmentStats(remoteSegmentStatsFromIndexStats);
Expand Down
Loading

0 comments on commit 21d766a

Please sign in to comment.