Skip to content

Commit

Permalink
ES|QL: fix LIMIT pushdown past MV_EXPAND
Browse files Browse the repository at this point in the history
  • Loading branch information
luigidellaquila committed Oct 25, 2024
1 parent a281d62 commit 65096cb
Show file tree
Hide file tree
Showing 11 changed files with 209 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -324,3 +324,64 @@ from employees | where emp_no == 10001 | keep * | mv_expand first_name;
avg_worked_seconds:long | birth_date:date | emp_no:integer | first_name:keyword | gender:keyword | height:double | height.float:double | height.half_float:double | height.scaled_float:double | hire_date:date | is_rehired:boolean | job_positions:keyword | languages:integer | languages.byte:integer | languages.long:long | languages.short:integer | last_name:keyword | salary:integer | salary_change:double | salary_change.int:integer | salary_change.keyword:keyword | salary_change.long:long | still_hired:boolean
268728049 | 1953-09-02T00:00:00.000Z | 10001 | Georgi | M | 2.03 | 2.0299999713897705 | 2.029296875 | 2.0300000000000002 | 1986-06-26T00:00:00.000Z | [false, true] | [Accountant, Senior Python Developer] | 2 | 2 | 2 | 2 | Facello | 57305 | 1.19 | 1 | 1.19 | 1 | true
;


// see https://github.com/elastic/elasticsearch/issues/102061
sortMvExpand
required_capability: fix_mv_expand_limit_pushdown
row a = 1 | sort a | mv_expand a;

a:integer
1
;


// see https://github.com/elastic/elasticsearch/issues/102061
limitSortMvExpand
required_capability: fix_mv_expand_limit_pushdown
row a = 1 | limit 1 | sort a | mv_expand a;

a:integer
1
;


// see https://github.com/elastic/elasticsearch/issues/102061
limitSortMultipleMvExpand
required_capability: fix_mv_expand_limit_pushdown
row a = [1, 2, 3, 4, 5], b = 2, c = 3 | sort a | mv_expand a | mv_expand b | mv_expand c | limit 3;

a:integer | b:integer | c:integer
1 | 2 | 3
2 | 2 | 3
3 | 2 | 3
;


multipleLimitSortMultipleMvExpand
required_capability: fix_mv_expand_limit_pushdown
row a = [1, 2, 3, 4, 5], b = 2, c = 3 | sort a | mv_expand a | limit 2 | mv_expand b | mv_expand c | limit 3;

a:integer | b:integer | c:integer
1 | 2 | 3
2 | 2 | 3
;


multipleLimitSortMultipleMvExpand2
required_capability: fix_mv_expand_limit_pushdown
row a = [1, 2, 3, 4, 5], b = 2, c = 3 | sort a | mv_expand a | limit 3 | mv_expand b | mv_expand c | limit 2;

a:integer | b:integer | c:integer
1 | 2 | 3
2 | 2 | 3
;


//see https://github.com/elastic/elasticsearch/issues/102084
whereMvExpand
required_capability: fix_mv_expand_limit_pushdown
row a = 1, b = -15 | where b > 3 | mv_expand b;

a:integer | b:integer
;
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,12 @@ public enum Cap {
/**
* Support simplified syntax for named parameters for field and function names.
*/
NAMED_PARAMETER_FOR_FIELD_AND_FUNCTION_NAMES_SIMPLIFIED_SYNTAX(Build.current().isSnapshot());
NAMED_PARAMETER_FOR_FIELD_AND_FUNCTION_NAMES_SIMPLIFIED_SYNTAX(Build.current().isSnapshot()),

/**
* Fix pushdown of LIMIT past MV_EXPAND
*/
FIX_MV_EXPAND_LIMIT_PUSHDOWN;

private final boolean enabled;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,8 @@ private LogicalPlan resolveMvExpand(MvExpand p, List<Attribute> childrenOutput)
resolved,
resolved.resolved()
? new ReferenceAttribute(resolved.source(), resolved.name(), resolved.dataType(), resolved.nullable(), null, false)
: resolved
: resolved,
p.limit()
);
}
return p;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,14 @@ public LogicalPlan rule(Limit limit) {
} else if (limit.child() instanceof UnaryPlan unary) {
if (unary instanceof Eval || unary instanceof Project || unary instanceof RegexExtract || unary instanceof Enrich) {
return unary.replaceChild(limit.replaceChild(unary.child()));
} else if (unary instanceof MvExpand mvx) {
var limitSource = limit.limit();
var limitVal = (int) limitSource.fold();
Integer mvxLimit = mvx.limit();
if (mvxLimit == null || mvxLimit < 0 || mvxLimit > limitVal) {
mvx = new MvExpand(mvx.source(), mvx.child(), mvx.target(), mvx.expanded(), limitVal);
}
return mvx.replaceChild(limit.replaceChild(mvx.child()));
}
// check if there's a 'visible' descendant limit lower than the current one
// and if so, align the current limit since it adds no value
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ public PlanFactory visitDissectCommand(EsqlBaseParser.DissectCommandContext ctx)
public PlanFactory visitMvExpandCommand(EsqlBaseParser.MvExpandCommandContext ctx) {
UnresolvedAttribute field = visitQualifiedName(ctx.qualifiedName());
Source src = source(ctx);
return child -> new MvExpand(src, child, field, new UnresolvedAttribute(src, field.name()));
return child -> new MvExpand(src, child, field, new UnresolvedAttribute(src, field.name()), null);

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,21 +27,24 @@ public class MvExpand extends UnaryPlan {

private final NamedExpression target;
private final Attribute expanded;
private final Integer limit;

private List<Attribute> output;

public MvExpand(Source source, LogicalPlan child, NamedExpression target, Attribute expanded) {
public MvExpand(Source source, LogicalPlan child, NamedExpression target, Attribute expanded, Integer limit) {
super(source, child);
this.target = target;
this.expanded = expanded;
this.limit = limit;
}

private MvExpand(StreamInput in) throws IOException {
this(
Source.readFrom((PlanStreamInput) in),
in.readNamedWriteable(LogicalPlan.class),
in.readNamedWriteable(NamedExpression.class),
in.readNamedWriteable(Attribute.class)
in.readNamedWriteable(Attribute.class),
null // we only need this on the coordinator
);
}

Expand All @@ -51,6 +54,7 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeNamedWriteable(child());
out.writeNamedWriteable(target());
out.writeNamedWriteable(expanded());
assert limit == null;
}

@Override
Expand Down Expand Up @@ -78,6 +82,10 @@ public Attribute expanded() {
return expanded;
}

public Integer limit() {
return limit;
}

@Override
protected AttributeSet computeReferences() {
return target.references();
Expand All @@ -94,7 +102,7 @@ public boolean expressionsResolved() {

@Override
public UnaryPlan replaceChild(LogicalPlan newChild) {
return new MvExpand(source(), newChild, target, expanded);
return new MvExpand(source(), newChild, target, expanded, limit);
}

@Override
Expand All @@ -107,19 +115,21 @@ public List<Attribute> output() {

@Override
protected NodeInfo<? extends LogicalPlan> info() {
return NodeInfo.create(this, MvExpand::new, child(), target, expanded);
return NodeInfo.create(this, MvExpand::new, child(), target, expanded, limit);
}

@Override
public int hashCode() {
return Objects.hash(super.hashCode(), target, expanded);
return Objects.hash(super.hashCode(), target, expanded, limit);
}

@Override
public boolean equals(Object obj) {
if (false == super.equals(obj)) {
return false;
}
return Objects.equals(target, ((MvExpand) obj).target) && Objects.equals(expanded, ((MvExpand) obj).expanded);
return Objects.equals(target, ((MvExpand) obj).target)
&& Objects.equals(expanded, ((MvExpand) obj).expanded)
&& Objects.equals(limit, ((MvExpand) obj).limit);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
import org.elasticsearch.compute.aggregation.AggregatorMode;
import org.elasticsearch.xpack.esql.EsqlIllegalArgumentException;
import org.elasticsearch.xpack.esql.core.expression.Attribute;
import org.elasticsearch.xpack.esql.core.expression.Literal;
import org.elasticsearch.xpack.esql.core.tree.Source;
import org.elasticsearch.xpack.esql.core.type.DataType;
import org.elasticsearch.xpack.esql.expression.function.EsqlFunctionRegistry;
import org.elasticsearch.xpack.esql.plan.logical.Aggregate;
import org.elasticsearch.xpack.esql.plan.logical.BinaryPlan;
Expand Down Expand Up @@ -186,7 +189,11 @@ private PhysicalPlan map(UnaryPlan p, PhysicalPlan child) {
}

if (p instanceof MvExpand mvExpand) {
return new MvExpandExec(mvExpand.source(), map(mvExpand.child()), mvExpand.target(), mvExpand.expanded());
MvExpandExec result = new MvExpandExec(mvExpand.source(), map(mvExpand.child()), mvExpand.target(), mvExpand.expanded());
if (mvExpand.limit() != null && mvExpand.limit() >= 0) {
return new LimitExec(result.source(), result, new Literal(Source.EMPTY, mvExpand.limit(), DataType.INTEGER));
}
return result;
}

//
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
import static org.elasticsearch.xpack.esql.EsqlTestUtils.withDefaultLimitWarning;
import static org.elasticsearch.xpack.esql.core.tree.Source.EMPTY;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
Expand Down Expand Up @@ -193,11 +194,10 @@ public void testMissingFieldInSort() {

/**
* Expects
* EsqlProject[[first_name{f}#6]]
* \_Limit[1000[INTEGER]]
* \_MvExpand[last_name{f}#9,last_name{r}#15]
* \_Limit[1000[INTEGER]]
* \_EsRelation[test][_meta_field{f}#11, emp_no{f}#5, first_name{f}#6, ge..]
* EsqlProject[[first_name{f}#9, last_name{r}#18]]
* \_MvExpand[last_name{f}#12,last_name{r}#18,1000]
* \_Limit[1000[INTEGER]]
* \_EsRelation[test][_meta_field{f}#14, emp_no{f}#8, first_name{f}#9, ge..]
*/
public void testMissingFieldInMvExpand() {
var plan = plan("""
Expand All @@ -213,11 +213,8 @@ public void testMissingFieldInMvExpand() {
var projections = project.projections();
assertThat(Expressions.names(projections), contains("first_name", "last_name"));

var limit = as(project.child(), Limit.class);
// MvExpand cannot be optimized (yet) because the target NamedExpression cannot be replaced with a NULL literal
// https://github.com/elastic/elasticsearch/issues/109974
// See LocalLogicalPlanOptimizer.ReplaceMissingFieldWithNull
var mvExpand = as(limit.child(), MvExpand.class);
var mvExpand = as(project.child(), MvExpand.class);
assertThat(mvExpand.limit(), equalTo(1000));
var limit2 = as(mvExpand.child(), Limit.class);
as(limit2.child(), EsRelation.class);
}
Expand Down
Loading

0 comments on commit 65096cb

Please sign in to comment.