diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java index 676a54264..c579234fc 100644 --- a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java @@ -305,13 +305,15 @@ public LogicalPlan visitAppendCol(AppendCol node, CatalystPlanContext context) { context.retainAllNamedParseExpressions(p -> p); context.retainAllPlans(p -> p); - SparkSession sparkSession = SparkSession.getActiveSession().get(); + if (node.override) { + SparkSession sparkSession = SparkSession.getActiveSession().get(); - QueryExecution queryExecution = sparkSession.sessionState().executePlan(mainSearchWithRowNumber, CommandExecutionMode.ALL()); - QueryExecution queryExecutionSub = sparkSession.sessionState().executePlan(subSearchWithRowNumber, CommandExecutionMode.ALL()); + QueryExecution queryExecution = sparkSession.sessionState().executePlan(mainSearchWithRowNumber, CommandExecutionMode.ALL()); + QueryExecution queryExecutionSub = sparkSession.sessionState().executePlan(subSearchWithRowNumber, CommandExecutionMode.ALL()); - Seq outputMain = queryExecution.analyzed().output(); - Seq outputSub = queryExecutionSub.analyzed().output(); + Seq outputMain = queryExecution.analyzed().output(); + Seq outputSub = queryExecutionSub.analyzed().output(); + } // Composite the join clause LogicalPlan joinedQuery = join( diff --git a/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanAppendColCommandTranslatorTestSuite.scala b/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanAppendColCommandTranslatorTestSuite.scala index daae82225..37b992c00 100644 --- a/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanAppendColCommandTranslatorTestSuite.scala +++ b/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanAppendColCommandTranslatorTestSuite.scala @@ -47,295 +47,295 @@ class PPLLogicalPlanAppendColCommandTranslatorTestSuite private val T12_COLUMNS_SEQ = Seq(UnresolvedAttribute("T1._row_number_"), UnresolvedAttribute("T2._row_number_")) -// -// // @formatter:off -// /** -// * Expected: -// 'Project [*] -// +- 'DataFrameDropColumns ['T1._row_number_, 'T2._row_number_] -// +- 'Join LeftOuter, ('T1._row_number_ = 'T2._row_number_) -// :- 'SubqueryAlias T1 -// : +- 'Project [row_number() windowspecdefinition(1 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _row_number_#1, *] -// : +- 'UnresolvedRelation [employees], [], false -// +- 'SubqueryAlias T2 -// +- 'Project [row_number() windowspecdefinition(1 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _row_number_#5, *] -// +- 'Aggregate ['age AS age#3], ['COUNT(*) AS count()#2, 'age AS age#3] -// +- 'UnresolvedRelation [employees], [], false -// */ -// // @formatter:on -// test("test AppendCol with NO transformation on main") { -// val context = new CatalystPlanContext -// val logicalPlan = planTransformer.visit( -// plan(pplParser, "source=employees | APPENDCOL [stats count() by age];"), -// context) -// -// /* -// :- 'SubqueryAlias T1 -// : +- 'Project [row_number() windowspecdefinition(1 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _row_number_#7, *] -// : +- 'UnresolvedRelation [relation], [], false -// */ -// val t1 = SubqueryAlias( -// "T1", -// Project(Seq(ROW_NUMBER_AGGREGATION, UnresolvedStar(None)), RELATION_EMPLOYEES)) -// -// /* -// +- 'SubqueryAlias T2 -// +- 'Project [row_number() windowspecdefinition(1 DESC NULLS LAST, -// specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _row_number_#11, *] -// +- 'Aggregate ['age AS age#9], ['COUNT(*) AS count()#8, 'age AS age#10] -// +- 'UnresolvedRelation [relation], [], false -// */ -// val t2 = SubqueryAlias( -// "T2", -// Project( -// Seq(ROW_NUMBER_AGGREGATION, UnresolvedStar(None)), -// Aggregate(AGE_ALIAS :: Nil, Seq(COUNT_STAR, AGE_ALIAS), RELATION_EMPLOYEES))) -// -// val result = Project( -// Seq(UnresolvedStar(None)), -// DataFrameDropColumns( -// T12_COLUMNS_SEQ, -// Join(t1, t2, LeftOuter, Some(T12_JOIN_CONDITION), JoinHint.NONE))) -// -// // scalastyle:off -// println(logicalPlan) -// println(result) -// // scalastyle:on -// -// comparePlans(logicalPlan, result, checkAnalysis = false) -// } -// -// // @formatter:off -// /** -// * 'Project [*] -// * +- 'DataFrameDropColumns ['T1._row_number_, 'T2._row_number_] -// * +- 'Join LeftOuter, ('T1._row_number_ = 'T2._row_number_) -// * :- 'SubqueryAlias T1 -// * : +- 'Project [row_number() windowspecdefinition(1 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _row_number_#11, *] -// * : +- 'Project ['age, 'dept, 'salary] -// * : +- 'UnresolvedRelation [relation], [], false -// * +- 'SubqueryAlias T2 -// * +- 'Project [row_number() windowspecdefinition(1 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _row_number_#15, *] -// * +- 'Aggregate ['age AS age#13], ['COUNT(*) AS count()#12, 'age AS age#13] -// * +- 'UnresolvedRelation [relation], [], false -// */ -// // @formatter:on -// test("test AppendCol with transformation on main-search") { -// val context = new CatalystPlanContext -// val logicalPlan = planTransformer.visit( -// plan( -// pplParser, -// "source=employees | FIELDS age, dept, salary | APPENDCOL [stats count() by age];"), -// context) -// -// /* -// :- 'SubqueryAlias T1 -// : +- 'Project [row_number() windowspecdefinition(1 DESC NULLS LAST, -// specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _row_number_#11, *] -// : +- 'Project ['age, 'dept, 'salary] -// : +- 'UnresolvedRelation [relation], [], false -// */ -// val t1 = SubqueryAlias( -// "T1", -// Project( -// Seq(ROW_NUMBER_AGGREGATION, UnresolvedStar(None)), -// Project( -// Seq( -// UnresolvedAttribute("age"), -// UnresolvedAttribute("dept"), -// UnresolvedAttribute("salary")), -// RELATION_EMPLOYEES))) -// -// /* -// +- 'SubqueryAlias T2 -// +- 'Project [row_number() windowspecdefinition(1 DESC NULLS LAST, -// specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _row_number_#11, *] -// +- 'Aggregate ['age AS age#9], ['COUNT(*) AS count()#8, 'age AS age#10] -// +- 'UnresolvedRelation [relation], [], false -// */ -// val t2 = SubqueryAlias( -// "T2", -// Project( -// Seq(ROW_NUMBER_AGGREGATION, UnresolvedStar(None)), -// Aggregate(AGE_ALIAS :: Nil, Seq(COUNT_STAR, AGE_ALIAS), RELATION_EMPLOYEES))) -// -// val result = Project( -// Seq(UnresolvedStar(None)), -// DataFrameDropColumns( -// T12_COLUMNS_SEQ, -// Join(t1, t2, LeftOuter, Some(T12_JOIN_CONDITION), JoinHint.NONE))) -// -// comparePlans(logicalPlan, result, checkAnalysis = false) -// } -// -// // @formatter:off -// /** -// * 'Project [*] -// * +- 'DataFrameDropColumns ['T1._row_number_, 'T2._row_number_] -// * +- 'Join LeftOuter, ('T1._row_number_ = 'T2._row_number_) -// * :- 'SubqueryAlias T1 -// * : +- 'Project [row_number() windowspecdefinition(1 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _row_number_#427, *] -// * : +- 'Project ['age, 'dept, 'salary] -// * : +- 'UnresolvedRelation [employees], [], false -// * +- 'SubqueryAlias T2 -// * +- 'Project [row_number() windowspecdefinition(1 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _row_number_#432, *] -// * +- 'DataFrameDropColumns ['m] -// * +- 'Project [*, 1 AS m#430] -// * +- 'Aggregate ['age AS age#429], ['COUNT(*) AS count()#428, 'age AS age#429] -// * +- 'UnresolvedRelation [employees], [], false -// */ -// // @formatter:on -// test("test AppendCol with chained sub-search") { -// val context = new CatalystPlanContext -// val logicalPlan = planTransformer.visit( -// plan( -// pplParser, -// "source=employees | FIELDS age, dept, salary | APPENDCOL [ stats count() by age | eval m = 1 | FIELDS -m ];"), -// context) -// -// /* -// :- 'SubqueryAlias T1 -// : +- 'Project [row_number() windowspecdefinition(1 DESC NULLS LAST, -// specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _row_number_#11, *] -// : +- 'Project ['age, 'dept, 'salary] -// : +- 'UnresolvedRelation [relation], [], false -// */ -// val t1 = SubqueryAlias( -// "T1", -// Project( -// Seq(ROW_NUMBER_AGGREGATION, UnresolvedStar(None)), -// Project( -// Seq( -// UnresolvedAttribute("age"), -// UnresolvedAttribute("dept"), -// UnresolvedAttribute("salary")), -// RELATION_EMPLOYEES))) -// -// /* -// +- 'SubqueryAlias T2 -// +- 'Project [row_number() windowspecdefinition(1 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _row_number_#432, *] -// +- 'DataFrameDropColumns ['m] -// +- 'Project [*, 1 AS m#430] -// +- 'Aggregate ['age AS age#429], ['COUNT(*) AS count()#428, 'age AS age#429] -// +- 'UnresolvedRelation [employees], [], false -// */ -// val t2 = SubqueryAlias( -// "T2", -// Project( -// Seq(ROW_NUMBER_AGGREGATION, UnresolvedStar(None)), -// DataFrameDropColumns( -// Seq(UnresolvedAttribute("m")), -// Project( -// Seq(UnresolvedStar(None), Alias(Literal(1), "m")()), -// Aggregate(AGE_ALIAS :: Nil, Seq(COUNT_STAR, AGE_ALIAS), RELATION_EMPLOYEES))))) -// -// val result = Project( -// Seq(UnresolvedStar(None)), -// DataFrameDropColumns( -// T12_COLUMNS_SEQ, -// Join(t1, t2, LeftOuter, Some(T12_JOIN_CONDITION), JoinHint.NONE))) -// -// comparePlans(logicalPlan, result, checkAnalysis = false) -// } -// -// // @formatter:off -// /** -// * == Parsed Logical Plan == -// * 'Project [*] -// * +- 'DataFrameDropColumns ['T1._row_number_, 'T2._row_number_] -// * +- 'Join LeftOuter, ('T1._row_number_ = 'T2._row_number_) -// * :- 'SubqueryAlias T1 -// * : +- 'Project [row_number() windowspecdefinition(1 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _row_number_#551, *] -// * : +- 'DataFrameDropColumns ['T1._row_number_, 'T2._row_number_] -// * : +- 'Join LeftOuter, ('T1._row_number_ = 'T2._row_number_) -// * : :- 'SubqueryAlias T1 -// * : : +- 'Project [row_number() windowspecdefinition(1 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _row_number_#544, *] -// * : : +- 'Project ['name, 'age] -// * : : +- 'UnresolvedRelation [employees], [], false -// * : +- 'SubqueryAlias T2 -// * : +- 'Project [row_number() windowspecdefinition(1 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _row_number_#549, *] -// * : +- 'DataFrameDropColumns ['m] -// * : +- 'Project [*, 1 AS m#547] -// * : +- 'Aggregate ['age AS age#546], ['COUNT(*) AS count()#545, 'age AS age#546] -// * : +- 'UnresolvedRelation [employees], [], false -// * +- 'SubqueryAlias T2 -// * +- 'Project [row_number() windowspecdefinition(1 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _row_number_#553, *] -// * +- 'Project ['dept] -// * +- 'UnresolvedRelation [employees], [], false -// */ -// // @formatter:on -// test("test multiple AppendCol clauses") { -// val context = new CatalystPlanContext -// val logicalPlan = planTransformer.visit( -// plan( -// pplParser, -// "source=employees | FIELDS name, age | APPENDCOL [ stats count() by age | eval m = 1 | FIELDS -m ] | APPENDCOL [FIELDS dept];"), -// context) -// -// /* -// :- 'SubqueryAlias T1 -// : +- 'Project [row_number() windowspecdefinition(1 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _row_number_#544, *] -// : +- 'Project ['name, 'age] -// : +- 'UnresolvedRelation [employees], [], false -// */ -// val mainSearch = SubqueryAlias( -// "T1", -// Project( -// Seq(ROW_NUMBER_AGGREGATION, UnresolvedStar(None)), -// Project( -// Seq(UnresolvedAttribute("name"), UnresolvedAttribute("age")), -// RELATION_EMPLOYEES))) -// -// /* -// +- 'SubqueryAlias T2 -// +- 'Project [row_number() windowspecdefinition(1 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _row_number_#432, *] -// +- 'DataFrameDropColumns ['m] -// +- 'Project [*, 1 AS m#430] -// +- 'Aggregate ['age AS age#429], ['COUNT(*) AS count()#428, 'age AS age#429] -// +- 'UnresolvedRelation [employees], [], false -// */ -// val firstAppenCol = SubqueryAlias( -// "T2", -// Project( -// Seq(ROW_NUMBER_AGGREGATION, UnresolvedStar(None)), -// DataFrameDropColumns( -// Seq(UnresolvedAttribute("m")), -// Project( -// Seq(UnresolvedStar(None), Alias(Literal(1), "m")()), -// Aggregate(AGE_ALIAS :: Nil, Seq(COUNT_STAR, AGE_ALIAS), RELATION_EMPLOYEES))))) -// -// val joinWithFirstAppendCol = SubqueryAlias( -// "T1", -// Project( -// Seq(ROW_NUMBER_AGGREGATION, UnresolvedStar(None)), -// DataFrameDropColumns( -// T12_COLUMNS_SEQ, -// Join(mainSearch, firstAppenCol, LeftOuter, Some(T12_JOIN_CONDITION), JoinHint.NONE)))) -// -// /* -// +- 'SubqueryAlias T2 -// +- 'Project [row_number() windowspecdefinition(1 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _row_number_#553, *] -// +- 'Project ['dept] -// +- 'UnresolvedRelation [employees], [], false -// */ -// val secondAppendCol = SubqueryAlias( -// "T2", -// Project( -// Seq(ROW_NUMBER_AGGREGATION, UnresolvedStar(None)), -// Project(Seq(UnresolvedAttribute("dept")), RELATION_EMPLOYEES))) -// -// val joinWithSecondAppendCol = Project( -// Seq(UnresolvedStar(None)), -// DataFrameDropColumns( -// T12_COLUMNS_SEQ, -// Join( -// joinWithFirstAppendCol, -// secondAppendCol, -// LeftOuter, -// Some(T12_JOIN_CONDITION), -// JoinHint.NONE))) -// -// comparePlans(logicalPlan, joinWithSecondAppendCol, checkAnalysis = false) -// } + + // @formatter:off + /** + * Expected: + 'Project [*] + +- 'DataFrameDropColumns ['T1._row_number_, 'T2._row_number_] + +- 'Join LeftOuter, ('T1._row_number_ = 'T2._row_number_) + :- 'SubqueryAlias T1 + : +- 'Project [row_number() windowspecdefinition(1 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _row_number_#1, *] + : +- 'UnresolvedRelation [employees], [], false + +- 'SubqueryAlias T2 + +- 'Project [row_number() windowspecdefinition(1 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _row_number_#5, *] + +- 'Aggregate ['age AS age#3], ['COUNT(*) AS count()#2, 'age AS age#3] + +- 'UnresolvedRelation [employees], [], false + */ + // @formatter:on + test("test AppendCol with NO transformation on main") { + val context = new CatalystPlanContext + val logicalPlan = planTransformer.visit( + plan(pplParser, "source=employees | APPENDCOL [stats count() by age];"), + context) + + /* + :- 'SubqueryAlias T1 + : +- 'Project [row_number() windowspecdefinition(1 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _row_number_#7, *] + : +- 'UnresolvedRelation [relation], [], false + */ + val t1 = SubqueryAlias( + "T1", + Project(Seq(ROW_NUMBER_AGGREGATION, UnresolvedStar(None)), RELATION_EMPLOYEES)) + + /* + +- 'SubqueryAlias T2 + +- 'Project [row_number() windowspecdefinition(1 DESC NULLS LAST, + specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _row_number_#11, *] + +- 'Aggregate ['age AS age#9], ['COUNT(*) AS count()#8, 'age AS age#10] + +- 'UnresolvedRelation [relation], [], false + */ + val t2 = SubqueryAlias( + "T2", + Project( + Seq(ROW_NUMBER_AGGREGATION, UnresolvedStar(None)), + Aggregate(AGE_ALIAS :: Nil, Seq(COUNT_STAR, AGE_ALIAS), RELATION_EMPLOYEES))) + + val result = Project( + Seq(UnresolvedStar(None)), + DataFrameDropColumns( + T12_COLUMNS_SEQ, + Join(t1, t2, LeftOuter, Some(T12_JOIN_CONDITION), JoinHint.NONE))) + + // scalastyle:off + println(logicalPlan) + println(result) + // scalastyle:on + + comparePlans(logicalPlan, result, checkAnalysis = false) + } + + // @formatter:off + /** + * 'Project [*] + * +- 'DataFrameDropColumns ['T1._row_number_, 'T2._row_number_] + * +- 'Join LeftOuter, ('T1._row_number_ = 'T2._row_number_) + * :- 'SubqueryAlias T1 + * : +- 'Project [row_number() windowspecdefinition(1 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _row_number_#11, *] + * : +- 'Project ['age, 'dept, 'salary] + * : +- 'UnresolvedRelation [relation], [], false + * +- 'SubqueryAlias T2 + * +- 'Project [row_number() windowspecdefinition(1 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _row_number_#15, *] + * +- 'Aggregate ['age AS age#13], ['COUNT(*) AS count()#12, 'age AS age#13] + * +- 'UnresolvedRelation [relation], [], false + */ + // @formatter:on + test("test AppendCol with transformation on main-search") { + val context = new CatalystPlanContext + val logicalPlan = planTransformer.visit( + plan( + pplParser, + "source=employees | FIELDS age, dept, salary | APPENDCOL [stats count() by age];"), + context) + + /* + :- 'SubqueryAlias T1 + : +- 'Project [row_number() windowspecdefinition(1 DESC NULLS LAST, + specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _row_number_#11, *] + : +- 'Project ['age, 'dept, 'salary] + : +- 'UnresolvedRelation [relation], [], false + */ + val t1 = SubqueryAlias( + "T1", + Project( + Seq(ROW_NUMBER_AGGREGATION, UnresolvedStar(None)), + Project( + Seq( + UnresolvedAttribute("age"), + UnresolvedAttribute("dept"), + UnresolvedAttribute("salary")), + RELATION_EMPLOYEES))) + + /* + +- 'SubqueryAlias T2 + +- 'Project [row_number() windowspecdefinition(1 DESC NULLS LAST, + specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _row_number_#11, *] + +- 'Aggregate ['age AS age#9], ['COUNT(*) AS count()#8, 'age AS age#10] + +- 'UnresolvedRelation [relation], [], false + */ + val t2 = SubqueryAlias( + "T2", + Project( + Seq(ROW_NUMBER_AGGREGATION, UnresolvedStar(None)), + Aggregate(AGE_ALIAS :: Nil, Seq(COUNT_STAR, AGE_ALIAS), RELATION_EMPLOYEES))) + + val result = Project( + Seq(UnresolvedStar(None)), + DataFrameDropColumns( + T12_COLUMNS_SEQ, + Join(t1, t2, LeftOuter, Some(T12_JOIN_CONDITION), JoinHint.NONE))) + + comparePlans(logicalPlan, result, checkAnalysis = false) + } + + // @formatter:off + /** + * 'Project [*] + * +- 'DataFrameDropColumns ['T1._row_number_, 'T2._row_number_] + * +- 'Join LeftOuter, ('T1._row_number_ = 'T2._row_number_) + * :- 'SubqueryAlias T1 + * : +- 'Project [row_number() windowspecdefinition(1 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _row_number_#427, *] + * : +- 'Project ['age, 'dept, 'salary] + * : +- 'UnresolvedRelation [employees], [], false + * +- 'SubqueryAlias T2 + * +- 'Project [row_number() windowspecdefinition(1 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _row_number_#432, *] + * +- 'DataFrameDropColumns ['m] + * +- 'Project [*, 1 AS m#430] + * +- 'Aggregate ['age AS age#429], ['COUNT(*) AS count()#428, 'age AS age#429] + * +- 'UnresolvedRelation [employees], [], false + */ + // @formatter:on + test("test AppendCol with chained sub-search") { + val context = new CatalystPlanContext + val logicalPlan = planTransformer.visit( + plan( + pplParser, + "source=employees | FIELDS age, dept, salary | APPENDCOL [ stats count() by age | eval m = 1 | FIELDS -m ];"), + context) + + /* + :- 'SubqueryAlias T1 + : +- 'Project [row_number() windowspecdefinition(1 DESC NULLS LAST, + specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _row_number_#11, *] + : +- 'Project ['age, 'dept, 'salary] + : +- 'UnresolvedRelation [relation], [], false + */ + val t1 = SubqueryAlias( + "T1", + Project( + Seq(ROW_NUMBER_AGGREGATION, UnresolvedStar(None)), + Project( + Seq( + UnresolvedAttribute("age"), + UnresolvedAttribute("dept"), + UnresolvedAttribute("salary")), + RELATION_EMPLOYEES))) + + /* + +- 'SubqueryAlias T2 + +- 'Project [row_number() windowspecdefinition(1 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _row_number_#432, *] + +- 'DataFrameDropColumns ['m] + +- 'Project [*, 1 AS m#430] + +- 'Aggregate ['age AS age#429], ['COUNT(*) AS count()#428, 'age AS age#429] + +- 'UnresolvedRelation [employees], [], false + */ + val t2 = SubqueryAlias( + "T2", + Project( + Seq(ROW_NUMBER_AGGREGATION, UnresolvedStar(None)), + DataFrameDropColumns( + Seq(UnresolvedAttribute("m")), + Project( + Seq(UnresolvedStar(None), Alias(Literal(1), "m")()), + Aggregate(AGE_ALIAS :: Nil, Seq(COUNT_STAR, AGE_ALIAS), RELATION_EMPLOYEES))))) + + val result = Project( + Seq(UnresolvedStar(None)), + DataFrameDropColumns( + T12_COLUMNS_SEQ, + Join(t1, t2, LeftOuter, Some(T12_JOIN_CONDITION), JoinHint.NONE))) + + comparePlans(logicalPlan, result, checkAnalysis = false) + } + + // @formatter:off + /** + * == Parsed Logical Plan == + * 'Project [*] + * +- 'DataFrameDropColumns ['T1._row_number_, 'T2._row_number_] + * +- 'Join LeftOuter, ('T1._row_number_ = 'T2._row_number_) + * :- 'SubqueryAlias T1 + * : +- 'Project [row_number() windowspecdefinition(1 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _row_number_#551, *] + * : +- 'DataFrameDropColumns ['T1._row_number_, 'T2._row_number_] + * : +- 'Join LeftOuter, ('T1._row_number_ = 'T2._row_number_) + * : :- 'SubqueryAlias T1 + * : : +- 'Project [row_number() windowspecdefinition(1 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _row_number_#544, *] + * : : +- 'Project ['name, 'age] + * : : +- 'UnresolvedRelation [employees], [], false + * : +- 'SubqueryAlias T2 + * : +- 'Project [row_number() windowspecdefinition(1 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _row_number_#549, *] + * : +- 'DataFrameDropColumns ['m] + * : +- 'Project [*, 1 AS m#547] + * : +- 'Aggregate ['age AS age#546], ['COUNT(*) AS count()#545, 'age AS age#546] + * : +- 'UnresolvedRelation [employees], [], false + * +- 'SubqueryAlias T2 + * +- 'Project [row_number() windowspecdefinition(1 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _row_number_#553, *] + * +- 'Project ['dept] + * +- 'UnresolvedRelation [employees], [], false + */ + // @formatter:on + test("test multiple AppendCol clauses") { + val context = new CatalystPlanContext + val logicalPlan = planTransformer.visit( + plan( + pplParser, + "source=employees | FIELDS name, age | APPENDCOL [ stats count() by age | eval m = 1 | FIELDS -m ] | APPENDCOL [FIELDS dept];"), + context) + + /* + :- 'SubqueryAlias T1 + : +- 'Project [row_number() windowspecdefinition(1 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _row_number_#544, *] + : +- 'Project ['name, 'age] + : +- 'UnresolvedRelation [employees], [], false + */ + val mainSearch = SubqueryAlias( + "T1", + Project( + Seq(ROW_NUMBER_AGGREGATION, UnresolvedStar(None)), + Project( + Seq(UnresolvedAttribute("name"), UnresolvedAttribute("age")), + RELATION_EMPLOYEES))) + + /* + +- 'SubqueryAlias T2 + +- 'Project [row_number() windowspecdefinition(1 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _row_number_#432, *] + +- 'DataFrameDropColumns ['m] + +- 'Project [*, 1 AS m#430] + +- 'Aggregate ['age AS age#429], ['COUNT(*) AS count()#428, 'age AS age#429] + +- 'UnresolvedRelation [employees], [], false + */ + val firstAppenCol = SubqueryAlias( + "T2", + Project( + Seq(ROW_NUMBER_AGGREGATION, UnresolvedStar(None)), + DataFrameDropColumns( + Seq(UnresolvedAttribute("m")), + Project( + Seq(UnresolvedStar(None), Alias(Literal(1), "m")()), + Aggregate(AGE_ALIAS :: Nil, Seq(COUNT_STAR, AGE_ALIAS), RELATION_EMPLOYEES))))) + + val joinWithFirstAppendCol = SubqueryAlias( + "T1", + Project( + Seq(ROW_NUMBER_AGGREGATION, UnresolvedStar(None)), + DataFrameDropColumns( + T12_COLUMNS_SEQ, + Join(mainSearch, firstAppenCol, LeftOuter, Some(T12_JOIN_CONDITION), JoinHint.NONE)))) + + /* + +- 'SubqueryAlias T2 + +- 'Project [row_number() windowspecdefinition(1 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _row_number_#553, *] + +- 'Project ['dept] + +- 'UnresolvedRelation [employees], [], false + */ + val secondAppendCol = SubqueryAlias( + "T2", + Project( + Seq(ROW_NUMBER_AGGREGATION, UnresolvedStar(None)), + Project(Seq(UnresolvedAttribute("dept")), RELATION_EMPLOYEES))) + + val joinWithSecondAppendCol = Project( + Seq(UnresolvedStar(None)), + DataFrameDropColumns( + T12_COLUMNS_SEQ, + Join( + joinWithFirstAppendCol, + secondAppendCol, + LeftOuter, + Some(T12_JOIN_CONDITION), + JoinHint.NONE))) + + comparePlans(logicalPlan, joinWithSecondAppendCol, checkAnalysis = false) + } }