Skip to content

Commit

Permalink
Fix: reset the filter built at segment level for date histogram optim…
Browse files Browse the repository at this point in the history
…ization (opensearch-project#12267)


---------

Signed-off-by: bowenlan-amzn <[email protected]>
  • Loading branch information
bowenlan-amzn authored and Peter Alfonsi committed Mar 1, 2024
1 parent b30a341 commit 0023e61
Show file tree
Hide file tree
Showing 2 changed files with 117 additions and 12 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.search.aggregations.bucket;

import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;

import org.opensearch.action.index.IndexRequestBuilder;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.time.DateFormatter;
import org.opensearch.index.query.QueryBuilder;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.search.aggregations.bucket.histogram.DateHistogramInterval;
import org.opensearch.search.aggregations.bucket.histogram.Histogram;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.ParameterizedDynamicSettingsOpenSearchIntegTestCase;

import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

import static org.opensearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING;
import static org.opensearch.search.aggregations.AggregationBuilders.dateHistogram;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;

@OpenSearchIntegTestCase.SuiteScopeTestCase
public class FilterRewriteIT extends ParameterizedDynamicSettingsOpenSearchIntegTestCase {

// simulate segment level match all
private static final QueryBuilder QUERY = QueryBuilders.termQuery("match", true);
private static final Map<String, Long> expected = new HashMap<>();

public FilterRewriteIT(Settings dynamicSettings) {
super(dynamicSettings);
}

@ParametersFactory
public static Collection<Object[]> parameters() {
return Arrays.asList(
new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), false).build() },
new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), true).build() }
);
}

@Override
protected void setupSuiteScopeCluster() throws Exception {
assertAcked(client().admin().indices().prepareCreate("idx").get());

final int segmentCount = randomIntBetween(2, 10);
final Set<Long> longTerms = new HashSet();

final Map<String, Integer> dateTerms = new HashMap<>();
for (int i = 0; i < segmentCount; i++) {
final List<IndexRequestBuilder> indexRequests = new ArrayList<>();

long longTerm;
do {
longTerm = randomInt(segmentCount * 2);
} while (!longTerms.add(longTerm));
ZonedDateTime time = ZonedDateTime.of(2024, 1, ((int) longTerm % 20) + 1, 0, 0, 0, 0, ZoneOffset.UTC);
String dateTerm = DateFormatter.forPattern("yyyy-MM-dd").format(time);

final int frequency = randomBoolean() ? 1 : randomIntBetween(2, 20);
for (int j = 0; j < frequency; j++) {
indexRequests.add(
client().prepareIndex("idx")
.setSource(jsonBuilder().startObject().field("date", dateTerm).field("match", true).endObject())
);
}
expected.put(dateTerm + "T00:00:00.000Z", (long) frequency);

indexRandom(true, false, indexRequests);
}

ensureSearchable();
}

public void testMinDocCountOnDateHistogram() throws Exception {
final SearchResponse allResponse = client().prepareSearch("idx")
.setSize(0)
.setQuery(QUERY)
.addAggregation(dateHistogram("histo").field("date").dateHistogramInterval(DateHistogramInterval.DAY).minDocCount(0))
.get();

final Histogram allHisto = allResponse.getAggregations().get("histo");
Map<String, Long> results = new HashMap<>();
allHisto.getBuckets().forEach(bucket -> results.put(bucket.getKeyAsString(), bucket.getDocCount()));

for (Map.Entry<String, Long> entry : expected.entrySet()) {
assertEquals(entry.getValue(), results.get(entry.getKey()));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -269,12 +269,15 @@ public void buildFastFilter() throws IOException {
}
}

public void buildFastFilter(LeafReaderContext leaf) throws IOException {
assert filters == null : "Filters should only be built once, but they are already built";
this.filters = this.aggregationType.buildFastFilter(leaf, context);
/**
* Built filters for a segment
*/
public Weight[] buildFastFilter(LeafReaderContext leaf) throws IOException {
Weight[] filters = this.aggregationType.buildFastFilter(leaf, context);
if (filters != null) {
logger.debug("Fast filter built for shard {} segment {}", context.indexShard().shardId(), leaf.ord);
}
return filters;
}
}

Expand Down Expand Up @@ -340,7 +343,6 @@ public Weight[] buildFastFilter(LeafReaderContext leaf, SearchContext context) t

private Weight[] buildFastFilter(SearchContext context, long[] bounds) throws IOException {
bounds = processHardBounds(bounds);
logger.debug("Bounds are {} for shard {} with hard bound", bounds, context.indexShard().shardId());
if (bounds == null) {
return null;
}
Expand Down Expand Up @@ -447,8 +449,7 @@ public static boolean tryFastFilterAggregation(
fastFilterContext.context.indexShard().shardId(),
ctx.ord
);
fastFilterContext.buildFastFilter(ctx);
filters = fastFilterContext.filters;
filters = fastFilterContext.buildFastFilter(ctx);
if (filters == null) {
return false;
}
Expand Down Expand Up @@ -480,20 +481,17 @@ public static boolean tryFastFilterAggregation(
incrementDocCount.accept(bucketKey, counts[i]);
s++;
if (s > size) {
logger.debug("Fast filter optimization applied to composite aggregation with size {}", size);
return true;
break;
}
}
}

logger.debug("Fast filter optimization applied");
logger.debug("Fast filter optimization applied to shard {} segment {}", fastFilterContext.context.indexShard().shardId(), ctx.ord);
return true;
}

private static boolean segmentMatchAll(SearchContext ctx, LeafReaderContext leafCtx) throws IOException {
Weight weight = ctx.searcher().createWeight(ctx.query(), ScoreMode.COMPLETE_NO_SCORES, 1f);
assert weight != null;
int count = weight.count(leafCtx);
return count > 0 && count == leafCtx.reader().numDocs();
return weight != null && weight.count(leafCtx) == leafCtx.reader().numDocs();
}
}

0 comments on commit 0023e61

Please sign in to comment.