forked from opensearch-project/opensearch-spark
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add PPL Between functionality (opensearch-project#758)
* Implement between Signed-off-by: Hendrik Saly <[email protected]> * add integration test for between command to the straight and NOT usage Signed-off-by: Jens Schmidt <[email protected]> * Add docs Signed-off-by: Hendrik Saly <[email protected]> * Add proposed syntax to ppl planning Signed-off-by: Hendrik Saly <[email protected]> * adjust gitignore Signed-off-by: Jens Schmidt <[email protected]> * adjust gitignore: add spark-bin Signed-off-by: Jens Schmidt <[email protected]> * clean .gitignore: remove local adjustments Signed-off-by: Jens Schmidt <[email protected]> * update integration test to use between keyword Signed-off-by: Jens Schmidt <[email protected]> * Move to comparisonExpression Signed-off-by: Hendrik Saly <[email protected]> * Added to keywordsCanBeId Signed-off-by: Hendrik Saly <[email protected]> * Update docs Signed-off-by: Hendrik Saly <[email protected]> * Add additional tests Signed-off-by: Hendrik Saly <[email protected]> * Move to comparisonExpression -2- Signed-off-by: Hendrik Saly <[email protected]> * Fix docs Signed-off-by: Hendrik Saly <[email protected]> * Added IT tests Signed-off-by: Hendrik Saly <[email protected]> --------- Signed-off-by: Hendrik Saly <[email protected]> Signed-off-by: Jens Schmidt <[email protected]> Co-authored-by: Hendrik Saly <[email protected]>
- Loading branch information
Showing
6 changed files
with
213 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,17 @@ | ||
## between syntax proposal | ||
|
||
1. **Proposed syntax** | ||
- `... | where expr1 [NOT] BETWEEN expr2 AND expr3` | ||
- evaluate if expr1 is [not] in between expr2 and expr3 | ||
- `... | where a between 1 and 4` - Note: This returns a >= 1 and a <= 4, i.e. [1, 4] | ||
- `... | where b not between '2024-09-10' and '2025-09-10'` - Note: This returns b >= '2024-09-10' and b <= '2025-09-10' | ||
|
||
### New syntax definition in ANTLR | ||
|
||
```ANTLR | ||
logicalExpression | ||
... | ||
| expr1 = functionArg NOT? BETWEEN expr2 = functionArg AND expr3 = functionArg # between | ||
``` |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
136 changes: 136 additions & 0 deletions
136
...st/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLBetweenITSuite.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,136 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.flint.spark.ppl | ||
|
||
import java.sql.Timestamp | ||
|
||
import org.apache.spark.sql.QueryTest | ||
import org.apache.spark.sql.streaming.StreamTest | ||
|
||
class FlintSparkPPLBetweenITSuite | ||
extends QueryTest | ||
with LogicalPlanTestUtils | ||
with FlintPPLSuite | ||
with StreamTest { | ||
|
||
/** Test table and index name */ | ||
private val testTable = "spark_catalog.default.flint_ppl_test" | ||
private val timeSeriesTestTable = "spark_catalog.default.flint_ppl_timeseries_test" | ||
|
||
override def beforeAll(): Unit = { | ||
super.beforeAll() | ||
|
||
// Create test tables | ||
createPartitionedStateCountryTable(testTable) | ||
createTimeSeriesTable(timeSeriesTestTable) | ||
} | ||
|
||
protected override def afterEach(): Unit = { | ||
super.afterEach() | ||
// Stop all streaming jobs if any | ||
spark.streams.active.foreach { job => | ||
job.stop() | ||
job.awaitTermination() | ||
} | ||
} | ||
|
||
test("test between should return records between two integer values") { | ||
val frame = sql(s""" | ||
| source = $testTable | where age between 20 and 30 | ||
| """.stripMargin) | ||
|
||
val results = frame.collect() | ||
assert(results.length == 3) | ||
assert(frame.columns.length == 6) | ||
|
||
results.foreach(row => { | ||
val age = row.getAs[Int]("age") | ||
assert(age >= 20 && age <= 30, s"Age $age is not between 20 and 30") | ||
}) | ||
} | ||
|
||
test("test between should return records between two integer computed values") { | ||
val frame = sql(s""" | ||
| source = $testTable | where age between 20 + 1 and 30 - 1 | ||
| """.stripMargin) | ||
|
||
val results = frame.collect() | ||
assert(results.length == 1) | ||
assert(frame.columns.length == 6) | ||
|
||
results.foreach(row => { | ||
val age = row.getAs[Int]("age") | ||
assert(age >= 21 && age <= 29, s"Age $age is not between 21 and 29") | ||
}) | ||
} | ||
|
||
test("test between should return records NOT between two integer values") { | ||
val frame = sql(s""" | ||
| source = $testTable | where age NOT between 20 and 30 | ||
| """.stripMargin) | ||
|
||
val results = frame.collect() | ||
assert(results.length == 1) | ||
assert(frame.columns.length == 6) | ||
|
||
results.foreach(row => { | ||
val age = row.getAs[Int]("age") | ||
assert(age < 20 || age > 30, s"Age $age is not between 20 and 30") | ||
}) | ||
} | ||
|
||
test("test between should return records where NOT between two integer values") { | ||
val frame = sql(s""" | ||
| source = $testTable | where NOT age between 20 and 30 | ||
| """.stripMargin) | ||
|
||
val results = frame.collect() | ||
assert(results.length == 1) | ||
assert(frame.columns.length == 6) | ||
|
||
results.foreach(row => { | ||
val age = row.getAs[Int]("age") | ||
assert(age < 20 || age > 30, s"Age $age is not between 20 and 30") | ||
}) | ||
} | ||
|
||
test("test between should return records between two date values") { | ||
val frame = sql(s""" | ||
| source = $timeSeriesTestTable | where time between '2023-10-01 00:01:00' and '2023-10-01 00:10:00' | ||
| """.stripMargin) | ||
|
||
val results = frame.collect() | ||
assert(results.length == 2) | ||
assert(frame.columns.length == 4) | ||
|
||
results.foreach(row => { | ||
val ts = row.getAs[Timestamp]("time") | ||
assert( | ||
!ts.before(Timestamp.valueOf("2023-10-01 00:01:00")) || !ts.after( | ||
Timestamp.valueOf("2023-10-01 00:01:00")), | ||
s"Timestamp $ts is not between '2023-10-01 00:01:00' and '2023-10-01 00:10:00'") | ||
}) | ||
} | ||
|
||
test("test between should return records NOT between two date values") { | ||
val frame = sql(s""" | ||
| source = $timeSeriesTestTable | where time NOT between '2023-10-01 00:01:00' and '2023-10-01 00:10:00' | ||
| """.stripMargin) | ||
|
||
val results = frame.collect() | ||
assert(results.length == 3) | ||
assert(frame.columns.length == 4) | ||
|
||
results.foreach(row => { | ||
val ts = row.getAs[Timestamp]("time") | ||
assert( | ||
ts.before(Timestamp.valueOf("2023-10-01 00:01:00")) || ts.after( | ||
Timestamp.valueOf("2023-10-01 00:01:00")), | ||
s"Timestamp $ts is not between '2023-10-01 00:01:00' and '2023-10-01 00:10:00'") | ||
}) | ||
|
||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
55 changes: 55 additions & 0 deletions
55
...a/org/opensearch/flint/spark/ppl/PPLLogicalPlanBetweenExpressionTranslatorTestSuite.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,55 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.flint.spark.ppl | ||
|
||
import org.opensearch.flint.spark.ppl.PlaneUtils.plan | ||
import org.opensearch.sql.ppl.{CatalystPlanContext, CatalystQueryPlanVisitor} | ||
import org.scalatest.matchers.should.Matchers | ||
|
||
import org.apache.spark.SparkFunSuite | ||
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedRelation, UnresolvedStar} | ||
import org.apache.spark.sql.catalyst.expressions.{And, GreaterThanOrEqual, LessThanOrEqual, Literal} | ||
import org.apache.spark.sql.catalyst.plans.PlanTest | ||
import org.apache.spark.sql.catalyst.plans.logical._ | ||
|
||
class PPLLogicalPlanBetweenExpressionTranslatorTestSuite | ||
extends SparkFunSuite | ||
with PlanTest | ||
with LogicalPlanTestUtils | ||
with Matchers { | ||
|
||
private val planTransformer = new CatalystQueryPlanVisitor() | ||
private val pplParser = new PPLSyntaxParser() | ||
|
||
test("test between expression") { | ||
// if successful build ppl logical plan and translate to catalyst logical plan | ||
val context = new CatalystPlanContext | ||
val logPlan = { | ||
planTransformer.visit( | ||
plan( | ||
pplParser, | ||
"source = table | where datetime_field between '2024-09-10' and '2024-09-15'"), | ||
context) | ||
} | ||
// SQL: SELECT * FROM table WHERE datetime_field BETWEEN '2024-09-10' AND '2024-09-15' | ||
val star = Seq(UnresolvedStar(None)) | ||
|
||
val datetime_field = UnresolvedAttribute("datetime_field") | ||
val tableRelation = UnresolvedRelation(Seq("table")) | ||
|
||
val lowerBound = Literal("2024-09-10") | ||
val upperBound = Literal("2024-09-15") | ||
val betweenCondition = And( | ||
GreaterThanOrEqual(datetime_field, lowerBound), | ||
LessThanOrEqual(datetime_field, upperBound)) | ||
|
||
val filterPlan = Filter(betweenCondition, tableRelation) | ||
val expectedPlan = Project(star, filterPlan) | ||
|
||
comparePlans(expectedPlan, logPlan, false) | ||
} | ||
|
||
} |