Skip to content

Commit

Permalink
Make dependency checker stricter for binary plans
Browse files Browse the repository at this point in the history
  • Loading branch information
alex-spies committed Dec 10, 2024
1 parent c5116f1 commit 0d0ecfb
Show file tree
Hide file tree
Showing 8 changed files with 78 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import org.elasticsearch.xpack.esql.core.expression.Attribute;
import org.elasticsearch.xpack.esql.core.expression.Expressions;
import org.elasticsearch.xpack.esql.optimizer.rules.PlanConsistencyChecker;
import org.elasticsearch.xpack.esql.plan.physical.AggregateExec;
import org.elasticsearch.xpack.esql.plan.physical.FieldExtractExec;
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import org.elasticsearch.xpack.esql.core.expression.AttributeSet;
import org.elasticsearch.xpack.esql.core.expression.NameId;
import org.elasticsearch.xpack.esql.plan.QueryPlan;
import org.elasticsearch.xpack.esql.plan.logical.BinaryPlan;
import org.elasticsearch.xpack.esql.plan.physical.BinaryExec;

import java.util.HashSet;
import java.util.Set;
Expand All @@ -26,13 +28,26 @@ public class PlanConsistencyChecker<P extends QueryPlan<P>> {
* {@link org.elasticsearch.xpack.esql.common.Failure Failure}s to the {@link Failures} object.
*/
public void checkPlan(P p, Failures failures) {
AttributeSet refs = p.references();
AttributeSet input = p.inputSet();
AttributeSet missing = refs.subtract(input);
// TODO: for Joins, we should probably check if the required fields from the left child are actually in the left child, not
// just any child (and analogously for the right child).
if (missing.isEmpty() == false) {
failures.add(fail(p, "Plan [{}] optimized incorrectly due to missing references {}", p.nodeString(), missing));
if (p instanceof BinaryPlan binaryPlan) {
checkMissing(p, binaryPlan.leftReferences(), binaryPlan.left().outputSet(), "missing references from left hand side", failures);
checkMissing(
p,
binaryPlan.rightReferences(),
binaryPlan.right().outputSet(),
"missing references from right hand side",
failures
);
} else if (p instanceof BinaryExec binaryExec) {
checkMissing(p, binaryExec.leftReferences(), binaryExec.left().outputSet(), "missing references from left hand side", failures);
checkMissing(
p,
binaryExec.rightReferences(),
binaryExec.right().outputSet(),
"missing references from right hand side",
failures
);
} else {
checkMissing(p, p.references(), p.inputSet(), "missing references", failures);
}

Set<String> outputAttributeNames = new HashSet<>();
Expand All @@ -45,4 +60,11 @@ public void checkPlan(P p, Failures failures) {
}
}
}

private void checkMissing(P plan, AttributeSet references, AttributeSet input, String detailErrorMessage, Failures failures) {
AttributeSet missing = references.subtract(input);
if (missing.isEmpty() == false) {
failures.add(fail(plan, "Plan [{}] optimized incorrectly due to {} [{}]", plan.nodeString(), detailErrorMessage, missing));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
*/
package org.elasticsearch.xpack.esql.plan.logical;

import org.elasticsearch.xpack.esql.core.expression.AttributeSet;
import org.elasticsearch.xpack.esql.core.tree.Source;

import java.util.Arrays;
Expand All @@ -30,6 +31,10 @@ public LogicalPlan right() {
return right;
}

public abstract AttributeSet leftReferences();

public abstract AttributeSet rightReferences();

@Override
public final BinaryPlan replaceChildren(List<LogicalPlan> newChildren) {
return replaceChildren(newChildren.get(0), newChildren.get(1));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.xpack.esql.core.expression.Attribute;
import org.elasticsearch.xpack.esql.core.expression.AttributeSet;
import org.elasticsearch.xpack.esql.core.expression.Expressions;
import org.elasticsearch.xpack.esql.core.expression.ReferenceAttribute;
import org.elasticsearch.xpack.esql.core.tree.NodeInfo;
import org.elasticsearch.xpack.esql.core.tree.Source;
Expand Down Expand Up @@ -97,6 +98,16 @@ public List<Attribute> output() {
return lazyOutput;
}

@Override
public AttributeSet leftReferences() {
return Expressions.references(config().leftFields());
}

@Override
public AttributeSet rightReferences() {
return Expressions.references(config().rightFields());
}

public List<Attribute> rightOutputFields() {
AttributeSet leftInputs = left().outputSet();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,9 @@ public List<Attribute> output() {

@Override
protected AttributeSet computeReferences() {
return mode.isInputPartial() ? new AttributeSet(intermediateAttributes) : Aggregate.computeReferences(aggregates, groupings).subtract(new AttributeSet(ordinalAttributes()));
return mode.isInputPartial()
? new AttributeSet(intermediateAttributes)
: Aggregate.computeReferences(aggregates, groupings).subtract(new AttributeSet(ordinalAttributes()));
}

/** Returns the attributes that can be loaded from ordinals -- no explicit extraction is needed */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
package org.elasticsearch.xpack.esql.plan.physical;

import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.xpack.esql.core.expression.AttributeSet;
import org.elasticsearch.xpack.esql.core.tree.Source;

import java.io.IOException;
Expand Down Expand Up @@ -40,6 +41,10 @@ public PhysicalPlan right() {
return right;
}

public abstract AttributeSet leftReferences();

public abstract AttributeSet rightReferences();

@Override
public void writeTo(StreamOutput out) throws IOException {
Source.EMPTY.writeTo(out);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,16 @@ protected AttributeSet computeReferences() {
return Expressions.references(leftFields);
}

@Override
public AttributeSet leftReferences() {
return Expressions.references(leftFields);
}

@Override
public AttributeSet rightReferences() {
return Expressions.references(rightFields);
}

@Override
public HashJoinExec replaceChildren(PhysicalPlan left, PhysicalPlan right) {
return new HashJoinExec(source(), left, right, matchFields, leftFields, rightFields, output);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,21 @@ protected AttributeSet computeReferences() {
return Expressions.references(leftFields);
}

@Override
public AttributeSet leftReferences() {
return Expressions.references(leftFields);
}

@Override
public AttributeSet rightReferences() {
// TODO: currently it's hard coded that we add all fields from the lookup index. But the output we "officially" get from the right
// hand side is inconsistent:
// - After logical optimization, there's a FragmentExec with an EsRelation on the right hand side with all the fields.
// - After local physical optimization, there's just an EsQueryExec here, with no fields other than _doc mentioned and we don't
// insert field extractions in the plan, either.
return AttributeSet.EMPTY;
}

@Override
public LookupJoinExec replaceChildren(PhysicalPlan left, PhysicalPlan right) {
return new LookupJoinExec(source(), left, right, leftFields, rightFields, addedFields);
Expand Down

0 comments on commit 0d0ecfb

Please sign in to comment.