Skip to content

Commit

Permalink
update with additional test case
Browse files Browse the repository at this point in the history
remove outer generator

Signed-off-by: YANGDB <[email protected]>
  • Loading branch information
YANG-DB committed Nov 5, 2024
1 parent ff39d31 commit a91a986
Show file tree
Hide file tree
Showing 5 changed files with 126 additions and 114 deletions.
9 changes: 5 additions & 4 deletions docs/ppl-lang/PPL-Example-Commands.md
Original file line number Diff line number Diff line change
Expand Up @@ -451,11 +451,12 @@ _- **Limitation: another command usage of (relation) subquery is in `appendcols`
#### **expand**
[See additional command details](ppl-expand-command.md)
```sql
- `source= table | expand field_with_array as array_list`
- `source = table | expand field_with_array as array_list`
- `source = table | expand employee | stats max(salary) as max by state, company`
- `source = table | expand employee as worker | stats max(salary) as max by state, company`
- `source = table | expand employee as worker | eval bonus = salary * 3 | fields worker, bonus`
- `source = table | expand employee | parse description '(?<email>.+@.+)' | fields employee, email`
- `source = table | expand employee as worker | stats max(salary) as max by state, company`
- `source = table | expand employee as worker | eval bonus = salary * 3 | fields worker, bonus`
- `source = table | expand employee | parse description '(?<email>.+@.+)' | fields employee, email`
- `source = table | eval array=json_array(1, 2, 3) | expand array as uid | fields name, occupation, uid`
```

#### Correlation Commands:
Expand Down
9 changes: 0 additions & 9 deletions docs/ppl-lang/ppl-expand-command.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,6 @@ This example shows how to expand an array of struct field.
PPL query:
- `source=table | expand bridges as britishBridge | fields britishBridge`

| \_time | bridges | city | country | alt | lat | long |
|---------------------|----------------------------------------------|---------|---------------|-----|--------|--------|
| 2024-09-13T12:00:00 | [{801, Tower Bridge}, {928, London Bridge}] | London | England | 35 | 51.5074| -0.1278|
| 2024-09-13T12:00:00 | [{232, Pont Neuf}, {160, Pont Alexandre III}]| Paris | France | 35 | 48.8566| 2.3522 |
| 2024-09-13T12:00:00 | [{48, Rialto Bridge}, {11, Bridge of Sighs}] | Venice | Italy | 2 | 45.4408| 12.3155|
| 2024-09-13T12:00:00 | [{516, Charles Bridge}, {343, Legion Bridge}]| Prague | Czech Republic| 200 | 50.0755| 14.4378|
| 2024-09-13T12:00:00 | [{375, Chain Bridge}, {333, Liberty Bridge}] | Budapest| Hungary | 96 | 47.4979| 19.0402|
| 1990-09-13T12:00:00 | NULL | Warsaw | Poland | NULL| NULL | NULL |



### Example 2: expand array
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ class FlintSparkPPLExpandITSuite
with StreamTest {

private val testTable = "flint_ppl_test"
private val occupationTable = "spark_catalog.default.flint_ppl_flat_table_test"
private val structNestedTable = "spark_catalog.default.flint_ppl_struct_nested_test"
private val structTable = "spark_catalog.default.flint_ppl_struct_test"
private val multiValueTable = "spark_catalog.default.flint_ppl_multi_value_test"
Expand All @@ -33,6 +34,7 @@ class FlintSparkPPLExpandITSuite

// Create test table
createNestedJsonContentTable(tempFile, testTable)
createOccupationTable(occupationTable)
createStructNestedTable(structNestedTable)
createStructTable(structTable)
createMultiValueStructTable(multiValueTable)
Expand All @@ -52,7 +54,61 @@ class FlintSparkPPLExpandITSuite
Files.deleteIfExists(tempFile)
}

test("flatten for structs") {
test("expand for eval field of an array") {
val frame = sql(
s""" source = $occupationTable | eval array=json_array(1, 2, 3) | expand array as uid | fields name, occupation, uid
""".stripMargin)

val results: Array[Row] = frame.collect()
val expectedResults: Array[Row] = Array(
Row("Jake", "Engineer", 1),
Row("Jake", "Engineer", 2),
Row("Jake", "Engineer", 3),
Row("Hello", "Artist", 1),
Row("Hello", "Artist", 2),
Row("Hello", "Artist", 3),
Row("John", "Doctor", 1),
Row("John", "Doctor", 2),
Row("John", "Doctor", 3),
Row("David", "Doctor", 1),
Row("David", "Doctor", 2),
Row("David", "Doctor", 3),
Row("David", "Unemployed", 1),
Row("David", "Unemployed", 2),
Row("David", "Unemployed", 3),
Row("Jane", "Scientist", 1),
Row("Jane", "Scientist", 2),
Row("Jane", "Scientist", 3))

// Compare the results
assert(results.toSet == expectedResults.toSet)

val logicalPlan: LogicalPlan = frame.queryExecution.logical
// expected plan
val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_flat_table_test"))
val jsonFunc =
UnresolvedFunction("array", Seq(Literal(1), Literal(2), Literal(3)), isDistinct = false)
val aliasA = Alias(jsonFunc, "array")()
val project = Project(seq(UnresolvedStar(None), aliasA), table)
val generate = Generate(
Explode(UnresolvedAttribute("array")),
seq(),
false,
None,
seq(UnresolvedAttribute("uid")),
project)
val dropSourceColumn =
DataFrameDropColumns(Seq(UnresolvedAttribute("array")), generate)
val expectedPlan = Project(
seq(
UnresolvedAttribute("name"),
UnresolvedAttribute("occupation"),
UnresolvedAttribute("uid")),
dropSourceColumn)
comparePlans(expectedPlan, logicalPlan, checkAnalysis = false)
}

test("expand for structs") {
val frame = sql(
s""" source = $multiValueTable | expand multi_value AS exploded_multi_value | fields exploded_multi_value
""".stripMargin)
Expand All @@ -73,11 +129,10 @@ class FlintSparkPPLExpandITSuite
val logicalPlan: LogicalPlan = frame.queryExecution.logical
// expected plan
val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_multi_value_test"))
val outerGenerator = GeneratorOuter(Explode(UnresolvedAttribute("multi_value")))
val generate = Generate(
outerGenerator,
Explode(UnresolvedAttribute("multi_value")),
seq(),
outer = true,
outer = false,
None,
seq(UnresolvedAttribute("exploded_multi_value")),
table)
Expand All @@ -98,8 +153,10 @@ class FlintSparkPPLExpandITSuite
val results: Array[Row] = frame.collect()
val expectedResults: Array[Row] = Array(
Row(mutable.WrappedArray.make(Array(Row(801, "Tower Bridge"), Row(928, "London Bridge")))),
Row(mutable.WrappedArray.make(Array(Row(801, "Tower Bridge"), Row(928, "London Bridge")))),
Row(null))
Row(mutable.WrappedArray.make(Array(Row(801, "Tower Bridge"), Row(928, "London Bridge"))))
// Row(null)) -> in case of outerGenerator = GeneratorOuter(Explode(UnresolvedAttribute("bridges"))) it will include the `null` row
)

// Compare the results
assert(results.toSet == expectedResults.toSet)
val logicalPlan: LogicalPlan = frame.queryExecution.logical
Expand All @@ -109,8 +166,8 @@ class FlintSparkPPLExpandITSuite
EqualTo(UnresolvedAttribute("country"), Literal("England")),
EqualTo(UnresolvedAttribute("country"), Literal("Poland"))),
table)
val outerGenerator = GeneratorOuter(Explode(UnresolvedAttribute("bridges")))
val generate = Generate(outerGenerator, seq(), outer = true, None, seq(), filter)
val generate =
Generate(Explode(UnresolvedAttribute("bridges")), seq(), outer = false, None, seq(), filter)
val expectedPlan = Project(Seq(UnresolvedAttribute("bridges")), generate)
comparePlans(logicalPlan, expectedPlan, checkAnalysis = false)
}
Expand All @@ -134,11 +191,10 @@ class FlintSparkPPLExpandITSuite
val logicalPlan: LogicalPlan = frame.queryExecution.logical
val table = UnresolvedRelation(Seq("flint_ppl_test"))
val filter = Filter(EqualTo(UnresolvedAttribute("country"), Literal("England")), table)
val outerGenerator = GeneratorOuter(Explode(UnresolvedAttribute("bridges")))
val generate = Generate(
outerGenerator,
Explode(UnresolvedAttribute("bridges")),
seq(),
outer = true,
outer = false,
None,
seq(UnresolvedAttribute("britishBridges")),
filter)
Expand Down Expand Up @@ -166,48 +222,10 @@ class FlintSparkPPLExpandITSuite

val logicalPlan: LogicalPlan = frame.queryExecution.logical
val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_struct_test"))
val outerGenerator = GeneratorOuter(Explode(UnresolvedAttribute("bridges")))
val generate = Generate(
outerGenerator,
seq(),
outer = true,
None,
seq(UnresolvedAttribute("britishBridges")),
table)
val expectedPlan = Project(Seq(UnresolvedStar(None)), generate)
comparePlans(logicalPlan, expectedPlan, checkAnalysis = false)
}

ignore("expand multi value nullable") {
val frame = sql(s"""
| source = $multiValueTable
| | expand multi_value as expand_field
| | fields expand_field
| """.stripMargin)

assert(frame.columns.sameElements(Array("expand_field")))
val results: Array[Row] = frame.collect()
val expectedResults: Array[Row] =
Array(
Row(1, "1_one", 1),
Row(1, null, 11),
Row(1, "1_three", null),
Row(2, "2_Monday", 2),
Row(2, null, null),
Row(3, "3_third", 3),
Row(3, "3_4th", 4),
Row(4, null, null))
// Compare the results
implicit val rowOrdering: Ordering[Row] = Ordering.by[Row, Int](_.getAs[Int](0))
assert(results.sorted.sameElements(expectedResults.sorted))

val logicalPlan: LogicalPlan = frame.queryExecution.logical
val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_multi_value_test"))
val outerGenerator = GeneratorOuter(Explode(UnresolvedAttribute("bridges")))
val generate = Generate(
outerGenerator,
Explode(UnresolvedAttribute("bridges")),
seq(),
outer = true,
outer = false,
None,
seq(UnresolvedAttribute("britishBridges")),
table)
Expand Down Expand Up @@ -241,15 +259,15 @@ class FlintSparkPPLExpandITSuite
val logicalPlan: LogicalPlan = frame.queryExecution.logical
val table =
UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_struct_nested_test"))
// val flattenStructCol = generator("struct_col", table)
// val flattenField1 = generator("field1", flattenStructCol)
// val flattenStructCol2 = generator("struct_col2", flattenField1)
// val flattenField1Again = generator("field1", flattenStructCol2)
// val expandStructCol = generator("struct_col", table)
// val expandField1 = generator("field1", expandStructCol)
// val expandStructCol2 = generator("struct_col2", expandField1)
// val expandField1Again = generator("field1", expandStructCol2)
val expectedPlan = Project(Seq(UnresolvedStar(None)), table)
comparePlans(logicalPlan, expectedPlan, checkAnalysis = false)
}

ignore("flatten multi value nullable") {
ignore("expand multi value nullable") {
val frame = sql(s"""
| source = $multiValueTable
| | expand multi_value as expand_field
Expand All @@ -274,11 +292,10 @@ class FlintSparkPPLExpandITSuite

val logicalPlan: LogicalPlan = frame.queryExecution.logical
val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_multi_value_test"))
val outerGenerator = GeneratorOuter(Explode(UnresolvedAttribute("bridges")))
val generate = Generate(
outerGenerator,
Explode(UnresolvedAttribute("bridges")),
seq(),
outer = true,
outer = false,
None,
seq(UnresolvedAttribute("britishBridges")),
table)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -479,12 +479,12 @@ public LogicalPlan visitExpand(org.opensearch.sql.ast.tree.Expand node, Catalyst
Optional<Expression> alias = node.getAlias().map(aliasNode -> visitExpression(aliasNode, context));
context.retainAllNamedParseExpressions(p -> (NamedExpression) p);
Explode explodeGenerator = new Explode(field);
scala.collection.mutable.Seq seq = alias.isEmpty() ? seq() : seq(alias.get());
scala.collection.mutable.Seq outputs = alias.isEmpty() ? seq() : seq(alias.get());
if(alias.isEmpty())
return context.apply(p -> new Generate(new GeneratorOuter(explodeGenerator), seq(), true, (Option) None$.MODULE$, seq, p));
return context.apply(p -> new Generate(explodeGenerator, seq(), false, (Option) None$.MODULE$, outputs, p));
else {
//in case an alias does appear - remove the original field from the returning columns
context.apply(p -> new Generate(new GeneratorOuter(explodeGenerator), seq(), true, (Option) None$.MODULE$, seq, p));
context.apply(p -> new Generate(explodeGenerator, seq(), false, (Option) None$.MODULE$, outputs, p));
return context.apply(logicalPlan -> DataFrameDropColumns$.MODULE$.apply(seq(field), logicalPlan));
}
}
Expand Down
Loading

0 comments on commit a91a986

Please sign in to comment.