From a75fcf9f7498459fef752973e1d5ef4e87a181cd Mon Sep 17 00:00:00 2001 From: YANGDB Date: Mon, 25 Sep 2023 14:49:30 -0700 Subject: [PATCH] adding window function test updating the PPL to Spark README.md Signed-off-by: YANGDB --- .../FlintSparkPPLTimeWindowITSuite.scala | 67 +++++++++++++++++-- ppl-spark-integration/README.md | 15 +++-- 2 files changed, 72 insertions(+), 10 deletions(-) diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkPPLTimeWindowITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkPPLTimeWindowITSuite.scala index e7452436a..1c9d04be3 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkPPLTimeWindowITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkPPLTimeWindowITSuite.scala @@ -123,8 +123,6 @@ class FlintSparkPPLTimeWindowITSuite implicit val rowOrdering: Ordering[Row] = Ordering.by[Row, Timestamp](_.getAs[Timestamp](1)) assert(results.sorted.sameElements(expectedResults.sorted)) - // Retrieve the logical plan - // Retrieve the logical plan val logicalPlan: LogicalPlan = frame.queryExecution.logical // Define the expected logical plan @@ -184,8 +182,6 @@ class FlintSparkPPLTimeWindowITSuite implicit val rowOrdering: Ordering[Row] = Ordering.by[Row, Timestamp](_.getAs[Timestamp](1)) assert(results.sorted.sameElements(expectedResults.sorted)) - // Retrieve the logical plan - // Retrieve the logical plan val logicalPlan: LogicalPlan = frame.queryExecution.logical // Define the expected logical plan @@ -287,8 +283,6 @@ class FlintSparkPPLTimeWindowITSuite implicit val rowOrdering: Ordering[Row] = Ordering.by[Row, Timestamp](_.getAs[Timestamp](2)) assert(results.sorted.sameElements(expectedResults.sorted)) - // Retrieve the logical plan - // Retrieve the logical plan val logicalPlan: LogicalPlan = frame.queryExecution.logical // Define the expected logical plan @@ -321,6 +315,67 @@ class FlintSparkPPLTimeWindowITSuite // Compare the two plans assert(compareByString(sortedPlan) === compareByString(logicalPlan)) } + + test("create ppl query count sales by weeks window and productId with sorting test") { + val frame = sql(s""" + | source = $testTable| stats sum(productsAmount) by span(transactionDate, 1w) as age_date | sort age_date + | """.stripMargin) + + frame.show(false) + // Retrieve the results + val results: Array[Row] = frame + .collect() + .map(row => + Row( + row.get(0), + row.getAs[GenericRowWithSchema](1).get(0), + row.getAs[GenericRowWithSchema](1).get(1))) + + // Define the expected results + val expectedResults = Array( + Row(11, Timestamp.valueOf("2023-03-29 17:00:00"), Timestamp.valueOf("2023-04-05 17:00:00")), + Row(7, Timestamp.valueOf("2023-04-26 17:00:00"), Timestamp.valueOf("2023-05-03 17:00:00")), + Row(6, Timestamp.valueOf("2023-05-03 17:00:00"), Timestamp.valueOf("2023-05-10 17:00:00"))) + + // Compare the results + implicit val timestampOrdering: Ordering[Timestamp] = new Ordering[Timestamp] { + def compare(x: Timestamp, y: Timestamp): Int = x.compareTo(y) + } + + implicit val rowOrdering: Ordering[Row] = Ordering.by[Row, Timestamp](_.getAs[Timestamp](1)) + assert(results.sorted.sameElements(expectedResults.sorted)) + + // Retrieve the logical plan + val logicalPlan: LogicalPlan = frame.queryExecution.logical + // Define the expected logical plan + val star = Seq(UnresolvedStar(None)) + val productsAmount = UnresolvedAttribute("productsAmount") + val table = UnresolvedRelation(Seq("default", "flint_ppl_sales_test")) + + val windowExpression = Alias( + TimeWindow( + UnresolvedAttribute("transactionDate"), + TimeWindow.parseExpression(Literal("1 week")), + TimeWindow.parseExpression(Literal("1 week")), + 0), + "age_date")() + + val aggregateExpressions = + Alias( + UnresolvedFunction(Seq("SUM"), Seq(productsAmount), isDistinct = false), + "sum(productsAmount)")() + val aggregatePlan = Aggregate( + Seq( windowExpression), + Seq(aggregateExpressions, windowExpression), + table) + val expectedPlan = Project(star, aggregatePlan) + val sortedPlan: LogicalPlan = Sort( + Seq(SortOrder(UnresolvedAttribute("age_date"), Ascending)), + global = true, + expectedPlan) + // Compare the two plans + assert(compareByString(sortedPlan) === compareByString(logicalPlan)) + } ignore("create ppl simple count age by span of interval of 10 years query order by age test ") { val frame = sql(s""" diff --git a/ppl-spark-integration/README.md b/ppl-spark-integration/README.md index cff142c7d..a824cfaaf 100644 --- a/ppl-spark-integration/README.md +++ b/ppl-spark-integration/README.md @@ -246,16 +246,23 @@ The next samples of PPL queries are currently supported: **Aggregations With Span** - `source = table | stats count(a) by span(a, 10) as a_span` +- `source = table | stats sum(age) by span(age, 5) as age_span | head 2` +- `source = table | stats avg(age) by span(age, 20) as age_span, country | sort - age_span | head 2` + +**Aggregations With TimeWindow Span (tumble windowing function) ** +- `source = table | stats sum(productsAmount) by span(transactionDate, 1d) as age_date | sort age_date` +- `source = table | stats sum(productsAmount) by span(transactionDate, 1w) as age_date, productId` #### Supported Commands: - `search` - [See details](https://github.com/opensearch-project/sql/blob/main/docs/user/ppl/cmd/search.rst) - - `where` - [See details](https://github.com/opensearch-project/sql/blob/main/docs/user/ppl/cmd/where.rst) + - `where` - [See details](https://github.com/opensearch-project/sql/blob/main/docs/user/ppl/cmd/where.rst) - `fields` - [See details](https://github.com/opensearch-project/sql/blob/main/docs/user/ppl/cmd/fields.rst) - - `head` - [See details](https://github.com/opensearch-project/sql/blob/main/docs/user/ppl/cmd/head.rst) - - `stats` - [See details](https://github.com/opensearch-project/sql/blob/main/docs/user/ppl/cmd/stats.rst) - - `sort` - [See details](https://github.com/opensearch-project/sql/blob/main/docs/user/ppl/cmd/sort.rst) + - `head` - [See details](https://github.com/opensearch-project/sql/blob/main/docs/user/ppl/cmd/head.rst) + - `stats` - [See details](https://github.com/opensearch-project/sql/blob/main/docs/user/ppl/cmd/stats.rst) + - `sort` - [See details](https://github.com/opensearch-project/sql/blob/main/docs/user/ppl/cmd/sort.rst) > For additional details review the next [Integration Test ](../integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkPPLITSuite.scala) +> For additional details review the next [Integration Time Window Test ](../integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkPPLTimeWindowITSuite.scala) ---