diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkPPLAggregationWithSpanITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkPPLAggregationWithSpanITSuite.scala index d0851c4f0..ca405b1dd 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkPPLAggregationWithSpanITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkPPLAggregationWithSpanITSuite.scala @@ -5,14 +5,14 @@ package org.opensearch.flint.spark +import org.apache.spark.sql.{QueryTest, Row} import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedFunction, UnresolvedRelation, UnresolvedStar} import org.apache.spark.sql.catalyst.expressions.{Alias, Descending, Divide, Floor, Literal, Multiply, SortOrder} import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.streaming.StreamTest -import org.apache.spark.sql.{QueryTest, Row} class FlintSparkPPLAggregationWithSpanITSuite - extends QueryTest + extends QueryTest with LogicalPlanTestUtils with FlintPPLSuite with StreamTest { @@ -63,7 +63,7 @@ class FlintSparkPPLAggregationWithSpanITSuite job.awaitTermination() } } - + /** * | age_span | count_age | * |:---------|----------:| @@ -144,7 +144,8 @@ class FlintSparkPPLAggregationWithSpanITSuite assert(compareByString(expectedPlan) === compareByString(logicalPlan)) } - test("create ppl simple avg age by span of interval of 10 years with head (limit) query test ") { + test( + "create ppl simple avg age by span of interval of 10 years with head (limit) query test ") { val frame = sql(s""" | source = $testTable| stats avg(age) by span(age, 10) as age_span | head 2 | """.stripMargin) @@ -217,7 +218,8 @@ class FlintSparkPPLAggregationWithSpanITSuite assert(compareByString(expectedPlan) === compareByString(logicalPlan)) } - test("create ppl average age by span of interval of 10 years group by country head (limit) 2 query test ") { + test( + "create ppl average age by span of interval of 10 years group by country head (limit) 2 query test ") { val frame = sql(s""" | source = $testTable| stats avg(age) by span(age, 10) as age_span, country | head 3 | """.stripMargin) @@ -253,7 +255,8 @@ class FlintSparkPPLAggregationWithSpanITSuite assert(compareByString(expectedPlan) === compareByString(logicalPlan)) } - test("create ppl average age by span of interval of 10 years group by country head (limit) 2 query and sort by test ") { + test( + "create ppl average age by span of interval of 10 years group by country head (limit) 2 query and sort by test ") { val frame = sql(s""" | source = $testTable| stats avg(age) by span(age, 10) as age_span, country | sort - age_span | head 2 | """.stripMargin) @@ -288,4 +291,3 @@ class FlintSparkPPLAggregationWithSpanITSuite assert(compareByString(sortedPlan) === compareByString(logicalPlan)) } } - diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkPPLAggregationsITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkPPLAggregationsITSuite.scala index 181fc8ee1..ab9cc07d5 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkPPLAggregationsITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkPPLAggregationsITSuite.scala @@ -5,14 +5,14 @@ package org.opensearch.flint.spark +import org.apache.spark.sql.{QueryTest, Row} import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedFunction, UnresolvedRelation, UnresolvedStar} import org.apache.spark.sql.catalyst.expressions.{Alias, And, Ascending, Descending, Divide, EqualTo, Floor, GreaterThan, LessThan, LessThanOrEqual, Literal, Multiply, Not, Or, SortOrder} import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.streaming.StreamTest -import org.apache.spark.sql.{QueryTest, Row} class FlintSparkPPLAggregationsITSuite - extends QueryTest + extends QueryTest with LogicalPlanTestUtils with FlintPPLSuite with StreamTest { @@ -25,8 +25,7 @@ class FlintSparkPPLAggregationsITSuite // Create test table // Update table creation - sql( - s""" + sql(s""" | CREATE TABLE $testTable | ( | name STRING, @@ -46,8 +45,7 @@ class FlintSparkPPLAggregationsITSuite |""".stripMargin) // Update data insertion - sql( - s""" + sql(s""" | INSERT INTO $testTable | PARTITION (year=2023, month=4) | VALUES ('Jake', 70, 'California', 'USA'), @@ -67,8 +65,7 @@ class FlintSparkPPLAggregationsITSuite } test("create ppl simple age avg query test") { - val frame = sql( - s""" + val frame = sql(s""" | source = $testTable| stats avg(age) | """.stripMargin) @@ -98,8 +95,7 @@ class FlintSparkPPLAggregationsITSuite } test("create ppl simple age avg query with filter test") { - val frame = sql( - s""" + val frame = sql(s""" | source = $testTable| where age < 50 | stats avg(age) | """.stripMargin) @@ -131,8 +127,7 @@ class FlintSparkPPLAggregationsITSuite } test("create ppl simple age avg group by country query test ") { - val frame = sql( - s""" + val frame = sql(s""" | source = $testTable| stats avg(age) by country | """.stripMargin) @@ -167,8 +162,7 @@ class FlintSparkPPLAggregationsITSuite } test("create ppl simple age avg group by country head (limit) query test ") { - val frame = sql( - s""" + val frame = sql(s""" | source = $testTable| stats avg(age) by country | head 1 | """.stripMargin) @@ -198,8 +192,7 @@ class FlintSparkPPLAggregationsITSuite } test("create ppl simple age max group by country query test ") { - val frame = sql( - s""" + val frame = sql(s""" | source = $testTable| stats max(age) by country | """.stripMargin) @@ -234,8 +227,7 @@ class FlintSparkPPLAggregationsITSuite } test("create ppl simple age min group by country query test ") { - val frame = sql( - s""" + val frame = sql(s""" | source = $testTable| stats min(age) by country | """.stripMargin) @@ -270,8 +262,7 @@ class FlintSparkPPLAggregationsITSuite } test("create ppl simple age sum group by country query test ") { - val frame = sql( - s""" + val frame = sql(s""" | source = $testTable| stats sum(age) by country | """.stripMargin) @@ -306,8 +297,7 @@ class FlintSparkPPLAggregationsITSuite } test("create ppl simple age sum group by country order by age query test with sort ") { - val frame = sql( - s""" + val frame = sql(s""" | source = $testTable| stats sum(age) by country | sort country | """.stripMargin) @@ -342,8 +332,7 @@ class FlintSparkPPLAggregationsITSuite } test("create ppl simple age count group by country query test ") { - val frame = sql( - s""" + val frame = sql(s""" | source = $testTable| stats count(age) by country | """.stripMargin) @@ -382,8 +371,7 @@ class FlintSparkPPLAggregationsITSuite } test("create ppl simple age avg group by country with state filter query test ") { - val frame = sql( - s""" + val frame = sql(s""" | source = $testTable| where state != 'Quebec' | stats avg(age) by country | """.stripMargin) @@ -420,4 +408,3 @@ class FlintSparkPPLAggregationsITSuite assert(compareByString(expectedPlan) === compareByString(logicalPlan)) } } - diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkPPLFiltersITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkPPLFiltersITSuite.scala index 37aef3e71..4d03d4f16 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkPPLFiltersITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkPPLFiltersITSuite.scala @@ -5,14 +5,14 @@ package org.opensearch.flint.spark +import org.apache.spark.sql.{QueryTest, Row} import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedFunction, UnresolvedRelation, UnresolvedStar} import org.apache.spark.sql.catalyst.expressions.{Alias, And, Ascending, Descending, Divide, EqualTo, Floor, GreaterThan, LessThan, LessThanOrEqual, Literal, Multiply, Not, Or, SortOrder} import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.streaming.StreamTest -import org.apache.spark.sql.{QueryTest, Row} class FlintSparkPPLFiltersITSuite - extends QueryTest + extends QueryTest with LogicalPlanTestUtils with FlintPPLSuite with StreamTest { @@ -25,8 +25,7 @@ class FlintSparkPPLFiltersITSuite // Create test table // Update table creation - sql( - s""" + sql(s""" | CREATE TABLE $testTable | ( | name STRING, @@ -46,8 +45,7 @@ class FlintSparkPPLFiltersITSuite |""".stripMargin) // Update data insertion - sql( - s""" + sql(s""" | INSERT INTO $testTable | PARTITION (year=2023, month=4) | VALUES ('Jake', 70, 'California', 'USA'), @@ -67,8 +65,7 @@ class FlintSparkPPLFiltersITSuite } test("create ppl simple age literal equal filter query with two fields result test") { - val frame = sql( - s""" + val frame = sql(s""" | source = $testTable age=25 | fields name, age | """.stripMargin) @@ -93,9 +90,9 @@ class FlintSparkPPLFiltersITSuite assert(compareByString(expectedPlan) === compareByString(logicalPlan)) } - test("create ppl simple age literal greater than filter AND country not equal filter query with two fields result test") { - val frame = sql( - s""" + test( + "create ppl simple age literal greater than filter AND country not equal filter query with two fields result test") { + val frame = sql(s""" | source = $testTable age>10 and country != 'USA' | fields name, age | """.stripMargin) @@ -113,7 +110,8 @@ class FlintSparkPPLFiltersITSuite // Define the expected logical plan val table = UnresolvedRelation(Seq("default", "flint_ppl_test")) val filterExpr = And( - GreaterThan(UnresolvedAttribute("age"), Literal(10)), Not(EqualTo(UnresolvedAttribute("country"), Literal("USA")))) + GreaterThan(UnresolvedAttribute("age"), Literal(10)), + Not(EqualTo(UnresolvedAttribute("country"), Literal("USA")))) val filterPlan = Filter(filterExpr, table) val projectList = Seq(UnresolvedAttribute("name"), UnresolvedAttribute("age")) val expectedPlan = Project(projectList, filterPlan) @@ -121,9 +119,9 @@ class FlintSparkPPLFiltersITSuite assert(compareByString(expectedPlan) === compareByString(logicalPlan)) } - test("create ppl simple age literal greater than filter AND country not equal filter query with two fields sorted result test") { - val frame = sql( - s""" + test( + "create ppl simple age literal greater than filter AND country not equal filter query with two fields sorted result test") { + val frame = sql(s""" | source = $testTable age>10 and country != 'USA' | sort - age | fields name, age | """.stripMargin) @@ -151,9 +149,9 @@ class FlintSparkPPLFiltersITSuite assert(compareByString(sortedPlan) === compareByString(logicalPlan)) } - test("create ppl simple age literal equal than filter OR country not equal filter query with two fields result test") { - val frame = sql( - s""" + test( + "create ppl simple age literal equal than filter OR country not equal filter query with two fields result test") { + val frame = sql(s""" | source = $testTable age<=20 OR country = 'USA' | fields name, age | """.stripMargin) @@ -180,9 +178,9 @@ class FlintSparkPPLFiltersITSuite assert(compareByString(expectedPlan) === compareByString(logicalPlan)) } - test("create ppl simple age literal equal than filter OR country not equal filter query with two fields result and head (limit) test") { - val frame = sql( - s""" + test( + "create ppl simple age literal equal than filter OR country not equal filter query with two fields result and head (limit) test") { + val frame = sql(s""" | source = $testTable age<=20 OR country = 'USA' | fields name, age | head 1 | """.stripMargin) @@ -206,8 +204,7 @@ class FlintSparkPPLFiltersITSuite } test("create ppl simple age literal greater than filter query with two fields result test") { - val frame = sql( - s""" + val frame = sql(s""" | source = $testTable age>25 | fields name, age | """.stripMargin) @@ -232,9 +229,9 @@ class FlintSparkPPLFiltersITSuite assert(compareByString(expectedPlan) === compareByString(logicalPlan)) } - test("create ppl simple age literal smaller than equals filter query with two fields result test") { - val frame = sql( - s""" + test( + "create ppl simple age literal smaller than equals filter query with two fields result test") { + val frame = sql(s""" | source = $testTable age<=65 | fields name, age | """.stripMargin) @@ -259,9 +256,9 @@ class FlintSparkPPLFiltersITSuite assert(compareByString(expectedPlan) === compareByString(logicalPlan)) } - test("create ppl simple age literal smaller than equals filter query with two fields result with sort test") { - val frame = sql( - s""" + test( + "create ppl simple age literal smaller than equals filter query with two fields result with sort test") { + val frame = sql(s""" | source = $testTable age<=65 | sort name | fields name, age | """.stripMargin) @@ -287,8 +284,7 @@ class FlintSparkPPLFiltersITSuite } test("create ppl simple name literal equal filter query with two fields result test") { - val frame = sql( - s""" + val frame = sql(s""" | source = $testTable name='Jake' | fields name, age | """.stripMargin) @@ -314,8 +310,7 @@ class FlintSparkPPLFiltersITSuite } test("create ppl simple name literal not equal filter query with two fields result test") { - val frame = sql( - s""" + val frame = sql(s""" | source = $testTable name!='Jake' | fields name, age | """.stripMargin) @@ -342,8 +337,7 @@ class FlintSparkPPLFiltersITSuite } test("create ppl simple avg age by span of interval of 10 years query test ") { - val frame = sql( - s""" + val frame = sql(s""" | source = $testTable| stats avg(age) by span(age, 10) as age_span | """.stripMargin) @@ -377,8 +371,7 @@ class FlintSparkPPLFiltersITSuite test( "create ppl simple avg age by span of interval of 10 years with head (limit) query test ") { - val frame = sql( - s""" + val frame = sql(s""" | source = $testTable| stats avg(age) by span(age, 10) as age_span | head 2 | """.stripMargin) @@ -419,8 +412,7 @@ class FlintSparkPPLFiltersITSuite dataFrame.collect(); dataFrame.show() - val frame = sql( - s""" + val frame = sql(s""" | source = $testTable| stats avg(age) by span(age, 10) as age_span, country | """.stripMargin) diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkPPLITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkPPLITSuite.scala index f9c9752a7..4079e512e 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkPPLITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkPPLITSuite.scala @@ -5,11 +5,11 @@ package org.opensearch.flint.spark +import org.apache.spark.sql.{QueryTest, Row} import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedRelation, UnresolvedStar} import org.apache.spark.sql.catalyst.expressions.{Ascending, Literal, SortOrder} import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.streaming.StreamTest -import org.apache.spark.sql.{QueryTest, Row} class FlintSparkPPLITSuite extends QueryTest @@ -200,7 +200,7 @@ class FlintSparkPPLITSuite // Compare the two plans assert(expectedPlan === logicalPlan) } - + test("create ppl simple query two with fields and head (limit) with sorting test") { val frame = sql(s""" | source = $testTable| fields name, age | head 1 | sort age @@ -222,5 +222,5 @@ class FlintSparkPPLITSuite // Compare the two plans assert(sortedPlan === logicalPlan) } - + } diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkPPLTimeWindowITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkPPLTimeWindowITSuite.scala index c4dfb02d1..d1c420091 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkPPLTimeWindowITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkPPLTimeWindowITSuite.scala @@ -363,10 +363,8 @@ class FlintSparkPPLTimeWindowITSuite Alias( UnresolvedFunction(Seq("SUM"), Seq(productsAmount), isDistinct = false), "sum(productsAmount)")() - val aggregatePlan = Aggregate( - Seq( windowExpression), - Seq(aggregateExpressions, windowExpression), - table) + val aggregatePlan = + Aggregate(Seq(windowExpression), Seq(aggregateExpressions, windowExpression), table) val expectedPlan = Project(star, aggregatePlan) val sortedPlan: LogicalPlan = Sort( Seq(SortOrder(UnresolvedAttribute("age_date"), Ascending)),