Skip to content

Commit

Permalink
Add cluster setting to dynamically configure filter rewrite optimizat…
Browse files Browse the repository at this point in the history
…ion (opensearch-project#13179)

---------

Signed-off-by: bowenlan-amzn <[email protected]>
  • Loading branch information
bowenlan-amzn committed Apr 19, 2024
1 parent d91edf0 commit cadf974
Show file tree
Hide file tree
Showing 7 changed files with 90 additions and 6 deletions.
18 changes: 18 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,24 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

## [Unreleased 2.12.x]
### Added
- Constant Keyword Field ([#12285](https://github.com/opensearch-project/OpenSearch/pull/12285))
- Convert ingest processor supports ip type ([#12818](https://github.com/opensearch-project/OpenSearch/pull/12818))
- Add a counter to node stat api to track shard going from idle to non-idle ([#12768](https://github.com/opensearch-project/OpenSearch/pull/12768))
- Allow setting KEYSTORE_PASSWORD through env variable ([#12865](https://github.com/opensearch-project/OpenSearch/pull/12865))
- [Concurrent Segment Search] Perform buildAggregation concurrently and support Composite Aggregations ([#12697](https://github.com/opensearch-project/OpenSearch/pull/12697))
- [Concurrent Segment Search] Disable concurrent segment search for system indices and throttled requests ([#12954](https://github.com/opensearch-project/OpenSearch/pull/12954))
- [Tiered Caching] Make took time caching policy setting dynamic ([#13063](https://github.com/opensearch-project/OpenSearch/pull/13063))
- Derived fields support to derive field values at query time without indexing ([#12569](https://github.com/opensearch-project/OpenSearch/pull/12569))
- Detect breaking changes on pull requests ([#9044](https://github.com/opensearch-project/OpenSearch/pull/9044))
- Add cluster primary balance contraint for rebalancing with buffer ([#12656](https://github.com/opensearch-project/OpenSearch/pull/12656))
- [Remote Store] Make translog transfer timeout configurable ([#12704](https://github.com/opensearch-project/OpenSearch/pull/12704))
- Reject Resize index requests (i.e, split, shrink and clone), While DocRep to SegRep migration is in progress.([#12686](https://github.com/opensearch-project/OpenSearch/pull/12686))
- Add support for more than one protocol for transport ([#12967](https://github.com/opensearch-project/OpenSearch/pull/12967))
- [Tiered Caching] Add dimension-based stats to ICache implementations. ([#12531](https://github.com/opensearch-project/OpenSearch/pull/12531))
- Add changes for overriding remote store and replication settings during snapshot restore. ([#11868](https://github.com/opensearch-project/OpenSearch/pull/11868))
- Add an individual setting of rate limiter for segment replication ([#12959](https://github.com/opensearch-project/OpenSearch/pull/12959))
- [Streaming Indexing] Ensure support of the new transport by security plugin ([#13174](https://github.com/opensearch-project/OpenSearch/pull/13174))
- Add cluster setting to dynamically configure the buckets for filter rewrite optimization. ([#13179](https://github.com/opensearch-project/OpenSearch/pull/13179))

### Dependencies

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;

import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
import org.opensearch.action.index.IndexRequestBuilder;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.common.settings.Settings;
Expand All @@ -34,6 +35,7 @@

import static org.opensearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING;
import static org.opensearch.search.SearchService.MAX_AGGREGATION_REWRITE_FILTERS;
import static org.opensearch.search.aggregations.AggregationBuilders.dateHistogram;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;

Expand Down Expand Up @@ -93,7 +95,7 @@ public void testMinDocCountOnDateHistogram() throws Exception {
final SearchResponse allResponse = client().prepareSearch("idx")
.setSize(0)
.setQuery(QUERY)
.addAggregation(dateHistogram("histo").field("date").dateHistogramInterval(DateHistogramInterval.DAY).minDocCount(0))
.addAggregation(dateHistogram("histo").field("date").calendarInterval(DateHistogramInterval.DAY).minDocCount(0))
.get();

final Histogram allHisto = allResponse.getAggregations().get("histo");
Expand All @@ -104,4 +106,36 @@ public void testMinDocCountOnDateHistogram() throws Exception {
assertEquals(entry.getValue(), results.get(entry.getKey()));
}
}

public void testDisableOptimizationGivesSameResults() throws Exception {
SearchResponse response = client().prepareSearch("idx")
.setSize(0)
.addAggregation(dateHistogram("histo").field("date").calendarInterval(DateHistogramInterval.DAY).minDocCount(0))
.get();

final Histogram allHisto1 = response.getAggregations().get("histo");

final ClusterUpdateSettingsResponse updateSettingResponse = client().admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(Settings.builder().put(MAX_AGGREGATION_REWRITE_FILTERS.getKey(), 0))
.get();

assertEquals(updateSettingResponse.getTransientSettings().get(MAX_AGGREGATION_REWRITE_FILTERS.getKey()), "0");

response = client().prepareSearch("idx")
.setSize(0)
.addAggregation(dateHistogram("histo").field("date").calendarInterval(DateHistogramInterval.DAY).minDocCount(0))
.get();

final Histogram allHisto2 = response.getAggregations().get("histo");

assertEquals(allHisto1, allHisto2);

client().admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(Settings.builder().putNull(MAX_AGGREGATION_REWRITE_FILTERS.getKey()))
.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -516,6 +516,7 @@ public void apply(Settings value, Settings current, Settings previous) {
SearchService.MAX_OPEN_SCROLL_CONTEXT,
SearchService.MAX_OPEN_PIT_CONTEXT,
SearchService.MAX_PIT_KEEPALIVE_SETTING,
SearchService.MAX_AGGREGATION_REWRITE_FILTERS,
CreatePitController.PIT_INIT_KEEP_ALIVE,
Node.WRITE_PORTS_FILE_SETTING,
Node.NODE_NAME_SETTING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@
import java.util.function.LongSupplier;

import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING;
import static org.opensearch.search.SearchService.MAX_AGGREGATION_REWRITE_FILTERS;

/**
* The main search context used during search phase
Expand Down Expand Up @@ -185,6 +186,7 @@ final class DefaultSearchContext extends SearchContext {
private final Function<SearchSourceBuilder, InternalAggregation.ReduceContextBuilder> requestToAggReduceContextBuilder;
private final boolean concurrentSearchSettingsEnabled;
private final SetOnce<Boolean> requestShouldUseConcurrentSearch = new SetOnce<>();
private final int maxAggRewriteFilters;

DefaultSearchContext(
ReaderContext readerContext,
Expand Down Expand Up @@ -238,6 +240,8 @@ final class DefaultSearchContext extends SearchContext {
queryBoost = request.indexBoost();
this.lowLevelCancellation = lowLevelCancellation;
this.requestToAggReduceContextBuilder = requestToAggReduceContextBuilder;

this.maxAggRewriteFilters = evaluateFilterRewriteSetting();
}

@Override
Expand Down Expand Up @@ -977,4 +981,15 @@ public int getTargetMaxSliceCount() {
return clusterService.getClusterSettings().get(SearchService.CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_SETTING);
}

@Override
public int maxAggRewriteFilters() {
return maxAggRewriteFilters;
}

private int evaluateFilterRewriteSetting() {
if (clusterService != null) {
return clusterService.getClusterSettings().get(MAX_AGGREGATION_REWRITE_FILTERS);
}
return 0;
}
}
9 changes: 9 additions & 0 deletions server/src/main/java/org/opensearch/search/SearchService.java
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,15 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
Property.NodeScope
);

// value 0 means rewrite filters optimization in aggregations will be disabled
public static final Setting<Integer> MAX_AGGREGATION_REWRITE_FILTERS = Setting.intSetting(
"search.max_aggregation_rewrite_filters",
72,
0,
Property.Dynamic,
Property.NodeScope
);

public static final int DEFAULT_SIZE = 10;
public static final int DEFAULT_FROM = 0;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ private FastFilterRewriteHelper() {}

private static final Logger logger = LogManager.getLogger(FastFilterRewriteHelper.class);

private static final int MAX_NUM_FILTER_BUCKETS = 1024;
private static final Map<Class<?>, Function<Query, Query>> queryWrappers;

// Initialize the wrapper map for unwrapping the query
Expand Down Expand Up @@ -191,8 +190,9 @@ private static Weight[] createFilterForAggregations(
int bucketCount = 0;
while (roundedLow <= fieldType.convertNanosToMillis(high)) {
bucketCount++;
if (bucketCount > MAX_NUM_FILTER_BUCKETS) {
logger.debug("Max number of filters reached [{}], skip the fast filter optimization", MAX_NUM_FILTER_BUCKETS);
int maxNumFilterBuckets = context.maxAggRewriteFilters();
if (bucketCount > maxNumFilterBuckets) {
logger.debug("Max number of filters reached [{}], skip the fast filter optimization", maxNumFilterBuckets);
return null;
}
// Below rounding is needed as the interval could return in
Expand Down Expand Up @@ -272,6 +272,8 @@ public void setAggregationType(AggregationType aggregationType) {
}

public boolean isRewriteable(final Object parent, final int subAggLength) {
if (context.maxAggRewriteFilters() == 0) return false;

boolean rewriteable = aggregationType.isRewriteable(parent, subAggLength);
logger.debug("Fast filter rewriteable: {} for shard {}", rewriteable, context.indexShard().shardId());
this.rewriteable = rewriteable;
Expand Down Expand Up @@ -588,8 +590,9 @@ private static long[][] createRangesFromAgg(
int bucketCount = 0;
while (roundedLow <= fieldType.convertNanosToMillis(high)) {
bucketCount++;
if (bucketCount > MAX_NUM_FILTER_BUCKETS) {
logger.debug("Max number of filters reached [{}], skip the fast filter optimization", MAX_NUM_FILTER_BUCKETS);
int maxNumFilterBuckets = context.maxAggRewriteFilters();
if (bucketCount > maxNumFilterBuckets) {
logger.debug("Max number of filters reached [{}], skip the fast filter optimization", maxNumFilterBuckets);
return null;
}
// Below rounding is needed as the interval could return in
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -489,4 +489,8 @@ public String toString() {
public abstract boolean shouldUseTimeSeriesDescSortOptimization();

public abstract int getTargetMaxSliceCount();

public int maxAggRewriteFilters() {
return 0;
}
}

0 comments on commit cadf974

Please sign in to comment.