diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/expression/function/BuiltinFunctionName.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/expression/function/BuiltinFunctionName.java index b101e664c..47ed7da44 100644 --- a/ppl-spark-integration/src/main/java/org/opensearch/sql/expression/function/BuiltinFunctionName.java +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/expression/function/BuiltinFunctionName.java @@ -248,7 +248,6 @@ public enum BuiltinFunctionName { ISNULL(FunctionName.of("isnull")), ISPRESENT(FunctionName.of("ispresent")), - ROW_NUMBER(FunctionName.of("row_number")), RANK(FunctionName.of("rank")), DENSE_RANK(FunctionName.of("dense_rank")), @@ -333,7 +332,6 @@ public FunctionName getName() { .put("percentile", BuiltinFunctionName.PERCENTILE) .put("percentile_approx", BuiltinFunctionName.PERCENTILE_APPROX) .put("approx_count_distinct", BuiltinFunctionName.APPROX_COUNT_DISTINCT) - .put("row_number", BuiltinFunctionName.ROW_NUMBER) .build(); public static Optional of(String str) { diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java index 912be3dee..028340ec6 100644 --- a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java @@ -282,31 +282,32 @@ public LogicalPlan visitAppendCol(AppendCol node, CatalystPlanContext context) { final Node mainSearchNode = node.getChild().get(0); final Node subSearchNode = node.getSubSearch(); - // Traverse to look for relation clause then append it into the sub-search. + // Traverse to look for relation clause, then append it into the sub-search. Relation relation = AppendColCatalystUtils.retrieveRelationClause(mainSearchNode); AppendColCatalystUtils.appendRelationClause(node.getSubSearch(), relation); - // Add apply a dropColumns if override present, then add * with ROW_NUMBER - LogicalPlan leftTemp = mainSearchNode.accept(this, context); - var mainSearchWithRowNumber = AppendColCatalystUtils.getRowNumStarProjection(context, leftTemp, TABLE_LHS); + // Apply an additional projection layer on main-search to provide natural order. + LogicalPlan mainSearch = mainSearchNode.accept(this, context); + var mainSearchWithRowNumber = AppendColCatalystUtils.getRowNumStarProjection(context, mainSearch, TABLE_LHS); context.withSubqueryAlias(mainSearchWithRowNumber); context.apply(left -> { - // Add a new projection layer with * and ROW_NUMBER (Sub-search) + // Apply an additional projection layer on sub-search to provide natural order. LogicalPlan subSearch = subSearchNode.accept(this, context); var subSearchWithRowNumber = AppendColCatalystUtils.getRowNumStarProjection(context, subSearch, TABLE_RHS); context.withSubqueryAlias(subSearchWithRowNumber); context.retainAllNamedParseExpressions(p -> p); context.retainAllPlans(p -> p); - // Composite the join clause + + // Join both Main and Sub search with _ROW_NUMBER_ column LogicalPlan joinedQuery = join( mainSearchWithRowNumber, subSearchWithRowNumber, Join.JoinType.LEFT, Optional.of(new EqualTo(t1Attr, t2Attr)), new Join.JoinHint()); - // Remove the APPEND_ID and duplicated field on T1 if override option is true. + // Remove the APPEND_ID and duplicated field on T1 if override option present. if (node.override) { List getoverridedlist = AppendColCatalystUtils.getoverridedlist(subSearchWithRowNumber, TABLE_LHS); fieldsToRemove.addAll(getoverridedlist); diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/utils/AppendColCatalystUtils.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/utils/AppendColCatalystUtils.java index 699c05409..8892052b8 100644 --- a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/utils/AppendColCatalystUtils.java +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/utils/AppendColCatalystUtils.java @@ -5,9 +5,11 @@ import org.apache.spark.sql.catalyst.analysis.UnresolvedStar; import org.apache.spark.sql.catalyst.expressions.Attribute; import org.apache.spark.sql.catalyst.expressions.Expression; +import org.apache.spark.sql.catalyst.expressions.Literal; import org.apache.spark.sql.catalyst.expressions.NamedExpression; import org.apache.spark.sql.catalyst.expressions.SortOrder; import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan; +import org.apache.spark.sql.catalyst.plans.logical.Project; import org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias; import org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias$; import org.apache.spark.sql.execution.CommandExecutionMode; @@ -33,14 +35,13 @@ public interface AppendColCatalystUtils { /** - * Response to traverse given subSearch Node till the last child, then append the Relation clause, - * in order to specify the data source || index. + * Responsible to traverse given subSearch Node till the last child, then append the Relation clause, + * in order to specify the data source || index for the subSearch. * @param subSearch User provided sub-search from APPENDCOL command. * @param relation Relation clause which represent the dataSource that this sub-search execute upon. */ static void appendRelationClause(Node subSearch, Relation relation) { - Relation table = new Relation(relation.getTableNames()); - // Replace it with a function to look up the search command and extract the index name. + final Relation table = new Relation(relation.getTableNames()); while (subSearch != null) { try { subSearch = subSearch.getChild().get(0); @@ -76,7 +77,7 @@ static Relation retrieveRelationClause(Node node) { /** - * Util method to perform analyzed() call against the given LogicalPlan to exact all fields + * Util method to perform analyzed() call against the given LogicalPlan to extract all fields * that will be projected upon the execution in the form of Java List with user provided schema prefix. * @param lp LogicalPlan instance to extract the projection fields from. * @param tableName the table || schema name being appended as part of the returned fields. @@ -97,15 +98,15 @@ static List getoverridedlist(LogicalPlan lp, String tableName) { } /** - * Helper method to first add an additional project clause to provide row_number, then wrap it SubqueryAlias and return. + * Helper method to first add an additional projection clause to provide row_number, then wrap it SubqueryAlias and return. * @param context Context object of the current Parser. * @param lp The Logical Plan instance which contains the query. * @param alias The name of the Alias clause. - * @return A subqeuryAlias instance which has row_number for natural ordering purpose. + * @return A subqueryAlias instance which has row_number for natural ordering purpose. */ static SubqueryAlias getRowNumStarProjection(CatalystPlanContext context, LogicalPlan lp, String alias) { final SortOrder sortOrder = SortUtils.sortOrder( - new org.apache.spark.sql.catalyst.expressions.Literal( + new Literal( UTF8String.fromString("1"), DataTypes.StringType), false); final NamedExpression appendCol = WindowSpecTransformer.buildRowNumber(seq(), seq(sortOrder)); @@ -113,7 +114,7 @@ static SubqueryAlias getRowNumStarProjection(CatalystPlanContext context, Logica ? List.of(appendCol, new UnresolvedStar(Option.empty())) : List.of(appendCol); - final LogicalPlan lpWithProjection = new org.apache.spark.sql.catalyst.plans.logical.Project(seq( + final LogicalPlan lpWithProjection = new Project(seq( projectList), lp); return SubqueryAlias$.MODULE$.apply(alias, lpWithProjection); } diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/utils/TrendlineCatalystUtils.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/utils/TrendlineCatalystUtils.java index 1315165e5..647f4542e 100644 --- a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/utils/TrendlineCatalystUtils.java +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/utils/TrendlineCatalystUtils.java @@ -153,7 +153,7 @@ private static NamedExpression getWMAComputationExpression(CatalystExpressionVis * @param expression The expression which will be evaluated. * @return An Alias instance with logical plan representation of `expression AS name`. */ - public static NamedExpression getAlias(String name, Expression expression) { + private static NamedExpression getAlias(String name, Expression expression) { return org.apache.spark.sql.catalyst.expressions.Alias$.MODULE$.apply(expression, name, NamedExpression.newExprId(),