Skip to content

Commit

Permalink
Merge branch 'main' into local-test-instructions
Browse files Browse the repository at this point in the history
  • Loading branch information
YANG-DB committed Nov 14, 2024
2 parents 83232bf + 439cf3e commit ebbf446
Show file tree
Hide file tree
Showing 17 changed files with 1,036 additions and 67 deletions.
12 changes: 12 additions & 0 deletions DEVELOPER_GUIDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@ To execute the unit tests, run the following command:
```
sbt test
```
To run a specific unit test in SBT, use the testOnly command with the full path of the test class:
```
sbt "; project pplSparkIntegration; test:testOnly org.opensearch.flint.spark.ppl.PPLLogicalPlanTrendlineCommandTranslatorTestSuite"
```


## Integration Test
The integration test is defined in the `integration` directory of the project. The integration tests will automatically trigger unit tests and will only run if all unit tests pass. If you want to run the integration test for the project, you can do so by running the following command:
Expand All @@ -23,6 +28,13 @@ If you get integration test failures with error message "Previous attempts to fi
3. Run `sudo ln -s $HOME/.docker/desktop/docker.sock /var/run/docker.sock` or `sudo ln -s $HOME/.docker/run/docker.sock /var/run/docker.sock`
4. If you use Docker Desktop, as an alternative of `3`, check mark the "Allow the default Docker socket to be used (requires password)" in advanced settings of Docker Desktop.

Running only a selected set of integration test suites is possible with the following command:
```
sbt "; project integtest; it:testOnly org.opensearch.flint.spark.ppl.FlintSparkPPLTrendlineITSuite"
```
This command runs only the specified test suite within the integtest submodule.


### AWS Integration Test
The `aws-integration` folder contains tests for cloud server providers. For instance, test against AWS OpenSearch domain, configure the following settings. The client will use the default credential provider to access the AWS OpenSearch domain.
```
Expand Down
5 changes: 5 additions & 0 deletions docs/ppl-lang/PPL-Example-Commands.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ _- **Limitation: new field added by eval command with a function cannot be dropp
- `source = table | where a < 1 | fields a,b,c`
- `source = table | where b != 'test' | fields a,b,c`
- `source = table | where c = 'test' | fields a,b,c | head 3`
- `source = table | where c = 'test' AND a = 1 | fields a,b,c`
- `source = table | where c != 'test' OR a > 1 | fields a,b,c`
- `source = table | where (b > 1 OR a > 1) AND c != 'test' | fields a,b,c`
- `source = table | where c = 'test' NOT a > 1 | fields a,b,c` - Note: "AND" is optional
- `source = table | where ispresent(b)`
- `source = table | where isnull(coalesce(a, b)) | fields a,b,c | head 3`
- `source = table | where isempty(a)`
Expand All @@ -61,6 +65,7 @@ _- **Limitation: new field added by eval command with a function cannot be dropp
- `source = table | where cidrmatch(ip, '192.169.1.0/24')`
- `source = table | where cidrmatch(ipv6, '2003:db8::/32')`
- `source = table | trendline sma(2, temperature) as temp_trend`
- `source = table | trendline sort timestamp wma(2, temperature) as temp_trend`

#### **IP related queries**
[See additional command details](functions/ppl-ip.md)
Expand Down
64 changes: 58 additions & 6 deletions docs/ppl-lang/ppl-trendline-command.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@
**Description**
Using ``trendline`` command to calculate moving averages of fields.


### Syntax
### Syntax - SMA (Simple Moving Average)
`TRENDLINE [sort <[+|-] sort-field>] SMA(number-of-datapoints, field) [AS alias] [SMA(number-of-datapoints, field) [AS alias]]...`

* [+|-]: optional. The plus [+] stands for ascending order and NULL/MISSING first and a minus [-] stands for descending order and NULL/MISSING last. **Default:** ascending order and NULL/MISSING first.
Expand All @@ -13,8 +12,6 @@ Using ``trendline`` command to calculate moving averages of fields.
* field: mandatory. the name of the field the moving average should be calculated for.
* alias: optional. the name of the resulting column containing the moving average.

And the moment only the Simple Moving Average (SMA) type is supported.

It is calculated like

f[i]: The value of field 'f' in the i-th data-point
Expand All @@ -23,7 +20,7 @@ It is calculated like

SMA(t) = (1/n) * Σ(f[i]), where i = t-n+1 to t

### Example 1: Calculate simple moving average for a timeseries of temperatures
#### Example 1: Calculate simple moving average for a timeseries of temperatures

The example calculates the simple moving average over temperatures using two datapoints.

Expand All @@ -41,7 +38,7 @@ PPL query:
| 15| 258|2023-04-06 17:07:...| 14.5|
+-----------+---------+--------------------+----------+

### Example 2: Calculate simple moving averages for a timeseries of temperatures with sorting
#### Example 2: Calculate simple moving averages for a timeseries of temperatures with sorting

The example calculates two simple moving average over temperatures using two and three datapoints sorted descending by device-id.

Expand All @@ -58,3 +55,58 @@ PPL query:
| 12| 1492|2023-04-06 17:07:...| 12.5| 13.0|
| 12| 1492|2023-04-06 17:07:...| 12.0|12.333333333333334|
+-----------+---------+--------------------+------------+------------------+


### Syntax - WMA (Weighted Moving Average)
`TRENDLINE sort <[+|-] sort-field> WMA(number-of-datapoints, field) [AS alias] [WMA(number-of-datapoints, field) [AS alias]]...`

* [+|-]: optional. The plus [+] stands for ascending order and NULL/MISSING first and a minus [-] stands for descending order and NULL/MISSING last. **Default:** ascending order and NULL/MISSING first.
* sort-field: mandatory. this field specifies the ordering of data poients when calculating the nth_value aggregation.
* number-of-datapoints: mandatory. number of datapoints to calculate the moving average (must be greater than zero).
* field: mandatory. the name of the field the moving averag should be calculated for.
* alias: optional. the name of the resulting column containing the moving average.

It is calculated like

f[i]: The value of field 'f' in the i-th data point
n: The number of data points in the moving window (period)
t: The current time index
w[i]: The weight assigned to the i-th data point, typically increasing for more recent points

WMA(t) = ( Σ from i=t−n+1 to t of (w[i] * f[i]) ) / ( Σ from i=t−n+1 to t of w[i] )

#### Example 1: Calculate weighted moving average for a timeseries of temperatures

The example calculates the simple moving average over temperatures using two datapoints.

PPL query:

os> source=t | trendline sort timestamp wma(2, temperature) as temp_trend;
fetched rows / total rows = 5/5
+-----------+---------+--------------------+----------+
|temperature|device-id| timestamp|temp_trend|
+-----------+---------+--------------------+----------+
| 12| 1492|2023-04-06 17:07:...| NULL|
| 12| 1492|2023-04-06 17:07:...| 12.0|
| 13| 256|2023-04-06 17:07:...| 12.6|
| 14| 257|2023-04-06 17:07:...| 13.6|
| 15| 258|2023-04-06 17:07:...| 14.6|
+-----------+---------+--------------------+----------+

#### Example 2: Calculate simple moving averages for a timeseries of temperatures with sorting

The example calculates two simple moving average over temperatures using two and three datapoints sorted descending by device-id.

PPL query:

os> source=t | trendline sort - device-id wma(2, temperature) as temp_trend_2 wma(3, temperature) as temp_trend_3;
fetched rows / total rows = 5/5
+-----------+---------+--------------------+------------+------------------+
|temperature|device-id| timestamp|temp_trend_2| temp_trend_3|
+-----------+---------+--------------------+------------+------------------+
| 15| 258|2023-04-06 17:07:...| NULL| NULL|
| 14| 257|2023-04-06 17:07:...| 14.3| NULL|
| 13| 256|2023-04-06 17:07:...| 13.3| 13.6|
| 12| 1492|2023-04-06 17:07:...| 12.3| 12.6|
| 12| 1492|2023-04-06 17:07:...| 12.0| 12.16|
+-----------+---------+--------------------+------------+------------------+
13 changes: 5 additions & 8 deletions docs/ppl-lang/ppl-where-command.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,15 @@ PPL query:
### Additional Examples

#### **Filters With Logical Conditions**
```
- `source = table | where c = 'test' AND a = 1 | fields a,b,c`
- `source = table | where c != 'test' OR a > 1 | fields a,b,c | head 1`
- `source = table | where c = 'test' NOT a > 1 | fields a,b,c`
- `source = table | where a = 1 | fields a,b,c`
- `source = table | where a >= 1 | fields a,b,c`
- `source = table | where a < 1 | fields a,b,c`
- `source = table | where b != 'test' | fields a,b,c`
- `source = table | where c = 'test' | fields a,b,c | head 3`
- `source = table | where c = 'test' AND a = 1 | fields a,b,c`
- `source = table | where c != 'test' OR a > 1 | fields a,b,c`
- `source = table | where (b > 1 OR a > 1) AND c != 'test' | fields a,b,c`
- `source = table | where c = 'test' NOT a > 1 | fields a,b,c` - Note: "AND" is optional
- `source = table | where ispresent(b)`
- `source = table | where isnull(coalesce(a, b)) | fields a,b,c | head 3`
- `source = table | where isempty(a)`
Expand All @@ -45,7 +45,6 @@ PPL query:
- `source = table | where b not between '2024-09-10' and '2025-09-10'` - Note: This returns b >= '2024-09-10' and b <= '2025-09-10'
- `source = table | where cidrmatch(ip, '192.169.1.0/24')`
- `source = table | where cidrmatch(ipv6, '2003:db8::/32')`
- `source = table | eval status_category =
case(a >= 200 AND a < 300, 'Success',
a >= 300 AND a < 400, 'Redirection',
Expand All @@ -57,10 +56,8 @@ PPL query:
a >= 400 AND a < 500, 'Client Error',
a >= 500, 'Server Error'
else 'Incorrect HTTP status code'
) = 'Incorrect HTTP status code'
) = 'Incorrect HTTP status code'`
- `source = table
| eval factor = case(a > 15, a - 14, isnull(b), a - 7, a < 3, a + 1 else 1)
| where case(factor = 2, 'even', factor = 4, 'even', factor = 6, 'even', factor = 8, 'even' else 'odd') = 'even'
| stats count() by factor`
```
47 changes: 26 additions & 21 deletions integ-test/src/integration/resources/tpch/q19.ppl
Original file line number Diff line number Diff line change
Expand Up @@ -37,25 +37,30 @@ where
*/

source = lineitem
| join ON p_partkey = l_partkey
and p_brand = 'Brand#12'
and p_container in ('SM CASE', 'SM BOX', 'SM PACK', 'SM PKG')
and l_quantity >= 1 and l_quantity <= 1 + 10
and p_size between 1 and 5
and l_shipmode in ('AIR', 'AIR REG')
and l_shipinstruct = 'DELIVER IN PERSON'
OR p_partkey = l_partkey
and p_brand = 'Brand#23'
and p_container in ('MED BAG', 'MED BOX', 'MED PKG', 'MED PACK')
and l_quantity >= 10 and l_quantity <= 10 + 10
and p_size between 1 and 10
and l_shipmode in ('AIR', 'AIR REG')
and l_shipinstruct = 'DELIVER IN PERSON'
OR p_partkey = l_partkey
and p_brand = 'Brand#34'
and p_container in ('LG CASE', 'LG BOX', 'LG PACK', 'LG PKG')
and l_quantity >= 20 and l_quantity <= 20 + 10
and p_size between 1 and 15
and l_shipmode in ('AIR', 'AIR REG')
and l_shipinstruct = 'DELIVER IN PERSON'
| join ON
(
p_partkey = l_partkey
and p_brand = 'Brand#12'
and p_container in ('SM CASE', 'SM BOX', 'SM PACK', 'SM PKG')
and l_quantity >= 1 and l_quantity <= 1 + 10
and p_size between 1 and 5
and l_shipmode in ('AIR', 'AIR REG')
and l_shipinstruct = 'DELIVER IN PERSON'
) OR (
p_partkey = l_partkey
and p_brand = 'Brand#23'
and p_container in ('MED BAG', 'MED BOX', 'MED PKG', 'MED PACK')
and l_quantity >= 10 and l_quantity <= 10 + 10
and p_size between 1 and 10
and l_shipmode in ('AIR', 'AIR REG')
and l_shipinstruct = 'DELIVER IN PERSON'
) OR (
p_partkey = l_partkey
and p_brand = 'Brand#34'
and p_container in ('LG CASE', 'LG BOX', 'LG PACK', 'LG PKG')
and l_quantity >= 20 and l_quantity <= 20 + 10
and p_size between 1 and 15
and l_shipmode in ('AIR', 'AIR REG')
and l_shipinstruct = 'DELIVER IN PERSON'
)
part
2 changes: 1 addition & 1 deletion integ-test/src/integration/resources/tpch/q7.ppl
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ source = [
| join ON s_nationkey = n1.n_nationkey nation as n1
| join ON c_nationkey = n2.n_nationkey nation as n2
| where l_shipdate between date('1995-01-01') and date('1996-12-31')
and n1.n_name = 'FRANCE' and n2.n_name = 'GERMANY' or n1.n_name = 'GERMANY' and n2.n_name = 'FRANCE'
and ((n1.n_name = 'FRANCE' and n2.n_name = 'GERMANY') or (n1.n_name = 'GERMANY' and n2.n_name = 'FRANCE'))
| eval supp_nation = n1.n_name, cust_nation = n2.n_name, l_year = year(l_shipdate), volume = l_extendedprice * (1 - l_discount)
| fields supp_nation, cust_nation, l_year, volume
] as shipping
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -467,4 +467,96 @@ class FlintSparkPPLFiltersITSuite
val expectedPlan = Project(Seq(UnresolvedAttribute("state")), filter)
comparePlans(logicalPlan, expectedPlan, checkAnalysis = false)
}

test("test parenthesis in filter") {
val frame = sql(s"""
| source = $testTable | where country = 'Canada' or age > 60 and age < 25 | fields name, age, country
| """.stripMargin)
assertSameRows(Seq(Row("John", 25, "Canada"), Row("Jane", 20, "Canada")), frame)

val frameWithParenthesis = sql(s"""
| source = $testTable | where (country = 'Canada' or age > 60) and age < 25 | fields name, age, country
| """.stripMargin)
assertSameRows(Seq(Row("Jane", 20, "Canada")), frameWithParenthesis)

val logicalPlan: LogicalPlan = frameWithParenthesis.queryExecution.logical
val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test"))
val filter = Filter(
And(
Or(
EqualTo(UnresolvedAttribute("country"), Literal("Canada")),
GreaterThan(UnresolvedAttribute("age"), Literal(60))),
LessThan(UnresolvedAttribute("age"), Literal(25))),
table)
val expectedPlan = Project(
Seq(
UnresolvedAttribute("name"),
UnresolvedAttribute("age"),
UnresolvedAttribute("country")),
filter)
comparePlans(logicalPlan, expectedPlan, checkAnalysis = false)
}

test("test complex and nested parenthesis in filter") {
val frame1 = sql(s"""
| source = $testTable | WHERE (age > 18 AND (state = 'California' OR state = 'New York'))
| """.stripMargin)
assertSameRows(
Seq(
Row("Hello", 30, "New York", "USA", 2023, 4),
Row("Jake", 70, "California", "USA", 2023, 4)),
frame1)

val frame2 = sql(s"""
| source = $testTable | WHERE ((((age > 18) AND ((((state = 'California') OR state = 'New York'))))))
| """.stripMargin)
assertSameRows(
Seq(
Row("Hello", 30, "New York", "USA", 2023, 4),
Row("Jake", 70, "California", "USA", 2023, 4)),
frame2)

val frame3 = sql(s"""
| source = $testTable | WHERE (year = 2023 AND (month BETWEEN 1 AND 6)) AND (age >= 31 OR country = 'Canada')
| """.stripMargin)
assertSameRows(
Seq(
Row("John", 25, "Ontario", "Canada", 2023, 4),
Row("Jake", 70, "California", "USA", 2023, 4),
Row("Jane", 20, "Quebec", "Canada", 2023, 4)),
frame3)

val frame4 = sql(s"""
| source = $testTable | WHERE ((state = 'Texas' OR state = 'California') AND (age < 30 OR (country = 'USA' AND year > 2020)))
| """.stripMargin)
assertSameRows(Seq(Row("Jake", 70, "California", "USA", 2023, 4)), frame4)

val frame5 = sql(s"""
| source = $testTable | WHERE (LIKE(LOWER(name), 'a%') OR LIKE(LOWER(name), 'j%')) AND (LENGTH(state) > 6 OR (country = 'USA' AND age > 18))
| """.stripMargin)
assertSameRows(
Seq(
Row("John", 25, "Ontario", "Canada", 2023, 4),
Row("Jake", 70, "California", "USA", 2023, 4)),
frame5)

val frame6 = sql(s"""
| source = $testTable | WHERE (age BETWEEN 25 AND 40) AND ((state IN ('California', 'New York', 'Texas') AND year = 2023) OR (country != 'USA' AND (month = 1 OR month = 12)))
| """.stripMargin)
assertSameRows(Seq(Row("Hello", 30, "New York", "USA", 2023, 4)), frame6)

val frame7 = sql(s"""
| source = $testTable | WHERE NOT (age < 18 OR (state = 'Alaska' AND year < 2020)) AND (country = 'USA' OR (country = 'Mexico' AND month BETWEEN 6 AND 8))
| """.stripMargin)
assertSameRows(
Seq(
Row("Jake", 70, "California", "USA", 2023, 4),
Row("Hello", 30, "New York", "USA", 2023, 4)),
frame7)

val frame8 = sql(s"""
| source = $testTable | WHERE (NOT (year < 2020 OR age < 18)) AND ((state = 'Texas' AND month % 2 = 0) OR (country = 'Mexico' AND (year = 2023 OR (year = 2022 AND month > 6))))
| """.stripMargin)
assertSameRows(Seq(), frame8)
}
}
Loading

0 comments on commit ebbf446

Please sign in to comment.