Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

P1 Antlr grammar #859

Merged
merged 10 commits into from
Nov 2, 2024
10 changes: 10 additions & 0 deletions docs/ppl-lang/PPL-Example-Commands.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ _- **Limitation: new field added by eval command with a function cannot be dropp
- `source = table | where b not between '2024-09-10' and '2025-09-10'` - Note: This returns b >= '2024-09-10' and b <= '2025-09-10'
- `source = table | where cidrmatch(ip, '192.169.1.0/24')`
- `source = table | where cidrmatch(ipv6, '2003:db8::/32')`
- `source = table | trendline sma(2, temperature) as temp_trend`

```sql
source = table | eval status_category =
Expand Down Expand Up @@ -122,6 +123,15 @@ Assumptions: `a`, `b`, `c`, `d`, `e` are existing fields in `table`
- `source = table | fillnull using a = 101, b = 102`
- `source = table | fillnull using a = concat(b, c), d = 2 * pi() * e`

### Flatten
[See additional command details](ppl-flatten-command.md)
Assumptions: `bridges`, `coor` are existing fields in `table`, and the field's types are `struct<?,?>` or `array<struct<?,?>>`
- `source = table | flatten bridges`
- `source = table | flatten coor`
- `source = table | flatten bridges | flatten coor`
- `source = table | fields bridges | flatten bridges`
- `source = table | fields country, bridges | flatten bridges | fields country, length | stats avg(length) as avg by country`

```sql
source = table | eval e = eval status_category =
case(a >= 200 AND a < 300, 'Success',
Expand Down
5 changes: 4 additions & 1 deletion docs/ppl-lang/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ For additional examples see the next [documentation](PPL-Example-Commands.md).
- [`describe command`](PPL-Example-Commands.md/#describe)

- [`fillnull command`](ppl-fillnull-command.md)

- [`flatten command`](ppl-flatten-command.md)

- [`eval command`](ppl-eval-command.md)

Expand Down Expand Up @@ -67,7 +69,8 @@ For additional examples see the next [documentation](PPL-Example-Commands.md).
- [`subquery commands`](ppl-subquery-command.md)

- [`correlation commands`](ppl-correlation-command.md)


- [`trendline commands`](ppl-trendline-command.md)

* **Functions**

Expand Down
90 changes: 90 additions & 0 deletions docs/ppl-lang/ppl-flatten-command.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
## PPL `flatten` command

### Description
Using `flatten` command to flatten a field of type:
- `struct<?,?>`
- `array<struct<?,?>>`


### Syntax
`flatten <field>`

* field: to be flattened. The field must be of supported type.

### Test table
#### Schema
| col\_name | data\_type |
|-----------|-------------------------------------------------|
| \_time | string |
| bridges | array\<struct\<length:bigint,name:string\>\> |
| city | string |
| coor | struct\<alt:bigint,lat:double,long:double\> |
| country | string |
#### Data
| \_time | bridges | city | coor | country |
|---------------------|----------------------------------------------|---------|------------------------|---------------|
| 2024-09-13T12:00:00 | [{801, Tower Bridge}, {928, London Bridge}] | London | {35, 51.5074, -0.1278} | England |
| 2024-09-13T12:00:00 | [{232, Pont Neuf}, {160, Pont Alexandre III}]| Paris | {35, 48.8566, 2.3522} | France |
| 2024-09-13T12:00:00 | [{48, Rialto Bridge}, {11, Bridge of Sighs}] | Venice | {2, 45.4408, 12.3155} | Italy |
| 2024-09-13T12:00:00 | [{516, Charles Bridge}, {343, Legion Bridge}]| Prague | {200, 50.0755, 14.4378}| Czech Republic|
| 2024-09-13T12:00:00 | [{375, Chain Bridge}, {333, Liberty Bridge}] | Budapest| {96, 47.4979, 19.0402} | Hungary |
| 1990-09-13T12:00:00 | NULL | Warsaw | NULL | Poland |



### Example 1: flatten struct
This example shows how to flatten a struct field.
PPL query:
- `source=table | flatten coor`

| \_time | bridges | city | country | alt | lat | long |
|---------------------|----------------------------------------------|---------|---------------|-----|--------|--------|
| 2024-09-13T12:00:00 | [{801, Tower Bridge}, {928, London Bridge}] | London | England | 35 | 51.5074| -0.1278|
| 2024-09-13T12:00:00 | [{232, Pont Neuf}, {160, Pont Alexandre III}]| Paris | France | 35 | 48.8566| 2.3522 |
| 2024-09-13T12:00:00 | [{48, Rialto Bridge}, {11, Bridge of Sighs}] | Venice | Italy | 2 | 45.4408| 12.3155|
| 2024-09-13T12:00:00 | [{516, Charles Bridge}, {343, Legion Bridge}]| Prague | Czech Republic| 200 | 50.0755| 14.4378|
| 2024-09-13T12:00:00 | [{375, Chain Bridge}, {333, Liberty Bridge}] | Budapest| Hungary | 96 | 47.4979| 19.0402|
| 1990-09-13T12:00:00 | NULL | Warsaw | Poland | NULL| NULL | NULL |



### Example 2: flatten array

The example shows how to flatten an array of struct fields.

PPL query:
- `source=table | flatten bridges`

| \_time | city | coor | country | length | name |
|---------------------|---------|------------------------|---------------|--------|-------------------|
| 2024-09-13T12:00:00 | London | {35, 51.5074, -0.1278} | England | 801 | Tower Bridge |
| 2024-09-13T12:00:00 | London | {35, 51.5074, -0.1278} | England | 928 | London Bridge |
| 2024-09-13T12:00:00 | Paris | {35, 48.8566, 2.3522} | France | 232 | Pont Neuf |
| 2024-09-13T12:00:00 | Paris | {35, 48.8566, 2.3522} | France | 160 | Pont Alexandre III|
| 2024-09-13T12:00:00 | Venice | {2, 45.4408, 12.3155} | Italy | 48 | Rialto Bridge |
| 2024-09-13T12:00:00 | Venice | {2, 45.4408, 12.3155} | Italy | 11 | Bridge of Sighs |
| 2024-09-13T12:00:00 | Prague | {200, 50.0755, 14.4378}| Czech Republic| 516 | Charles Bridge |
| 2024-09-13T12:00:00 | Prague | {200, 50.0755, 14.4378}| Czech Republic| 343 | Legion Bridge |
| 2024-09-13T12:00:00 | Budapest| {96, 47.4979, 19.0402} | Hungary | 375 | Chain Bridge |
| 2024-09-13T12:00:00 | Budapest| {96, 47.4979, 19.0402} | Hungary | 333 | Liberty Bridge |
| 1990-09-13T12:00:00 | Warsaw | NULL | Poland | NULL | NULL |


### Example 3: flatten array and struct
This example shows how to flatten multiple fields.
PPL query:
- `source=table | flatten bridges | flatten coor`

| \_time | city | country | length | name | alt | lat | long |
|---------------------|---------|---------------|--------|-------------------|------|--------|--------|
| 2024-09-13T12:00:00 | London | England | 801 | Tower Bridge | 35 | 51.5074| -0.1278|
| 2024-09-13T12:00:00 | London | England | 928 | London Bridge | 35 | 51.5074| -0.1278|
| 2024-09-13T12:00:00 | Paris | France | 232 | Pont Neuf | 35 | 48.8566| 2.3522 |
| 2024-09-13T12:00:00 | Paris | France | 160 | Pont Alexandre III| 35 | 48.8566| 2.3522 |
| 2024-09-13T12:00:00 | Venice | Italy | 48 | Rialto Bridge | 2 | 45.4408| 12.3155|
| 2024-09-13T12:00:00 | Venice | Italy | 11 | Bridge of Sighs | 2 | 45.4408| 12.3155|
| 2024-09-13T12:00:00 | Prague | Czech Republic| 516 | Charles Bridge | 200 | 50.0755| 14.4378|
| 2024-09-13T12:00:00 | Prague | Czech Republic| 343 | Legion Bridge | 200 | 50.0755| 14.4378|
| 2024-09-13T12:00:00 | Budapest| Hungary | 375 | Chain Bridge | 96 | 47.4979| 19.0402|
| 2024-09-13T12:00:00 | Budapest| Hungary | 333 | Liberty Bridge | 96 | 47.4979| 19.0402|
| 1990-09-13T12:00:00 | Warsaw | Poland | NULL | NULL | NULL | NULL | NULL |
60 changes: 60 additions & 0 deletions docs/ppl-lang/ppl-trendline-command.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
## PPL trendline Command

**Description**
Using ``trendline`` command to calculate moving averages of fields.


### Syntax
`TRENDLINE [sort <[+|-] sort-field>] SMA(number-of-datapoints, field) [AS alias] [SMA(number-of-datapoints, field) [AS alias]]...`

* [+|-]: optional. The plus [+] stands for ascending order and NULL/MISSING first and a minus [-] stands for descending order and NULL/MISSING last. **Default:** ascending order and NULL/MISSING first.
* sort-field: mandatory when sorting is used. The field used to sort.
* number-of-datapoints: mandatory. number of datapoints to calculate the moving average (must be greater than zero).
* field: mandatory. the name of the field the moving average should be calculated for.
* alias: optional. the name of the resulting column containing the moving average.

And the moment only the Simple Moving Average (SMA) type is supported.

It is calculated like

f[i]: The value of field 'f' in the i-th data-point
n: The number of data-points in the moving window (period)
t: The current time index

SMA(t) = (1/n) * Σ(f[i]), where i = t-n+1 to t

### Example 1: Calculate simple moving average for a timeseries of temperatures

The example calculates the simple moving average over temperatures using two datapoints.

PPL query:

os> source=t | trendline sma(2, temperature) as temp_trend;
fetched rows / total rows = 5/5
+-----------+---------+--------------------+----------+
|temperature|device-id| timestamp|temp_trend|
+-----------+---------+--------------------+----------+
| 12| 1492|2023-04-06 17:07:...| NULL|
| 12| 1492|2023-04-06 17:07:...| 12.0|
| 13| 256|2023-04-06 17:07:...| 12.5|
| 14| 257|2023-04-06 17:07:...| 13.5|
| 15| 258|2023-04-06 17:07:...| 14.5|
+-----------+---------+--------------------+----------+

### Example 2: Calculate simple moving averages for a timeseries of temperatures with sorting

The example calculates two simple moving average over temperatures using two and three datapoints sorted descending by device-id.

PPL query:

os> source=t | trendline sort - device-id sma(2, temperature) as temp_trend_2 sma(3, temperature) as temp_trend_3;
fetched rows / total rows = 5/5
+-----------+---------+--------------------+------------+------------------+
|temperature|device-id| timestamp|temp_trend_2| temp_trend_3|
+-----------+---------+--------------------+------------+------------------+
| 15| 258|2023-04-06 17:07:...| NULL| NULL|
| 14| 257|2023-04-06 17:07:...| 14.5| NULL|
| 13| 256|2023-04-06 17:07:...| 13.5| 14.0|
| 12| 1492|2023-04-06 17:07:...| 12.5| 13.0|
| 12| 1492|2023-04-06 17:07:...| 12.0|12.333333333333334|
+-----------+---------+--------------------+------------+------------------+
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w
private val flintMetadataCacheWriter = FlintMetadataCacheWriterBuilder.build(flintSparkConf)

private val flintAsyncQueryScheduler: AsyncQueryScheduler = {
AsyncQuerySchedulerBuilder.build(flintSparkConf.flintOptions())
AsyncQuerySchedulerBuilder.build(spark, flintSparkConf.flintOptions())
}

override protected val flintMetadataLogService: FlintMetadataLogService = {
Expand Down Expand Up @@ -183,7 +183,7 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w
attachLatestLogEntry(indexName, metadata)
}
.toList
.flatMap(FlintSparkIndexFactory.create)
.flatMap(metadata => FlintSparkIndexFactory.create(spark, metadata))
} else {
Seq.empty
}
Expand All @@ -202,7 +202,7 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w
if (flintClient.exists(indexName)) {
val metadata = flintIndexMetadataService.getIndexMetadata(indexName)
val metadataWithEntry = attachLatestLogEntry(indexName, metadata)
FlintSparkIndexFactory.create(metadataWithEntry)
FlintSparkIndexFactory.create(spark, metadataWithEntry)
} else {
Option.empty
}
Expand Down Expand Up @@ -327,7 +327,7 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w
val index = describeIndex(indexName)

if (index.exists(_.options.autoRefresh())) {
val updatedIndex = FlintSparkIndexFactory.createWithDefaultOptions(index.get).get
val updatedIndex = FlintSparkIndexFactory.createWithDefaultOptions(spark, index.get).get
FlintSparkIndexRefresh
.create(updatedIndex.name(), updatedIndex)
.validate(spark)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ abstract class FlintSparkIndexBuilder(flint: FlintSpark) {
val updatedMetadata = index
.metadata()
.copy(options = updatedOptions.options.mapValues(_.asInstanceOf[AnyRef]).asJava)
validateIndex(FlintSparkIndexFactory.create(updatedMetadata).get)
validateIndex(FlintSparkIndexFactory.create(flint.spark, updatedMetadata).get)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.opensearch.flint.spark.skipping.partition.PartitionSkippingStrategy
import org.opensearch.flint.spark.skipping.valueset.ValueSetSkippingStrategy

import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession

/**
* Flint Spark index factory that encapsulates specific Flint index instance creation. This is for
Expand All @@ -35,14 +36,16 @@ object FlintSparkIndexFactory extends Logging {
/**
* Creates Flint index from generic Flint metadata.
*
* @param spark
* Spark session
* @param metadata
* Flint metadata
* @return
* Flint index instance, or None if any error during creation
*/
def create(metadata: FlintMetadata): Option[FlintSparkIndex] = {
def create(spark: SparkSession, metadata: FlintMetadata): Option[FlintSparkIndex] = {
try {
Some(doCreate(metadata))
Some(doCreate(spark, metadata))
} catch {
case e: Exception =>
logWarning(s"Failed to create Flint index from metadata $metadata", e)
Expand All @@ -53,24 +56,26 @@ object FlintSparkIndexFactory extends Logging {
/**
* Creates Flint index with default options.
*
* @param spark
* Spark session
* @param index
* Flint index
* @param metadata
* Flint metadata
* @return
* Flint index with default options
*/
def createWithDefaultOptions(index: FlintSparkIndex): Option[FlintSparkIndex] = {
def createWithDefaultOptions(
spark: SparkSession,
index: FlintSparkIndex): Option[FlintSparkIndex] = {
val originalOptions = index.options
val updatedOptions =
FlintSparkIndexOptions.updateOptionsWithDefaults(index.name(), originalOptions)
val updatedMetadata = index
.metadata()
.copy(options = updatedOptions.options.mapValues(_.asInstanceOf[AnyRef]).asJava)
this.create(updatedMetadata)
this.create(spark, updatedMetadata)
}

private def doCreate(metadata: FlintMetadata): FlintSparkIndex = {
private def doCreate(spark: SparkSession, metadata: FlintMetadata): FlintSparkIndex = {
val indexOptions = FlintSparkIndexOptions(
metadata.options.asScala.mapValues(_.asInstanceOf[String]).toMap)
val latestLogEntry = metadata.latestLogEntry
Expand Down Expand Up @@ -118,6 +123,7 @@ object FlintSparkIndexFactory extends Logging {
FlintSparkMaterializedView(
metadata.name,
metadata.source,
getMvSourceTables(spark, metadata),
metadata.indexedColumns.map { colInfo =>
getString(colInfo, "columnName") -> getString(colInfo, "columnType")
}.toMap,
Expand All @@ -134,6 +140,15 @@ object FlintSparkIndexFactory extends Logging {
.toMap
}

private def getMvSourceTables(spark: SparkSession, metadata: FlintMetadata): Array[String] = {
val sourceTables = getArrayString(metadata.properties, "sourceTables")
if (sourceTables.isEmpty) {
FlintSparkMaterializedView.extractSourceTableNames(spark, metadata.source)
} else {
sourceTables
}
}

private def getString(map: java.util.Map[String, AnyRef], key: String): String = {
map.get(key).asInstanceOf[String]
}
Expand All @@ -146,4 +161,12 @@ object FlintSparkIndexFactory extends Logging {
Some(value.asInstanceOf[String])
}
}

private def getArrayString(map: java.util.Map[String, AnyRef], key: String): Array[String] = {
map.get(key) match {
case list: java.util.ArrayList[_] =>
list.toArray.map(_.toString)
case _ => Array.empty[String]
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,8 @@ import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex

import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.flint.{loadTable, parseTableName, qualifyTableName}
import org.apache.spark.sql.flint.{loadTable, parseTableName}

/**
* Flint Spark validation helper.
Expand All @@ -31,16 +30,10 @@ trait FlintSparkValidationHelper extends Logging {
* true if all non Hive, otherwise false
*/
def isTableProviderSupported(spark: SparkSession, index: FlintSparkIndex): Boolean = {
// Extract source table name (possibly more than one for MV query)
val tableNames = index match {
case skipping: FlintSparkSkippingIndex => Seq(skipping.tableName)
case covering: FlintSparkCoveringIndex => Seq(covering.tableName)
case mv: FlintSparkMaterializedView =>
spark.sessionState.sqlParser
.parsePlan(mv.query)
.collect { case relation: UnresolvedRelation =>
qualifyTableName(spark, relation.tableName)
}
case mv: FlintSparkMaterializedView => mv.sourceTables.toSeq
}

// Validate if any source table is not supported (currently Hive only)
Expand Down
Loading
Loading