Skip to content

Commit

Permalink
Update code style
Browse files Browse the repository at this point in the history
Signed-off-by: Andy Kwok <[email protected]>
  • Loading branch information
andy-k-improving committed Dec 13, 2024
1 parent 13f4cb9 commit d34abf1
Showing 1 changed file with 118 additions and 115 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,32 +8,14 @@ package org.opensearch.flint.spark.ppl
import org.opensearch.flint.spark.ppl.PlaneUtils.plan
import org.opensearch.sql.ppl.{CatalystPlanContext, CatalystQueryPlanVisitor}
import org.opensearch.sql.ppl.utils.SortUtils
import org.scalatest.matchers.should.Matchers

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.analysis.{
UnresolvedAttribute,
UnresolvedFunction,
UnresolvedRelation,
UnresolvedStar
}
import org.apache.spark.sql.catalyst.expressions.{
Alias,
And,
CurrentRow,
EqualTo,
Literal,
RowFrame,
RowNumber,
SpecifiedWindowFrame,
UnboundedPreceding,
WindowExpression,
WindowSpecDefinition
}
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedFunction, UnresolvedRelation, UnresolvedStar}
import org.apache.spark.sql.catalyst.expressions.{Alias, And, CurrentRow, EqualTo, Literal, RowFrame, RowNumber, SpecifiedWindowFrame, UnboundedPreceding, WindowExpression, WindowSpecDefinition}
import org.apache.spark.sql.catalyst.plans.{LeftOuter, PlanTest}
import org.apache.spark.sql.catalyst.plans.logical._

import org.scalatest.matchers.should.Matchers

class PPLLogicalPlanAppendColCommandTranslatorTestSuite
extends SparkFunSuite
with PlanTest
Expand All @@ -53,20 +35,18 @@ class PPLLogicalPlanAppendColCommandTranslatorTestSuite
"_row_number_")()

private val COUNT_STAR = Alias(
UnresolvedFunction(Seq("COUNT"),
Seq(UnresolvedStar(None)),
isDistinct = false),
UnresolvedFunction(Seq("COUNT"), Seq(UnresolvedStar(None)), isDistinct = false),
"count()")()

private val AGE_ALIAS = Alias(UnresolvedAttribute("age"), "age")()

private val RELATION_EMPLOYEES = UnresolvedRelation(Seq("employees"))

private val T12_JOIN_CONDITION = EqualTo(
UnresolvedAttribute("T1._row_number_"), UnresolvedAttribute("T2._row_number_"))
private val T12_JOIN_CONDITION =
EqualTo(UnresolvedAttribute("T1._row_number_"), UnresolvedAttribute("T2._row_number_"))

private val T12_COLUMNS_SEQ = Seq(
UnresolvedAttribute("T1._row_number_"), UnresolvedAttribute("T2._row_number_"))
private val T12_COLUMNS_SEQ =
Seq(UnresolvedAttribute("T1._row_number_"), UnresolvedAttribute("T2._row_number_"))

// @formatter:off
/**
Expand All @@ -87,17 +67,16 @@ class PPLLogicalPlanAppendColCommandTranslatorTestSuite
val context = new CatalystPlanContext
val logicalPlan = planTransformer.visit(
plan(pplParser, "source=employees | APPENDCOL [stats count() by age];"),
context
)
context)

/*
:- 'SubqueryAlias T1
: +- 'Project [row_number() windowspecdefinition(1 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _row_number_#7, *]
: +- 'UnresolvedRelation [relation], [], false
*/
val t1 = SubqueryAlias("T1", Project(
Seq(ROW_NUMBER_AGGREGATION, UnresolvedStar(None)),
RELATION_EMPLOYEES))
val t1 = SubqueryAlias(
"T1",
Project(Seq(ROW_NUMBER_AGGREGATION, UnresolvedStar(None)), RELATION_EMPLOYEES))

/*
+- 'SubqueryAlias T2
Expand All @@ -106,16 +85,17 @@ class PPLLogicalPlanAppendColCommandTranslatorTestSuite
+- 'Aggregate ['age AS age#9], ['COUNT(*) AS count()#8, 'age AS age#10]
+- 'UnresolvedRelation [relation], [], false
*/
val t2 = SubqueryAlias("T2", Project(
Seq(ROW_NUMBER_AGGREGATION, UnresolvedStar(None)),
Aggregate(
AGE_ALIAS :: Nil, Seq(COUNT_STAR, AGE_ALIAS),
RELATION_EMPLOYEES)
))
val t2 = SubqueryAlias(
"T2",
Project(
Seq(ROW_NUMBER_AGGREGATION, UnresolvedStar(None)),
Aggregate(AGE_ALIAS :: Nil, Seq(COUNT_STAR, AGE_ALIAS), RELATION_EMPLOYEES)))

val result = Project(Seq(UnresolvedStar(None)),
DataFrameDropColumns(T12_COLUMNS_SEQ,
Join(t1, t2, LeftOuter, Some(T12_JOIN_CONDITION), JoinHint.NONE)))
val result = Project(
Seq(UnresolvedStar(None)),
DataFrameDropColumns(
T12_COLUMNS_SEQ,
Join(t1, t2, LeftOuter, Some(T12_JOIN_CONDITION), JoinHint.NONE)))

comparePlans(logicalPlan, result, checkAnalysis = false)
}
Expand All @@ -138,9 +118,10 @@ class PPLLogicalPlanAppendColCommandTranslatorTestSuite
test("test AppendCol with transformation on main-search") {
val context = new CatalystPlanContext
val logicalPlan = planTransformer.visit(
plan(pplParser, "source=employees | FIELDS age, dept, salary | APPENDCOL [stats count() by age];"),
context
)
plan(
pplParser,
"source=employees | FIELDS age, dept, salary | APPENDCOL [stats count() by age];"),
context)

/*
:- 'SubqueryAlias T1
Expand All @@ -149,13 +130,16 @@ class PPLLogicalPlanAppendColCommandTranslatorTestSuite
: +- 'Project ['age, 'dept, 'salary]
: +- 'UnresolvedRelation [relation], [], false
*/
val t1 = SubqueryAlias("T1", Project(
Seq(ROW_NUMBER_AGGREGATION, UnresolvedStar(None)),
Project(Seq(
UnresolvedAttribute("age"),
UnresolvedAttribute("dept"),
UnresolvedAttribute("salary")), RELATION_EMPLOYEES)))

val t1 = SubqueryAlias(
"T1",
Project(
Seq(ROW_NUMBER_AGGREGATION, UnresolvedStar(None)),
Project(
Seq(
UnresolvedAttribute("age"),
UnresolvedAttribute("dept"),
UnresolvedAttribute("salary")),
RELATION_EMPLOYEES)))

/*
+- 'SubqueryAlias T2
Expand All @@ -164,15 +148,16 @@ class PPLLogicalPlanAppendColCommandTranslatorTestSuite
+- 'Aggregate ['age AS age#9], ['COUNT(*) AS count()#8, 'age AS age#10]
+- 'UnresolvedRelation [relation], [], false
*/
val t2 = SubqueryAlias("T2", Project(
Seq(ROW_NUMBER_AGGREGATION, UnresolvedStar(None)),
Aggregate(
AGE_ALIAS :: Nil, Seq(COUNT_STAR, AGE_ALIAS),
RELATION_EMPLOYEES)
))

val result = Project(Seq(UnresolvedStar(None)),
DataFrameDropColumns(T12_COLUMNS_SEQ,
val t2 = SubqueryAlias(
"T2",
Project(
Seq(ROW_NUMBER_AGGREGATION, UnresolvedStar(None)),
Aggregate(AGE_ALIAS :: Nil, Seq(COUNT_STAR, AGE_ALIAS), RELATION_EMPLOYEES)))

val result = Project(
Seq(UnresolvedStar(None)),
DataFrameDropColumns(
T12_COLUMNS_SEQ,
Join(t1, t2, LeftOuter, Some(T12_JOIN_CONDITION), JoinHint.NONE)))

comparePlans(logicalPlan, result, checkAnalysis = false)
Expand All @@ -198,9 +183,10 @@ class PPLLogicalPlanAppendColCommandTranslatorTestSuite
test("test AppendCol with chained sub-search") {
val context = new CatalystPlanContext
val logicalPlan = planTransformer.visit(
plan(pplParser, "source=employees | FIELDS age, dept, salary | APPENDCOL [ stats count() by age | eval m = 1 | FIELDS -m ];"),
context
)
plan(
pplParser,
"source=employees | FIELDS age, dept, salary | APPENDCOL [ stats count() by age | eval m = 1 | FIELDS -m ];"),
context)

/*
:- 'SubqueryAlias T1
Expand All @@ -209,13 +195,16 @@ class PPLLogicalPlanAppendColCommandTranslatorTestSuite
: +- 'Project ['age, 'dept, 'salary]
: +- 'UnresolvedRelation [relation], [], false
*/
val t1 = SubqueryAlias("T1", Project(
Seq(ROW_NUMBER_AGGREGATION, UnresolvedStar(None)),
Project(Seq(
UnresolvedAttribute("age"),
UnresolvedAttribute("dept"),
UnresolvedAttribute("salary")), RELATION_EMPLOYEES)))

val t1 = SubqueryAlias(
"T1",
Project(
Seq(ROW_NUMBER_AGGREGATION, UnresolvedStar(None)),
Project(
Seq(
UnresolvedAttribute("age"),
UnresolvedAttribute("dept"),
UnresolvedAttribute("salary")),
RELATION_EMPLOYEES)))

/*
+- 'SubqueryAlias T2
Expand All @@ -225,17 +214,20 @@ class PPLLogicalPlanAppendColCommandTranslatorTestSuite
+- 'Aggregate ['age AS age#429], ['COUNT(*) AS count()#428, 'age AS age#429]
+- 'UnresolvedRelation [employees], [], false
*/
val t2 = SubqueryAlias("T2", Project(
Seq(ROW_NUMBER_AGGREGATION, UnresolvedStar(None)),
DataFrameDropColumns(Seq(UnresolvedAttribute("m")),
Project(Seq(UnresolvedStar(None), Alias(Literal(1),"m")()),
Aggregate(
AGE_ALIAS :: Nil, Seq(COUNT_STAR, AGE_ALIAS),
RELATION_EMPLOYEES)))
))

val result = Project(Seq(UnresolvedStar(None)),
DataFrameDropColumns(T12_COLUMNS_SEQ,
val t2 = SubqueryAlias(
"T2",
Project(
Seq(ROW_NUMBER_AGGREGATION, UnresolvedStar(None)),
DataFrameDropColumns(
Seq(UnresolvedAttribute("m")),
Project(
Seq(UnresolvedStar(None), Alias(Literal(1), "m")()),
Aggregate(AGE_ALIAS :: Nil, Seq(COUNT_STAR, AGE_ALIAS), RELATION_EMPLOYEES)))))

val result = Project(
Seq(UnresolvedStar(None)),
DataFrameDropColumns(
T12_COLUMNS_SEQ,
Join(t1, t2, LeftOuter, Some(T12_JOIN_CONDITION), JoinHint.NONE)))

comparePlans(logicalPlan, result, checkAnalysis = false)
Expand Down Expand Up @@ -270,22 +262,24 @@ class PPLLogicalPlanAppendColCommandTranslatorTestSuite
test("test multiple AppendCol clauses") {
val context = new CatalystPlanContext
val logicalPlan = planTransformer.visit(
plan(pplParser, "source=employees | FIELDS name, age | APPENDCOL [ stats count() by age | eval m = 1 | FIELDS -m ] | APPENDCOL [FIELDS dept];"),
context
)
plan(
pplParser,
"source=employees | FIELDS name, age | APPENDCOL [ stats count() by age | eval m = 1 | FIELDS -m ] | APPENDCOL [FIELDS dept];"),
context)

/*
:- 'SubqueryAlias T1
: +- 'Project [row_number() windowspecdefinition(1 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _row_number_#544, *]
: +- 'Project ['name, 'age]
: +- 'UnresolvedRelation [employees], [], false
*/
val mainSearch = SubqueryAlias("T1", Project(
Seq(ROW_NUMBER_AGGREGATION, UnresolvedStar(None)),
Project(Seq(
UnresolvedAttribute("name"),
UnresolvedAttribute("age")), RELATION_EMPLOYEES)))

val mainSearch = SubqueryAlias(
"T1",
Project(
Seq(ROW_NUMBER_AGGREGATION, UnresolvedStar(None)),
Project(
Seq(UnresolvedAttribute("name"), UnresolvedAttribute("age")),
RELATION_EMPLOYEES)))

/*
+- 'SubqueryAlias T2
Expand All @@ -295,37 +289,46 @@ class PPLLogicalPlanAppendColCommandTranslatorTestSuite
+- 'Aggregate ['age AS age#429], ['COUNT(*) AS count()#428, 'age AS age#429]
+- 'UnresolvedRelation [employees], [], false
*/
val firstAppenCol = SubqueryAlias("T2", Project(
Seq(ROW_NUMBER_AGGREGATION, UnresolvedStar(None)),
DataFrameDropColumns(Seq(UnresolvedAttribute("m")),
Project(Seq(UnresolvedStar(None), Alias(Literal(1),"m")()),
Aggregate(
AGE_ALIAS :: Nil, Seq(COUNT_STAR, AGE_ALIAS),
RELATION_EMPLOYEES)))
))


val joinWithFirstAppendCol = SubqueryAlias("T1", Project(Seq(
ROW_NUMBER_AGGREGATION, UnresolvedStar(None)),
DataFrameDropColumns(T12_COLUMNS_SEQ,
Join(mainSearch, firstAppenCol, LeftOuter, Some(T12_JOIN_CONDITION), JoinHint.NONE))))

val firstAppenCol = SubqueryAlias(
"T2",
Project(
Seq(ROW_NUMBER_AGGREGATION, UnresolvedStar(None)),
DataFrameDropColumns(
Seq(UnresolvedAttribute("m")),
Project(
Seq(UnresolvedStar(None), Alias(Literal(1), "m")()),
Aggregate(AGE_ALIAS :: Nil, Seq(COUNT_STAR, AGE_ALIAS), RELATION_EMPLOYEES)))))

val joinWithFirstAppendCol = SubqueryAlias(
"T1",
Project(
Seq(ROW_NUMBER_AGGREGATION, UnresolvedStar(None)),
DataFrameDropColumns(
T12_COLUMNS_SEQ,
Join(mainSearch, firstAppenCol, LeftOuter, Some(T12_JOIN_CONDITION), JoinHint.NONE))))

/*
+- 'SubqueryAlias T2
+- 'Project [row_number() windowspecdefinition(1 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _row_number_#553, *]
+- 'Project ['dept]
+- 'UnresolvedRelation [employees], [], false
*/
val secondAppendCol = SubqueryAlias("T2", Project(
Seq(ROW_NUMBER_AGGREGATION, UnresolvedStar(None)),
Project(Seq(
UnresolvedAttribute("dept")), RELATION_EMPLOYEES)))


val joinWithSecondAppendCol = Project(Seq(UnresolvedStar(None)),
DataFrameDropColumns(T12_COLUMNS_SEQ,
Join(joinWithFirstAppendCol, secondAppendCol, LeftOuter, Some(T12_JOIN_CONDITION), JoinHint.NONE)))
val secondAppendCol = SubqueryAlias(
"T2",
Project(
Seq(ROW_NUMBER_AGGREGATION, UnresolvedStar(None)),
Project(Seq(UnresolvedAttribute("dept")), RELATION_EMPLOYEES)))

val joinWithSecondAppendCol = Project(
Seq(UnresolvedStar(None)),
DataFrameDropColumns(
T12_COLUMNS_SEQ,
Join(
joinWithFirstAppendCol,
secondAppendCol,
LeftOuter,
Some(T12_JOIN_CONDITION),
JoinHint.NONE)))

comparePlans(logicalPlan, joinWithSecondAppendCol, checkAnalysis = false)
}
Expand Down

0 comments on commit d34abf1

Please sign in to comment.