From 25857f2d0059dab2ed841f2761a37aa98e24849e Mon Sep 17 00:00:00 2001 From: Andy Kwok Date: Fri, 13 Dec 2024 13:31:48 -0800 Subject: [PATCH] Override option Signed-off-by: Andy Kwok --- .../src/main/antlr4/OpenSearchPPLLexer.g4 | 3 + .../src/main/antlr4/OpenSearchPPLParser.g4 | 2 +- .../sql/ppl/CatalystQueryPlanVisitor.java | 28 +- ...nAppendColCommandTranslatorTestSuite.scala | 580 +++++++++--------- 4 files changed, 321 insertions(+), 292 deletions(-) diff --git a/ppl-spark-integration/src/main/antlr4/OpenSearchPPLLexer.g4 b/ppl-spark-integration/src/main/antlr4/OpenSearchPPLLexer.g4 index 01ea8768a..277a0c2ac 100644 --- a/ppl-spark-integration/src/main/antlr4/OpenSearchPPLLexer.g4 +++ b/ppl-spark-integration/src/main/antlr4/OpenSearchPPLLexer.g4 @@ -99,6 +99,9 @@ NULLS: 'NULLS'; SMA: 'SMA'; WMA: 'WMA'; +// APPENDCOL options +OVERRIDE: 'OVERRIDE'; + // ARGUMENT KEYWORDS KEEPEMPTY: 'KEEPEMPTY'; CONSECUTIVE: 'CONSECUTIVE'; diff --git a/ppl-spark-integration/src/main/antlr4/OpenSearchPPLParser.g4 b/ppl-spark-integration/src/main/antlr4/OpenSearchPPLParser.g4 index 47d3c3f94..be769a9fb 100644 --- a/ppl-spark-integration/src/main/antlr4/OpenSearchPPLParser.g4 +++ b/ppl-spark-integration/src/main/antlr4/OpenSearchPPLParser.g4 @@ -278,7 +278,7 @@ trendlineType ; appendcolCommand - : APPENDCOL LT_SQR_PRTHS commands (PIPE commands)* RT_SQR_PRTHS + : APPENDCOL (OVERRIDE EQUAL booleanLiteral)? LT_SQR_PRTHS commands (PIPE commands)* RT_SQR_PRTHS ; kmeansCommand diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java index 42b7fe050..676a54264 100644 --- a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java @@ -5,6 +5,7 @@ package org.opensearch.sql.ppl; +import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.catalyst.TableIdentifier; import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute; import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute$; @@ -13,6 +14,7 @@ import org.apache.spark.sql.catalyst.analysis.UnresolvedStar; import org.apache.spark.sql.catalyst.analysis.UnresolvedStar$; import org.apache.spark.sql.catalyst.expressions.Ascending$; +import org.apache.spark.sql.catalyst.expressions.Attribute; import org.apache.spark.sql.catalyst.expressions.Descending$; import org.apache.spark.sql.catalyst.expressions.EqualTo; import org.apache.spark.sql.catalyst.expressions.Equality$; @@ -33,7 +35,9 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$; import org.apache.spark.sql.catalyst.plans.logical.Project$; import org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias$; +import org.apache.spark.sql.execution.CommandExecutionMode; import org.apache.spark.sql.execution.ExplainMode; +import org.apache.spark.sql.execution.QueryExecution; import org.apache.spark.sql.execution.command.DescribeTableCommand; import org.apache.spark.sql.execution.command.ExplainCommand; import org.apache.spark.sql.types.DataTypes; @@ -100,6 +104,7 @@ import java.util.Objects; import java.util.Optional; import java.util.stream.Collectors; +import java.util.stream.Stream; import static java.util.Collections.emptyList; import static java.util.List.of; @@ -147,7 +152,11 @@ public LogicalPlan visitFirstChild(Node node, CatalystPlanContext context) { @Override public LogicalPlan visitExplain(Explain node, CatalystPlanContext context) { node.getStatement().accept(this, context); - return context.apply(p -> new ExplainCommand(p, ExplainMode.fromString(node.getExplainMode().name()))); + context.apply(p -> new ExplainCommand(p, ExplainMode.fromString(node.getExplainMode().name()))); + System.out.println(context.getPlan()); + Seq output = context.getPlan().output(); + System.out.println(output); + return context.getPlan(); } @Override @@ -296,6 +305,14 @@ public LogicalPlan visitAppendCol(AppendCol node, CatalystPlanContext context) { context.retainAllNamedParseExpressions(p -> p); context.retainAllPlans(p -> p); + SparkSession sparkSession = SparkSession.getActiveSession().get(); + + QueryExecution queryExecution = sparkSession.sessionState().executePlan(mainSearchWithRowNumber, CommandExecutionMode.ALL()); + QueryExecution queryExecutionSub = sparkSession.sessionState().executePlan(subSearchWithRowNumber, CommandExecutionMode.ALL()); + + Seq outputMain = queryExecution.analyzed().output(); + Seq outputSub = queryExecutionSub.analyzed().output(); + // Composite the join clause LogicalPlan joinedQuery = join( mainSearchWithRowNumber, subSearchWithRowNumber, @@ -306,6 +323,9 @@ public LogicalPlan visitAppendCol(AppendCol node, CatalystPlanContext context) { // Remove the APPEND_ID return new DataFrameDropColumns(fieldsToRemove, joinedQuery); }); + + System.out.println("Attributes: "); + System.out.println(context.getPlan().output()); return context.getPlan(); } @@ -335,6 +355,12 @@ private static Relation retrieveRelationClause(Node node) { // NPE will be thrown by some node.getChild() call. break; } + /* + if (node == null || node.getChild() == null || node.getChild().isEmpty()) { + break; + } + node = node.getChild().get(0); + */ } } return null; diff --git a/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanAppendColCommandTranslatorTestSuite.scala b/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanAppendColCommandTranslatorTestSuite.scala index 37b992c00..daae82225 100644 --- a/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanAppendColCommandTranslatorTestSuite.scala +++ b/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanAppendColCommandTranslatorTestSuite.scala @@ -47,295 +47,295 @@ class PPLLogicalPlanAppendColCommandTranslatorTestSuite private val T12_COLUMNS_SEQ = Seq(UnresolvedAttribute("T1._row_number_"), UnresolvedAttribute("T2._row_number_")) - - // @formatter:off - /** - * Expected: - 'Project [*] - +- 'DataFrameDropColumns ['T1._row_number_, 'T2._row_number_] - +- 'Join LeftOuter, ('T1._row_number_ = 'T2._row_number_) - :- 'SubqueryAlias T1 - : +- 'Project [row_number() windowspecdefinition(1 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _row_number_#1, *] - : +- 'UnresolvedRelation [employees], [], false - +- 'SubqueryAlias T2 - +- 'Project [row_number() windowspecdefinition(1 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _row_number_#5, *] - +- 'Aggregate ['age AS age#3], ['COUNT(*) AS count()#2, 'age AS age#3] - +- 'UnresolvedRelation [employees], [], false - */ - // @formatter:on - test("test AppendCol with NO transformation on main") { - val context = new CatalystPlanContext - val logicalPlan = planTransformer.visit( - plan(pplParser, "source=employees | APPENDCOL [stats count() by age];"), - context) - - /* - :- 'SubqueryAlias T1 - : +- 'Project [row_number() windowspecdefinition(1 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _row_number_#7, *] - : +- 'UnresolvedRelation [relation], [], false - */ - val t1 = SubqueryAlias( - "T1", - Project(Seq(ROW_NUMBER_AGGREGATION, UnresolvedStar(None)), RELATION_EMPLOYEES)) - - /* - +- 'SubqueryAlias T2 - +- 'Project [row_number() windowspecdefinition(1 DESC NULLS LAST, - specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _row_number_#11, *] - +- 'Aggregate ['age AS age#9], ['COUNT(*) AS count()#8, 'age AS age#10] - +- 'UnresolvedRelation [relation], [], false - */ - val t2 = SubqueryAlias( - "T2", - Project( - Seq(ROW_NUMBER_AGGREGATION, UnresolvedStar(None)), - Aggregate(AGE_ALIAS :: Nil, Seq(COUNT_STAR, AGE_ALIAS), RELATION_EMPLOYEES))) - - val result = Project( - Seq(UnresolvedStar(None)), - DataFrameDropColumns( - T12_COLUMNS_SEQ, - Join(t1, t2, LeftOuter, Some(T12_JOIN_CONDITION), JoinHint.NONE))) - - // scalastyle:off - println(logicalPlan) - println(result) - // scalastyle:on - - comparePlans(logicalPlan, result, checkAnalysis = false) - } - - // @formatter:off - /** - * 'Project [*] - * +- 'DataFrameDropColumns ['T1._row_number_, 'T2._row_number_] - * +- 'Join LeftOuter, ('T1._row_number_ = 'T2._row_number_) - * :- 'SubqueryAlias T1 - * : +- 'Project [row_number() windowspecdefinition(1 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _row_number_#11, *] - * : +- 'Project ['age, 'dept, 'salary] - * : +- 'UnresolvedRelation [relation], [], false - * +- 'SubqueryAlias T2 - * +- 'Project [row_number() windowspecdefinition(1 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _row_number_#15, *] - * +- 'Aggregate ['age AS age#13], ['COUNT(*) AS count()#12, 'age AS age#13] - * +- 'UnresolvedRelation [relation], [], false - */ - // @formatter:on - test("test AppendCol with transformation on main-search") { - val context = new CatalystPlanContext - val logicalPlan = planTransformer.visit( - plan( - pplParser, - "source=employees | FIELDS age, dept, salary | APPENDCOL [stats count() by age];"), - context) - - /* - :- 'SubqueryAlias T1 - : +- 'Project [row_number() windowspecdefinition(1 DESC NULLS LAST, - specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _row_number_#11, *] - : +- 'Project ['age, 'dept, 'salary] - : +- 'UnresolvedRelation [relation], [], false - */ - val t1 = SubqueryAlias( - "T1", - Project( - Seq(ROW_NUMBER_AGGREGATION, UnresolvedStar(None)), - Project( - Seq( - UnresolvedAttribute("age"), - UnresolvedAttribute("dept"), - UnresolvedAttribute("salary")), - RELATION_EMPLOYEES))) - - /* - +- 'SubqueryAlias T2 - +- 'Project [row_number() windowspecdefinition(1 DESC NULLS LAST, - specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _row_number_#11, *] - +- 'Aggregate ['age AS age#9], ['COUNT(*) AS count()#8, 'age AS age#10] - +- 'UnresolvedRelation [relation], [], false - */ - val t2 = SubqueryAlias( - "T2", - Project( - Seq(ROW_NUMBER_AGGREGATION, UnresolvedStar(None)), - Aggregate(AGE_ALIAS :: Nil, Seq(COUNT_STAR, AGE_ALIAS), RELATION_EMPLOYEES))) - - val result = Project( - Seq(UnresolvedStar(None)), - DataFrameDropColumns( - T12_COLUMNS_SEQ, - Join(t1, t2, LeftOuter, Some(T12_JOIN_CONDITION), JoinHint.NONE))) - - comparePlans(logicalPlan, result, checkAnalysis = false) - } - - // @formatter:off - /** - * 'Project [*] - * +- 'DataFrameDropColumns ['T1._row_number_, 'T2._row_number_] - * +- 'Join LeftOuter, ('T1._row_number_ = 'T2._row_number_) - * :- 'SubqueryAlias T1 - * : +- 'Project [row_number() windowspecdefinition(1 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _row_number_#427, *] - * : +- 'Project ['age, 'dept, 'salary] - * : +- 'UnresolvedRelation [employees], [], false - * +- 'SubqueryAlias T2 - * +- 'Project [row_number() windowspecdefinition(1 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _row_number_#432, *] - * +- 'DataFrameDropColumns ['m] - * +- 'Project [*, 1 AS m#430] - * +- 'Aggregate ['age AS age#429], ['COUNT(*) AS count()#428, 'age AS age#429] - * +- 'UnresolvedRelation [employees], [], false - */ - // @formatter:on - test("test AppendCol with chained sub-search") { - val context = new CatalystPlanContext - val logicalPlan = planTransformer.visit( - plan( - pplParser, - "source=employees | FIELDS age, dept, salary | APPENDCOL [ stats count() by age | eval m = 1 | FIELDS -m ];"), - context) - - /* - :- 'SubqueryAlias T1 - : +- 'Project [row_number() windowspecdefinition(1 DESC NULLS LAST, - specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _row_number_#11, *] - : +- 'Project ['age, 'dept, 'salary] - : +- 'UnresolvedRelation [relation], [], false - */ - val t1 = SubqueryAlias( - "T1", - Project( - Seq(ROW_NUMBER_AGGREGATION, UnresolvedStar(None)), - Project( - Seq( - UnresolvedAttribute("age"), - UnresolvedAttribute("dept"), - UnresolvedAttribute("salary")), - RELATION_EMPLOYEES))) - - /* - +- 'SubqueryAlias T2 - +- 'Project [row_number() windowspecdefinition(1 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _row_number_#432, *] - +- 'DataFrameDropColumns ['m] - +- 'Project [*, 1 AS m#430] - +- 'Aggregate ['age AS age#429], ['COUNT(*) AS count()#428, 'age AS age#429] - +- 'UnresolvedRelation [employees], [], false - */ - val t2 = SubqueryAlias( - "T2", - Project( - Seq(ROW_NUMBER_AGGREGATION, UnresolvedStar(None)), - DataFrameDropColumns( - Seq(UnresolvedAttribute("m")), - Project( - Seq(UnresolvedStar(None), Alias(Literal(1), "m")()), - Aggregate(AGE_ALIAS :: Nil, Seq(COUNT_STAR, AGE_ALIAS), RELATION_EMPLOYEES))))) - - val result = Project( - Seq(UnresolvedStar(None)), - DataFrameDropColumns( - T12_COLUMNS_SEQ, - Join(t1, t2, LeftOuter, Some(T12_JOIN_CONDITION), JoinHint.NONE))) - - comparePlans(logicalPlan, result, checkAnalysis = false) - } - - // @formatter:off - /** - * == Parsed Logical Plan == - * 'Project [*] - * +- 'DataFrameDropColumns ['T1._row_number_, 'T2._row_number_] - * +- 'Join LeftOuter, ('T1._row_number_ = 'T2._row_number_) - * :- 'SubqueryAlias T1 - * : +- 'Project [row_number() windowspecdefinition(1 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _row_number_#551, *] - * : +- 'DataFrameDropColumns ['T1._row_number_, 'T2._row_number_] - * : +- 'Join LeftOuter, ('T1._row_number_ = 'T2._row_number_) - * : :- 'SubqueryAlias T1 - * : : +- 'Project [row_number() windowspecdefinition(1 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _row_number_#544, *] - * : : +- 'Project ['name, 'age] - * : : +- 'UnresolvedRelation [employees], [], false - * : +- 'SubqueryAlias T2 - * : +- 'Project [row_number() windowspecdefinition(1 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _row_number_#549, *] - * : +- 'DataFrameDropColumns ['m] - * : +- 'Project [*, 1 AS m#547] - * : +- 'Aggregate ['age AS age#546], ['COUNT(*) AS count()#545, 'age AS age#546] - * : +- 'UnresolvedRelation [employees], [], false - * +- 'SubqueryAlias T2 - * +- 'Project [row_number() windowspecdefinition(1 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _row_number_#553, *] - * +- 'Project ['dept] - * +- 'UnresolvedRelation [employees], [], false - */ - // @formatter:on - test("test multiple AppendCol clauses") { - val context = new CatalystPlanContext - val logicalPlan = planTransformer.visit( - plan( - pplParser, - "source=employees | FIELDS name, age | APPENDCOL [ stats count() by age | eval m = 1 | FIELDS -m ] | APPENDCOL [FIELDS dept];"), - context) - - /* - :- 'SubqueryAlias T1 - : +- 'Project [row_number() windowspecdefinition(1 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _row_number_#544, *] - : +- 'Project ['name, 'age] - : +- 'UnresolvedRelation [employees], [], false - */ - val mainSearch = SubqueryAlias( - "T1", - Project( - Seq(ROW_NUMBER_AGGREGATION, UnresolvedStar(None)), - Project( - Seq(UnresolvedAttribute("name"), UnresolvedAttribute("age")), - RELATION_EMPLOYEES))) - - /* - +- 'SubqueryAlias T2 - +- 'Project [row_number() windowspecdefinition(1 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _row_number_#432, *] - +- 'DataFrameDropColumns ['m] - +- 'Project [*, 1 AS m#430] - +- 'Aggregate ['age AS age#429], ['COUNT(*) AS count()#428, 'age AS age#429] - +- 'UnresolvedRelation [employees], [], false - */ - val firstAppenCol = SubqueryAlias( - "T2", - Project( - Seq(ROW_NUMBER_AGGREGATION, UnresolvedStar(None)), - DataFrameDropColumns( - Seq(UnresolvedAttribute("m")), - Project( - Seq(UnresolvedStar(None), Alias(Literal(1), "m")()), - Aggregate(AGE_ALIAS :: Nil, Seq(COUNT_STAR, AGE_ALIAS), RELATION_EMPLOYEES))))) - - val joinWithFirstAppendCol = SubqueryAlias( - "T1", - Project( - Seq(ROW_NUMBER_AGGREGATION, UnresolvedStar(None)), - DataFrameDropColumns( - T12_COLUMNS_SEQ, - Join(mainSearch, firstAppenCol, LeftOuter, Some(T12_JOIN_CONDITION), JoinHint.NONE)))) - - /* - +- 'SubqueryAlias T2 - +- 'Project [row_number() windowspecdefinition(1 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _row_number_#553, *] - +- 'Project ['dept] - +- 'UnresolvedRelation [employees], [], false - */ - val secondAppendCol = SubqueryAlias( - "T2", - Project( - Seq(ROW_NUMBER_AGGREGATION, UnresolvedStar(None)), - Project(Seq(UnresolvedAttribute("dept")), RELATION_EMPLOYEES))) - - val joinWithSecondAppendCol = Project( - Seq(UnresolvedStar(None)), - DataFrameDropColumns( - T12_COLUMNS_SEQ, - Join( - joinWithFirstAppendCol, - secondAppendCol, - LeftOuter, - Some(T12_JOIN_CONDITION), - JoinHint.NONE))) - - comparePlans(logicalPlan, joinWithSecondAppendCol, checkAnalysis = false) - } +// +// // @formatter:off +// /** +// * Expected: +// 'Project [*] +// +- 'DataFrameDropColumns ['T1._row_number_, 'T2._row_number_] +// +- 'Join LeftOuter, ('T1._row_number_ = 'T2._row_number_) +// :- 'SubqueryAlias T1 +// : +- 'Project [row_number() windowspecdefinition(1 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _row_number_#1, *] +// : +- 'UnresolvedRelation [employees], [], false +// +- 'SubqueryAlias T2 +// +- 'Project [row_number() windowspecdefinition(1 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _row_number_#5, *] +// +- 'Aggregate ['age AS age#3], ['COUNT(*) AS count()#2, 'age AS age#3] +// +- 'UnresolvedRelation [employees], [], false +// */ +// // @formatter:on +// test("test AppendCol with NO transformation on main") { +// val context = new CatalystPlanContext +// val logicalPlan = planTransformer.visit( +// plan(pplParser, "source=employees | APPENDCOL [stats count() by age];"), +// context) +// +// /* +// :- 'SubqueryAlias T1 +// : +- 'Project [row_number() windowspecdefinition(1 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _row_number_#7, *] +// : +- 'UnresolvedRelation [relation], [], false +// */ +// val t1 = SubqueryAlias( +// "T1", +// Project(Seq(ROW_NUMBER_AGGREGATION, UnresolvedStar(None)), RELATION_EMPLOYEES)) +// +// /* +// +- 'SubqueryAlias T2 +// +- 'Project [row_number() windowspecdefinition(1 DESC NULLS LAST, +// specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _row_number_#11, *] +// +- 'Aggregate ['age AS age#9], ['COUNT(*) AS count()#8, 'age AS age#10] +// +- 'UnresolvedRelation [relation], [], false +// */ +// val t2 = SubqueryAlias( +// "T2", +// Project( +// Seq(ROW_NUMBER_AGGREGATION, UnresolvedStar(None)), +// Aggregate(AGE_ALIAS :: Nil, Seq(COUNT_STAR, AGE_ALIAS), RELATION_EMPLOYEES))) +// +// val result = Project( +// Seq(UnresolvedStar(None)), +// DataFrameDropColumns( +// T12_COLUMNS_SEQ, +// Join(t1, t2, LeftOuter, Some(T12_JOIN_CONDITION), JoinHint.NONE))) +// +// // scalastyle:off +// println(logicalPlan) +// println(result) +// // scalastyle:on +// +// comparePlans(logicalPlan, result, checkAnalysis = false) +// } +// +// // @formatter:off +// /** +// * 'Project [*] +// * +- 'DataFrameDropColumns ['T1._row_number_, 'T2._row_number_] +// * +- 'Join LeftOuter, ('T1._row_number_ = 'T2._row_number_) +// * :- 'SubqueryAlias T1 +// * : +- 'Project [row_number() windowspecdefinition(1 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _row_number_#11, *] +// * : +- 'Project ['age, 'dept, 'salary] +// * : +- 'UnresolvedRelation [relation], [], false +// * +- 'SubqueryAlias T2 +// * +- 'Project [row_number() windowspecdefinition(1 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _row_number_#15, *] +// * +- 'Aggregate ['age AS age#13], ['COUNT(*) AS count()#12, 'age AS age#13] +// * +- 'UnresolvedRelation [relation], [], false +// */ +// // @formatter:on +// test("test AppendCol with transformation on main-search") { +// val context = new CatalystPlanContext +// val logicalPlan = planTransformer.visit( +// plan( +// pplParser, +// "source=employees | FIELDS age, dept, salary | APPENDCOL [stats count() by age];"), +// context) +// +// /* +// :- 'SubqueryAlias T1 +// : +- 'Project [row_number() windowspecdefinition(1 DESC NULLS LAST, +// specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _row_number_#11, *] +// : +- 'Project ['age, 'dept, 'salary] +// : +- 'UnresolvedRelation [relation], [], false +// */ +// val t1 = SubqueryAlias( +// "T1", +// Project( +// Seq(ROW_NUMBER_AGGREGATION, UnresolvedStar(None)), +// Project( +// Seq( +// UnresolvedAttribute("age"), +// UnresolvedAttribute("dept"), +// UnresolvedAttribute("salary")), +// RELATION_EMPLOYEES))) +// +// /* +// +- 'SubqueryAlias T2 +// +- 'Project [row_number() windowspecdefinition(1 DESC NULLS LAST, +// specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _row_number_#11, *] +// +- 'Aggregate ['age AS age#9], ['COUNT(*) AS count()#8, 'age AS age#10] +// +- 'UnresolvedRelation [relation], [], false +// */ +// val t2 = SubqueryAlias( +// "T2", +// Project( +// Seq(ROW_NUMBER_AGGREGATION, UnresolvedStar(None)), +// Aggregate(AGE_ALIAS :: Nil, Seq(COUNT_STAR, AGE_ALIAS), RELATION_EMPLOYEES))) +// +// val result = Project( +// Seq(UnresolvedStar(None)), +// DataFrameDropColumns( +// T12_COLUMNS_SEQ, +// Join(t1, t2, LeftOuter, Some(T12_JOIN_CONDITION), JoinHint.NONE))) +// +// comparePlans(logicalPlan, result, checkAnalysis = false) +// } +// +// // @formatter:off +// /** +// * 'Project [*] +// * +- 'DataFrameDropColumns ['T1._row_number_, 'T2._row_number_] +// * +- 'Join LeftOuter, ('T1._row_number_ = 'T2._row_number_) +// * :- 'SubqueryAlias T1 +// * : +- 'Project [row_number() windowspecdefinition(1 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _row_number_#427, *] +// * : +- 'Project ['age, 'dept, 'salary] +// * : +- 'UnresolvedRelation [employees], [], false +// * +- 'SubqueryAlias T2 +// * +- 'Project [row_number() windowspecdefinition(1 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _row_number_#432, *] +// * +- 'DataFrameDropColumns ['m] +// * +- 'Project [*, 1 AS m#430] +// * +- 'Aggregate ['age AS age#429], ['COUNT(*) AS count()#428, 'age AS age#429] +// * +- 'UnresolvedRelation [employees], [], false +// */ +// // @formatter:on +// test("test AppendCol with chained sub-search") { +// val context = new CatalystPlanContext +// val logicalPlan = planTransformer.visit( +// plan( +// pplParser, +// "source=employees | FIELDS age, dept, salary | APPENDCOL [ stats count() by age | eval m = 1 | FIELDS -m ];"), +// context) +// +// /* +// :- 'SubqueryAlias T1 +// : +- 'Project [row_number() windowspecdefinition(1 DESC NULLS LAST, +// specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _row_number_#11, *] +// : +- 'Project ['age, 'dept, 'salary] +// : +- 'UnresolvedRelation [relation], [], false +// */ +// val t1 = SubqueryAlias( +// "T1", +// Project( +// Seq(ROW_NUMBER_AGGREGATION, UnresolvedStar(None)), +// Project( +// Seq( +// UnresolvedAttribute("age"), +// UnresolvedAttribute("dept"), +// UnresolvedAttribute("salary")), +// RELATION_EMPLOYEES))) +// +// /* +// +- 'SubqueryAlias T2 +// +- 'Project [row_number() windowspecdefinition(1 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _row_number_#432, *] +// +- 'DataFrameDropColumns ['m] +// +- 'Project [*, 1 AS m#430] +// +- 'Aggregate ['age AS age#429], ['COUNT(*) AS count()#428, 'age AS age#429] +// +- 'UnresolvedRelation [employees], [], false +// */ +// val t2 = SubqueryAlias( +// "T2", +// Project( +// Seq(ROW_NUMBER_AGGREGATION, UnresolvedStar(None)), +// DataFrameDropColumns( +// Seq(UnresolvedAttribute("m")), +// Project( +// Seq(UnresolvedStar(None), Alias(Literal(1), "m")()), +// Aggregate(AGE_ALIAS :: Nil, Seq(COUNT_STAR, AGE_ALIAS), RELATION_EMPLOYEES))))) +// +// val result = Project( +// Seq(UnresolvedStar(None)), +// DataFrameDropColumns( +// T12_COLUMNS_SEQ, +// Join(t1, t2, LeftOuter, Some(T12_JOIN_CONDITION), JoinHint.NONE))) +// +// comparePlans(logicalPlan, result, checkAnalysis = false) +// } +// +// // @formatter:off +// /** +// * == Parsed Logical Plan == +// * 'Project [*] +// * +- 'DataFrameDropColumns ['T1._row_number_, 'T2._row_number_] +// * +- 'Join LeftOuter, ('T1._row_number_ = 'T2._row_number_) +// * :- 'SubqueryAlias T1 +// * : +- 'Project [row_number() windowspecdefinition(1 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _row_number_#551, *] +// * : +- 'DataFrameDropColumns ['T1._row_number_, 'T2._row_number_] +// * : +- 'Join LeftOuter, ('T1._row_number_ = 'T2._row_number_) +// * : :- 'SubqueryAlias T1 +// * : : +- 'Project [row_number() windowspecdefinition(1 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _row_number_#544, *] +// * : : +- 'Project ['name, 'age] +// * : : +- 'UnresolvedRelation [employees], [], false +// * : +- 'SubqueryAlias T2 +// * : +- 'Project [row_number() windowspecdefinition(1 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _row_number_#549, *] +// * : +- 'DataFrameDropColumns ['m] +// * : +- 'Project [*, 1 AS m#547] +// * : +- 'Aggregate ['age AS age#546], ['COUNT(*) AS count()#545, 'age AS age#546] +// * : +- 'UnresolvedRelation [employees], [], false +// * +- 'SubqueryAlias T2 +// * +- 'Project [row_number() windowspecdefinition(1 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _row_number_#553, *] +// * +- 'Project ['dept] +// * +- 'UnresolvedRelation [employees], [], false +// */ +// // @formatter:on +// test("test multiple AppendCol clauses") { +// val context = new CatalystPlanContext +// val logicalPlan = planTransformer.visit( +// plan( +// pplParser, +// "source=employees | FIELDS name, age | APPENDCOL [ stats count() by age | eval m = 1 | FIELDS -m ] | APPENDCOL [FIELDS dept];"), +// context) +// +// /* +// :- 'SubqueryAlias T1 +// : +- 'Project [row_number() windowspecdefinition(1 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _row_number_#544, *] +// : +- 'Project ['name, 'age] +// : +- 'UnresolvedRelation [employees], [], false +// */ +// val mainSearch = SubqueryAlias( +// "T1", +// Project( +// Seq(ROW_NUMBER_AGGREGATION, UnresolvedStar(None)), +// Project( +// Seq(UnresolvedAttribute("name"), UnresolvedAttribute("age")), +// RELATION_EMPLOYEES))) +// +// /* +// +- 'SubqueryAlias T2 +// +- 'Project [row_number() windowspecdefinition(1 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _row_number_#432, *] +// +- 'DataFrameDropColumns ['m] +// +- 'Project [*, 1 AS m#430] +// +- 'Aggregate ['age AS age#429], ['COUNT(*) AS count()#428, 'age AS age#429] +// +- 'UnresolvedRelation [employees], [], false +// */ +// val firstAppenCol = SubqueryAlias( +// "T2", +// Project( +// Seq(ROW_NUMBER_AGGREGATION, UnresolvedStar(None)), +// DataFrameDropColumns( +// Seq(UnresolvedAttribute("m")), +// Project( +// Seq(UnresolvedStar(None), Alias(Literal(1), "m")()), +// Aggregate(AGE_ALIAS :: Nil, Seq(COUNT_STAR, AGE_ALIAS), RELATION_EMPLOYEES))))) +// +// val joinWithFirstAppendCol = SubqueryAlias( +// "T1", +// Project( +// Seq(ROW_NUMBER_AGGREGATION, UnresolvedStar(None)), +// DataFrameDropColumns( +// T12_COLUMNS_SEQ, +// Join(mainSearch, firstAppenCol, LeftOuter, Some(T12_JOIN_CONDITION), JoinHint.NONE)))) +// +// /* +// +- 'SubqueryAlias T2 +// +- 'Project [row_number() windowspecdefinition(1 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _row_number_#553, *] +// +- 'Project ['dept] +// +- 'UnresolvedRelation [employees], [], false +// */ +// val secondAppendCol = SubqueryAlias( +// "T2", +// Project( +// Seq(ROW_NUMBER_AGGREGATION, UnresolvedStar(None)), +// Project(Seq(UnresolvedAttribute("dept")), RELATION_EMPLOYEES))) +// +// val joinWithSecondAppendCol = Project( +// Seq(UnresolvedStar(None)), +// DataFrameDropColumns( +// T12_COLUMNS_SEQ, +// Join( +// joinWithFirstAppendCol, +// secondAppendCol, +// LeftOuter, +// Some(T12_JOIN_CONDITION), +// JoinHint.NONE))) +// +// comparePlans(logicalPlan, joinWithSecondAppendCol, checkAnalysis = false) +// } }