diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java index f54c8dd6d..44f8a6401 100644 --- a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java @@ -267,20 +267,17 @@ public LogicalPlan visitAppendCol(AppendCol node, CatalystPlanContext context) { final UnresolvedAttribute t1Attr = new UnresolvedAttribute(seq(TABLE_LHS, APPENDCOL_ID)); final UnresolvedAttribute t2Attr = new UnresolvedAttribute(seq(TABLE_RHS, APPENDCOL_ID)); final List fieldsToRemove = new ArrayList<>(List.of(t1Attr, t2Attr)); - final Node mainSearchNode = node.getChild().get(0); -// final Node mainSearchNode = visitFirstChild(node, context); final Node subSearchNode = node.getSubSearch(); - // Traverse to look for relation clause, then append it into the sub-search. - Relation relation = AppendColCatalystUtils.retrieveRelationClause(mainSearchNode); - AppendColCatalystUtils.appendRelationClause(node.getSubSearch(), relation); - // Apply an additional projection layer on main-search to provide natural order. -// LogicalPlan mainSearch = mainSearchNode.accept(this, context); - LogicalPlan mainSearch = mainSearchNode.accept(this, context); + LogicalPlan mainSearch = visitFirstChild(node, context); var mainSearchWithRowNumber = AppendColCatalystUtils.getRowNumStarProjection(context, mainSearch, TABLE_LHS); context.withSubqueryAlias(mainSearchWithRowNumber); + // Duplicate the relation clause from main-search to sub-search. + AppendColCatalystUtils.appendRelationClause(node.getSubSearch(), context.getRelations()); + + context.apply(left -> { // Apply an additional projection layer on sub-search to provide natural order. LogicalPlan subSearch = subSearchNode.accept(this, context); @@ -297,6 +294,7 @@ public LogicalPlan visitAppendCol(AppendCol node, CatalystPlanContext context) { Optional.of(new EqualTo(t1Attr, t2Attr)), new Join.JoinHint()); + // Remove the APPEND_ID and duplicated field on T1 if override option present. if (node.override) { List getoverridedlist = AppendColCatalystUtils.getOverridedList(subSearchWithRowNumber, TABLE_LHS); diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/utils/AppendColCatalystUtils.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/utils/AppendColCatalystUtils.java index f39f03f4d..b2b101257 100644 --- a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/utils/AppendColCatalystUtils.java +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/utils/AppendColCatalystUtils.java @@ -6,6 +6,7 @@ import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute; +import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation; import org.apache.spark.sql.catalyst.analysis.UnresolvedStar; import org.apache.spark.sql.catalyst.expressions.Attribute; import org.apache.spark.sql.catalyst.expressions.Expression; @@ -21,6 +22,8 @@ import org.apache.spark.sql.types.DataTypes; import org.apache.spark.unsafe.types.UTF8String; import org.opensearch.sql.ast.Node; +import org.opensearch.sql.ast.expression.QualifiedName; +import org.opensearch.sql.ast.expression.UnresolvedExpression; import org.opensearch.sql.ast.tree.Relation; import org.opensearch.sql.ast.tree.UnresolvedPlan; import org.opensearch.sql.ppl.CatalystPlanContext; @@ -44,8 +47,18 @@ public interface AppendColCatalystUtils { * @param subSearch User provided sub-search from APPENDCOL command. * @param relation Relation clause which represent the dataSource that this sub-search execute upon. */ - static void appendRelationClause(Node subSearch, Relation relation) { - final Relation table = new Relation(relation.getTableNames()); + static void appendRelationClause(Node subSearch, List relation) { + + final List unresolvedExpressionList = relation.stream() + .map(r -> { + UnresolvedRelation unresolvedRelation = (UnresolvedRelation) r; + List multipartId = seqAsJavaList(unresolvedRelation.multipartIdentifier()); + return (UnresolvedExpression) new QualifiedName(multipartId); + }) + // To avoid stack overflow in the case of chained AppendCol. + .distinct() + .collect(Collectors.toList()); + final Relation table = new Relation(unresolvedExpressionList); while (subSearch != null) { try { subSearch = subSearch.getChild().get(0); @@ -56,29 +69,6 @@ static void appendRelationClause(Node subSearch, Relation relation) { } } - /** - * Util method to traverse a given Node object and return the first occurrence of a Relation clause. - * @param node The Node object that this util method search upon. - * @return The first occurrence of Relation object from the given Node. - */ - static Relation retrieveRelationClause(Node node) { - while (node != null) { - if (node instanceof Relation) { - return (Relation) node; - } else { - try { - node = node.getChild().get(0); - } catch (NullPointerException ex) { - // Base on the current implementation of Flint, - // NPE will be thrown by certain type of Node implementation, - // when node.getChild() being called. - break; - } - } - } - return null; - } - /** * Util method to perform analyzed() call against the given LogicalPlan to extract all fields