Skip to content

Commit

Permalink
ESQL: fix COUNT filter pushdown (#117503) (#117648)
Browse files Browse the repository at this point in the history
* ESQL: fix COUNT filter pushdown (#117503)

If `COUNT` agg has a filter applied, this must also be push down to source. This currently does not happen, but this issue is masked currently by two factors:
* a logical optimisation, `ExtractAggregateCommonFilter` that extracts the filter out of the STATS entirely (and pushes it to source then from a `WHERE`);
* the phisical plan optimisation implementing the  push down, `PushStatsToSource`, currently only applies if there's just one agg function to push down.

However, this fix needs to be applied since:
* it's still present in versions prior to `ExtractAggregateCommonFilter` introduction;
* the defect might resurface when the restriction in `PushStatsToSource` is lifted.

Fixes #115522.

(cherry picked from commit 560e0c5)

* revert merge artefact

* 8.x adaptation
  • Loading branch information
bpintea authored Nov 28, 2024
1 parent d90b4c7 commit 9dafb30
Show file tree
Hide file tree
Showing 5 changed files with 140 additions and 4 deletions.
6 changes: 6 additions & 0 deletions docs/changelog/117503.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 117503
summary: Fix COUNT filter pushdown
area: ES|QL
type: bug
issues:
- 115522
Original file line number Diff line number Diff line change
Expand Up @@ -2678,6 +2678,57 @@ c2:l |c2_f:l |m2:i |m2_f:i |c:l
1 |1 |5 |5 |21
;

simpleCountOnFieldWithFilteringAndNoGrouping
required_capability: per_agg_filtering
from employees
| stats c1 = count(emp_no) where emp_no < 10042
;

c1:long
41
;

simpleCountOnFieldWithFilteringOnDifferentFieldAndNoGrouping
required_capability: per_agg_filtering
from employees
| stats c1 = count(hire_date) where emp_no < 10042
;

c1:long
41
;

simpleCountOnStarWithFilteringAndNoGrouping
required_capability: per_agg_filtering
from employees
| stats c1 = count(*) where emp_no < 10042
;

c1:long
41
;

simpleCountWithFilteringAndNoGroupingOnFieldWithNulls
required_capability: per_agg_filtering
from employees
| stats c1 = count(birth_date) where emp_no <= 10050
;

c1:long
40
;


simpleCountWithFilteringAndNoGroupingOnFieldWithMultivalues
required_capability: per_agg_filtering
from employees
| stats c1 = count(job_positions) where emp_no <= 10003
;

c1:long
3
;

commonFilterExtractionWithAliasing
required_capability: per_agg_filtering
from employees
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.elasticsearch.xpack.esql.core.expression.Expression;
import org.elasticsearch.xpack.esql.core.expression.FieldAttribute;
import org.elasticsearch.xpack.esql.core.expression.NamedExpression;
import org.elasticsearch.xpack.esql.core.util.Queries;
import org.elasticsearch.xpack.esql.core.util.StringUtils;
import org.elasticsearch.xpack.esql.expression.function.aggregate.Count;
import org.elasticsearch.xpack.esql.optimizer.LocalPhysicalOptimizerContext;
Expand All @@ -25,12 +26,15 @@
import org.elasticsearch.xpack.esql.plan.physical.EsStatsQueryExec;
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
import org.elasticsearch.xpack.esql.planner.AbstractPhysicalOperationProviders;
import org.elasticsearch.xpack.esql.planner.PlannerUtils;

import java.util.ArrayList;
import java.util.List;

import static java.util.Arrays.asList;
import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;
import static org.elasticsearch.xpack.esql.optimizer.rules.physical.local.PushFiltersToSource.canPushToSource;
import static org.elasticsearch.xpack.esql.plan.physical.EsStatsQueryExec.StatsType.COUNT;

/**
Expand Down Expand Up @@ -98,6 +102,13 @@ private Tuple<List<Attribute>, List<EsStatsQueryExec.Stat>> pushableStats(
}
}
if (fieldName != null) {
if (count.hasFilter()) {
if (canPushToSource(count.filter()) == false) {
return null; // can't push down
}
var countFilter = PlannerUtils.TRANSLATOR_HANDLER.asQuery(count.filter());
query = Queries.combine(Queries.Clause.MUST, asList(countFilter.asBuilder(), query));
}
return new EsStatsQueryExec.Stat(fieldName, COUNT, query);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@
import org.elasticsearch.xpack.esql.expression.function.EsqlFunctionRegistry;
import org.elasticsearch.xpack.esql.index.EsIndex;
import org.elasticsearch.xpack.esql.index.IndexResolution;
import org.elasticsearch.xpack.esql.optimizer.rules.logical.ExtractAggregateCommonFilter;
import org.elasticsearch.xpack.esql.plan.logical.Enrich;
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
import org.elasticsearch.xpack.esql.plan.physical.AggregateExec;
import org.elasticsearch.xpack.esql.plan.physical.EsQueryExec;
import org.elasticsearch.xpack.esql.plan.physical.EsStatsQueryExec;
Expand All @@ -58,6 +60,7 @@
import org.elasticsearch.xpack.esql.planner.FilterTests;
import org.elasticsearch.xpack.esql.plugin.QueryPragmas;
import org.elasticsearch.xpack.esql.querydsl.query.SingleValueQuery;
import org.elasticsearch.xpack.esql.rule.Rule;
import org.elasticsearch.xpack.esql.session.Configuration;
import org.elasticsearch.xpack.esql.stats.Metrics;
import org.elasticsearch.xpack.esql.stats.SearchContextStats;
Expand All @@ -66,9 +69,11 @@
import org.junit.Before;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.function.Function;

import static java.util.Arrays.asList;
import static org.elasticsearch.compute.aggregation.AggregatorMode.FINAL;
Expand Down Expand Up @@ -375,6 +380,67 @@ public void testMultiCountAllWithFilter() {
assertThat(plan.anyMatch(EsQueryExec.class::isInstance), is(true));
}

@SuppressWarnings("unchecked")
public void testSingleCountWithStatsFilter() {
// an optimizer that filters out the ExtractAggregateCommonFilter rule
var logicalOptimizer = new LogicalPlanOptimizer(new LogicalOptimizerContext(config)) {
@Override
protected List<Batch<LogicalPlan>> batches() {
var oldBatches = super.batches();
List<Batch<LogicalPlan>> newBatches = new ArrayList<>(oldBatches.size());
for (var batch : oldBatches) {
List<Rule<?, LogicalPlan>> rules = new ArrayList<>(List.of(batch.rules()));
rules.removeIf(r -> r instanceof ExtractAggregateCommonFilter);
newBatches.add(batch.with(rules.toArray(Rule[]::new)));
}
return newBatches;
}
};
var analyzer = makeAnalyzer("mapping-default.json", new EnrichResolution());
var plannerOptimizer = new TestPlannerOptimizer(config, analyzer, logicalOptimizer);
var plan = plannerOptimizer.plan("""
from test
| stats c = count(hire_date) where emp_no < 10042
""", IS_SV_STATS);

var limit = as(plan, LimitExec.class);
var agg = as(limit.child(), AggregateExec.class);
assertThat(agg.getMode(), is(FINAL));
var exchange = as(agg.child(), ExchangeExec.class);
var esStatsQuery = as(exchange.child(), EsStatsQueryExec.class);

Function<String, String> compact = s -> s.replaceAll("\\s+", "");
assertThat(compact.apply(esStatsQuery.query().toString()), is(compact.apply("""
{
"bool": {
"must": [
{
"exists": {
"field": "hire_date",
"boost": 1.0
}
},
{
"esql_single_value": {
"field": "emp_no",
"next": {
"range": {
"emp_no": {
"lt": 10042,
"boost": 1.0
}
}
},
"source": "emp_no < 10042@2:36"
}
}
],
"boost": 1.0
}
}
""")));
}

/**
* Expecting
* LimitExec[1000[INTEGER]]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@

import org.elasticsearch.xpack.esql.EsqlTestUtils;
import org.elasticsearch.xpack.esql.analysis.Analyzer;
import org.elasticsearch.xpack.esql.expression.function.EsqlFunctionRegistry;
import org.elasticsearch.xpack.esql.parser.EsqlParser;
import org.elasticsearch.xpack.esql.plan.physical.EstimatesRowSize;
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
Expand All @@ -23,19 +22,22 @@ public class TestPlannerOptimizer {
private final Analyzer analyzer;
private final LogicalPlanOptimizer logicalOptimizer;
private final PhysicalPlanOptimizer physicalPlanOptimizer;
private final EsqlFunctionRegistry functionRegistry;
private final Mapper mapper;
private final Configuration config;

public TestPlannerOptimizer(Configuration config, Analyzer analyzer) {
this(config, analyzer, new LogicalPlanOptimizer(new LogicalOptimizerContext(config)));
}

public TestPlannerOptimizer(Configuration config, Analyzer analyzer, LogicalPlanOptimizer logicalOptimizer) {
this.analyzer = analyzer;
this.config = config;
this.logicalOptimizer = logicalOptimizer;

parser = new EsqlParser();
logicalOptimizer = new LogicalPlanOptimizer(new LogicalOptimizerContext(config));
physicalPlanOptimizer = new PhysicalPlanOptimizer(new PhysicalOptimizerContext(config));
functionRegistry = new EsqlFunctionRegistry();
mapper = new Mapper();

}

public PhysicalPlan plan(String query) {
Expand Down

0 comments on commit 9dafb30

Please sign in to comment.