Skip to content

Commit

Permalink
Merge branch 'main' into p1-antlr-grammar
Browse files Browse the repository at this point in the history
# Conflicts:
#	ppl-spark-integration/src/main/antlr4/OpenSearchPPLParser.g4
  • Loading branch information
YANG-DB committed Nov 5, 2024
2 parents 6b44cb3 + aaba489 commit f5f05f8
Show file tree
Hide file tree
Showing 16 changed files with 843 additions and 7 deletions.
9 changes: 8 additions & 1 deletion docs/ppl-lang/PPL-Example-Commands.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,17 @@ _- **Limitation: new field added by eval command with a function cannot be dropp
- `source = table | where a not in (1, 2, 3) | fields a,b,c`
- `source = table | where a between 1 and 4` - Note: This returns a >= 1 and a <= 4, i.e. [1, 4]
- `source = table | where b not between '2024-09-10' and '2025-09-10'` - Note: This returns b >= '2024-09-10' and b <= '2025-09-10'
- `source = table | where cidrmatch(ip, '192.169.1.0/24')`
- `source = table | where cidrmatch(ip, '192.169.1.0/24')`
- `source = table | where cidrmatch(ipv6, '2003:db8::/32')`
- `source = table | trendline sma(2, temperature) as temp_trend`

#### **IP related queries**
[See additional command details](functions/ppl-ip.md)

- `source = table | where cidrmatch(ip, '192.169.1.0/24')`
- `source = table | where isV6 = false and isValid = true and cidrmatch(ipAddress, '192.168.1.0/24')`
- `source = table | where isV6 = true | eval inRange = case(cidrmatch(ipAddress, '2003:db8::/32'), 'in' else 'out') | fields ip, inRange`

```sql
source = table | eval status_category =
case(a >= 200 AND a < 300, 'Success',
Expand Down
4 changes: 3 additions & 1 deletion docs/ppl-lang/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ For additional examples see the next [documentation](PPL-Example-Commands.md).
- [`Cryptographic Functions`](functions/ppl-cryptographic.md)

- [`IP Address Functions`](functions/ppl-ip.md)

- [`Lambda Functions`](functions/ppl-lambda.md)

---
### PPL On Spark
Expand All @@ -109,4 +111,4 @@ See samples of [PPL queries](PPL-Example-Commands.md)

---
### PPL Project Roadmap
[PPL Github Project Roadmap](https://github.com/orgs/opensearch-project/projects/214)
[PPL Github Project Roadmap](https://github.com/orgs/opensearch-project/projects/214)
187 changes: 187 additions & 0 deletions docs/ppl-lang/functions/ppl-lambda.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
## Lambda Functions

### `FORALL`

**Description**

`forall(array, lambda)` Evaluates whether a lambda predicate holds for all elements in the array.

**Argument type:** ARRAY, LAMBDA

**Return type:** BOOLEAN

Returns `TRUE` if all elements in the array satisfy the lambda predicate, otherwise `FALSE`.

Example:

os> source=people | eval array = json_array(1, -1, 2), result = forall(array, x -> x > 0) | fields result
fetched rows / total rows = 1/1
+-----------+
| result |
+-----------+
| false |
+-----------+

os> source=people | eval array = json_array(1, 3, 2), result = forall(array, x -> x > 0) | fields result
fetched rows / total rows = 1/1
+-----------+
| result |
+-----------+
| true |
+-----------+

**Note:** The lambda expression can access the nested fields of the array elements. This applies to all lambda functions introduced in this document.

Consider constructing the following array:

array = [
{"a":1, "b":1},
{"a":-1, "b":2}
]

and perform lambda functions against the nested fields `a` or `b`. See the examples:

os> source=people | eval array = json_array(json_object("a", 1, "b", 1), json_object("a" , -1, "b", 2)), result = forall(array, x -> x.a > 0) | fields result
fetched rows / total rows = 1/1
+-----------+
| result |
+-----------+
| false |
+-----------+

os> source=people | eval array = json_array(json_object("a", 1, "b", 1), json_object("a" , -1, "b", 2)), result = forall(array, x -> x.b > 0) | fields result
fetched rows / total rows = 1/1
+-----------+
| result |
+-----------+
| true |
+-----------+

### `EXISTS`

**Description**

`exists(array, lambda)` Evaluates whether a lambda predicate holds for one or more elements in the array.

**Argument type:** ARRAY, LAMBDA

**Return type:** BOOLEAN

Returns `TRUE` if at least one element in the array satisfies the lambda predicate, otherwise `FALSE`.

Example:

os> source=people | eval array = json_array(1, -1, 2), result = exists(array, x -> x > 0) | fields result
fetched rows / total rows = 1/1
+-----------+
| result |
+-----------+
| true |
+-----------+

os> source=people | eval array = json_array(-1, -3, -2), result = exists(array, x -> x > 0) | fields result
fetched rows / total rows = 1/1
+-----------+
| result |
+-----------+
| false |
+-----------+


### `FILTER`

**Description**

`filter(array, lambda)` Filters the input array using the given lambda function.

**Argument type:** ARRAY, LAMBDA

**Return type:** ARRAY

An ARRAY that contains all elements in the input array that satisfy the lambda predicate.

Example:

os> source=people | eval array = json_array(1, -1, 2), result = filter(array, x -> x > 0) | fields result
fetched rows / total rows = 1/1
+-----------+
| result |
+-----------+
| [1, 2] |
+-----------+

os> source=people | eval array = json_array(-1, -3, -2), result = filter(array, x -> x > 0) | fields result
fetched rows / total rows = 1/1
+-----------+
| result |
+-----------+
| [] |
+-----------+

### `TRANSFORM`

**Description**

`transform(array, lambda)` Transform elements in an array using the lambda transform function. The second argument implies the index of the element if using binary lambda function. This is similar to a `map` in functional programming.

**Argument type:** ARRAY, LAMBDA

**Return type:** ARRAY

An ARRAY that contains the result of applying the lambda transform function to each element in the input array.

Example:

os> source=people | eval array = json_array(1, 2, 3), result = transform(array, x -> x + 1) | fields result
fetched rows / total rows = 1/1
+--------------+
| result |
+--------------+
| [2, 3, 4] |
+--------------+

os> source=people | eval array = json_array(1, 2, 3), result = transform(array, (x, i) -> x + i) | fields result
fetched rows / total rows = 1/1
+--------------+
| result |
+--------------+
| [1, 3, 5] |
+--------------+

### `REDUCE`

**Description**

`reduce(array, start, merge_lambda, finish_lambda)` Applies a binary merge lambda function to a start value and all elements in the array, and reduces this to a single state. The final state is converted into the final result by applying a finish lambda function.

**Argument type:** ARRAY, ANY, LAMBDA, LAMBDA

**Return type:** ANY

The final result of applying the lambda functions to the start value and the input array.

Example:

os> source=people | eval array = json_array(1, 2, 3), result = reduce(array, 0, (acc, x) -> acc + x) | fields result
fetched rows / total rows = 1/1
+-----------+
| result |
+-----------+
| 6 |
+-----------+

os> source=people | eval array = json_array(1, 2, 3), result = reduce(array, 10, (acc, x) -> acc + x) | fields result
fetched rows / total rows = 1/1
+-----------+
| result |
+-----------+
| 16 |
+-----------+

os> source=people | eval array = json_array(1, 2, 3), result = reduce(array, 0, (acc, x) -> acc + x, acc -> acc * 10) | fields result
fetched rows / total rows = 1/1
+-----------+
| result |
+-----------+
| 60 |
+-----------+
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.spark.ppl

import org.apache.spark.sql.{QueryTest, Row}
import org.apache.spark.sql.functions.{col, to_json}
import org.apache.spark.sql.streaming.StreamTest

class FlintSparkPPLLambdaFunctionITSuite
extends QueryTest
with LogicalPlanTestUtils
with FlintPPLSuite
with StreamTest {

/** Test table and index name */
private val testTable = "spark_catalog.default.flint_ppl_test"

override def beforeAll(): Unit = {
super.beforeAll()
// Create test table
createNullableJsonContentTable(testTable)
}

protected override def afterEach(): Unit = {
super.afterEach()
// Stop all streaming jobs if any
spark.streams.active.foreach { job =>
job.stop()
job.awaitTermination()
}
}

test("test forall()") {
val frame = sql(s"""
| source = $testTable | eval array = json_array(1,2,0,-1,1.1,-0.11), result = forall(array, x -> x > 0) | head 1 | fields result
| """.stripMargin)
assertSameRows(Seq(Row(false)), frame)

val frame2 = sql(s"""
| source = $testTable | eval array = json_array(1,2,0,-1,1.1,-0.11), result = forall(array, x -> x > -10) | head 1 | fields result
| """.stripMargin)
assertSameRows(Seq(Row(true)), frame2)

val frame3 = sql(s"""
| source = $testTable | eval array = json_array(json_object("a",1,"b",-1),json_object("a",-1,"b",-1)), result = forall(array, x -> x.a > 0) | head 1 | fields result
| """.stripMargin)
assertSameRows(Seq(Row(false)), frame3)

val frame4 = sql(s"""
| source = $testTable | eval array = json_array(json_object("a",1,"b",-1),json_object("a",-1,"b",-1)), result = exists(array, x -> x.b < 0) | head 1 | fields result
| """.stripMargin)
assertSameRows(Seq(Row(true)), frame4)
}

test("test exists()") {
val frame = sql(s"""
| source = $testTable | eval array = json_array(1,2,0,-1,1.1,-0.11), result = exists(array, x -> x > 0) | head 1 | fields result
| """.stripMargin)
assertSameRows(Seq(Row(true)), frame)

val frame2 = sql(s"""
| source = $testTable | eval array = json_array(1,2,0,-1,1.1,-0.11), result = exists(array, x -> x > 10) | head 1 | fields result
| """.stripMargin)
assertSameRows(Seq(Row(false)), frame2)

val frame3 = sql(s"""
| source = $testTable | eval array = json_array(json_object("a",1,"b",-1),json_object("a",-1,"b",-1)), result = exists(array, x -> x.a > 0) | head 1 | fields result
| """.stripMargin)
assertSameRows(Seq(Row(true)), frame3)

val frame4 = sql(s"""
| source = $testTable | eval array = json_array(json_object("a",1,"b",-1),json_object("a",-1,"b",-1)), result = exists(array, x -> x.b > 0) | head 1 | fields result
| """.stripMargin)
assertSameRows(Seq(Row(false)), frame4)

}

test("test filter()") {
val frame = sql(s"""
| source = $testTable| eval array = json_array(1,2,0,-1,1.1,-0.11), result = filter(array, x -> x > 0) | head 1 | fields result
| """.stripMargin)
assertSameRows(Seq(Row(Seq(1, 2, 1.1))), frame)

val frame2 = sql(s"""
| source = $testTable| eval array = json_array(1,2,0,-1,1.1,-0.11), result = filter(array, x -> x > 10) | head 1 | fields result
| """.stripMargin)
assertSameRows(Seq(Row(Seq())), frame2)

val frame3 = sql(s"""
| source = $testTable| eval array = json_array(json_object("a",1,"b",-1),json_object("a",-1,"b",-1)), result = filter(array, x -> x.a > 0) | head 1 | fields result
| """.stripMargin)

assertSameRows(Seq(Row("""[{"a":1,"b":-1}]""")), frame3.select(to_json(col("result"))))

val frame4 = sql(s"""
| source = $testTable| eval array = json_array(json_object("a",1,"b",-1),json_object("a",-1,"b",-1)), result = filter(array, x -> x.b > 0) | head 1 | fields result
| """.stripMargin)
assertSameRows(Seq(Row("""[]""")), frame4.select(to_json(col("result"))))
}

test("test transform()") {
val frame = sql(s"""
| source = $testTable | eval array = json_array(1,2,3), result = transform(array, x -> x + 1) | head 1 | fields result
| """.stripMargin)
assertSameRows(Seq(Row(Seq(2, 3, 4))), frame)

val frame2 = sql(s"""
| source = $testTable | eval array = json_array(1,2,3), result = transform(array, (x, y) -> x + y) | head 1 | fields result
| """.stripMargin)
assertSameRows(Seq(Row(Seq(1, 3, 5))), frame2)
}

test("test reduce()") {
val frame = sql(s"""
| source = $testTable | eval array = json_array(1,2,3), result = reduce(array, 0, (acc, x) -> acc + x) | head 1 | fields result
| """.stripMargin)
assertSameRows(Seq(Row(6)), frame)

val frame2 = sql(s"""
| source = $testTable | eval array = json_array(1,2,3), result = reduce(array, 1, (acc, x) -> acc + x) | head 1 | fields result
| """.stripMargin)
assertSameRows(Seq(Row(7)), frame2)

val frame3 = sql(s"""
| source = $testTable | eval array = json_array(1,2,3), result = reduce(array, 0, (acc, x) -> acc + x, acc -> acc * 10) | head 1 | fields result
| """.stripMargin)
assertSameRows(Seq(Row(60)), frame3)
}
}
14 changes: 11 additions & 3 deletions ppl-spark-integration/src/main/antlr4/OpenSearchPPLLexer.g4
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ RT_SQR_PRTHS: ']';
SINGLE_QUOTE: '\'';
DOUBLE_QUOTE: '"';
BACKTICK: '`';
ARROW: '->';

// Operators. Bit

Expand Down Expand Up @@ -386,15 +387,22 @@ JSON_VALID: 'JSON_VALID';
//JSON_DELETE: 'JSON_DELETE';
//JSON_EXTEND: 'JSON_EXTEND';
//JSON_SET: 'JSON_SET';
//JSON_ARRAY_ALL_MATCH: 'JSON_ALL_MATCH';
//JSON_ARRAY_ANY_MATCH: 'JSON_ANY_MATCH';
//JSON_ARRAY_FILTER: 'JSON_FILTER';
//JSON_ARRAY_ALL_MATCH: 'JSON_ARRAY_ALL_MATCH';
//JSON_ARRAY_ANY_MATCH: 'JSON_ARRAY_ANY_MATCH';
//JSON_ARRAY_FILTER: 'JSON_ARRAY_FILTER';
//JSON_ARRAY_MAP: 'JSON_ARRAY_MAP';
//JSON_ARRAY_REDUCE: 'JSON_ARRAY_REDUCE';

// COLLECTION FUNCTIONS
ARRAY: 'ARRAY';

// LAMBDA FUNCTIONS
//EXISTS: 'EXISTS';
FORALL: 'FORALL';
FILTER: 'FILTER';
TRANSFORM: 'TRANSFORM';
REDUCE: 'REDUCE';

// BOOL FUNCTIONS
LIKE: 'LIKE';
ISNULL: 'ISNULL';
Expand Down
Loading

0 comments on commit f5f05f8

Please sign in to comment.