Skip to content

Commit

Permalink
adding window function test
Browse files Browse the repository at this point in the history
updating the PPL to Spark README.md

Signed-off-by: YANGDB <[email protected]>
  • Loading branch information
YANG-DB committed Sep 25, 2023
1 parent 5e8a914 commit a75fcf9
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,6 @@ class FlintSparkPPLTimeWindowITSuite
implicit val rowOrdering: Ordering[Row] = Ordering.by[Row, Timestamp](_.getAs[Timestamp](1))
assert(results.sorted.sameElements(expectedResults.sorted))

// Retrieve the logical plan

// Retrieve the logical plan
val logicalPlan: LogicalPlan = frame.queryExecution.logical
// Define the expected logical plan
Expand Down Expand Up @@ -184,8 +182,6 @@ class FlintSparkPPLTimeWindowITSuite
implicit val rowOrdering: Ordering[Row] = Ordering.by[Row, Timestamp](_.getAs[Timestamp](1))
assert(results.sorted.sameElements(expectedResults.sorted))

// Retrieve the logical plan

// Retrieve the logical plan
val logicalPlan: LogicalPlan = frame.queryExecution.logical
// Define the expected logical plan
Expand Down Expand Up @@ -287,8 +283,6 @@ class FlintSparkPPLTimeWindowITSuite
implicit val rowOrdering: Ordering[Row] = Ordering.by[Row, Timestamp](_.getAs[Timestamp](2))
assert(results.sorted.sameElements(expectedResults.sorted))

// Retrieve the logical plan

// Retrieve the logical plan
val logicalPlan: LogicalPlan = frame.queryExecution.logical
// Define the expected logical plan
Expand Down Expand Up @@ -321,6 +315,67 @@ class FlintSparkPPLTimeWindowITSuite
// Compare the two plans
assert(compareByString(sortedPlan) === compareByString(logicalPlan))
}

test("create ppl query count sales by weeks window and productId with sorting test") {
val frame = sql(s"""
| source = $testTable| stats sum(productsAmount) by span(transactionDate, 1w) as age_date | sort age_date
| """.stripMargin)

frame.show(false)
// Retrieve the results
val results: Array[Row] = frame
.collect()
.map(row =>
Row(
row.get(0),
row.getAs[GenericRowWithSchema](1).get(0),
row.getAs[GenericRowWithSchema](1).get(1)))

// Define the expected results
val expectedResults = Array(
Row(11, Timestamp.valueOf("2023-03-29 17:00:00"), Timestamp.valueOf("2023-04-05 17:00:00")),
Row(7, Timestamp.valueOf("2023-04-26 17:00:00"), Timestamp.valueOf("2023-05-03 17:00:00")),
Row(6, Timestamp.valueOf("2023-05-03 17:00:00"), Timestamp.valueOf("2023-05-10 17:00:00")))

// Compare the results
implicit val timestampOrdering: Ordering[Timestamp] = new Ordering[Timestamp] {
def compare(x: Timestamp, y: Timestamp): Int = x.compareTo(y)
}

implicit val rowOrdering: Ordering[Row] = Ordering.by[Row, Timestamp](_.getAs[Timestamp](1))
assert(results.sorted.sameElements(expectedResults.sorted))

// Retrieve the logical plan
val logicalPlan: LogicalPlan = frame.queryExecution.logical
// Define the expected logical plan
val star = Seq(UnresolvedStar(None))
val productsAmount = UnresolvedAttribute("productsAmount")
val table = UnresolvedRelation(Seq("default", "flint_ppl_sales_test"))

val windowExpression = Alias(
TimeWindow(
UnresolvedAttribute("transactionDate"),
TimeWindow.parseExpression(Literal("1 week")),
TimeWindow.parseExpression(Literal("1 week")),
0),
"age_date")()

val aggregateExpressions =
Alias(
UnresolvedFunction(Seq("SUM"), Seq(productsAmount), isDistinct = false),
"sum(productsAmount)")()
val aggregatePlan = Aggregate(
Seq( windowExpression),
Seq(aggregateExpressions, windowExpression),
table)
val expectedPlan = Project(star, aggregatePlan)
val sortedPlan: LogicalPlan = Sort(
Seq(SortOrder(UnresolvedAttribute("age_date"), Ascending)),
global = true,
expectedPlan)
// Compare the two plans
assert(compareByString(sortedPlan) === compareByString(logicalPlan))
}

ignore("create ppl simple count age by span of interval of 10 years query order by age test ") {
val frame = sql(s"""
Expand Down
15 changes: 11 additions & 4 deletions ppl-spark-integration/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -246,16 +246,23 @@ The next samples of PPL queries are currently supported:

**Aggregations With Span**
- `source = table | stats count(a) by span(a, 10) as a_span`
- `source = table | stats sum(age) by span(age, 5) as age_span | head 2`
- `source = table | stats avg(age) by span(age, 20) as age_span, country | sort - age_span | head 2`

**Aggregations With TimeWindow Span (tumble windowing function) **
- `source = table | stats sum(productsAmount) by span(transactionDate, 1d) as age_date | sort age_date`
- `source = table | stats sum(productsAmount) by span(transactionDate, 1w) as age_date, productId`

#### Supported Commands:
- `search` - [See details](https://github.com/opensearch-project/sql/blob/main/docs/user/ppl/cmd/search.rst)
- `where` - [See details](https://github.com/opensearch-project/sql/blob/main/docs/user/ppl/cmd/where.rst)
- `where` - [See details](https://github.com/opensearch-project/sql/blob/main/docs/user/ppl/cmd/where.rst)
- `fields` - [See details](https://github.com/opensearch-project/sql/blob/main/docs/user/ppl/cmd/fields.rst)
- `head` - [See details](https://github.com/opensearch-project/sql/blob/main/docs/user/ppl/cmd/head.rst)
- `stats` - [See details](https://github.com/opensearch-project/sql/blob/main/docs/user/ppl/cmd/stats.rst)
- `sort` - [See details](https://github.com/opensearch-project/sql/blob/main/docs/user/ppl/cmd/sort.rst)
- `head` - [See details](https://github.com/opensearch-project/sql/blob/main/docs/user/ppl/cmd/head.rst)
- `stats` - [See details](https://github.com/opensearch-project/sql/blob/main/docs/user/ppl/cmd/stats.rst)
- `sort` - [See details](https://github.com/opensearch-project/sql/blob/main/docs/user/ppl/cmd/sort.rst)

> For additional details review the next [Integration Test ](../integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkPPLITSuite.scala)
> For additional details review the next [Integration Time Window Test ](../integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkPPLTimeWindowITSuite.scala)
---

Expand Down

0 comments on commit a75fcf9

Please sign in to comment.