diff --git a/docs/ppl-lang/README.md b/docs/ppl-lang/README.md index 19e1a6ee0..cdf9d8507 100644 --- a/docs/ppl-lang/README.md +++ b/docs/ppl-lang/README.md @@ -74,6 +74,8 @@ For additional examples see the next [documentation](PPL-Example-Commands.md). - [`expand commands`](ppl-expand-command.md) + - [`appendcol command`](ppl-appendcol-command.md) + * **Functions** - [`Expressions`](functions/ppl-expressions.md) diff --git a/docs/ppl-lang/ppl-appendcol-command.md b/docs/ppl-lang/ppl-appendcol-command.md new file mode 100644 index 000000000..ca1d34551 --- /dev/null +++ b/docs/ppl-lang/ppl-appendcol-command.md @@ -0,0 +1,83 @@ +## PPL `appendcol` command + +### Description +Using `appendcol` command to append the result of a sub-search and attach it alongside with the input search results (The main search). + +### Syntax - APPENDCOL +`APPENDCOL [sub-search]...` + +* : optional boolean field to specify should result from main-result be overwritten in the case of column name conflict. +* sub-search: Executes PPL commands as a secondary search. The sub-search uses the same data specified in the source clause of the main search results as its input. + + +#### Example 1: To append the result of `stats avg(age) as AVG_AGE` into existing search result + +The example append the result of sub-search `stats avg(age) as AVG_AGE` alongside with the main-search. + +PPL query: + + os> source=employees | FIELDS name, dept, age | APPENDCOL [ stats avg(age) as AVG_AGE ]; + fetched rows / total rows = 9/9 + +------+-------------+-----+------------------+ + | name | dept | age | AVG_AGE | + +------+-------------+-----+------------------+ + | Lisa | Sales | 35 | 31.2222222222222 | + | Fred | Engineering | 28 | NULL | + | Paul | Engineering | 23 | NULL | + | Evan | Sales | 38 | NULL | + | Chloe| Engineering | 25 | NULL | + | Tom | Engineering | 33 | NULL | + | Alex | Sales | 33 | NULL | + | Jane | Marketing | 28 | NULL | + | Jeff | Marketing | 38 | NULL | + +------+-------------+-----+------------------+ + + +#### Example 2: Append multiple sub-search result + +The example demonstrate multiple APPENCOL commands can be chained to provide one comprehensive view for user. + +PPL query: + + os> source=employees | FIELDS name, dept, age | APPENDCOL [ stats avg(age) as AVG_AGE ] | APPENDCOL [ stats max(age) as MAX_AGE ]; + fetched rows / total rows = 9/9 + +------+-------------+-----+------------------+---------+ + | name | dept | age | AVG_AGE | MAX_AGE | + +------+-------------+-----+------------------+---------+ + | Lisa | Sales------ | 35 | 31.22222222222222| 38 | + | Fred | Engineering | 28 | NULL | NULL | + | Paul | Engineering | 23 | NULL | NULL | + | Evan | Sales------ | 38 | NULL | NULL | + | Chloe| Engineering | 25 | NULL | NULL | + | Tom | Engineering | 33 | NULL | NULL | + | Alex | Sales | 33 | NULL | NULL | + | Jane | Marketing | 28 | NULL | NULL | + | Jeff | Marketing | 38 | NULL | NULL | + +------+-------------+-----+------------------+---------+ + +#### Example 3: Over main-search in the case of column name conflict + +The example demonstrate the usage of `OVERRIDE` option to overwrite the `age` column from the main-search, +when the option is set to true and column with same name `age` present on sub-search. + +PPL query: + + os> source=employees | FIELDS name, dept, age | APPENDCOL OVERRIDE=true [ stats avg(age) as age ]; + fetched rows / total rows = 9/9 + +------+-------------+------------------+ + | name | dept | age | + +------+-------------+------------------+ + | Lisa | Sales------ | 31.22222222222222| + | Fred | Engineering | NULL | + | Paul | Engineering | NULL | + | Evan | Sales------ | NULL | + | Chloe| Engineering | NULL | + | Tom | Engineering | NULL | + | Alex | Sales | NULL | + | Jane | Marketing | NULL | + | Jeff | Marketing | NULL | + +------+-------------+------------------+ + +### Limitation: +When override is set to true, only `FIELDS` and `STATS` commands are allowed as the final clause in a sub-search. +Otherwise, an IllegalStateException with the message `Not Supported operation: APPENDCOL should specify the output fields` will be thrown. diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLAppendColITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLAppendColITSuite.scala new file mode 100644 index 000000000..e1825c439 --- /dev/null +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLAppendColITSuite.scala @@ -0,0 +1,416 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.spark.ppl + +import org.opensearch.sql.ppl.utils.DataTypeTransformer.seq +import org.opensearch.sql.ppl.utils.SortUtils + +import org.apache.spark.sql.{QueryTest, Row} +import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedFunction, UnresolvedRelation, UnresolvedStar} +import org.apache.spark.sql.catalyst.expressions.{Add, Alias, Ascending, CaseWhen, CurrentRow, Descending, Divide, EqualTo, Expression, LessThan, Literal, Multiply, RowFrame, RowNumber, SortOrder, SpecifiedWindowFrame, UnboundedPreceding, WindowExpression, WindowSpecDefinition} +import org.apache.spark.sql.catalyst.parser.ParseException +import org.apache.spark.sql.catalyst.plans.LeftOuter +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.streaming.StreamTest + +class FlintSparkPPLAppendColITSuite + extends QueryTest + with LogicalPlanTestUtils + with FlintPPLSuite + with StreamTest { + + /** Test table and index name */ + private val testTable = "spark_catalog.default.flint_ppl_test" + + private val ROW_NUMBER_AGGREGATION = Alias( + WindowExpression( + RowNumber(), + WindowSpecDefinition( + Nil, + SortUtils.sortOrder(Literal("1"), false) :: Nil, + SpecifiedWindowFrame(RowFrame, UnboundedPreceding, CurrentRow))), + "_row_number_")() + + private val COUNT_STAR = Alias( + UnresolvedFunction(Seq("COUNT"), Seq(UnresolvedStar(None)), isDistinct = false), + "count()")() + + private val AGE_ALIAS = Alias(UnresolvedAttribute("age"), "age")() + + private val RELATION_TEST_TABLE = UnresolvedRelation( + Seq("spark_catalog", "default", "flint_ppl_test")) + + private val T12_JOIN_CONDITION = + EqualTo( + UnresolvedAttribute("APPENDCOL_T1._row_number_"), + UnresolvedAttribute("APPENDCOL_T2._row_number_")) + + private val T12_COLUMNS_SEQ = + Seq( + UnresolvedAttribute("APPENDCOL_T1._row_number_"), + UnresolvedAttribute("APPENDCOL_T2._row_number_")) + + override def beforeAll(): Unit = { + super.beforeAll() + + // Create test table + createPartitionedStateCountryTable(testTable) + } + + protected override def afterEach(): Unit = { + super.afterEach() + // Stop all streaming jobs if any + spark.streams.active.foreach { job => + job.stop() + job.awaitTermination() + } + } + + /** + * The baseline test-case to make sure APPENDCOL( ) function works, when no transformation + * present on the main search, after the search command. + */ + test("test AppendCol with NO transformation on main") { + val frame = sql(s""" + | source = $testTable | APPENDCOL [stats count() by age] + | """.stripMargin) + + assert( + frame.columns.sameElements( + Array("name", "age", "state", "country", "year", "month", "count()", "age"))) + // Retrieve the results + val results: Array[Row] = frame.collect() + val expectedResults: Array[Row] = + Array( + Row("Jake", 70, "California", "USA", 2023, 4, 1, 70), + Row("Hello", 30, "New York", "USA", 2023, 4, 1, 30), + Row("John", 25, "Ontario", "Canada", 2023, 4, 1, 25), + Row("Jane", 20, "Quebec", "Canada", 2023, 4, 1, 20)) + // Compare the results + implicit val rowOrdering: Ordering[Row] = Ordering.by[Row, String](_.getAs[String](0)) + assert(results.sorted.sameElements(expectedResults.sorted)) + + // Retrieve the logical plan + val logicalPlan: LogicalPlan = frame.queryExecution.logical + + /* + :- 'SubqueryAlias T1 + : +- 'Project [row_number() windowspecdefinition(1 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _row_number_#7, *] + : +- 'UnresolvedRelation [relation], [], false + */ + val t1 = SubqueryAlias( + "APPENDCOL_T1", + Project(Seq(ROW_NUMBER_AGGREGATION, UnresolvedStar(None)), RELATION_TEST_TABLE)) + + /* + +- 'SubqueryAlias T2 + +- 'Project [row_number() windowspecdefinition(1 DESC NULLS LAST, + specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _row_number_#11, *] + +- 'Aggregate ['age AS age#9], ['COUNT(*) AS count()#8, 'age AS age#10] + +- 'UnresolvedRelation [relation], [], false + */ + val t2 = SubqueryAlias( + "APPENDCOL_T2", + Project( + Seq(ROW_NUMBER_AGGREGATION, UnresolvedStar(None)), + Aggregate(AGE_ALIAS :: Nil, Seq(COUNT_STAR, AGE_ALIAS), RELATION_TEST_TABLE))) + + val expectedPlan = Project( + Seq(UnresolvedStar(None)), + DataFrameDropColumns( + T12_COLUMNS_SEQ, + Join(t1, t2, LeftOuter, Some(T12_JOIN_CONDITION), JoinHint.NONE))) + + comparePlans(logicalPlan, expectedPlan, checkAnalysis = false) + } + + /** + * To simulate the use-case when user attempt to attach an APPENDCOL command on a well + * established main search. + */ + test("test AppendCol with transformation on main-search") { + val frame = sql(s""" + | source = $testTable | FIELDS name, age, state | APPENDCOL [stats count() by age] + | """.stripMargin) + + assert(frame.columns.sameElements(Array("name", "age", "state", "count()", "age"))) + // Retrieve the results + val results: Array[Row] = frame.collect() + val expectedResults: Array[Row] = + Array( + Row("Jake", 70, "California", 1, 70), + Row("Hello", 30, "New York", 1, 30), + Row("John", 25, "Ontario", 1, 25), + Row("Jane", 20, "Quebec", 1, 20)) + // Compare the results + implicit val rowOrdering: Ordering[Row] = Ordering.by[Row, String](_.getAs[String](0)) + assert(results.sorted.sameElements(expectedResults.sorted)) + + // Retrieve the logical plan + val logicalPlan: LogicalPlan = frame.queryExecution.logical + /* + :- 'SubqueryAlias T1 + : +- 'Project [row_number() windowspecdefinition(1 DESC NULLS LAST, + specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _row_number_#11, *] + : +- 'Project ['name, 'age, 'state] + : +- 'UnresolvedRelation [relation], [], false + */ + val t1 = SubqueryAlias( + "APPENDCOL_T1", + Project( + Seq(ROW_NUMBER_AGGREGATION, UnresolvedStar(None)), + Project( + Seq( + UnresolvedAttribute("name"), + UnresolvedAttribute("age"), + UnresolvedAttribute("state")), + RELATION_TEST_TABLE))) + + /* + +- 'SubqueryAlias T2 + +- 'Project [row_number() windowspecdefinition(1 DESC NULLS LAST, + specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _row_number_#11, *] + +- 'Aggregate ['age AS age#9], ['COUNT(*) AS count()#8, 'age AS age#10] + +- 'UnresolvedRelation [relation], [], false + */ + val t2 = SubqueryAlias( + "APPENDCOL_T2", + Project( + Seq(ROW_NUMBER_AGGREGATION, UnresolvedStar(None)), + Aggregate(AGE_ALIAS :: Nil, Seq(COUNT_STAR, AGE_ALIAS), RELATION_TEST_TABLE))) + + val expectedPlan = Project( + Seq(UnresolvedStar(None)), + DataFrameDropColumns( + T12_COLUMNS_SEQ, + Join(t1, t2, LeftOuter, Some(T12_JOIN_CONDITION), JoinHint.NONE))) + comparePlans(logicalPlan, expectedPlan, checkAnalysis = false) + } + + /** + * To simulate the situation when multiple PPL commands being applied on the sub-search. + */ + test("test AppendCol with chained sub-search") { + val frame = sql(s""" + | source = $testTable | FIELDS name, age, state | APPENDCOL [ stats count() by age | eval m = 1 | FIELDS -m ] + | """.stripMargin) + + assert(frame.columns.sameElements(Array("name", "age", "state", "count()", "age"))) + // Retrieve the results + val results: Array[Row] = frame.collect() + val expectedResults: Array[Row] = + Array( + Row("Jake", 70, "California", 1, 70), + Row("Hello", 30, "New York", 1, 30), + Row("John", 25, "Ontario", 1, 25), + Row("Jane", 20, "Quebec", 1, 20)) + // Compare the results + implicit val rowOrdering: Ordering[Row] = Ordering.by[Row, String](_.getAs[String](0)) + assert(results.sorted.sameElements(expectedResults.sorted)) + + // Retrieve the logical plan + val logicalPlan: LogicalPlan = frame.queryExecution.logical + /* + :- 'SubqueryAlias T1 + : +- 'Project [row_number() windowspecdefinition(1 DESC NULLS LAST, + specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _row_number_#11, *] + : +- 'Project ['age, 'dept, 'salary] + : +- 'UnresolvedRelation [relation], [], false + */ + val t1 = SubqueryAlias( + "APPENDCOL_T1", + Project( + Seq(ROW_NUMBER_AGGREGATION, UnresolvedStar(None)), + Project( + Seq( + UnresolvedAttribute("name"), + UnresolvedAttribute("age"), + UnresolvedAttribute("state")), + RELATION_TEST_TABLE))) + + /* + +- 'SubqueryAlias T2 + +- 'Project [row_number() windowspecdefinition(1 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _row_number_#432, *] + +- 'DataFrameDropColumns ['m] + +- 'Project [*, 1 AS m#430] + +- 'Aggregate ['age AS age#429], ['COUNT(*) AS count()#428, 'age AS age#429] + +- 'UnresolvedRelation [flint_ppl_test], [], false + */ + val t2 = SubqueryAlias( + "APPENDCOL_T2", + Project( + Seq(ROW_NUMBER_AGGREGATION, UnresolvedStar(None)), + DataFrameDropColumns( + Seq(UnresolvedAttribute("m")), + Project( + Seq(UnresolvedStar(None), Alias(Literal(1), "m")()), + Aggregate(AGE_ALIAS :: Nil, Seq(COUNT_STAR, AGE_ALIAS), RELATION_TEST_TABLE))))) + + val expectedPlan = Project( + Seq(UnresolvedStar(None)), + DataFrameDropColumns( + T12_COLUMNS_SEQ, + Join(t1, t2, LeftOuter, Some(T12_JOIN_CONDITION), JoinHint.NONE))) + comparePlans(logicalPlan, expectedPlan, checkAnalysis = false) + } + + /** + * The use-case when user attempt to chain multiple APPENCOL command in a PPL, this is a common + * use case, when user prefer to show the statistic report alongside with the dataset. + */ + test("test multiple AppendCol clauses") { + val frame = sql(s""" + | source = $testTable | FIELDS name, age | APPENDCOL [ stats count() by age | eval m = 1 | FIELDS -m ] | APPENDCOL [FIELDS state] + | """.stripMargin) + + assert(frame.columns.sameElements(Array("name", "age", "count()", "age", "state"))) + // Retrieve the results + val results: Array[Row] = frame.collect() + val expectedResults: Array[Row] = + Array( + Row("Jake", 70, 1, 70, "California"), + Row("Hello", 30, 1, 30, "New York"), + Row("John", 25, 1, 25, "Ontario"), + Row("Jane", 20, 1, 20, "Quebec")) + // Compare the results + implicit val rowOrdering: Ordering[Row] = Ordering.by[Row, String](_.getAs[String](0)) + assert(results.sorted.sameElements(expectedResults.sorted)) + + // Retrieve the logical plan + val logicalPlan: LogicalPlan = frame.queryExecution.logical + /* + :- 'SubqueryAlias T1 + : +- 'Project [row_number() windowspecdefinition(1 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _row_number_#544, *] + : +- 'Project ['name, 'age] + : +- 'UnresolvedRelation [flint_ppl_test], [], false + */ + val mainSearch = SubqueryAlias( + "APPENDCOL_T1", + Project( + Seq(ROW_NUMBER_AGGREGATION, UnresolvedStar(None)), + Project( + Seq(UnresolvedAttribute("name"), UnresolvedAttribute("age")), + RELATION_TEST_TABLE))) + + /* + +- 'SubqueryAlias T2 + +- 'Project [row_number() windowspecdefinition(1 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _row_number_#432, *] + +- 'DataFrameDropColumns ['m] + +- 'Project [*, 1 AS m#430] + +- 'Aggregate ['age AS age#429], ['COUNT(*) AS count()#428, 'age AS age#429] + +- 'UnresolvedRelation [flint_ppl_test], [], false + */ + val firstAppenCol = SubqueryAlias( + "APPENDCOL_T2", + Project( + Seq(ROW_NUMBER_AGGREGATION, UnresolvedStar(None)), + DataFrameDropColumns( + Seq(UnresolvedAttribute("m")), + Project( + Seq(UnresolvedStar(None), Alias(Literal(1), "m")()), + Aggregate(AGE_ALIAS :: Nil, Seq(COUNT_STAR, AGE_ALIAS), RELATION_TEST_TABLE))))) + + val joinWithFirstAppendCol = SubqueryAlias( + "APPENDCOL_T1", + Project( + Seq(ROW_NUMBER_AGGREGATION, UnresolvedStar(None)), + DataFrameDropColumns( + T12_COLUMNS_SEQ, + Join(mainSearch, firstAppenCol, LeftOuter, Some(T12_JOIN_CONDITION), JoinHint.NONE)))) + + /* + +- 'SubqueryAlias T2 + +- 'Project [row_number() windowspecdefinition(1 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _row_number_#553, *] + +- 'Project ['dept] + +- 'UnresolvedRelation [flint_ppl_test], [], false + */ + val secondAppendCol = SubqueryAlias( + "APPENDCOL_T2", + Project( + Seq(ROW_NUMBER_AGGREGATION, UnresolvedStar(None)), + Project(Seq(UnresolvedAttribute("state")), RELATION_TEST_TABLE))) + + val expectedPlan = Project( + Seq(UnresolvedStar(None)), + DataFrameDropColumns( + T12_COLUMNS_SEQ, + Join( + joinWithFirstAppendCol, + secondAppendCol, + LeftOuter, + Some(T12_JOIN_CONDITION), + JoinHint.NONE))) + comparePlans(logicalPlan, expectedPlan, checkAnalysis = false) + } + + /** + * To simulate the use-case when column `age` present on both main and sub search, with option + * OVERRIDE=true. + */ + test("test AppendCol with OVERRIDE option") { + val frame = sql(s""" + | source = $testTable | FIELDS name, age, state | APPENDCOL OVERRIDE=true [stats count() as age] + | """.stripMargin) + + assert(frame.columns.sameElements(Array("name", "state", "age"))) + // Retrieve the results + val results: Array[Row] = frame.collect() + + /* + The sub-search result `APPENDCOL OVERRIDE=true [stats count() as age]` will be attached alongside with first row of main-search, + however given the non-deterministic natural of nature order, we cannot guarantee which specific data row will be returned from the primary search query. + Hence, only assert sub-search position but skipping the table content comparison. + */ + assert(results(0).get(2) == 4) + + // Retrieve the logical plan + val logicalPlan: LogicalPlan = frame.queryExecution.logical + /* + :- 'SubqueryAlias T1 + : +- 'Project [row_number() windowspecdefinition(1 DESC NULLS LAST, + specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _row_number_#11, *] + : +- 'Project ['name, 'age, 'state] + : +- 'UnresolvedRelation [relation], [], false + */ + val t1 = SubqueryAlias( + "APPENDCOL_T1", + Project( + Seq(ROW_NUMBER_AGGREGATION, UnresolvedStar(None)), + Project( + Seq( + UnresolvedAttribute("name"), + UnresolvedAttribute("age"), + UnresolvedAttribute("state")), + RELATION_TEST_TABLE))) + + /* + +- 'SubqueryAlias T2 + +- 'Project [row_number() windowspecdefinition(1 DESC NULLS LAST, + specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _row_number_#216, *] + +- 'Aggregate ['COUNT(*) AS age#240] + +- 'UnresolvedRelation [flint_ppl_test], [], false + */ + val t2 = SubqueryAlias( + "APPENDCOL_T2", + Project( + Seq(ROW_NUMBER_AGGREGATION, UnresolvedStar(None)), + Aggregate( + Nil, + Seq( + Alias( + UnresolvedFunction(Seq("COUNT"), Seq(UnresolvedStar(None)), isDistinct = false), + "age")()), + RELATION_TEST_TABLE))) + + val expectedPlan = Project( + Seq(UnresolvedStar(None)), + DataFrameDropColumns( + T12_COLUMNS_SEQ :+ UnresolvedAttribute("APPENDCOL_T1.age"), + Join(t1, t2, LeftOuter, Some(T12_JOIN_CONDITION), JoinHint.NONE))) + comparePlans(logicalPlan, expectedPlan, checkAnalysis = false) + } + +} diff --git a/ppl-spark-integration/src/main/antlr4/OpenSearchPPLLexer.g4 b/ppl-spark-integration/src/main/antlr4/OpenSearchPPLLexer.g4 index d15f5c8e3..277a0c2ac 100644 --- a/ppl-spark-integration/src/main/antlr4/OpenSearchPPLLexer.g4 +++ b/ppl-spark-integration/src/main/antlr4/OpenSearchPPLLexer.g4 @@ -42,6 +42,7 @@ FILLNULL: 'FILLNULL'; EXPAND: 'EXPAND'; FLATTEN: 'FLATTEN'; TRENDLINE: 'TRENDLINE'; +APPENDCOL: 'APPENDCOL'; //Native JOIN KEYWORDS JOIN: 'JOIN'; @@ -98,6 +99,9 @@ NULLS: 'NULLS'; SMA: 'SMA'; WMA: 'WMA'; +// APPENDCOL options +OVERRIDE: 'OVERRIDE'; + // ARGUMENT KEYWORDS KEEPEMPTY: 'KEEPEMPTY'; CONSECUTIVE: 'CONSECUTIVE'; diff --git a/ppl-spark-integration/src/main/antlr4/OpenSearchPPLParser.g4 b/ppl-spark-integration/src/main/antlr4/OpenSearchPPLParser.g4 index eb6cd1a35..04cc19828 100644 --- a/ppl-spark-integration/src/main/antlr4/OpenSearchPPLParser.g4 +++ b/ppl-spark-integration/src/main/antlr4/OpenSearchPPLParser.g4 @@ -56,6 +56,7 @@ commands | flattenCommand | expandCommand | trendlineCommand + | appendcolCommand ; commandName @@ -90,6 +91,7 @@ commandName | FIELDSUMMARY | FLATTEN | TRENDLINE + | APPENDCOL ; searchCommand @@ -275,6 +277,10 @@ trendlineType | WMA ; +appendcolCommand + : APPENDCOL (OVERRIDE EQUAL override = booleanLiteral)? LT_SQR_PRTHS commands (PIPE commands)* RT_SQR_PRTHS + ; + kmeansCommand : KMEANS (kmeansParameter)* ; diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/ast/AbstractNodeVisitor.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/ast/AbstractNodeVisitor.java index dadf6b968..529c569e0 100644 --- a/ppl-spark-integration/src/main/java/org/opensearch/sql/ast/AbstractNodeVisitor.java +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/ast/AbstractNodeVisitor.java @@ -121,6 +121,10 @@ public T visitTrendline(Trendline node, C context) { return visitChildren(node, context); } + public T visitAppendCol(AppendCol node, C context) { + return visitChildren(node, context); + } + public T visitCorrelation(Correlation node, C context) { return visitChildren(node, context); } diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/ast/tree/AppendCol.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/ast/tree/AppendCol.java new file mode 100644 index 000000000..421821739 --- /dev/null +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/ast/tree/AppendCol.java @@ -0,0 +1,55 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.sql.ast.tree; + +import com.google.common.collect.ImmutableList; +import lombok.AllArgsConstructor; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.RequiredArgsConstructor; +import lombok.Setter; +import lombok.ToString; +import org.opensearch.sql.ast.AbstractNodeVisitor; +import org.opensearch.sql.ast.Node; + +import java.util.List; + +/** + * A composite object which store the subQuery along with some more ad-hoc option like override + */ +@Getter +@Setter +@ToString +@EqualsAndHashCode(callSuper = false) +@AllArgsConstructor +public class AppendCol extends UnresolvedPlan { + + private final boolean override; + + private final UnresolvedPlan subSearch; + + private UnresolvedPlan child; + + public AppendCol(UnresolvedPlan subSearch, boolean override) { + this.override = override; + this.subSearch = subSearch; + } + + @Override + public UnresolvedPlan attach(UnresolvedPlan child) { + this.child = child; + return this; + } + + @Override + public List getChild() { + return ImmutableList.of(child); + } + + @Override + public T accept(AbstractNodeVisitor visitor, C context) { + return visitor.visitAppendCol(this, context); + } +} diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java index d7f59bae3..fba8cf4f2 100644 --- a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java @@ -6,11 +6,14 @@ package org.opensearch.sql.ppl; import org.apache.spark.sql.catalyst.TableIdentifier; +import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute; import org.apache.spark.sql.catalyst.analysis.UnresolvedFunction; import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation; +import org.apache.spark.sql.catalyst.analysis.UnresolvedStar; import org.apache.spark.sql.catalyst.analysis.UnresolvedStar$; import org.apache.spark.sql.catalyst.expressions.Ascending$; import org.apache.spark.sql.catalyst.expressions.Descending$; +import org.apache.spark.sql.catalyst.expressions.EqualTo; import org.apache.spark.sql.catalyst.expressions.Explode; import org.apache.spark.sql.catalyst.expressions.Expression; import org.apache.spark.sql.catalyst.expressions.GeneratorOuter; @@ -18,6 +21,7 @@ import org.apache.spark.sql.catalyst.expressions.SortDirection; import org.apache.spark.sql.catalyst.expressions.SortOrder; import org.apache.spark.sql.catalyst.plans.logical.Aggregate; +import org.apache.spark.sql.catalyst.plans.logical.DataFrameDropColumns; import org.apache.spark.sql.catalyst.plans.logical.DataFrameDropColumns$; import org.apache.spark.sql.catalyst.plans.logical.DescribeRelation$; import org.apache.spark.sql.catalyst.plans.logical.Generate; @@ -47,6 +51,7 @@ import org.opensearch.sql.ast.statement.Query; import org.opensearch.sql.ast.statement.Statement; import org.opensearch.sql.ast.tree.Aggregation; +import org.opensearch.sql.ast.tree.AppendCol; import org.opensearch.sql.ast.tree.Correlation; import org.opensearch.sql.ast.tree.CountedAggregation; import org.opensearch.sql.ast.tree.Dedupe; @@ -71,6 +76,7 @@ import org.opensearch.sql.ast.tree.Trendline; import org.opensearch.sql.ast.tree.Window; import org.opensearch.sql.common.antlr.SyntaxCheckException; +import org.opensearch.sql.ppl.utils.AppendColCatalystUtils; import org.opensearch.sql.ppl.utils.FieldSummaryTransformer; import org.opensearch.sql.ppl.utils.ParseTransformer; import org.opensearch.sql.ppl.utils.SortUtils; @@ -253,6 +259,61 @@ public LogicalPlan visitTrendline(Trendline node, CatalystPlanContext context) { return context.apply(p -> new org.apache.spark.sql.catalyst.plans.logical.Project(seq(trendlineProjectExpressions), p)); } + @Override + public LogicalPlan visitAppendCol(AppendCol node, CatalystPlanContext context) { + + final String APPENDCOL_ID = WindowSpecTransformer.ROW_NUMBER_COLUMN_NAME; + final String TABLE_LHS = "APPENDCOL_T1"; + final String TABLE_RHS = "APPENDCOL_T2"; + final UnresolvedAttribute t1Attr = new UnresolvedAttribute(seq(TABLE_LHS, APPENDCOL_ID)); + final UnresolvedAttribute t2Attr = new UnresolvedAttribute(seq(TABLE_RHS, APPENDCOL_ID)); + final List fieldsToRemove = new ArrayList<>(List.of(t1Attr, t2Attr)); + final Node subSearchNode = node.getSubSearch(); + + // Apply an additional projection layer on main-search to provide natural order. + LogicalPlan mainSearch = visitFirstChild(node, context); + var mainSearchWithRowNumber = AppendColCatalystUtils.getRowNumStarProjection(context, mainSearch, TABLE_LHS); + context.withSubqueryAlias(mainSearchWithRowNumber); + + // Duplicate the relation clause from main-search to sub-search. + AppendColCatalystUtils.appendRelationClause(node.getSubSearch(), context.getRelations()); + + context.apply(left -> { + + // Apply an additional projection layer on sub-search to provide natural order. + LogicalPlan subSearch = subSearchNode.accept(this, context); + var subSearchWithRowNumber = AppendColCatalystUtils.getRowNumStarProjection(context, subSearch, TABLE_RHS); + context.withSubqueryAlias(subSearchWithRowNumber); + + context.retainAllNamedParseExpressions(p -> p); + context.retainAllPlans(p -> p); + + // Join both Main and Sub search with _ROW_NUMBER_ column + LogicalPlan joinedQuery = join( + mainSearchWithRowNumber, subSearchWithRowNumber, + Join.JoinType.LEFT, + Optional.of(new EqualTo(t1Attr, t2Attr)), + new Join.JoinHint()); + + // Remove the APPEND_ID and duplicated field on T1 if override option present. + if (node.isOverride()) { + List attrToOverride = AppendColCatalystUtils.getOverridedList(subSearch, TABLE_LHS); + if (attrToOverride != null && + !attrToOverride.isEmpty() && + attrToOverride.stream().noneMatch(UnresolvedStar.class::isInstance)) { + fieldsToRemove.addAll(attrToOverride); + } else { + throw new IllegalStateException("Not Supported operation: " + + "APPENDCOL should specify the output fields"); + } + } + return new DataFrameDropColumns(seq(fieldsToRemove), joinedQuery); + }); + + return context.getPlan(); + } + + @Override public LogicalPlan visitCorrelation(Correlation node, CatalystPlanContext context) { visitFirstChild(node, context); diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java index b1254bf8f..4312b9a7a 100644 --- a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java @@ -417,6 +417,17 @@ public UnresolvedPlan visitTrendlineCommand(OpenSearchPPLParser.TrendlineCommand .orElse(new Trendline(Optional.empty(), trendlineComputations)); } + @Override + public UnresolvedPlan visitAppendcolCommand(OpenSearchPPLParser.AppendcolCommandContext ctx) { + final Optional pplCmd = ctx.commands().stream() + .map(this::visit) + .reduce((r, e) -> e.attach(r)); + final boolean override = (ctx.override != null && + Boolean.parseBoolean(ctx.override.getText())); + // ANTLR parser check guarantee pplCmd won't be null. + return new AppendCol(pplCmd.get(), override); + } + private Trendline.TrendlineComputation toTrendlineComputation(OpenSearchPPLParser.TrendlineClauseContext ctx) { int numberOfDataPoints = Integer.parseInt(ctx.numberOfDataPoints.getText()); if (numberOfDataPoints < 1) { diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/utils/AppendColCatalystUtils.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/utils/AppendColCatalystUtils.java new file mode 100644 index 000000000..f47fb6470 --- /dev/null +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/utils/AppendColCatalystUtils.java @@ -0,0 +1,113 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.sql.ppl.utils; + +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute; +import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation; +import org.apache.spark.sql.catalyst.analysis.UnresolvedStar; +import org.apache.spark.sql.catalyst.expressions.Attribute; +import org.apache.spark.sql.catalyst.expressions.Expression; +import org.apache.spark.sql.catalyst.expressions.Literal; +import org.apache.spark.sql.catalyst.expressions.NamedExpression; +import org.apache.spark.sql.catalyst.expressions.SortOrder; +import org.apache.spark.sql.catalyst.plans.logical.Aggregate; +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan; +import org.apache.spark.sql.catalyst.plans.logical.Project; +import org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias; +import org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias$; +import org.apache.spark.sql.execution.CommandExecutionMode; +import org.apache.spark.sql.execution.QueryExecution; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.unsafe.types.UTF8String; +import org.opensearch.sql.ast.Node; +import org.opensearch.sql.ast.expression.QualifiedName; +import org.opensearch.sql.ast.expression.UnresolvedExpression; +import org.opensearch.sql.ast.tree.Relation; +import org.opensearch.sql.ast.tree.UnresolvedPlan; +import org.opensearch.sql.ppl.CatalystPlanContext; +import scala.Option; +import scala.collection.Seq; + +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; + +import static org.opensearch.sql.ppl.utils.DataTypeTransformer.seq; +import static scala.collection.JavaConverters.seqAsJavaList; + +/** + * Util class to facilitate the logical plan composition for APPENDCOL command. + */ +public interface AppendColCatalystUtils { + + /** + * Responsible to traverse given subSearch Node till the last child, then append the Relation clause, + * in order to specify the data source || index for the subSearch. + * @param subSearch User provided sub-search from APPENDCOL command. + * @param relation Relation clause which represent the dataSource that this sub-search execute upon. + */ + static void appendRelationClause(Node subSearch, List relation) { + + final List unresolvedExpressionList = relation.stream() + .map(r -> { + UnresolvedRelation unresolvedRelation = (UnresolvedRelation) r; + List multipartId = seqAsJavaList(unresolvedRelation.multipartIdentifier()); + return (UnresolvedExpression) new QualifiedName(multipartId); + }) + // To avoid stack overflow in the case of chained AppendCol. + .distinct() + .collect(Collectors.toList()); + final Relation table = new Relation(unresolvedExpressionList); + while (subSearch != null) { + try { + subSearch = subSearch.getChild().get(0); + } catch (NullPointerException ex) { + ((UnresolvedPlan) subSearch).attach(table); + break; + } + } + } + + + /** + * Util method extract output fields from given LogicalPlan instance in non-recursive manner, + * and return null in the case of non-supported LogicalPlan. + * @param lp LogicalPlan instance to extract the projection fields from. + * @param tableName the table || schema name being appended as part of the returned fields. + * @return A list of Expression instances with alternated tableName || Schema information. + */ + static List getOverridedList(LogicalPlan lp, String tableName) { + // Extract the output from supported LogicalPlan type. + if (lp instanceof Project || lp instanceof Aggregate) { + return seqAsJavaList(lp.output()).stream() + .map(attr -> new UnresolvedAttribute(seq(tableName, attr.name()))) + .collect(Collectors.toList()); + } + return null; + } + + /** + * Helper method to first add an additional projection clause to provide row_number, then wrap it SubqueryAlias and return. + * @param context Context object of the current Parser. + * @param lp The Logical Plan instance which contains the query. + * @param alias The name of the Alias clause. + * @return A subqueryAlias instance which has row_number for natural ordering purpose. + */ + static SubqueryAlias getRowNumStarProjection(CatalystPlanContext context, LogicalPlan lp, String alias) { + final SortOrder sortOrder = SortUtils.sortOrder( + new Literal( + UTF8String.fromString("1"), DataTypes.StringType), false); + + final NamedExpression appendCol = WindowSpecTransformer.buildRowNumber(seq(), seq(sortOrder)); + final List projectList = (context.getNamedParseExpressions().isEmpty()) + ? List.of(appendCol, new UnresolvedStar(Option.empty())) + : List.of(appendCol); + + final LogicalPlan lpWithProjection = new Project(seq( + projectList), lp); + return SubqueryAlias$.MODULE$.apply(alias, lpWithProjection); + } +} diff --git a/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanAppendColCommandTranslatorTestSuite.scala b/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanAppendColCommandTranslatorTestSuite.scala new file mode 100644 index 000000000..1df436adc --- /dev/null +++ b/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanAppendColCommandTranslatorTestSuite.scala @@ -0,0 +1,410 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.spark.ppl + +import org.opensearch.flint.spark.ppl.PlaneUtils.plan +import org.opensearch.sql.ppl.{CatalystPlanContext, CatalystQueryPlanVisitor} +import org.opensearch.sql.ppl.utils.SortUtils +import org.scalatest.matchers.should.Matchers + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedFunction, UnresolvedRelation, UnresolvedStar} +import org.apache.spark.sql.catalyst.expressions.{Alias, And, CurrentRow, EqualTo, Literal, RowFrame, RowNumber, SpecifiedWindowFrame, UnboundedPreceding, WindowExpression, WindowSpecDefinition} +import org.apache.spark.sql.catalyst.plans.{LeftOuter, PlanTest} +import org.apache.spark.sql.catalyst.plans.logical._ + +class PPLLogicalPlanAppendColCommandTranslatorTestSuite + extends SparkFunSuite + with PlanTest + with LogicalPlanTestUtils + with Matchers { + + private val planTransformer = new CatalystQueryPlanVisitor() + private val pplParser = new PPLSyntaxParser() + + private val ROW_NUMBER_AGGREGATION = Alias( + WindowExpression( + RowNumber(), + WindowSpecDefinition( + Nil, + SortUtils.sortOrder(Literal("1"), false) :: Nil, + SpecifiedWindowFrame(RowFrame, UnboundedPreceding, CurrentRow))), + "_row_number_")() + + private val COUNT_STAR = Alias( + UnresolvedFunction(Seq("COUNT"), Seq(UnresolvedStar(None)), isDistinct = false), + "count()")() + + private val AGE_ALIAS = Alias(UnresolvedAttribute("age"), "age")() + + private val RELATION_EMPLOYEES = UnresolvedRelation(Seq("employees")) + + private val T12_JOIN_CONDITION = + EqualTo( + UnresolvedAttribute("APPENDCOL_T1._row_number_"), + UnresolvedAttribute("APPENDCOL_T2._row_number_")) + + private val T12_COLUMNS_SEQ = + Seq( + UnresolvedAttribute("APPENDCOL_T1._row_number_"), + UnresolvedAttribute("APPENDCOL_T2._row_number_")) + + // @formatter:off + /** + * Expected: + 'Project [*] + +- 'DataFrameDropColumns ['T1._row_number_, 'T2._row_number_] + +- 'Join LeftOuter, ('T1._row_number_ = 'T2._row_number_) + :- 'SubqueryAlias T1 + : +- 'Project [row_number() windowspecdefinition(1 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _row_number_#1, *] + : +- 'UnresolvedRelation [employees], [], false + +- 'SubqueryAlias T2 + +- 'Project [row_number() windowspecdefinition(1 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _row_number_#5, *] + +- 'Aggregate ['age AS age#3], ['COUNT(*) AS count()#2, 'age AS age#3] + +- 'UnresolvedRelation [employees], [], false + */ + // @formatter:on + test("test AppendCol with NO transformation on main") { + val context = new CatalystPlanContext + val logicalPlan = planTransformer.visit( + plan(pplParser, "source=employees | APPENDCOL [stats count() by age];"), + context) + + /* + :- 'SubqueryAlias T1 + : +- 'Project [row_number() windowspecdefinition(1 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _row_number_#7, *] + : +- 'UnresolvedRelation [relation], [], false + */ + val t1 = SubqueryAlias( + "APPENDCOL_T1", + Project(Seq(ROW_NUMBER_AGGREGATION, UnresolvedStar(None)), RELATION_EMPLOYEES)) + + /* + +- 'SubqueryAlias T2 + +- 'Project [row_number() windowspecdefinition(1 DESC NULLS LAST, + specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _row_number_#11, *] + +- 'Aggregate ['age AS age#9], ['COUNT(*) AS count()#8, 'age AS age#10] + +- 'UnresolvedRelation [relation], [], false + */ + val t2 = SubqueryAlias( + "APPENDCOL_T2", + Project( + Seq(ROW_NUMBER_AGGREGATION, UnresolvedStar(None)), + Aggregate(AGE_ALIAS :: Nil, Seq(COUNT_STAR, AGE_ALIAS), RELATION_EMPLOYEES))) + + val expectedPlan = Project( + Seq(UnresolvedStar(None)), + DataFrameDropColumns( + T12_COLUMNS_SEQ, + Join(t1, t2, LeftOuter, Some(T12_JOIN_CONDITION), JoinHint.NONE))) + + comparePlans(logicalPlan, expectedPlan, checkAnalysis = false) + } + + // @formatter:off + /** + * 'Project [*] + * +- 'DataFrameDropColumns ['T1._row_number_, 'T2._row_number_] + * +- 'Join LeftOuter, ('T1._row_number_ = 'T2._row_number_) + * :- 'SubqueryAlias T1 + * : +- 'Project [row_number() windowspecdefinition(1 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _row_number_#11, *] + * : +- 'Project ['age, 'dept, 'salary] + * : +- 'UnresolvedRelation [relation], [], false + * +- 'SubqueryAlias T2 + * +- 'Project [row_number() windowspecdefinition(1 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _row_number_#15, *] + * +- 'Aggregate ['age AS age#13], ['COUNT(*) AS count()#12, 'age AS age#13] + * +- 'UnresolvedRelation [relation], [], false + */ + // @formatter:on + test("test AppendCol with transformation on main-search") { + val context = new CatalystPlanContext + val logicalPlan = planTransformer.visit( + plan( + pplParser, + "source=employees | FIELDS age, dept, salary | APPENDCOL [stats count() by age];"), + context) + + /* + :- 'SubqueryAlias T1 + : +- 'Project [row_number() windowspecdefinition(1 DESC NULLS LAST, + specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _row_number_#11, *] + : +- 'Project ['age, 'dept, 'salary] + : +- 'UnresolvedRelation [relation], [], false + */ + val t1 = SubqueryAlias( + "APPENDCOL_T1", + Project( + Seq(ROW_NUMBER_AGGREGATION, UnresolvedStar(None)), + Project( + Seq( + UnresolvedAttribute("age"), + UnresolvedAttribute("dept"), + UnresolvedAttribute("salary")), + RELATION_EMPLOYEES))) + + /* + +- 'SubqueryAlias T2 + +- 'Project [row_number() windowspecdefinition(1 DESC NULLS LAST, + specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _row_number_#11, *] + +- 'Aggregate ['age AS age#9], ['COUNT(*) AS count()#8, 'age AS age#10] + +- 'UnresolvedRelation [relation], [], false + */ + val t2 = SubqueryAlias( + "APPENDCOL_T2", + Project( + Seq(ROW_NUMBER_AGGREGATION, UnresolvedStar(None)), + Aggregate(AGE_ALIAS :: Nil, Seq(COUNT_STAR, AGE_ALIAS), RELATION_EMPLOYEES))) + + val expectedPlan = Project( + Seq(UnresolvedStar(None)), + DataFrameDropColumns( + T12_COLUMNS_SEQ, + Join(t1, t2, LeftOuter, Some(T12_JOIN_CONDITION), JoinHint.NONE))) + + comparePlans(logicalPlan, expectedPlan, checkAnalysis = false) + } + + // @formatter:off + /** + * 'Project [*] + * +- 'DataFrameDropColumns ['T1._row_number_, 'T2._row_number_] + * +- 'Join LeftOuter, ('T1._row_number_ = 'T2._row_number_) + * :- 'SubqueryAlias T1 + * : +- 'Project [row_number() windowspecdefinition(1 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _row_number_#427, *] + * : +- 'Project ['age, 'dept, 'salary] + * : +- 'UnresolvedRelation [employees], [], false + * +- 'SubqueryAlias T2 + * +- 'Project [row_number() windowspecdefinition(1 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _row_number_#432, *] + * +- 'DataFrameDropColumns ['m] + * +- 'Project [*, 1 AS m#430] + * +- 'Aggregate ['age AS age#429], ['COUNT(*) AS count()#428, 'age AS age#429] + * +- 'UnresolvedRelation [employees], [], false + */ + // @formatter:on + test("test AppendCol with chained sub-search") { + val context = new CatalystPlanContext + val logicalPlan = planTransformer.visit( + plan( + pplParser, + "source=employees | FIELDS age, dept, salary | APPENDCOL [ stats count() by age | eval m = 1 | FIELDS -m ];"), + context) + + /* + :- 'SubqueryAlias T1 + : +- 'Project [row_number() windowspecdefinition(1 DESC NULLS LAST, + specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _row_number_#11, *] + : +- 'Project ['age, 'dept, 'salary] + : +- 'UnresolvedRelation [relation], [], false + */ + val t1 = SubqueryAlias( + "APPENDCOL_T1", + Project( + Seq(ROW_NUMBER_AGGREGATION, UnresolvedStar(None)), + Project( + Seq( + UnresolvedAttribute("age"), + UnresolvedAttribute("dept"), + UnresolvedAttribute("salary")), + RELATION_EMPLOYEES))) + + /* + +- 'SubqueryAlias T2 + +- 'Project [row_number() windowspecdefinition(1 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _row_number_#432, *] + +- 'DataFrameDropColumns ['m] + +- 'Project [*, 1 AS m#430] + +- 'Aggregate ['age AS age#429], ['COUNT(*) AS count()#428, 'age AS age#429] + +- 'UnresolvedRelation [employees], [], false + */ + val t2 = SubqueryAlias( + "APPENDCOL_T2", + Project( + Seq(ROW_NUMBER_AGGREGATION, UnresolvedStar(None)), + DataFrameDropColumns( + Seq(UnresolvedAttribute("m")), + Project( + Seq(UnresolvedStar(None), Alias(Literal(1), "m")()), + Aggregate(AGE_ALIAS :: Nil, Seq(COUNT_STAR, AGE_ALIAS), RELATION_EMPLOYEES))))) + + val expectedPlan = Project( + Seq(UnresolvedStar(None)), + DataFrameDropColumns( + T12_COLUMNS_SEQ, + Join(t1, t2, LeftOuter, Some(T12_JOIN_CONDITION), JoinHint.NONE))) + + comparePlans(logicalPlan, expectedPlan, checkAnalysis = false) + } + + // @formatter:off + /** + * == Parsed Logical Plan == + * 'Project [*] + * +- 'DataFrameDropColumns ['T1._row_number_, 'T2._row_number_] + * +- 'Join LeftOuter, ('T1._row_number_ = 'T2._row_number_) + * :- 'SubqueryAlias T1 + * : +- 'Project [row_number() windowspecdefinition(1 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _row_number_#551, *] + * : +- 'DataFrameDropColumns ['T1._row_number_, 'T2._row_number_] + * : +- 'Join LeftOuter, ('T1._row_number_ = 'T2._row_number_) + * : :- 'SubqueryAlias T1 + * : : +- 'Project [row_number() windowspecdefinition(1 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _row_number_#544, *] + * : : +- 'Project ['name, 'age] + * : : +- 'UnresolvedRelation [employees], [], false + * : +- 'SubqueryAlias T2 + * : +- 'Project [row_number() windowspecdefinition(1 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _row_number_#549, *] + * : +- 'DataFrameDropColumns ['m] + * : +- 'Project [*, 1 AS m#547] + * : +- 'Aggregate ['age AS age#546], ['COUNT(*) AS count()#545, 'age AS age#546] + * : +- 'UnresolvedRelation [employees], [], false + * +- 'SubqueryAlias T2 + * +- 'Project [row_number() windowspecdefinition(1 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _row_number_#553, *] + * +- 'Project ['dept] + * +- 'UnresolvedRelation [employees], [], false + */ + // @formatter:on + test("test multiple AppendCol clauses") { + val context = new CatalystPlanContext + val logicalPlan = planTransformer.visit( + plan( + pplParser, + "source=employees | FIELDS name, age | APPENDCOL [ stats count() by age | eval m = 1 | FIELDS -m ] | APPENDCOL [FIELDS dept];"), + context) + + /* + :- 'SubqueryAlias T1 + : +- 'Project [row_number() windowspecdefinition(1 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _row_number_#544, *] + : +- 'Project ['name, 'age] + : +- 'UnresolvedRelation [employees], [], false + */ + val mainSearch = SubqueryAlias( + "APPENDCOL_T1", + Project( + Seq(ROW_NUMBER_AGGREGATION, UnresolvedStar(None)), + Project( + Seq(UnresolvedAttribute("name"), UnresolvedAttribute("age")), + RELATION_EMPLOYEES))) + + /* + +- 'SubqueryAlias T2 + +- 'Project [row_number() windowspecdefinition(1 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _row_number_#432, *] + +- 'DataFrameDropColumns ['m] + +- 'Project [*, 1 AS m#430] + +- 'Aggregate ['age AS age#429], ['COUNT(*) AS count()#428, 'age AS age#429] + +- 'UnresolvedRelation [employees], [], false + */ + val firstAppenCol = SubqueryAlias( + "APPENDCOL_T2", + Project( + Seq(ROW_NUMBER_AGGREGATION, UnresolvedStar(None)), + DataFrameDropColumns( + Seq(UnresolvedAttribute("m")), + Project( + Seq(UnresolvedStar(None), Alias(Literal(1), "m")()), + Aggregate(AGE_ALIAS :: Nil, Seq(COUNT_STAR, AGE_ALIAS), RELATION_EMPLOYEES))))) + + val joinWithFirstAppendCol = SubqueryAlias( + "APPENDCOL_T1", + Project( + Seq(ROW_NUMBER_AGGREGATION, UnresolvedStar(None)), + DataFrameDropColumns( + T12_COLUMNS_SEQ, + Join(mainSearch, firstAppenCol, LeftOuter, Some(T12_JOIN_CONDITION), JoinHint.NONE)))) + + /* + +- 'SubqueryAlias T2 + +- 'Project [row_number() windowspecdefinition(1 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _row_number_#553, *] + +- 'Project ['dept] + +- 'UnresolvedRelation [employees], [], false + */ + val secondAppendCol = SubqueryAlias( + "APPENDCOL_T2", + Project( + Seq(ROW_NUMBER_AGGREGATION, UnresolvedStar(None)), + Project(Seq(UnresolvedAttribute("dept")), RELATION_EMPLOYEES))) + + val expectedPlan = Project( + Seq(UnresolvedStar(None)), + DataFrameDropColumns( + T12_COLUMNS_SEQ, + Join( + joinWithFirstAppendCol, + secondAppendCol, + LeftOuter, + Some(T12_JOIN_CONDITION), + JoinHint.NONE))) + + comparePlans(logicalPlan, expectedPlan, checkAnalysis = false) + } + + test("test invalid override sub-search") { + val context = new CatalystPlanContext + val exception = intercept[IllegalStateException]( + planTransformer + .visit( + plan( + pplParser, + "source=relation | FIELDS name, age | APPENDCOL override=true [ where age > 10]"), + context)) + assert(exception.getMessage startsWith "Not Supported operation") + } + + + // @formatter:off + /** + * 'Project [*] + * +- 'DataFrameDropColumns ['T1._row_number_, 'T2._row_number_, 'T1.age] + * +- 'Join LeftOuter, ('T1._row_number_ = 'T2._row_number_) + * :- 'SubqueryAlias T1 + * : +- 'Project [row_number() windowspecdefinition(1 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _row_number_#383, *] + * : +- 'UnresolvedRelation [employees], [], false + * +- 'SubqueryAlias T2 + * +- 'Project [row_number() windowspecdefinition(1 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _row_number_#386, *] + * +- 'Aggregate ['COUNT(*) AS age#384] + * +- 'UnresolvedRelation [employees], [], false + */ + // @formatter:on + test("test override with Supported sub-search") { + val context = new CatalystPlanContext + val logicalPlan = planTransformer.visit( + plan(pplParser, "source=employees | APPENDCOL OVERRIDE=true [stats count() as age];"), + context) + + /* + :- 'SubqueryAlias T1 + : +- 'Project [row_number() windowspecdefinition(1 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _row_number_#7, *] + : +- 'UnresolvedRelation [relation], [], false + */ + val t1 = SubqueryAlias( + "APPENDCOL_T1", + Project(Seq(ROW_NUMBER_AGGREGATION, UnresolvedStar(None)), RELATION_EMPLOYEES)) + + /* + +- 'SubqueryAlias T2 + +- 'Project [row_number() windowspecdefinition(1 DESC NULLS LAST, + specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _row_number_#11, *] + +- 'Aggregate ['COUNT(*) AS age#8] + +- 'UnresolvedRelation [relation], [], false + */ + val t2 = SubqueryAlias( + "APPENDCOL_T2", + Project( + Seq(ROW_NUMBER_AGGREGATION, UnresolvedStar(None)), + Aggregate( + Nil, + Seq( + Alias( + UnresolvedFunction(Seq("COUNT"), Seq(UnresolvedStar(None)), isDistinct = false), + "age")()), + RELATION_EMPLOYEES))) + + val expectedPlan = Project( + Seq(UnresolvedStar(None)), + DataFrameDropColumns( + T12_COLUMNS_SEQ :+ UnresolvedAttribute("APPENDCOL_T1.age"), + Join(t1, t2, LeftOuter, Some(T12_JOIN_CONDITION), JoinHint.NONE))) + + comparePlans(logicalPlan, expectedPlan, checkAnalysis = false) + } + +}