Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Segment Replication] Add random replication strategy #12297

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -97,12 +97,17 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
return Collections.singletonList(PrivateCustomPlugin.class);
}

@Override
protected boolean useRandomReplicationStrategy() {
return true;
}

@Before
public void indexData() throws Exception {
index("foo", "bar", "1", XContentFactory.jsonBuilder().startObject().field("foo", "foo").endObject());
index("fuu", "buu", "1", XContentFactory.jsonBuilder().startObject().field("fuu", "fuu").endObject());
index("baz", "baz", "1", XContentFactory.jsonBuilder().startObject().field("baz", "baz").endObject());
refresh();
refreshAndWaitForReplication();
}

public void testRoutingTable() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,11 @@ public static Collection<Object[]> parameters() {
);
}

@Override
protected boolean useRandomReplicationStrategy() {
return true;
}

// One of the primary purposes of the query cache is to cache aggs results
public void testCacheAggs() throws Exception {
Client client = client();
Expand Down Expand Up @@ -180,7 +185,7 @@ public void testQueryRewrite() throws Exception {
// Force merge the index to ensure there can be no background merges during the subsequent searches that would invalidate the cache
ForceMergeResponse forceMergeResponse = client.admin().indices().prepareForceMerge("index").setFlush(true).get();
OpenSearchAssertions.assertAllSuccessful(forceMergeResponse);
refresh();
refreshAndWaitForReplication();
ensureSearchable("index");

assertCacheState(client, "index", 0, 0);
Expand Down Expand Up @@ -250,7 +255,7 @@ public void testQueryRewriteMissingValues() throws Exception {
// Force merge the index to ensure there can be no background merges during the subsequent searches that would invalidate the cache
ForceMergeResponse forceMergeResponse = client.admin().indices().prepareForceMerge("index").setFlush(true).get();
OpenSearchAssertions.assertAllSuccessful(forceMergeResponse);
refresh();
refreshAndWaitForReplication();
ensureSearchable("index");

assertCacheState(client, "index", 0, 0);
Expand Down Expand Up @@ -316,7 +321,7 @@ public void testQueryRewriteDates() throws Exception {
// Force merge the index to ensure there can be no background merges during the subsequent searches that would invalidate the cache
ForceMergeResponse forceMergeResponse = client.admin().indices().prepareForceMerge("index").setFlush(true).get();
OpenSearchAssertions.assertAllSuccessful(forceMergeResponse);
refresh();
refreshAndWaitForReplication();
ensureSearchable("index");

assertCacheState(client, "index", 0, 0);
Expand Down Expand Up @@ -389,7 +394,7 @@ public void testQueryRewriteDatesWithNow() throws Exception {
.setFlush(true)
.get();
OpenSearchAssertions.assertAllSuccessful(forceMergeResponse);
refresh();
refreshAndWaitForReplication();
ensureSearchable("index-1", "index-2", "index-3");

assertCacheState(client, "index-1", 0, 0);
Expand Down Expand Up @@ -460,7 +465,7 @@ public void testCanCache() throws Exception {
// Force merge the index to ensure there can be no background merges during the subsequent searches that would invalidate the cache
ForceMergeResponse forceMergeResponse = client.admin().indices().prepareForceMerge("index").setFlush(true).get();
OpenSearchAssertions.assertAllSuccessful(forceMergeResponse);
refresh();
refreshAndWaitForReplication();
ensureSearchable("index");

assertCacheState(client, "index", 0, 0);
Expand Down Expand Up @@ -554,7 +559,7 @@ public void testCacheWithFilteredAlias() throws InterruptedException {
// Force merge the index to ensure there can be no background merges during the subsequent searches that would invalidate the cache
ForceMergeResponse forceMergeResponse = client.admin().indices().prepareForceMerge("index").setFlush(true).get();
OpenSearchAssertions.assertAllSuccessful(forceMergeResponse);
refresh();
refreshAndWaitForReplication();

indexRandomForConcurrentSearch("index");

Expand Down Expand Up @@ -661,7 +666,7 @@ public void testCacheWithInvalidation() throws Exception {
assertCacheState(client, "index", 1, 1);

// Explicit refresh would invalidate cache
refresh();
refreshAndWaitForReplication();
// Hit same query again
resp = client.prepareSearch("index").setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello")).get();
assertSearchResponse(resp);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@
import org.opensearch.indices.IndicesQueryCache;
import org.opensearch.indices.IndicesRequestCache;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.indices.store.IndicesStore;
import org.opensearch.monitor.os.OsInfo;
import org.opensearch.node.NodeMocksPlugin;
Expand Down Expand Up @@ -208,6 +209,7 @@
import static org.opensearch.index.IndexSettings.INDEX_DOC_ID_FUZZY_SET_FALSE_POSITIVE_PROBABILITY_SETTING;
import static org.opensearch.index.IndexSettings.INDEX_SOFT_DELETES_RETENTION_LEASE_PERIOD_SETTING;
import static org.opensearch.index.query.QueryBuilders.matchAllQuery;
import static org.opensearch.indices.IndicesService.CLUSTER_REPLICATION_TYPE_SETTING;
import static org.opensearch.test.XContentTestUtils.convertToMap;
import static org.opensearch.test.XContentTestUtils.differenceBetweenMapsIgnoringArrayOrder;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
Expand Down Expand Up @@ -1887,9 +1889,25 @@ protected Settings nodeSettings(int nodeOrdinal) {
builder.put(TelemetrySettings.TRACER_FEATURE_ENABLED_SETTING.getKey(), true);
builder.put(TelemetrySettings.TRACER_ENABLED_SETTING.getKey(), true);
}

// Randomly set a replication strategy for the node. Replication Strategy can still be manually overridden by subclass if needed.
if (useRandomReplicationStrategy()) {
ReplicationType replicationType = randomBoolean() ? ReplicationType.DOCUMENT : ReplicationType.SEGMENT;
logger.info("Randomly using Replication Strategy as {}.", replicationType.toString());
builder.put(CLUSTER_REPLICATION_TYPE_SETTING.getKey(), replicationType);
}
return builder.build();
}

/**
* Used for selecting random replication strategy, either DOCUMENT or SEGMENT.
* This method must be overridden by subclass to use random replication strategy.
* Should be used only on test classes where replication strategy is not critical for tests.
*/
protected boolean useRandomReplicationStrategy() {
return false;
}

protected Path nodeConfigPath(int nodeOrdinal) {
return null;
}
Expand Down
Loading