diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/ast/tree/Lookup.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/ast/tree/Lookup.java index db398d0ce..06b3370a9 100644 --- a/ppl-spark-integration/src/main/java/org/opensearch/sql/ast/tree/Lookup.java +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/ast/tree/Lookup.java @@ -7,6 +7,7 @@ import com.google.common.collect.ImmutableList; import org.opensearch.sql.ast.AbstractNodeVisitor; +import org.opensearch.sql.ast.expression.Alias; import org.opensearch.sql.ast.expression.Argument; import org.opensearch.sql.ast.expression.Map; @@ -16,21 +17,21 @@ public class Lookup extends UnresolvedPlan { private UnresolvedPlan child; - private final String indexName; + private final String tableName; private final List matchFieldList; private final List options; - private final List copyFieldList; + private final List copyFieldList; - public Lookup(UnresolvedPlan child, String indexName, List matchFieldList, List options, List copyFieldList) { + public Lookup(UnresolvedPlan child, String tableName, List matchFieldList, List options, List copyFieldList) { this.child = child; - this.indexName = indexName; + this.tableName = tableName; this.matchFieldList = matchFieldList; this.options = options; this.copyFieldList = copyFieldList; } - public Lookup(String indexName, List matchFieldList, List options, List copyFieldList) { - this.indexName = indexName; + public Lookup(String tableName, List matchFieldList, List options, List copyFieldList) { + this.tableName = tableName; this.matchFieldList = matchFieldList; this.options = options; this.copyFieldList = copyFieldList; @@ -42,8 +43,8 @@ public Lookup attach(UnresolvedPlan child) { return this; } - public String getIndexName() { - return indexName; + public String getTableName() { + return tableName; } public List getMatchFieldList() { @@ -54,7 +55,7 @@ public List getOptions() { return options; } - public List getCopyFieldList() { + public List getCopyFieldList() { return copyFieldList; } diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java index 6d4f014d1..40609dd86 100644 --- a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java @@ -7,6 +7,7 @@ import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute$; import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation; +import org.apache.spark.sql.catalyst.analysis.UnresolvedStar; import org.apache.spark.sql.catalyst.analysis.UnresolvedStar$; import org.apache.spark.sql.catalyst.expressions.EqualTo; import org.apache.spark.sql.catalyst.expressions.Expression; @@ -68,6 +69,7 @@ import scala.collection.Seq; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Objects; import java.util.Optional; @@ -280,17 +282,26 @@ public LogicalPlan visitLookup(Lookup node, CatalystPlanContext context) { //TODO: not sure how to implement appendonly Boolean appendonly = (Boolean) node.getOptions().get(0).getValue().getValue(); - LogicalPlan right = new UnresolvedRelation(seq(of(node.getIndexName())), CaseInsensitiveStringMap.empty(), false); + LogicalPlan lookupRelation = new UnresolvedRelation(seq(of(node.getTableName())), CaseInsensitiveStringMap.empty(), false); + org.apache.spark.sql.catalyst.plans.logical.Project lookupProject; + + List lookupRelationFields = buildLookupTableFieldList(node, context); + if (! lookupRelationFields.isEmpty()) { + lookupProject = new org.apache.spark.sql.catalyst.plans.logical.Project(seq(lookupRelationFields), lookupRelation); + } else { + lookupProject = new org.apache.spark.sql.catalyst.plans.logical.Project(seq(of(new UnresolvedStar(Option.empty()))), lookupRelation); + } + //TODO: use node.getCopyFieldList() to prefilter the right logical plan //and return only the fields listed there. rename fields when requested - Expression joinCondition = visitFieldMap(node.getMatchFieldList(), source.getTableQualifiedName().toString(), node.getIndexName(), context); + Expression joinCondition = buildLookupTableJoinCondition(node.getMatchFieldList(), source.getTableQualifiedName().toString(), node.getTableName(), context); return context.apply(p -> new Join( p, //original query (left) - right, //lookup query (right) + lookupProject, //lookup query (right) JoinType.apply("left"), //https://spark.apache.org/docs/latest/sql-ref-syntax-qry-select-join.html @@ -300,6 +311,18 @@ public LogicalPlan visitLookup(Lookup node, CatalystPlanContext context) { )); } + private List buildLookupTableFieldList(Lookup node, CatalystPlanContext context) { + if (node.getCopyFieldList().isEmpty()) { + return Collections.emptyList(); + } else { + //todo should we also append fields used to match records - node.getMatchFieldList()? + List copyFields = node.getCopyFieldList().stream() + .map(copyField -> (NamedExpression) expressionAnalyzer.visitAlias(copyField, context)) + .collect(Collectors.toList()); + return copyFields; + } + } + private org.opensearch.sql.ast.expression.Field prefixField(List prefixParts, UnresolvedExpression field) { org.opensearch.sql.ast.expression.Field in = (org.opensearch.sql.ast.expression.Field) field; org.opensearch.sql.ast.expression.QualifiedName inq = (org.opensearch.sql.ast.expression.QualifiedName) in.getField(); @@ -307,15 +330,19 @@ private org.opensearch.sql.ast.expression.Field prefixField(List prefixP return new org.opensearch.sql.ast.expression.Field(new org.opensearch.sql.ast.expression.QualifiedName(finalParts), in.getFieldArgs()); } - private Expression visitFieldMap(List fieldMap, String sourceTableName, String lookupTableName, CatalystPlanContext context) { + private Expression buildLookupTableJoinCondition(List fieldMap, String sourceTableName, String lookupTableName, CatalystPlanContext context) { int size = fieldMap.size(); List allEqlExpressions = new ArrayList<>(size); for (Map map : fieldMap) { - Expression origin = visitExpression(prefixField(of(sourceTableName.split("\\.")),map.getOrigin()), context); - Expression target = visitExpression(prefixField(of(lookupTableName.split("\\.")),map.getTarget()), context); + //todo do we need to run prefixField? match fields are anyway handled as qualifiedName? +// Expression origin = visitExpression(prefixField(of(sourceTableName.split("\\.")),map.getOrigin()), context); +// Expression target = visitExpression(prefixField(of(lookupTableName.split("\\.")),map.getTarget()), context); + Expression origin = visitExpression(map.getOrigin(), context); + Expression target = visitExpression(map.getTarget(), context); + //important context.retainAllNamedParseExpressions(e -> e); diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java index e4833b0d1..61be26287 100644 --- a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java @@ -44,7 +44,6 @@ import org.opensearch.sql.ppl.utils.ArgumentFactory; import java.util.ArrayList; -import java.util.Collections; import java.util.List; import java.util.Objects; import java.util.Optional; @@ -216,9 +215,9 @@ public UnresolvedPlan visitLookupCommand(OpenSearchPPLParser.LookupCommandContex ctx.copyFieldWithOptAs().stream() .map( ct -> - new Map( - evaluateFieldExpressionContext(ct.orignalCopyField), - evaluateFieldExpressionContext(ct.asCopyField, ct.orignalCopyField))) + new Alias( + ct.asCopyField == null? ct.orignalCopyField.getText() : ct.asCopyField.getText(), + evaluateFieldExpressionContext(ct.orignalCopyField))) .collect(Collectors.toList())); }