Skip to content

Commit

Permalink
Expand ppl command (opensearch-project#868)
Browse files Browse the repository at this point in the history
* add expand command

Signed-off-by: YANGDB <[email protected]>

* add expand command with visitor

Signed-off-by: YANGDB <[email protected]>

* create unit / integration tests

Signed-off-by: YANGDB <[email protected]>

* update expand tests

Signed-off-by: YANGDB <[email protected]>

* add tests

Signed-off-by: YANGDB <[email protected]>

* update doc

Signed-off-by: YANGDB <[email protected]>

* update docs with examples

Signed-off-by: YANGDB <[email protected]>

* update scala style

Signed-off-by: YANGDB <[email protected]>

* update with additional test case
remove outer generator

Signed-off-by: YANGDB <[email protected]>

* update with additional test case
remove outer generator

Signed-off-by: YANGDB <[email protected]>

* update documentation

Signed-off-by: YANGDB <[email protected]>

---------

Signed-off-by: YANGDB <[email protected]>
  • Loading branch information
YANG-DB authored Nov 7, 2024
1 parent 48be5cc commit 4303057
Show file tree
Hide file tree
Showing 13 changed files with 716 additions and 16 deletions.
37 changes: 24 additions & 13 deletions docs/ppl-lang/PPL-Example-Commands.md
Original file line number Diff line number Diff line change
Expand Up @@ -441,8 +441,30 @@ Assumptions: `a`, `b` are fields of table outer, `c`, `d` are fields of table in

_- **Limitation: another command usage of (relation) subquery is in `appendcols` commands which is unsupported**_

---
#### Experimental Commands:

#### **fillnull**
[See additional command details](ppl-fillnull-command.md)
```sql
- `source=accounts | fillnull fields status_code=101`
- `source=accounts | fillnull fields request_path='/not_found', timestamp='*'`
- `source=accounts | fillnull using field1=101`
- `source=accounts | fillnull using field1=concat(field2, field3), field4=2*pi()*field5`
- `source=accounts | fillnull using field1=concat(field2, field3), field4=2*pi()*field5, field6 = 'N/A'`
```

#### **expand**
[See additional command details](ppl-expand-command.md)
```sql
- `source = table | expand field_with_array as array_list`
- `source = table | expand employee | stats max(salary) as max by state, company`
- `source = table | expand employee as worker | stats max(salary) as max by state, company`
- `source = table | expand employee as worker | eval bonus = salary * 3 | fields worker, bonus`
- `source = table | expand employee | parse description '(?<email>.+@.+)' | fields employee, email`
- `source = table | eval array=json_array(1, 2, 3) | expand array as uid | fields name, occupation, uid`
- `source = table | expand multi_valueA as multiA | expand multi_valueB as multiB`
```

#### Correlation Commands:
[See additional command details](ppl-correlation-command.md)

```sql
Expand All @@ -454,14 +476,3 @@ _- **Limitation: another command usage of (relation) subquery is in `appendcols`
> ppl-correlation-command is an experimental command - it may be removed in future versions
---
### Planned Commands:

#### **fillnull**
[See additional command details](ppl-fillnull-command.md)
```sql
- `source=accounts | fillnull fields status_code=101`
- `source=accounts | fillnull fields request_path='/not_found', timestamp='*'`
- `source=accounts | fillnull using field1=101`
- `source=accounts | fillnull using field1=concat(field2, field3), field4=2*pi()*field5`
- `source=accounts | fillnull using field1=concat(field2, field3), field4=2*pi()*field5, field6 = 'N/A'`
```
2 changes: 2 additions & 0 deletions docs/ppl-lang/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ For additional examples see the next [documentation](PPL-Example-Commands.md).
- [`correlation commands`](ppl-correlation-command.md)

- [`trendline commands`](ppl-trendline-command.md)

- [`expand commands`](ppl-expand-command.md)

* **Functions**

Expand Down
45 changes: 45 additions & 0 deletions docs/ppl-lang/ppl-expand-command.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
## PPL `expand` command

### Description
Using `expand` command to flatten a field of type:
- `Array<Any>`
- `Map<Any>`


### Syntax
`expand <field> [As alias]`

* field: to be expanded (exploded). The field must be of supported type.
* alias: Optional to be expanded as the name to be used instead of the original field name

### Usage Guidelines
The expand command produces a row for each element in the specified array or map field, where:
- Array elements become individual rows.
- Map key-value pairs are broken into separate rows, with each key-value represented as a row.

- When an alias is provided, the exploded values are represented under the alias instead of the original field name.
- This can be used in combination with other commands, such as stats, eval, and parse to manipulate or extract data post-expansion.

### Examples:
- `source = table | expand employee | stats max(salary) as max by state, company`
- `source = table | expand employee as worker | stats max(salary) as max by state, company`
- `source = table | expand employee as worker | eval bonus = salary * 3 | fields worker, bonus`
- `source = table | expand employee | parse description '(?<email>.+@.+)' | fields employee, email`
- `source = table | eval array=json_array(1, 2, 3) | expand array as uid | fields name, occupation, uid`
- `source = table | expand multi_valueA as multiA | expand multi_valueB as multiB`

- Expand command can be used in combination with other commands such as `eval`, `stats` and more
- Using multiple expand commands will create a cartesian product of all the internal elements within each composite array or map

### Effective SQL push-down query
The expand command is translated into an equivalent SQL operation using LATERAL VIEW explode, allowing for efficient exploding of arrays or maps at the SQL query level.

```sql
SELECT customer exploded_productId
FROM table
LATERAL VIEW explode(productId) AS exploded_productId
```
Where the `explode` command offers the following functionality:
- it is a column operation that returns a new column
- it creates a new row for every element in the exploded column
- internal `null`s are ignored as part of the exploded field (no row is created/exploded for null)
Original file line number Diff line number Diff line change
Expand Up @@ -559,6 +559,28 @@ trait FlintSparkSuite extends QueryTest with FlintSuite with OpenSearchSuite wit
|""".stripMargin)
}

protected def createMultiColumnArrayTable(testTable: String): Unit = {
// CSV doesn't support struct field
sql(s"""
| CREATE TABLE $testTable
| (
| int_col INT,
| multi_valueA Array<STRUCT<name: STRING, value: INT>>,
| multi_valueB Array<STRUCT<name: STRING, value: INT>>
| )
| USING JSON
|""".stripMargin)

sql(s"""
| INSERT INTO $testTable
| VALUES
| ( 1, array(STRUCT("1_one", 1), STRUCT(null, 11), STRUCT("1_three", null)), array(STRUCT("2_Monday", 2), null) ),
| ( 2, array(STRUCT("2_Monday", 2), null) , array(STRUCT("3_third", 3), STRUCT("3_4th", 4)) ),
| ( 3, array(STRUCT("3_third", 3), STRUCT("3_4th", 4)) , array(STRUCT("1_one", 1))),
| ( 4, null, array(STRUCT("1_one", 1)))
|""".stripMargin)
}

protected def createTableIssue112(testTable: String): Unit = {
sql(s"""
| CREATE TABLE $testTable (
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,255 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.flint.spark.ppl

import java.nio.file.Files

import scala.collection.mutable

import org.opensearch.sql.ppl.utils.DataTypeTransformer.seq

import org.apache.spark.sql.{QueryTest, Row}
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedFunction, UnresolvedRelation, UnresolvedStar}
import org.apache.spark.sql.catalyst.expressions.{Alias, EqualTo, Explode, GeneratorOuter, Literal, Or}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.streaming.StreamTest

class FlintSparkPPLExpandITSuite
extends QueryTest
with LogicalPlanTestUtils
with FlintPPLSuite
with StreamTest {

private val testTable = "flint_ppl_test"
private val occupationTable = "spark_catalog.default.flint_ppl_flat_table_test"
private val structNestedTable = "spark_catalog.default.flint_ppl_struct_nested_test"
private val structTable = "spark_catalog.default.flint_ppl_struct_test"
private val multiValueTable = "spark_catalog.default.flint_ppl_multi_value_test"
private val multiArraysTable = "spark_catalog.default.flint_ppl_multi_array_test"
private val tempFile = Files.createTempFile("jsonTestData", ".json")

override def beforeAll(): Unit = {
super.beforeAll()

// Create test table
createNestedJsonContentTable(tempFile, testTable)
createOccupationTable(occupationTable)
createStructNestedTable(structNestedTable)
createStructTable(structTable)
createMultiValueStructTable(multiValueTable)
createMultiColumnArrayTable(multiArraysTable)
}

protected override def afterEach(): Unit = {
super.afterEach()
// Stop all streaming jobs if any
spark.streams.active.foreach { job =>
job.stop()
job.awaitTermination()
}
}

override def afterAll(): Unit = {
super.afterAll()
Files.deleteIfExists(tempFile)
}

test("expand for eval field of an array") {
val frame = sql(
s""" source = $occupationTable | eval array=json_array(1, 2, 3) | expand array as uid | fields name, occupation, uid
""".stripMargin)

val results: Array[Row] = frame.collect()
val expectedResults: Array[Row] = Array(
Row("Jake", "Engineer", 1),
Row("Jake", "Engineer", 2),
Row("Jake", "Engineer", 3),
Row("Hello", "Artist", 1),
Row("Hello", "Artist", 2),
Row("Hello", "Artist", 3),
Row("John", "Doctor", 1),
Row("John", "Doctor", 2),
Row("John", "Doctor", 3),
Row("David", "Doctor", 1),
Row("David", "Doctor", 2),
Row("David", "Doctor", 3),
Row("David", "Unemployed", 1),
Row("David", "Unemployed", 2),
Row("David", "Unemployed", 3),
Row("Jane", "Scientist", 1),
Row("Jane", "Scientist", 2),
Row("Jane", "Scientist", 3))

// Compare the results
assert(results.toSet == expectedResults.toSet)

val logicalPlan: LogicalPlan = frame.queryExecution.logical
// expected plan
val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_flat_table_test"))
val jsonFunc =
UnresolvedFunction("array", Seq(Literal(1), Literal(2), Literal(3)), isDistinct = false)
val aliasA = Alias(jsonFunc, "array")()
val project = Project(seq(UnresolvedStar(None), aliasA), table)
val generate = Generate(
Explode(UnresolvedAttribute("array")),
seq(),
false,
None,
seq(UnresolvedAttribute("uid")),
project)
val dropSourceColumn =
DataFrameDropColumns(Seq(UnresolvedAttribute("array")), generate)
val expectedPlan = Project(
seq(
UnresolvedAttribute("name"),
UnresolvedAttribute("occupation"),
UnresolvedAttribute("uid")),
dropSourceColumn)
comparePlans(expectedPlan, logicalPlan, checkAnalysis = false)
}

test("expand for structs") {
val frame = sql(
s""" source = $multiValueTable | expand multi_value AS exploded_multi_value | fields exploded_multi_value
""".stripMargin)

val results: Array[Row] = frame.collect()
val expectedResults: Array[Row] = Array(
Row(Row("1_one", 1)),
Row(Row(null, 11)),
Row(Row("1_three", null)),
Row(Row("2_Monday", 2)),
Row(null),
Row(Row("3_third", 3)),
Row(Row("3_4th", 4)),
Row(null))
// Compare the results
assert(results.toSet == expectedResults.toSet)

val logicalPlan: LogicalPlan = frame.queryExecution.logical
// expected plan
val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_multi_value_test"))
val generate = Generate(
Explode(UnresolvedAttribute("multi_value")),
seq(),
outer = false,
None,
seq(UnresolvedAttribute("exploded_multi_value")),
table)
val dropSourceColumn =
DataFrameDropColumns(Seq(UnresolvedAttribute("multi_value")), generate)
val expectedPlan = Project(Seq(UnresolvedAttribute("exploded_multi_value")), dropSourceColumn)
comparePlans(logicalPlan, expectedPlan, checkAnalysis = false)
}

test("expand for array of structs") {
val frame = sql(s"""
| source = $testTable
| | where country = 'England' or country = 'Poland'
| | expand bridges
| | fields bridges
| """.stripMargin)

val results: Array[Row] = frame.collect()
val expectedResults: Array[Row] = Array(
Row(mutable.WrappedArray.make(Array(Row(801, "Tower Bridge"), Row(928, "London Bridge")))),
Row(mutable.WrappedArray.make(Array(Row(801, "Tower Bridge"), Row(928, "London Bridge"))))
// Row(null)) -> in case of outerGenerator = GeneratorOuter(Explode(UnresolvedAttribute("bridges"))) it will include the `null` row
)

// Compare the results
assert(results.toSet == expectedResults.toSet)
val logicalPlan: LogicalPlan = frame.queryExecution.logical
val table = UnresolvedRelation(Seq("flint_ppl_test"))
val filter = Filter(
Or(
EqualTo(UnresolvedAttribute("country"), Literal("England")),
EqualTo(UnresolvedAttribute("country"), Literal("Poland"))),
table)
val generate =
Generate(Explode(UnresolvedAttribute("bridges")), seq(), outer = false, None, seq(), filter)
val expectedPlan = Project(Seq(UnresolvedAttribute("bridges")), generate)
comparePlans(logicalPlan, expectedPlan, checkAnalysis = false)
}

test("expand for array of structs with alias") {
val frame = sql(s"""
| source = $testTable
| | where country = 'England'
| | expand bridges as britishBridges
| | fields britishBridges
| """.stripMargin)

val results: Array[Row] = frame.collect()
val expectedResults: Array[Row] = Array(
Row(Row(801, "Tower Bridge")),
Row(Row(928, "London Bridge")),
Row(Row(801, "Tower Bridge")),
Row(Row(928, "London Bridge")))
// Compare the results
assert(results.toSet == expectedResults.toSet)
val logicalPlan: LogicalPlan = frame.queryExecution.logical
val table = UnresolvedRelation(Seq("flint_ppl_test"))
val filter = Filter(EqualTo(UnresolvedAttribute("country"), Literal("England")), table)
val generate = Generate(
Explode(UnresolvedAttribute("bridges")),
seq(),
outer = false,
None,
seq(UnresolvedAttribute("britishBridges")),
filter)
val dropColumn =
DataFrameDropColumns(Seq(UnresolvedAttribute("bridges")), generate)
val expectedPlan = Project(Seq(UnresolvedAttribute("britishBridges")), dropColumn)

comparePlans(logicalPlan, expectedPlan, checkAnalysis = false)
}

test("expand multi columns array table") {
val frame = sql(s"""
| source = $multiArraysTable
| | expand multi_valueA as multiA
| | expand multi_valueB as multiB
| """.stripMargin)

val results: Array[Row] = frame.collect()
val expectedResults: Array[Row] = Array(
Row(1, Row("1_one", 1), Row("2_Monday", 2)),
Row(1, Row("1_one", 1), null),
Row(1, Row(null, 11), Row("2_Monday", 2)),
Row(1, Row(null, 11), null),
Row(1, Row("1_three", null), Row("2_Monday", 2)),
Row(1, Row("1_three", null), null),
Row(2, Row("2_Monday", 2), Row("3_third", 3)),
Row(2, Row("2_Monday", 2), Row("3_4th", 4)),
Row(2, null, Row("3_third", 3)),
Row(2, null, Row("3_4th", 4)),
Row(3, Row("3_third", 3), Row("1_one", 1)),
Row(3, Row("3_4th", 4), Row("1_one", 1)))
// Compare the results
assert(results.toSet == expectedResults.toSet)

val logicalPlan: LogicalPlan = frame.queryExecution.logical
val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_multi_array_test"))
val generatorA = Explode(UnresolvedAttribute("multi_valueA"))
val generateA =
Generate(generatorA, seq(), false, None, seq(UnresolvedAttribute("multiA")), table)
val dropSourceColumnA =
DataFrameDropColumns(Seq(UnresolvedAttribute("multi_valueA")), generateA)
val generatorB = Explode(UnresolvedAttribute("multi_valueB"))
val generateB = Generate(
generatorB,
seq(),
false,
None,
seq(UnresolvedAttribute("multiB")),
dropSourceColumnA)
val dropSourceColumnB =
DataFrameDropColumns(Seq(UnresolvedAttribute("multi_valueB")), generateB)
val expectedPlan = Project(seq(UnresolvedStar(None)), dropSourceColumnB)
comparePlans(expectedPlan, logicalPlan, checkAnalysis = false)

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ KMEANS: 'KMEANS';
AD: 'AD';
ML: 'ML';
FILLNULL: 'FILLNULL';
EXPAND: 'EXPAND';
FLATTEN: 'FLATTEN';
TRENDLINE: 'TRENDLINE';

Expand Down
Loading

0 comments on commit 4303057

Please sign in to comment.