diff --git a/docs/ppl-lang/PPL-Example-Commands.md b/docs/ppl-lang/PPL-Example-Commands.md index 9ca6cf258..82843cd80 100644 --- a/docs/ppl-lang/PPL-Example-Commands.md +++ b/docs/ppl-lang/PPL-Example-Commands.md @@ -56,6 +56,8 @@ _- **Limitation: new field added by eval command with a function cannot be dropp - `source = table | where isblank(a)` - `source = table | where case(length(a) > 6, 'True' else 'False') = 'True'` - `source = table | where a not in (1, 2, 3) | fields a,b,c` +- `source = table | where a between 1 and 4` - Note: This returns a >= 1 and a <= 4, i.e. [1, 4] +- `source = table | where b not between '2024-09-10' and '2025-09-10'` - Note: This returns b >= '2024-09-10' and b <= '2025-09-10' ```sql source = table | eval status_category = diff --git a/docs/ppl-lang/planning/ppl-between.md b/docs/ppl-lang/planning/ppl-between.md new file mode 100644 index 000000000..6c8e300e8 --- /dev/null +++ b/docs/ppl-lang/planning/ppl-between.md @@ -0,0 +1,17 @@ +## between syntax proposal + +1. **Proposed syntax** + - `... | where expr1 [NOT] BETWEEN expr2 AND expr3` + - evaluate if expr1 is [not] in between expr2 and expr3 + - `... | where a between 1 and 4` - Note: This returns a >= 1 and a <= 4, i.e. [1, 4] + - `... | where b not between '2024-09-10' and '2025-09-10'` - Note: This returns b >= '2024-09-10' and b <= '2025-09-10' + +### New syntax definition in ANTLR + +```ANTLR + +logicalExpression + ... + | expr1 = functionArg NOT? BETWEEN expr2 = functionArg AND expr3 = functionArg # between + +``` \ No newline at end of file diff --git a/docs/ppl-lang/ppl-where-command.md b/docs/ppl-lang/ppl-where-command.md index 94ddc1f5c..89a7e61fa 100644 --- a/docs/ppl-lang/ppl-where-command.md +++ b/docs/ppl-lang/ppl-where-command.md @@ -41,6 +41,8 @@ PPL query: - `source = table | where isempty(a)` - `source = table | where isblank(a)` - `source = table | where case(length(a) > 6, 'True' else 'False') = 'True'` +- `source = table | where a between 1 and 4` - Note: This returns a >= 1 and a <= 4, i.e. [1, 4] +- `source = table | where b not between '2024-09-10' and '2025-09-10'` - Note: This returns b >= '2024-09-10' and b <= '2025-09-10' - `source = table | eval status_category = case(a >= 200 AND a < 300, 'Success', diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLBetweenITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLBetweenITSuite.scala new file mode 100644 index 000000000..ce0be1409 --- /dev/null +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLBetweenITSuite.scala @@ -0,0 +1,136 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.spark.ppl + +import java.sql.Timestamp + +import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.streaming.StreamTest + +class FlintSparkPPLBetweenITSuite + extends QueryTest + with LogicalPlanTestUtils + with FlintPPLSuite + with StreamTest { + + /** Test table and index name */ + private val testTable = "spark_catalog.default.flint_ppl_test" + private val timeSeriesTestTable = "spark_catalog.default.flint_ppl_timeseries_test" + + override def beforeAll(): Unit = { + super.beforeAll() + + // Create test tables + createPartitionedStateCountryTable(testTable) + createTimeSeriesTable(timeSeriesTestTable) + } + + protected override def afterEach(): Unit = { + super.afterEach() + // Stop all streaming jobs if any + spark.streams.active.foreach { job => + job.stop() + job.awaitTermination() + } + } + + test("test between should return records between two integer values") { + val frame = sql(s""" + | source = $testTable | where age between 20 and 30 + | """.stripMargin) + + val results = frame.collect() + assert(results.length == 3) + assert(frame.columns.length == 6) + + results.foreach(row => { + val age = row.getAs[Int]("age") + assert(age >= 20 && age <= 30, s"Age $age is not between 20 and 30") + }) + } + + test("test between should return records between two integer computed values") { + val frame = sql(s""" + | source = $testTable | where age between 20 + 1 and 30 - 1 + | """.stripMargin) + + val results = frame.collect() + assert(results.length == 1) + assert(frame.columns.length == 6) + + results.foreach(row => { + val age = row.getAs[Int]("age") + assert(age >= 21 && age <= 29, s"Age $age is not between 21 and 29") + }) + } + + test("test between should return records NOT between two integer values") { + val frame = sql(s""" + | source = $testTable | where age NOT between 20 and 30 + | """.stripMargin) + + val results = frame.collect() + assert(results.length == 1) + assert(frame.columns.length == 6) + + results.foreach(row => { + val age = row.getAs[Int]("age") + assert(age < 20 || age > 30, s"Age $age is not between 20 and 30") + }) + } + + test("test between should return records where NOT between two integer values") { + val frame = sql(s""" + | source = $testTable | where NOT age between 20 and 30 + | """.stripMargin) + + val results = frame.collect() + assert(results.length == 1) + assert(frame.columns.length == 6) + + results.foreach(row => { + val age = row.getAs[Int]("age") + assert(age < 20 || age > 30, s"Age $age is not between 20 and 30") + }) + } + + test("test between should return records between two date values") { + val frame = sql(s""" + | source = $timeSeriesTestTable | where time between '2023-10-01 00:01:00' and '2023-10-01 00:10:00' + | """.stripMargin) + + val results = frame.collect() + assert(results.length == 2) + assert(frame.columns.length == 4) + + results.foreach(row => { + val ts = row.getAs[Timestamp]("time") + assert( + !ts.before(Timestamp.valueOf("2023-10-01 00:01:00")) || !ts.after( + Timestamp.valueOf("2023-10-01 00:01:00")), + s"Timestamp $ts is not between '2023-10-01 00:01:00' and '2023-10-01 00:10:00'") + }) + } + + test("test between should return records NOT between two date values") { + val frame = sql(s""" + | source = $timeSeriesTestTable | where time NOT between '2023-10-01 00:01:00' and '2023-10-01 00:10:00' + | """.stripMargin) + + val results = frame.collect() + assert(results.length == 3) + assert(frame.columns.length == 4) + + results.foreach(row => { + val ts = row.getAs[Timestamp]("time") + assert( + ts.before(Timestamp.valueOf("2023-10-01 00:01:00")) || ts.after( + Timestamp.valueOf("2023-10-01 00:01:00")), + s"Timestamp $ts is not between '2023-10-01 00:01:00' and '2023-10-01 00:10:00'") + }) + + } +} diff --git a/ppl-spark-integration/src/main/antlr4/OpenSearchPPLLexer.g4 b/ppl-spark-integration/src/main/antlr4/OpenSearchPPLLexer.g4 index 3b5f605df..82fdeb42f 100644 --- a/ppl-spark-integration/src/main/antlr4/OpenSearchPPLLexer.g4 +++ b/ppl-spark-integration/src/main/antlr4/OpenSearchPPLLexer.g4 @@ -392,6 +392,7 @@ LIKE: 'LIKE'; ISNULL: 'ISNULL'; ISNOTNULL: 'ISNOTNULL'; ISPRESENT: 'ISPRESENT'; +BETWEEN: 'BETWEEN'; // FLOWCONTROL FUNCTIONS IFNULL: 'IFNULL'; diff --git a/ppl-spark-integration/src/main/antlr4/OpenSearchPPLParser.g4 b/ppl-spark-integration/src/main/antlr4/OpenSearchPPLParser.g4 index 12aa1332d..48984b3a5 100644 --- a/ppl-spark-integration/src/main/antlr4/OpenSearchPPLParser.g4 +++ b/ppl-spark-integration/src/main/antlr4/OpenSearchPPLParser.g4 @@ -407,6 +407,7 @@ logicalExpression comparisonExpression : left = valueExpression comparisonOperator right = valueExpression # compareExpr | valueExpression NOT? IN valueList # inExpr + | expr1 = functionArg NOT? BETWEEN expr2 = functionArg AND expr3 = functionArg # between ; valueExpressionList @@ -1114,4 +1115,5 @@ keywordsCanBeId | FULL | SEMI | ANTI + | BETWEEN ; diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java index fabc42580..441287ddb 100644 --- a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java @@ -16,7 +16,9 @@ import org.apache.spark.sql.catalyst.expressions.Exists$; import org.apache.spark.sql.catalyst.expressions.Expression; import org.apache.spark.sql.catalyst.expressions.In$; +import org.apache.spark.sql.catalyst.expressions.GreaterThanOrEqual; import org.apache.spark.sql.catalyst.expressions.InSubquery$; +import org.apache.spark.sql.catalyst.expressions.LessThanOrEqual; import org.apache.spark.sql.catalyst.expressions.ListQuery$; import org.apache.spark.sql.catalyst.expressions.NamedExpression; import org.apache.spark.sql.catalyst.expressions.Predicate; @@ -35,6 +37,7 @@ import org.opensearch.sql.ast.expression.AllFields; import org.opensearch.sql.ast.expression.And; import org.opensearch.sql.ast.expression.Argument; +import org.opensearch.sql.ast.expression.Between; import org.opensearch.sql.ast.expression.BinaryExpression; import org.opensearch.sql.ast.expression.Case; import org.opensearch.sql.ast.expression.Compare; @@ -867,5 +870,14 @@ public Expression visitExistsSubquery(ExistsSubquery node, CatalystPlanContext c Option.empty()); return context.getNamedParseExpressions().push(existsSubQuery); } + + @Override + public Expression visitBetween(Between node, CatalystPlanContext context) { + Expression value = analyze(node.getValue(), context); + Expression lower = analyze(node.getLowerBound(), context); + Expression upper = analyze(node.getUpperBound(), context); + context.retainAllNamedParseExpressions(p -> p); + return context.getNamedParseExpressions().push(new org.apache.spark.sql.catalyst.expressions.And(new GreaterThanOrEqual(value, lower), new LessThanOrEqual(value, upper))); + } } } diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/parser/AstExpressionBuilder.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/parser/AstExpressionBuilder.java index 20c55a401..6a0c80c16 100644 --- a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/parser/AstExpressionBuilder.java +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/parser/AstExpressionBuilder.java @@ -17,6 +17,7 @@ import org.opensearch.sql.ast.expression.And; import org.opensearch.sql.ast.expression.Argument; import org.opensearch.sql.ast.expression.AttributeList; +import org.opensearch.sql.ast.expression.Between; import org.opensearch.sql.ast.expression.Case; import org.opensearch.sql.ast.expression.Compare; import org.opensearch.sql.ast.expression.DataType; @@ -277,6 +278,12 @@ public UnresolvedExpression visitConvertedDataType(OpenSearchPPLParser.Converted return new Literal(ctx.getText(), DataType.STRING); } + @Override + public UnresolvedExpression visitBetween(OpenSearchPPLParser.BetweenContext ctx) { + UnresolvedExpression betweenExpr = new Between(visit(ctx.expr1),visit(ctx.expr2),visit(ctx.expr3)); + return ctx.NOT() != null ? new Not(betweenExpr) : betweenExpr; + } + private Function buildFunction( String functionName, List args) { return new Function( diff --git a/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanBetweenExpressionTranslatorTestSuite.scala b/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanBetweenExpressionTranslatorTestSuite.scala new file mode 100644 index 000000000..6defcb766 --- /dev/null +++ b/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanBetweenExpressionTranslatorTestSuite.scala @@ -0,0 +1,55 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.spark.ppl + +import org.opensearch.flint.spark.ppl.PlaneUtils.plan +import org.opensearch.sql.ppl.{CatalystPlanContext, CatalystQueryPlanVisitor} +import org.scalatest.matchers.should.Matchers + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedRelation, UnresolvedStar} +import org.apache.spark.sql.catalyst.expressions.{And, GreaterThanOrEqual, LessThanOrEqual, Literal} +import org.apache.spark.sql.catalyst.plans.PlanTest +import org.apache.spark.sql.catalyst.plans.logical._ + +class PPLLogicalPlanBetweenExpressionTranslatorTestSuite + extends SparkFunSuite + with PlanTest + with LogicalPlanTestUtils + with Matchers { + + private val planTransformer = new CatalystQueryPlanVisitor() + private val pplParser = new PPLSyntaxParser() + + test("test between expression") { + // if successful build ppl logical plan and translate to catalyst logical plan + val context = new CatalystPlanContext + val logPlan = { + planTransformer.visit( + plan( + pplParser, + "source = table | where datetime_field between '2024-09-10' and '2024-09-15'"), + context) + } + // SQL: SELECT * FROM table WHERE datetime_field BETWEEN '2024-09-10' AND '2024-09-15' + val star = Seq(UnresolvedStar(None)) + + val datetime_field = UnresolvedAttribute("datetime_field") + val tableRelation = UnresolvedRelation(Seq("table")) + + val lowerBound = Literal("2024-09-10") + val upperBound = Literal("2024-09-15") + val betweenCondition = And( + GreaterThanOrEqual(datetime_field, lowerBound), + LessThanOrEqual(datetime_field, upperBound)) + + val filterPlan = Filter(betweenCondition, tableRelation) + val expectedPlan = Project(star, filterPlan) + + comparePlans(expectedPlan, logPlan, false) + } + +}