From 9c576c223f4eaa132594a0849e645b3f53d9b188 Mon Sep 17 00:00:00 2001 From: Lilienthal Date: Tue, 29 Oct 2024 17:47:51 +0100 Subject: [PATCH] Add PPL Between functionality (#758) * Implement between Signed-off-by: Hendrik Saly * add integration test for between command to the straight and NOT usage Signed-off-by: Jens Schmidt * Add docs Signed-off-by: Hendrik Saly * Add proposed syntax to ppl planning Signed-off-by: Hendrik Saly * adjust gitignore Signed-off-by: Jens Schmidt * adjust gitignore: add spark-bin Signed-off-by: Jens Schmidt * clean .gitignore: remove local adjustments Signed-off-by: Jens Schmidt * update integration test to use between keyword Signed-off-by: Jens Schmidt * Move to comparisonExpression Signed-off-by: Hendrik Saly * Added to keywordsCanBeId Signed-off-by: Hendrik Saly * Update docs Signed-off-by: Hendrik Saly * Add additional tests Signed-off-by: Hendrik Saly * Move to comparisonExpression -2- Signed-off-by: Hendrik Saly * Fix docs Signed-off-by: Hendrik Saly * Added IT tests Signed-off-by: Hendrik Saly --------- Signed-off-by: Hendrik Saly Signed-off-by: Jens Schmidt Co-authored-by: Hendrik Saly --- docs/ppl-lang/PPL-Example-Commands.md | 2 + docs/ppl-lang/planning/ppl-between.md | 17 +++ docs/ppl-lang/ppl-where-command.md | 2 + .../ppl/FlintSparkPPLBetweenITSuite.scala | 136 ++++++++++++++++++ .../src/main/antlr4/OpenSearchPPLLexer.g4 | 1 + ...BetweenExpressionTranslatorTestSuite.scala | 55 +++++++ 6 files changed, 213 insertions(+) create mode 100644 docs/ppl-lang/planning/ppl-between.md create mode 100644 integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLBetweenITSuite.scala create mode 100644 ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanBetweenExpressionTranslatorTestSuite.scala diff --git a/docs/ppl-lang/PPL-Example-Commands.md b/docs/ppl-lang/PPL-Example-Commands.md index 9ca6cf258..82843cd80 100644 --- a/docs/ppl-lang/PPL-Example-Commands.md +++ b/docs/ppl-lang/PPL-Example-Commands.md @@ -56,6 +56,8 @@ _- **Limitation: new field added by eval command with a function cannot be dropp - `source = table | where isblank(a)` - `source = table | where case(length(a) > 6, 'True' else 'False') = 'True'` - `source = table | where a not in (1, 2, 3) | fields a,b,c` +- `source = table | where a between 1 and 4` - Note: This returns a >= 1 and a <= 4, i.e. [1, 4] +- `source = table | where b not between '2024-09-10' and '2025-09-10'` - Note: This returns b >= '2024-09-10' and b <= '2025-09-10' ```sql source = table | eval status_category = diff --git a/docs/ppl-lang/planning/ppl-between.md b/docs/ppl-lang/planning/ppl-between.md new file mode 100644 index 000000000..6c8e300e8 --- /dev/null +++ b/docs/ppl-lang/planning/ppl-between.md @@ -0,0 +1,17 @@ +## between syntax proposal + +1. **Proposed syntax** + - `... | where expr1 [NOT] BETWEEN expr2 AND expr3` + - evaluate if expr1 is [not] in between expr2 and expr3 + - `... | where a between 1 and 4` - Note: This returns a >= 1 and a <= 4, i.e. [1, 4] + - `... | where b not between '2024-09-10' and '2025-09-10'` - Note: This returns b >= '2024-09-10' and b <= '2025-09-10' + +### New syntax definition in ANTLR + +```ANTLR + +logicalExpression + ... + | expr1 = functionArg NOT? BETWEEN expr2 = functionArg AND expr3 = functionArg # between + +``` \ No newline at end of file diff --git a/docs/ppl-lang/ppl-where-command.md b/docs/ppl-lang/ppl-where-command.md index 94ddc1f5c..89a7e61fa 100644 --- a/docs/ppl-lang/ppl-where-command.md +++ b/docs/ppl-lang/ppl-where-command.md @@ -41,6 +41,8 @@ PPL query: - `source = table | where isempty(a)` - `source = table | where isblank(a)` - `source = table | where case(length(a) > 6, 'True' else 'False') = 'True'` +- `source = table | where a between 1 and 4` - Note: This returns a >= 1 and a <= 4, i.e. [1, 4] +- `source = table | where b not between '2024-09-10' and '2025-09-10'` - Note: This returns b >= '2024-09-10' and b <= '2025-09-10' - `source = table | eval status_category = case(a >= 200 AND a < 300, 'Success', diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLBetweenITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLBetweenITSuite.scala new file mode 100644 index 000000000..ce0be1409 --- /dev/null +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLBetweenITSuite.scala @@ -0,0 +1,136 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.spark.ppl + +import java.sql.Timestamp + +import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.streaming.StreamTest + +class FlintSparkPPLBetweenITSuite + extends QueryTest + with LogicalPlanTestUtils + with FlintPPLSuite + with StreamTest { + + /** Test table and index name */ + private val testTable = "spark_catalog.default.flint_ppl_test" + private val timeSeriesTestTable = "spark_catalog.default.flint_ppl_timeseries_test" + + override def beforeAll(): Unit = { + super.beforeAll() + + // Create test tables + createPartitionedStateCountryTable(testTable) + createTimeSeriesTable(timeSeriesTestTable) + } + + protected override def afterEach(): Unit = { + super.afterEach() + // Stop all streaming jobs if any + spark.streams.active.foreach { job => + job.stop() + job.awaitTermination() + } + } + + test("test between should return records between two integer values") { + val frame = sql(s""" + | source = $testTable | where age between 20 and 30 + | """.stripMargin) + + val results = frame.collect() + assert(results.length == 3) + assert(frame.columns.length == 6) + + results.foreach(row => { + val age = row.getAs[Int]("age") + assert(age >= 20 && age <= 30, s"Age $age is not between 20 and 30") + }) + } + + test("test between should return records between two integer computed values") { + val frame = sql(s""" + | source = $testTable | where age between 20 + 1 and 30 - 1 + | """.stripMargin) + + val results = frame.collect() + assert(results.length == 1) + assert(frame.columns.length == 6) + + results.foreach(row => { + val age = row.getAs[Int]("age") + assert(age >= 21 && age <= 29, s"Age $age is not between 21 and 29") + }) + } + + test("test between should return records NOT between two integer values") { + val frame = sql(s""" + | source = $testTable | where age NOT between 20 and 30 + | """.stripMargin) + + val results = frame.collect() + assert(results.length == 1) + assert(frame.columns.length == 6) + + results.foreach(row => { + val age = row.getAs[Int]("age") + assert(age < 20 || age > 30, s"Age $age is not between 20 and 30") + }) + } + + test("test between should return records where NOT between two integer values") { + val frame = sql(s""" + | source = $testTable | where NOT age between 20 and 30 + | """.stripMargin) + + val results = frame.collect() + assert(results.length == 1) + assert(frame.columns.length == 6) + + results.foreach(row => { + val age = row.getAs[Int]("age") + assert(age < 20 || age > 30, s"Age $age is not between 20 and 30") + }) + } + + test("test between should return records between two date values") { + val frame = sql(s""" + | source = $timeSeriesTestTable | where time between '2023-10-01 00:01:00' and '2023-10-01 00:10:00' + | """.stripMargin) + + val results = frame.collect() + assert(results.length == 2) + assert(frame.columns.length == 4) + + results.foreach(row => { + val ts = row.getAs[Timestamp]("time") + assert( + !ts.before(Timestamp.valueOf("2023-10-01 00:01:00")) || !ts.after( + Timestamp.valueOf("2023-10-01 00:01:00")), + s"Timestamp $ts is not between '2023-10-01 00:01:00' and '2023-10-01 00:10:00'") + }) + } + + test("test between should return records NOT between two date values") { + val frame = sql(s""" + | source = $timeSeriesTestTable | where time NOT between '2023-10-01 00:01:00' and '2023-10-01 00:10:00' + | """.stripMargin) + + val results = frame.collect() + assert(results.length == 3) + assert(frame.columns.length == 4) + + results.foreach(row => { + val ts = row.getAs[Timestamp]("time") + assert( + ts.before(Timestamp.valueOf("2023-10-01 00:01:00")) || ts.after( + Timestamp.valueOf("2023-10-01 00:01:00")), + s"Timestamp $ts is not between '2023-10-01 00:01:00' and '2023-10-01 00:10:00'") + }) + + } +} diff --git a/ppl-spark-integration/src/main/antlr4/OpenSearchPPLLexer.g4 b/ppl-spark-integration/src/main/antlr4/OpenSearchPPLLexer.g4 index aaee885e3..440ae45b1 100644 --- a/ppl-spark-integration/src/main/antlr4/OpenSearchPPLLexer.g4 +++ b/ppl-spark-integration/src/main/antlr4/OpenSearchPPLLexer.g4 @@ -392,6 +392,7 @@ LIKE: 'LIKE'; ISNULL: 'ISNULL'; ISNOTNULL: 'ISNOTNULL'; ISPRESENT: 'ISPRESENT'; +BETWEEN: 'BETWEEN'; // FLOWCONTROL FUNCTIONS IFNULL: 'IFNULL'; diff --git a/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanBetweenExpressionTranslatorTestSuite.scala b/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanBetweenExpressionTranslatorTestSuite.scala new file mode 100644 index 000000000..6defcb766 --- /dev/null +++ b/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanBetweenExpressionTranslatorTestSuite.scala @@ -0,0 +1,55 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.spark.ppl + +import org.opensearch.flint.spark.ppl.PlaneUtils.plan +import org.opensearch.sql.ppl.{CatalystPlanContext, CatalystQueryPlanVisitor} +import org.scalatest.matchers.should.Matchers + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedRelation, UnresolvedStar} +import org.apache.spark.sql.catalyst.expressions.{And, GreaterThanOrEqual, LessThanOrEqual, Literal} +import org.apache.spark.sql.catalyst.plans.PlanTest +import org.apache.spark.sql.catalyst.plans.logical._ + +class PPLLogicalPlanBetweenExpressionTranslatorTestSuite + extends SparkFunSuite + with PlanTest + with LogicalPlanTestUtils + with Matchers { + + private val planTransformer = new CatalystQueryPlanVisitor() + private val pplParser = new PPLSyntaxParser() + + test("test between expression") { + // if successful build ppl logical plan and translate to catalyst logical plan + val context = new CatalystPlanContext + val logPlan = { + planTransformer.visit( + plan( + pplParser, + "source = table | where datetime_field between '2024-09-10' and '2024-09-15'"), + context) + } + // SQL: SELECT * FROM table WHERE datetime_field BETWEEN '2024-09-10' AND '2024-09-15' + val star = Seq(UnresolvedStar(None)) + + val datetime_field = UnresolvedAttribute("datetime_field") + val tableRelation = UnresolvedRelation(Seq("table")) + + val lowerBound = Literal("2024-09-10") + val upperBound = Literal("2024-09-15") + val betweenCondition = And( + GreaterThanOrEqual(datetime_field, lowerBound), + LessThanOrEqual(datetime_field, upperBound)) + + val filterPlan = Filter(betweenCondition, tableRelation) + val expectedPlan = Project(star, filterPlan) + + comparePlans(expectedPlan, logPlan, false) + } + +}