From 0899d53202354efc7ba1b25afb5f7013c9dfe62a Mon Sep 17 00:00:00 2001 From: Heng Qian Date: Mon, 18 Nov 2024 15:10:29 +0800 Subject: [PATCH 1/5] Support flatten with alias Signed-off-by: Heng Qian --- docs/ppl-lang/ppl-flatten-command.md | 19 +++++++- .../ppl/FlintSparkPPLFlattenITSuite.scala | 26 +++++++++++ .../src/main/antlr4/OpenSearchPPLParser.g4 | 7 ++- .../org/opensearch/sql/ast/tree/Flatten.java | 3 ++ .../sql/ppl/CatalystQueryPlanVisitor.java | 7 ++- .../opensearch/sql/ppl/parser/AstBuilder.java | 4 +- .../sql/ppl/parser/AstExpressionBuilder.java | 5 +++ ...lanFlattenCommandTranslatorTestSuite.scala | 45 ++++++++++++++++++- 8 files changed, 109 insertions(+), 7 deletions(-) diff --git a/docs/ppl-lang/ppl-flatten-command.md b/docs/ppl-lang/ppl-flatten-command.md index 4c1ae5d0d..5af13f335 100644 --- a/docs/ppl-lang/ppl-flatten-command.md +++ b/docs/ppl-lang/ppl-flatten-command.md @@ -7,9 +7,10 @@ Using `flatten` command to flatten a field of type: ### Syntax -`flatten ` +`flatten [As alias]` * field: to be flattened. The field must be of supported type. +* alias: to be used as alias for the flattened-output fields. Need to put the alias in brace if there is more than one field. ### Test table #### Schema @@ -87,4 +88,18 @@ PPL query: | 2024-09-13T12:00:00 | Prague | Czech Republic| 343 | Legion Bridge | 200 | 50.0755| 14.4378| | 2024-09-13T12:00:00 | Budapest| Hungary | 375 | Chain Bridge | 96 | 47.4979| 19.0402| | 2024-09-13T12:00:00 | Budapest| Hungary | 333 | Liberty Bridge | 96 | 47.4979| 19.0402| -| 1990-09-13T12:00:00 | Warsaw | Poland | NULL | NULL | NULL | NULL | NULL | \ No newline at end of file +| 1990-09-13T12:00:00 | Warsaw | Poland | NULL | NULL | NULL | NULL | NULL | + +### Example 4: flatten with alias +This example shows how to flatten with alias. +PPL query: + - `source=table | flatten coor as (altitude, latitude, longitude)` + +| \_time | bridges | city | country | altitude | latitude | longtitude | +|---------------------|----------------------------------------------|---------|---------------|----------|----------|------------| +| 2024-09-13T12:00:00 | [{801, Tower Bridge}, {928, London Bridge}] | London | England | 35 | 51.5074 | -0.1278 | +| 2024-09-13T12:00:00 | [{232, Pont Neuf}, {160, Pont Alexandre III}]| Paris | France | 35 | 48.8566 | 2.3522 | +| 2024-09-13T12:00:00 | [{48, Rialto Bridge}, {11, Bridge of Sighs}] | Venice | Italy | 2 | 45.4408 | 12.3155 | +| 2024-09-13T12:00:00 | [{516, Charles Bridge}, {343, Legion Bridge}]| Prague | Czech Republic| 200 | 50.0755 | 14.4378 | +| 2024-09-13T12:00:00 | [{375, Chain Bridge}, {333, Liberty Bridge}] | Budapest| Hungary | 96 | 47.4979 | 19.0402 | +| 1990-09-13T12:00:00 | NULL | Warsaw | Poland | NULL | NULL | NULL | diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLFlattenITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLFlattenITSuite.scala index e714a5f7e..c1a061b20 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLFlattenITSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLFlattenITSuite.scala @@ -347,4 +347,30 @@ class FlintSparkPPLFlattenITSuite val expectedPlan = Project(Seq(UnresolvedStar(None)), flattenMultiValue) comparePlans(logicalPlan, expectedPlan, checkAnalysis = false) } + + test("flatten struct nested table using alias") { + val frame = sql(s""" + | source = $structNestedTable + | | flatten struct_col + | | flatten field1 as subfield_1 + | | flatten struct_col2 as (field1, field2_2) + | | flatten field1 as subfield_2 + | """.stripMargin) + + assert( + frame.columns.sameElements( + Array("int_col", "field2", "subfield_1", "field2_2", "subfield_2"))) + val results: Array[Row] = frame.collect() + val expectedResults: Array[Row] = + Array( + Row(30, 123, "value1", 23, "valueA"), + Row(40, 123, "value5", 33, "valueB"), + Row(30, 823, "value4", 83, "valueC"), + Row(40, 456, "value2", 46, "valueD"), + Row(50, 789, "value3", 89, "valueE")) + // Compare the results + implicit val rowOrdering: Ordering[Row] = Ordering.by[Row, Int](_.getAs[Int](0)) + assert(results.sorted.sameElements(expectedResults.sorted)) + } + } diff --git a/ppl-spark-integration/src/main/antlr4/OpenSearchPPLParser.g4 b/ppl-spark-integration/src/main/antlr4/OpenSearchPPLParser.g4 index 357673e73..675893b7e 100644 --- a/ppl-spark-integration/src/main/antlr4/OpenSearchPPLParser.g4 +++ b/ppl-spark-integration/src/main/antlr4/OpenSearchPPLParser.g4 @@ -259,7 +259,7 @@ expandCommand ; flattenCommand - : FLATTEN fieldExpression + : FLATTEN fieldExpression (AS alias = identifierSeq)? ; trendlineCommand @@ -1032,6 +1032,11 @@ qualifiedName : ident (DOT ident)* # identsAsQualifiedName ; +identifierSeq + : qualifiedName (COMMA qualifiedName)* # identsAsQualifiedNameSeq + | LT_PRTHS qualifiedName (COMMA qualifiedName)* RT_PRTHS # identsAsQualifiedNameSeq + ; + tableQualifiedName : tableIdent (DOT ident)* # identsAsTableQualifiedName ; diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/ast/tree/Flatten.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/ast/tree/Flatten.java index 9c57d2adf..9c9179057 100644 --- a/ppl-spark-integration/src/main/java/org/opensearch/sql/ast/tree/Flatten.java +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/ast/tree/Flatten.java @@ -7,6 +7,7 @@ import org.opensearch.sql.ast.expression.Field; import java.util.List; +import org.opensearch.sql.ast.expression.UnresolvedExpression; @RequiredArgsConstructor public class Flatten extends UnresolvedPlan { @@ -15,6 +16,8 @@ public class Flatten extends UnresolvedPlan { @Getter private final Field field; + @Getter + private final List alias; @Override public UnresolvedPlan attach(UnresolvedPlan child) { diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java index debd37376..53139948c 100644 --- a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java @@ -10,6 +10,7 @@ import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation; import org.apache.spark.sql.catalyst.analysis.UnresolvedStar$; import org.apache.spark.sql.catalyst.expressions.Ascending$; +import org.apache.spark.sql.catalyst.expressions.Attribute; import org.apache.spark.sql.catalyst.expressions.Descending$; import org.apache.spark.sql.catalyst.expressions.Explode; import org.apache.spark.sql.catalyst.expressions.Expression; @@ -462,9 +463,13 @@ public LogicalPlan visitFlatten(Flatten flatten, CatalystPlanContext context) { context.getNamedParseExpressions().push(UnresolvedStar$.MODULE$.apply(Option.>empty())); } Expression field = visitExpression(flatten.getField(), context); + List alias = flatten.getAlias().stream() + .map(aliasNode -> visitExpression(aliasNode, context)) + .collect(Collectors.toList()); context.retainAllNamedParseExpressions(p -> (NamedExpression) p); FlattenGenerator flattenGenerator = new FlattenGenerator(field); - context.apply(p -> new Generate(new GeneratorOuter(flattenGenerator), seq(), true, (Option) None$.MODULE$, seq(), p)); + scala.collection.mutable.Seq outputs = alias.isEmpty() ? seq() : seq(alias); + context.apply(p -> new Generate(new GeneratorOuter(flattenGenerator), seq(), true, (Option) None$.MODULE$, outputs, p)); return context.apply(logicalPlan -> DataFrameDropColumns$.MODULE$.apply(seq(field), logicalPlan)); } diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java index 7d1cc072b..b1254bf8f 100644 --- a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java @@ -18,6 +18,7 @@ import org.opensearch.sql.ast.expression.Alias; import org.opensearch.sql.ast.expression.And; import org.opensearch.sql.ast.expression.Argument; +import org.opensearch.sql.ast.expression.AttributeList; import org.opensearch.sql.ast.expression.DataType; import org.opensearch.sql.ast.expression.EqualTo; import org.opensearch.sql.ast.expression.Field; @@ -605,7 +606,8 @@ public UnresolvedPlan visitFillnullCommand(OpenSearchPPLParser.FillnullCommandCo @Override public UnresolvedPlan visitFlattenCommand(OpenSearchPPLParser.FlattenCommandContext ctx) { Field unresolvedExpression = (Field) internalVisitExpression(ctx.fieldExpression()); - return new Flatten(unresolvedExpression); + List alias = ctx.alias == null ? emptyList() : ((AttributeList) internalVisitExpression(ctx.alias)).getAttrList(); + return new Flatten(unresolvedExpression, alias); } /** AD command. */ diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/parser/AstExpressionBuilder.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/parser/AstExpressionBuilder.java index e683a1395..1fef4251e 100644 --- a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/parser/AstExpressionBuilder.java +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/parser/AstExpressionBuilder.java @@ -329,6 +329,11 @@ public UnresolvedExpression visitIdentsAsQualifiedName(OpenSearchPPLParser.Ident return visitIdentifiers(ctx.ident()); } + @Override + public UnresolvedExpression visitIdentsAsQualifiedNameSeq(OpenSearchPPLParser.IdentsAsQualifiedNameSeqContext ctx) { + return new AttributeList(ctx.qualifiedName().stream().map(this::visit).collect(Collectors.toList())); + } + @Override public UnresolvedExpression visitIdentsAsTableQualifiedName( OpenSearchPPLParser.IdentsAsTableQualifiedNameContext ctx) { diff --git a/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanFlattenCommandTranslatorTestSuite.scala b/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanFlattenCommandTranslatorTestSuite.scala index 58a6c04b3..543e5c05d 100644 --- a/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanFlattenCommandTranslatorTestSuite.scala +++ b/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanFlattenCommandTranslatorTestSuite.scala @@ -13,9 +13,9 @@ import org.scalatest.matchers.should.Matchers import org.apache.spark.SparkFunSuite import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedFunction, UnresolvedRelation, UnresolvedStar} -import org.apache.spark.sql.catalyst.expressions.{Alias, Descending, GeneratorOuter, Literal, NullsLast, RegExpExtract, SortOrder} +import org.apache.spark.sql.catalyst.expressions.{Alias, GeneratorOuter, Literal, RegExpExtract} import org.apache.spark.sql.catalyst.plans.PlanTest -import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, DataFrameDropColumns, Generate, GlobalLimit, LocalLimit, Project, Sort} +import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, DataFrameDropColumns, Generate, Project} import org.apache.spark.sql.types.IntegerType class PPLLogicalPlanFlattenCommandTranslatorTestSuite @@ -153,4 +153,45 @@ class PPLLogicalPlanFlattenCommandTranslatorTestSuite comparePlans(expectedPlan, logPlan, checkAnalysis = false) } + test("test flatten with one alias") { + val context = new CatalystPlanContext + val logPlan = + planTransformer.visit( + plan(pplParser, "source=relation | flatten field_with_array as col1"), + context) + + val relation = UnresolvedRelation(Seq("relation")) + val flattenGenerator = new FlattenGenerator(UnresolvedAttribute("field_with_array")) + val outerGenerator = GeneratorOuter(flattenGenerator) + val generate = + Generate(outerGenerator, seq(), true, None, Seq(UnresolvedAttribute("col1")), relation) + val dropSourceColumn = + DataFrameDropColumns(Seq(UnresolvedAttribute("field_with_array")), generate) + val expectedPlan = Project(seq(UnresolvedStar(None)), dropSourceColumn) + comparePlans(expectedPlan, logPlan, checkAnalysis = false) + } + + test("test flatten with alias list") { + val context = new CatalystPlanContext + val logPlan = + planTransformer.visit( + plan(pplParser, "source=relation | flatten field_with_array as (col1, col2)"), + context) + + val relation = UnresolvedRelation(Seq("relation")) + val flattenGenerator = new FlattenGenerator(UnresolvedAttribute("field_with_array")) + val outerGenerator = GeneratorOuter(flattenGenerator) + val generate = Generate( + outerGenerator, + seq(), + true, + None, + Seq(UnresolvedAttribute("col1"), UnresolvedAttribute("col2")), + relation) + val dropSourceColumn = + DataFrameDropColumns(Seq(UnresolvedAttribute("field_with_array")), generate) + val expectedPlan = Project(seq(UnresolvedStar(None)), dropSourceColumn) + comparePlans(expectedPlan, logPlan, checkAnalysis = false) + } + } From 9d9ad8045eb25e3c0debd49be2ca954e334e7190 Mon Sep 17 00:00:00 2001 From: Heng Qian Date: Mon, 18 Nov 2024 17:49:19 +0800 Subject: [PATCH 2/5] Address comments Signed-off-by: Heng Qian --- docs/ppl-lang/ppl-flatten-command.md | 8 ++-- .../ppl/FlintSparkPPLFlattenITSuite.scala | 41 ++++++++++++++++++- .../org/opensearch/sql/ast/tree/Flatten.java | 2 +- .../sql/ppl/CatalystQueryPlanVisitor.java | 5 +-- 4 files changed, 46 insertions(+), 10 deletions(-) diff --git a/docs/ppl-lang/ppl-flatten-command.md b/docs/ppl-lang/ppl-flatten-command.md index 5af13f335..68b03e82e 100644 --- a/docs/ppl-lang/ppl-flatten-command.md +++ b/docs/ppl-lang/ppl-flatten-command.md @@ -7,10 +7,10 @@ Using `flatten` command to flatten a field of type: ### Syntax -`flatten [As alias]` +`flatten [As aliasSequence]` * field: to be flattened. The field must be of supported type. -* alias: to be used as alias for the flattened-output fields. Need to put the alias in brace if there is more than one field. +* aliasSequence: to be used as aliasSequence for the flattened-output fields. Better to put the aliasSequence in brace if there is more than one field. ### Test table #### Schema @@ -90,8 +90,8 @@ PPL query: | 2024-09-13T12:00:00 | Budapest| Hungary | 333 | Liberty Bridge | 96 | 47.4979| 19.0402| | 1990-09-13T12:00:00 | Warsaw | Poland | NULL | NULL | NULL | NULL | NULL | -### Example 4: flatten with alias -This example shows how to flatten with alias. +### Example 4: flatten with aliasSequence +This example shows how to flatten with aliasSequence. PPL query: - `source=table | flatten coor as (altitude, latitude, longitude)` diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLFlattenITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLFlattenITSuite.scala index c1a061b20..e3c780e06 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLFlattenITSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLFlattenITSuite.scala @@ -9,7 +9,7 @@ import java.nio.file.Files import org.opensearch.flint.spark.FlattenGenerator import org.opensearch.sql.ppl.utils.DataTypeTransformer.seq -import org.apache.spark.sql.{QueryTest, Row} +import org.apache.spark.sql.{AnalysisException, QueryTest, Row} import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedFunction, UnresolvedRelation, UnresolvedStar} import org.apache.spark.sql.catalyst.expressions.{Alias, EqualTo, GeneratorOuter, Literal, Or} import org.apache.spark.sql.catalyst.plans.logical._ @@ -371,6 +371,45 @@ class FlintSparkPPLFlattenITSuite // Compare the results implicit val rowOrdering: Ordering[Row] = Ordering.by[Row, Int](_.getAs[Int](0)) assert(results.sorted.sameElements(expectedResults.sorted)) + + // duplicate alias names + val frame2 = sql(s""" + | source = $structNestedTable + | | flatten struct_col as (field1, field2_2) + | | flatten field1 as subfield_1 + | | flatten struct_col2 as (field1, field2_2) + | | flatten field1 as subfield_2 + | """.stripMargin) + + // alias names duplicate with existing fields + assert( + frame2.columns.sameElements( + Array("int_col", "field2_2", "subfield_1", "field2_2", "subfield_2"))) + assert(frame2.collect().sorted.sameElements(expectedResults.sorted)) + + val frame3 = sql(s""" + | source = $structNestedTable + | | flatten struct_col as (field1, field2_2) + | | flatten field1 as int_col + | | flatten struct_col2 as (field1, field2_2) + | | flatten field1 as int_col + | """.stripMargin) + + assert( + frame3.columns.sameElements(Array("int_col", "field2_2", "int_col", "field2_2", "int_col"))) + assert(frame3.collect().sorted.sameElements(expectedResults.sorted)) + + // Throw AnalysisException if The number of aliases supplied in the AS clause does not match the + // number of columns output + assertThrows[AnalysisException] { + sql(s""" + | source = $structNestedTable + | | flatten struct_col as (field1) + | | flatten field1 as int_col + | | flatten struct_col2 as (field1, field2_2) + | | flatten field1 as int_col + | """.stripMargin) + } } } diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/ast/tree/Flatten.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/ast/tree/Flatten.java index 9c9179057..d6885b299 100644 --- a/ppl-spark-integration/src/main/java/org/opensearch/sql/ast/tree/Flatten.java +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/ast/tree/Flatten.java @@ -17,7 +17,7 @@ public class Flatten extends UnresolvedPlan { @Getter private final Field field; @Getter - private final List alias; + private final List aliases; @Override public UnresolvedPlan attach(UnresolvedPlan child) { diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java index 53139948c..3ea1acb75 100644 --- a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java @@ -10,7 +10,6 @@ import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation; import org.apache.spark.sql.catalyst.analysis.UnresolvedStar$; import org.apache.spark.sql.catalyst.expressions.Ascending$; -import org.apache.spark.sql.catalyst.expressions.Attribute; import org.apache.spark.sql.catalyst.expressions.Descending$; import org.apache.spark.sql.catalyst.expressions.Explode; import org.apache.spark.sql.catalyst.expressions.Expression; @@ -85,12 +84,10 @@ import java.util.List; import java.util.Objects; import java.util.Optional; -import java.util.function.BiConsumer; import java.util.stream.Collectors; import static java.util.Collections.emptyList; import static java.util.List.of; -import static org.opensearch.sql.ppl.CatalystPlanContext.findRelation; import static org.opensearch.sql.ppl.utils.DataTypeTransformer.seq; import static org.opensearch.sql.ppl.utils.DedupeTransformer.retainMultipleDuplicateEvents; import static org.opensearch.sql.ppl.utils.DedupeTransformer.retainMultipleDuplicateEventsAndKeepEmpty; @@ -463,7 +460,7 @@ public LogicalPlan visitFlatten(Flatten flatten, CatalystPlanContext context) { context.getNamedParseExpressions().push(UnresolvedStar$.MODULE$.apply(Option.>empty())); } Expression field = visitExpression(flatten.getField(), context); - List alias = flatten.getAlias().stream() + List alias = flatten.getAliases().stream() .map(aliasNode -> visitExpression(aliasNode, context)) .collect(Collectors.toList()); context.retainAllNamedParseExpressions(p -> (NamedExpression) p); From e494c5d228aebae3e3df0e75c1618e5c40881988 Mon Sep 17 00:00:00 2001 From: Heng Qian Date: Mon, 18 Nov 2024 17:53:06 +0800 Subject: [PATCH 3/5] Address comments Signed-off-by: Heng Qian --- .../src/main/java/org/opensearch/sql/ast/tree/Flatten.java | 2 +- .../java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/ast/tree/Flatten.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/ast/tree/Flatten.java index d6885b299..36c126591 100644 --- a/ppl-spark-integration/src/main/java/org/opensearch/sql/ast/tree/Flatten.java +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/ast/tree/Flatten.java @@ -17,7 +17,7 @@ public class Flatten extends UnresolvedPlan { @Getter private final Field field; @Getter - private final List aliases; + private final List aliasSequence; @Override public UnresolvedPlan attach(UnresolvedPlan child) { diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java index 3ea1acb75..af94dd004 100644 --- a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java @@ -460,7 +460,7 @@ public LogicalPlan visitFlatten(Flatten flatten, CatalystPlanContext context) { context.getNamedParseExpressions().push(UnresolvedStar$.MODULE$.apply(Option.>empty())); } Expression field = visitExpression(flatten.getField(), context); - List alias = flatten.getAliases().stream() + List alias = flatten.getAliasSequence().stream() .map(aliasNode -> visitExpression(aliasNode, context)) .collect(Collectors.toList()); context.retainAllNamedParseExpressions(p -> (NamedExpression) p); From dd72b1d26a276da6b3aee1da203e4f07d2d59457 Mon Sep 17 00:00:00 2001 From: Heng Qian Date: Tue, 19 Nov 2024 10:22:29 +0800 Subject: [PATCH 4/5] Add flatten with alias example Signed-off-by: Heng Qian --- docs/ppl-lang/PPL-Example-Commands.md | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/ppl-lang/PPL-Example-Commands.md b/docs/ppl-lang/PPL-Example-Commands.md index 7766c3b50..26ddd3613 100644 --- a/docs/ppl-lang/PPL-Example-Commands.md +++ b/docs/ppl-lang/PPL-Example-Commands.md @@ -140,6 +140,7 @@ Assumptions: `a`, `b`, `c`, `d`, `e` are existing fields in `table` Assumptions: `bridges`, `coor` are existing fields in `table`, and the field's types are `struct` or `array>` - `source = table | flatten bridges` - `source = table | flatten coor` +- `source = table | flatten coor as (altitude, latitude, longitude)` - `source = table | flatten bridges | flatten coor` - `source = table | fields bridges | flatten bridges` - `source = table | fields country, bridges | flatten bridges | fields country, length | stats avg(length) as avg by country` From b7cdf91d36d8ff655b04b0e891061ae29caf5511 Mon Sep 17 00:00:00 2001 From: Heng Qian Date: Wed, 20 Nov 2024 11:38:05 +0800 Subject: [PATCH 5/5] Add IT for ambiguous exception Signed-off-by: Heng Qian --- .../ppl/FlintSparkPPLFlattenITSuite.scala | 28 +++++++++++++++---- 1 file changed, 22 insertions(+), 6 deletions(-) diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLFlattenITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLFlattenITSuite.scala index e3c780e06..7d1b6e437 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLFlattenITSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLFlattenITSuite.scala @@ -361,16 +361,16 @@ class FlintSparkPPLFlattenITSuite frame.columns.sameElements( Array("int_col", "field2", "subfield_1", "field2_2", "subfield_2"))) val results: Array[Row] = frame.collect() + implicit val rowOrdering: Ordering[Row] = Ordering.by[Row, Int](_.getAs[Int](0)) val expectedResults: Array[Row] = Array( Row(30, 123, "value1", 23, "valueA"), Row(40, 123, "value5", 33, "valueB"), Row(30, 823, "value4", 83, "valueC"), Row(40, 456, "value2", 46, "valueD"), - Row(50, 789, "value3", 89, "valueE")) + Row(50, 789, "value3", 89, "valueE")).sorted // Compare the results - implicit val rowOrdering: Ordering[Row] = Ordering.by[Row, Int](_.getAs[Int](0)) - assert(results.sorted.sameElements(expectedResults.sorted)) + assert(results.sorted.sameElements(expectedResults)) // duplicate alias names val frame2 = sql(s""" @@ -385,7 +385,7 @@ class FlintSparkPPLFlattenITSuite assert( frame2.columns.sameElements( Array("int_col", "field2_2", "subfield_1", "field2_2", "subfield_2"))) - assert(frame2.collect().sorted.sameElements(expectedResults.sorted)) + assert(frame2.collect().sorted.sameElements(expectedResults)) val frame3 = sql(s""" | source = $structNestedTable @@ -397,11 +397,11 @@ class FlintSparkPPLFlattenITSuite assert( frame3.columns.sameElements(Array("int_col", "field2_2", "int_col", "field2_2", "int_col"))) - assert(frame3.collect().sorted.sameElements(expectedResults.sorted)) + assert(frame3.collect().sorted.sameElements(expectedResults)) // Throw AnalysisException if The number of aliases supplied in the AS clause does not match the // number of columns output - assertThrows[AnalysisException] { + val except = intercept[AnalysisException] { sql(s""" | source = $structNestedTable | | flatten struct_col as (field1) @@ -410,6 +410,22 @@ class FlintSparkPPLFlattenITSuite | | flatten field1 as int_col | """.stripMargin) } + assert(except.message.contains( + "The number of aliases supplied in the AS clause does not match the number of columns output by the UDTF")) + + // Throw AnalysisException because of ambiguous + val except2 = intercept[AnalysisException] { + sql(s""" + | source = $structNestedTable + | | flatten struct_col as (field1, field2_2) + | | flatten field1 as int_col + | | flatten struct_col2 as (field1, field2_2) + | | flatten field1 as int_col + | | fields field2_2 + | """.stripMargin) + } + assert(except2.message.contains( + "[AMBIGUOUS_REFERENCE] Reference `field2_2` is ambiguous, could be: [`field2_2`, `field2_2`].")) } }