Skip to content

Commit

Permalink
Add PPL Between functionality (opensearch-project#758)
Browse files Browse the repository at this point in the history
* Implement between

Signed-off-by: Hendrik Saly <[email protected]>

* add integration test for between command to the straight and NOT usage

Signed-off-by: Jens Schmidt <[email protected]>

* Add docs

Signed-off-by: Hendrik Saly <[email protected]>

* Add proposed syntax to ppl planning

Signed-off-by: Hendrik Saly <[email protected]>

* adjust gitignore

Signed-off-by: Jens Schmidt <[email protected]>

* adjust gitignore: add spark-bin

Signed-off-by: Jens Schmidt <[email protected]>

* clean .gitignore: remove local adjustments

Signed-off-by: Jens Schmidt <[email protected]>

* update integration test to use between keyword

Signed-off-by: Jens Schmidt <[email protected]>

* Move to comparisonExpression

Signed-off-by: Hendrik Saly <[email protected]>

* Added to keywordsCanBeId

Signed-off-by: Hendrik Saly <[email protected]>

* Update docs

Signed-off-by: Hendrik Saly <[email protected]>

* Add additional tests

Signed-off-by: Hendrik Saly <[email protected]>

* Move to comparisonExpression -2-

Signed-off-by: Hendrik Saly <[email protected]>

* Fix docs

Signed-off-by: Hendrik Saly <[email protected]>

* Added IT tests

Signed-off-by: Hendrik Saly <[email protected]>

---------

Signed-off-by: Hendrik Saly <[email protected]>
Signed-off-by: Jens Schmidt <[email protected]>
Co-authored-by: Hendrik Saly <[email protected]>
  • Loading branch information
dr-lilienthal and salyh authored Oct 29, 2024
1 parent 9d2f94d commit 82770ec
Show file tree
Hide file tree
Showing 9 changed files with 234 additions and 0 deletions.
2 changes: 2 additions & 0 deletions docs/ppl-lang/PPL-Example-Commands.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ _- **Limitation: new field added by eval command with a function cannot be dropp
- `source = table | where isblank(a)`
- `source = table | where case(length(a) > 6, 'True' else 'False') = 'True'`
- `source = table | where a not in (1, 2, 3) | fields a,b,c`
- `source = table | where a between 1 and 4` - Note: This returns a >= 1 and a <= 4, i.e. [1, 4]
- `source = table | where b not between '2024-09-10' and '2025-09-10'` - Note: This returns b >= '2024-09-10' and b <= '2025-09-10'

```sql
source = table | eval status_category =
Expand Down
17 changes: 17 additions & 0 deletions docs/ppl-lang/planning/ppl-between.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
## between syntax proposal

1. **Proposed syntax**
- `... | where expr1 [NOT] BETWEEN expr2 AND expr3`
- evaluate if expr1 is [not] in between expr2 and expr3
- `... | where a between 1 and 4` - Note: This returns a >= 1 and a <= 4, i.e. [1, 4]
- `... | where b not between '2024-09-10' and '2025-09-10'` - Note: This returns b >= '2024-09-10' and b <= '2025-09-10'

### New syntax definition in ANTLR

```ANTLR
logicalExpression
...
| expr1 = functionArg NOT? BETWEEN expr2 = functionArg AND expr3 = functionArg # between
```
2 changes: 2 additions & 0 deletions docs/ppl-lang/ppl-where-command.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ PPL query:
- `source = table | where isempty(a)`
- `source = table | where isblank(a)`
- `source = table | where case(length(a) > 6, 'True' else 'False') = 'True'`
- `source = table | where a between 1 and 4` - Note: This returns a >= 1 and a <= 4, i.e. [1, 4]
- `source = table | where b not between '2024-09-10' and '2025-09-10'` - Note: This returns b >= '2024-09-10' and b <= '2025-09-10'
- `source = table | eval status_category =
case(a >= 200 AND a < 300, 'Success',
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.spark.ppl

import java.sql.Timestamp

import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.streaming.StreamTest

class FlintSparkPPLBetweenITSuite
extends QueryTest
with LogicalPlanTestUtils
with FlintPPLSuite
with StreamTest {

/** Test table and index name */
private val testTable = "spark_catalog.default.flint_ppl_test"
private val timeSeriesTestTable = "spark_catalog.default.flint_ppl_timeseries_test"

override def beforeAll(): Unit = {
super.beforeAll()

// Create test tables
createPartitionedStateCountryTable(testTable)
createTimeSeriesTable(timeSeriesTestTable)
}

protected override def afterEach(): Unit = {
super.afterEach()
// Stop all streaming jobs if any
spark.streams.active.foreach { job =>
job.stop()
job.awaitTermination()
}
}

test("test between should return records between two integer values") {
val frame = sql(s"""
| source = $testTable | where age between 20 and 30
| """.stripMargin)

val results = frame.collect()
assert(results.length == 3)
assert(frame.columns.length == 6)

results.foreach(row => {
val age = row.getAs[Int]("age")
assert(age >= 20 && age <= 30, s"Age $age is not between 20 and 30")
})
}

test("test between should return records between two integer computed values") {
val frame = sql(s"""
| source = $testTable | where age between 20 + 1 and 30 - 1
| """.stripMargin)

val results = frame.collect()
assert(results.length == 1)
assert(frame.columns.length == 6)

results.foreach(row => {
val age = row.getAs[Int]("age")
assert(age >= 21 && age <= 29, s"Age $age is not between 21 and 29")
})
}

test("test between should return records NOT between two integer values") {
val frame = sql(s"""
| source = $testTable | where age NOT between 20 and 30
| """.stripMargin)

val results = frame.collect()
assert(results.length == 1)
assert(frame.columns.length == 6)

results.foreach(row => {
val age = row.getAs[Int]("age")
assert(age < 20 || age > 30, s"Age $age is not between 20 and 30")
})
}

test("test between should return records where NOT between two integer values") {
val frame = sql(s"""
| source = $testTable | where NOT age between 20 and 30
| """.stripMargin)

val results = frame.collect()
assert(results.length == 1)
assert(frame.columns.length == 6)

results.foreach(row => {
val age = row.getAs[Int]("age")
assert(age < 20 || age > 30, s"Age $age is not between 20 and 30")
})
}

test("test between should return records between two date values") {
val frame = sql(s"""
| source = $timeSeriesTestTable | where time between '2023-10-01 00:01:00' and '2023-10-01 00:10:00'
| """.stripMargin)

val results = frame.collect()
assert(results.length == 2)
assert(frame.columns.length == 4)

results.foreach(row => {
val ts = row.getAs[Timestamp]("time")
assert(
!ts.before(Timestamp.valueOf("2023-10-01 00:01:00")) || !ts.after(
Timestamp.valueOf("2023-10-01 00:01:00")),
s"Timestamp $ts is not between '2023-10-01 00:01:00' and '2023-10-01 00:10:00'")
})
}

test("test between should return records NOT between two date values") {
val frame = sql(s"""
| source = $timeSeriesTestTable | where time NOT between '2023-10-01 00:01:00' and '2023-10-01 00:10:00'
| """.stripMargin)

val results = frame.collect()
assert(results.length == 3)
assert(frame.columns.length == 4)

results.foreach(row => {
val ts = row.getAs[Timestamp]("time")
assert(
ts.before(Timestamp.valueOf("2023-10-01 00:01:00")) || ts.after(
Timestamp.valueOf("2023-10-01 00:01:00")),
s"Timestamp $ts is not between '2023-10-01 00:01:00' and '2023-10-01 00:10:00'")
})

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,7 @@ LIKE: 'LIKE';
ISNULL: 'ISNULL';
ISNOTNULL: 'ISNOTNULL';
ISPRESENT: 'ISPRESENT';
BETWEEN: 'BETWEEN';

// FLOWCONTROL FUNCTIONS
IFNULL: 'IFNULL';
Expand Down
2 changes: 2 additions & 0 deletions ppl-spark-integration/src/main/antlr4/OpenSearchPPLParser.g4
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,7 @@ logicalExpression
comparisonExpression
: left = valueExpression comparisonOperator right = valueExpression # compareExpr
| valueExpression NOT? IN valueList # inExpr
| expr1 = functionArg NOT? BETWEEN expr2 = functionArg AND expr3 = functionArg # between
;

valueExpressionList
Expand Down Expand Up @@ -1114,4 +1115,5 @@ keywordsCanBeId
| FULL
| SEMI
| ANTI
| BETWEEN
;
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
import org.apache.spark.sql.catalyst.expressions.Exists$;
import org.apache.spark.sql.catalyst.expressions.Expression;
import org.apache.spark.sql.catalyst.expressions.In$;
import org.apache.spark.sql.catalyst.expressions.GreaterThanOrEqual;
import org.apache.spark.sql.catalyst.expressions.InSubquery$;
import org.apache.spark.sql.catalyst.expressions.LessThanOrEqual;
import org.apache.spark.sql.catalyst.expressions.ListQuery$;
import org.apache.spark.sql.catalyst.expressions.NamedExpression;
import org.apache.spark.sql.catalyst.expressions.Predicate;
Expand All @@ -35,6 +37,7 @@
import org.opensearch.sql.ast.expression.AllFields;
import org.opensearch.sql.ast.expression.And;
import org.opensearch.sql.ast.expression.Argument;
import org.opensearch.sql.ast.expression.Between;
import org.opensearch.sql.ast.expression.BinaryExpression;
import org.opensearch.sql.ast.expression.Case;
import org.opensearch.sql.ast.expression.Compare;
Expand Down Expand Up @@ -867,5 +870,14 @@ public Expression visitExistsSubquery(ExistsSubquery node, CatalystPlanContext c
Option.empty());
return context.getNamedParseExpressions().push(existsSubQuery);
}

@Override
public Expression visitBetween(Between node, CatalystPlanContext context) {
Expression value = analyze(node.getValue(), context);
Expression lower = analyze(node.getLowerBound(), context);
Expression upper = analyze(node.getUpperBound(), context);
context.retainAllNamedParseExpressions(p -> p);
return context.getNamedParseExpressions().push(new org.apache.spark.sql.catalyst.expressions.And(new GreaterThanOrEqual(value, lower), new LessThanOrEqual(value, upper)));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.opensearch.sql.ast.expression.And;
import org.opensearch.sql.ast.expression.Argument;
import org.opensearch.sql.ast.expression.AttributeList;
import org.opensearch.sql.ast.expression.Between;
import org.opensearch.sql.ast.expression.Case;
import org.opensearch.sql.ast.expression.Compare;
import org.opensearch.sql.ast.expression.DataType;
Expand Down Expand Up @@ -277,6 +278,12 @@ public UnresolvedExpression visitConvertedDataType(OpenSearchPPLParser.Converted
return new Literal(ctx.getText(), DataType.STRING);
}

@Override
public UnresolvedExpression visitBetween(OpenSearchPPLParser.BetweenContext ctx) {
UnresolvedExpression betweenExpr = new Between(visit(ctx.expr1),visit(ctx.expr2),visit(ctx.expr3));
return ctx.NOT() != null ? new Not(betweenExpr) : betweenExpr;
}

private Function buildFunction(
String functionName, List<OpenSearchPPLParser.FunctionArgContext> args) {
return new Function(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.spark.ppl

import org.opensearch.flint.spark.ppl.PlaneUtils.plan
import org.opensearch.sql.ppl.{CatalystPlanContext, CatalystQueryPlanVisitor}
import org.scalatest.matchers.should.Matchers

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedRelation, UnresolvedStar}
import org.apache.spark.sql.catalyst.expressions.{And, GreaterThanOrEqual, LessThanOrEqual, Literal}
import org.apache.spark.sql.catalyst.plans.PlanTest
import org.apache.spark.sql.catalyst.plans.logical._

class PPLLogicalPlanBetweenExpressionTranslatorTestSuite
extends SparkFunSuite
with PlanTest
with LogicalPlanTestUtils
with Matchers {

private val planTransformer = new CatalystQueryPlanVisitor()
private val pplParser = new PPLSyntaxParser()

test("test between expression") {
// if successful build ppl logical plan and translate to catalyst logical plan
val context = new CatalystPlanContext
val logPlan = {
planTransformer.visit(
plan(
pplParser,
"source = table | where datetime_field between '2024-09-10' and '2024-09-15'"),
context)
}
// SQL: SELECT * FROM table WHERE datetime_field BETWEEN '2024-09-10' AND '2024-09-15'
val star = Seq(UnresolvedStar(None))

val datetime_field = UnresolvedAttribute("datetime_field")
val tableRelation = UnresolvedRelation(Seq("table"))

val lowerBound = Literal("2024-09-10")
val upperBound = Literal("2024-09-15")
val betweenCondition = And(
GreaterThanOrEqual(datetime_field, lowerBound),
LessThanOrEqual(datetime_field, upperBound))

val filterPlan = Filter(betweenCondition, tableRelation)
val expectedPlan = Project(star, filterPlan)

comparePlans(expectedPlan, logPlan, false)
}

}

0 comments on commit 82770ec

Please sign in to comment.