diff --git a/docs/ppl-lang/PPL-Example-Commands.md b/docs/ppl-lang/PPL-Example-Commands.md index 7766c3b50..26ddd3613 100644 --- a/docs/ppl-lang/PPL-Example-Commands.md +++ b/docs/ppl-lang/PPL-Example-Commands.md @@ -140,6 +140,7 @@ Assumptions: `a`, `b`, `c`, `d`, `e` are existing fields in `table` Assumptions: `bridges`, `coor` are existing fields in `table`, and the field's types are `struct` or `array>` - `source = table | flatten bridges` - `source = table | flatten coor` +- `source = table | flatten coor as (altitude, latitude, longitude)` - `source = table | flatten bridges | flatten coor` - `source = table | fields bridges | flatten bridges` - `source = table | fields country, bridges | flatten bridges | fields country, length | stats avg(length) as avg by country` diff --git a/docs/ppl-lang/ppl-flatten-command.md b/docs/ppl-lang/ppl-flatten-command.md index 4c1ae5d0d..68b03e82e 100644 --- a/docs/ppl-lang/ppl-flatten-command.md +++ b/docs/ppl-lang/ppl-flatten-command.md @@ -7,9 +7,10 @@ Using `flatten` command to flatten a field of type: ### Syntax -`flatten ` +`flatten [As aliasSequence]` * field: to be flattened. The field must be of supported type. +* aliasSequence: to be used as aliasSequence for the flattened-output fields. Better to put the aliasSequence in brace if there is more than one field. ### Test table #### Schema @@ -87,4 +88,18 @@ PPL query: | 2024-09-13T12:00:00 | Prague | Czech Republic| 343 | Legion Bridge | 200 | 50.0755| 14.4378| | 2024-09-13T12:00:00 | Budapest| Hungary | 375 | Chain Bridge | 96 | 47.4979| 19.0402| | 2024-09-13T12:00:00 | Budapest| Hungary | 333 | Liberty Bridge | 96 | 47.4979| 19.0402| -| 1990-09-13T12:00:00 | Warsaw | Poland | NULL | NULL | NULL | NULL | NULL | \ No newline at end of file +| 1990-09-13T12:00:00 | Warsaw | Poland | NULL | NULL | NULL | NULL | NULL | + +### Example 4: flatten with aliasSequence +This example shows how to flatten with aliasSequence. +PPL query: + - `source=table | flatten coor as (altitude, latitude, longitude)` + +| \_time | bridges | city | country | altitude | latitude | longtitude | +|---------------------|----------------------------------------------|---------|---------------|----------|----------|------------| +| 2024-09-13T12:00:00 | [{801, Tower Bridge}, {928, London Bridge}] | London | England | 35 | 51.5074 | -0.1278 | +| 2024-09-13T12:00:00 | [{232, Pont Neuf}, {160, Pont Alexandre III}]| Paris | France | 35 | 48.8566 | 2.3522 | +| 2024-09-13T12:00:00 | [{48, Rialto Bridge}, {11, Bridge of Sighs}] | Venice | Italy | 2 | 45.4408 | 12.3155 | +| 2024-09-13T12:00:00 | [{516, Charles Bridge}, {343, Legion Bridge}]| Prague | Czech Republic| 200 | 50.0755 | 14.4378 | +| 2024-09-13T12:00:00 | [{375, Chain Bridge}, {333, Liberty Bridge}] | Budapest| Hungary | 96 | 47.4979 | 19.0402 | +| 1990-09-13T12:00:00 | NULL | Warsaw | Poland | NULL | NULL | NULL | diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLFlattenITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLFlattenITSuite.scala index e714a5f7e..7d1b6e437 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLFlattenITSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLFlattenITSuite.scala @@ -9,7 +9,7 @@ import java.nio.file.Files import org.opensearch.flint.spark.FlattenGenerator import org.opensearch.sql.ppl.utils.DataTypeTransformer.seq -import org.apache.spark.sql.{QueryTest, Row} +import org.apache.spark.sql.{AnalysisException, QueryTest, Row} import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedFunction, UnresolvedRelation, UnresolvedStar} import org.apache.spark.sql.catalyst.expressions.{Alias, EqualTo, GeneratorOuter, Literal, Or} import org.apache.spark.sql.catalyst.plans.logical._ @@ -347,4 +347,85 @@ class FlintSparkPPLFlattenITSuite val expectedPlan = Project(Seq(UnresolvedStar(None)), flattenMultiValue) comparePlans(logicalPlan, expectedPlan, checkAnalysis = false) } + + test("flatten struct nested table using alias") { + val frame = sql(s""" + | source = $structNestedTable + | | flatten struct_col + | | flatten field1 as subfield_1 + | | flatten struct_col2 as (field1, field2_2) + | | flatten field1 as subfield_2 + | """.stripMargin) + + assert( + frame.columns.sameElements( + Array("int_col", "field2", "subfield_1", "field2_2", "subfield_2"))) + val results: Array[Row] = frame.collect() + implicit val rowOrdering: Ordering[Row] = Ordering.by[Row, Int](_.getAs[Int](0)) + val expectedResults: Array[Row] = + Array( + Row(30, 123, "value1", 23, "valueA"), + Row(40, 123, "value5", 33, "valueB"), + Row(30, 823, "value4", 83, "valueC"), + Row(40, 456, "value2", 46, "valueD"), + Row(50, 789, "value3", 89, "valueE")).sorted + // Compare the results + assert(results.sorted.sameElements(expectedResults)) + + // duplicate alias names + val frame2 = sql(s""" + | source = $structNestedTable + | | flatten struct_col as (field1, field2_2) + | | flatten field1 as subfield_1 + | | flatten struct_col2 as (field1, field2_2) + | | flatten field1 as subfield_2 + | """.stripMargin) + + // alias names duplicate with existing fields + assert( + frame2.columns.sameElements( + Array("int_col", "field2_2", "subfield_1", "field2_2", "subfield_2"))) + assert(frame2.collect().sorted.sameElements(expectedResults)) + + val frame3 = sql(s""" + | source = $structNestedTable + | | flatten struct_col as (field1, field2_2) + | | flatten field1 as int_col + | | flatten struct_col2 as (field1, field2_2) + | | flatten field1 as int_col + | """.stripMargin) + + assert( + frame3.columns.sameElements(Array("int_col", "field2_2", "int_col", "field2_2", "int_col"))) + assert(frame3.collect().sorted.sameElements(expectedResults)) + + // Throw AnalysisException if The number of aliases supplied in the AS clause does not match the + // number of columns output + val except = intercept[AnalysisException] { + sql(s""" + | source = $structNestedTable + | | flatten struct_col as (field1) + | | flatten field1 as int_col + | | flatten struct_col2 as (field1, field2_2) + | | flatten field1 as int_col + | """.stripMargin) + } + assert(except.message.contains( + "The number of aliases supplied in the AS clause does not match the number of columns output by the UDTF")) + + // Throw AnalysisException because of ambiguous + val except2 = intercept[AnalysisException] { + sql(s""" + | source = $structNestedTable + | | flatten struct_col as (field1, field2_2) + | | flatten field1 as int_col + | | flatten struct_col2 as (field1, field2_2) + | | flatten field1 as int_col + | | fields field2_2 + | """.stripMargin) + } + assert(except2.message.contains( + "[AMBIGUOUS_REFERENCE] Reference `field2_2` is ambiguous, could be: [`field2_2`, `field2_2`].")) + } + } diff --git a/ppl-spark-integration/src/main/antlr4/OpenSearchPPLParser.g4 b/ppl-spark-integration/src/main/antlr4/OpenSearchPPLParser.g4 index 357673e73..675893b7e 100644 --- a/ppl-spark-integration/src/main/antlr4/OpenSearchPPLParser.g4 +++ b/ppl-spark-integration/src/main/antlr4/OpenSearchPPLParser.g4 @@ -259,7 +259,7 @@ expandCommand ; flattenCommand - : FLATTEN fieldExpression + : FLATTEN fieldExpression (AS alias = identifierSeq)? ; trendlineCommand @@ -1032,6 +1032,11 @@ qualifiedName : ident (DOT ident)* # identsAsQualifiedName ; +identifierSeq + : qualifiedName (COMMA qualifiedName)* # identsAsQualifiedNameSeq + | LT_PRTHS qualifiedName (COMMA qualifiedName)* RT_PRTHS # identsAsQualifiedNameSeq + ; + tableQualifiedName : tableIdent (DOT ident)* # identsAsTableQualifiedName ; diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/ast/tree/Flatten.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/ast/tree/Flatten.java index 9c57d2adf..36c126591 100644 --- a/ppl-spark-integration/src/main/java/org/opensearch/sql/ast/tree/Flatten.java +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/ast/tree/Flatten.java @@ -7,6 +7,7 @@ import org.opensearch.sql.ast.expression.Field; import java.util.List; +import org.opensearch.sql.ast.expression.UnresolvedExpression; @RequiredArgsConstructor public class Flatten extends UnresolvedPlan { @@ -15,6 +16,8 @@ public class Flatten extends UnresolvedPlan { @Getter private final Field field; + @Getter + private final List aliasSequence; @Override public UnresolvedPlan attach(UnresolvedPlan child) { diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java index debd37376..af94dd004 100644 --- a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java @@ -84,12 +84,10 @@ import java.util.List; import java.util.Objects; import java.util.Optional; -import java.util.function.BiConsumer; import java.util.stream.Collectors; import static java.util.Collections.emptyList; import static java.util.List.of; -import static org.opensearch.sql.ppl.CatalystPlanContext.findRelation; import static org.opensearch.sql.ppl.utils.DataTypeTransformer.seq; import static org.opensearch.sql.ppl.utils.DedupeTransformer.retainMultipleDuplicateEvents; import static org.opensearch.sql.ppl.utils.DedupeTransformer.retainMultipleDuplicateEventsAndKeepEmpty; @@ -462,9 +460,13 @@ public LogicalPlan visitFlatten(Flatten flatten, CatalystPlanContext context) { context.getNamedParseExpressions().push(UnresolvedStar$.MODULE$.apply(Option.>empty())); } Expression field = visitExpression(flatten.getField(), context); + List alias = flatten.getAliasSequence().stream() + .map(aliasNode -> visitExpression(aliasNode, context)) + .collect(Collectors.toList()); context.retainAllNamedParseExpressions(p -> (NamedExpression) p); FlattenGenerator flattenGenerator = new FlattenGenerator(field); - context.apply(p -> new Generate(new GeneratorOuter(flattenGenerator), seq(), true, (Option) None$.MODULE$, seq(), p)); + scala.collection.mutable.Seq outputs = alias.isEmpty() ? seq() : seq(alias); + context.apply(p -> new Generate(new GeneratorOuter(flattenGenerator), seq(), true, (Option) None$.MODULE$, outputs, p)); return context.apply(logicalPlan -> DataFrameDropColumns$.MODULE$.apply(seq(field), logicalPlan)); } diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java index 7d1cc072b..b1254bf8f 100644 --- a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java @@ -18,6 +18,7 @@ import org.opensearch.sql.ast.expression.Alias; import org.opensearch.sql.ast.expression.And; import org.opensearch.sql.ast.expression.Argument; +import org.opensearch.sql.ast.expression.AttributeList; import org.opensearch.sql.ast.expression.DataType; import org.opensearch.sql.ast.expression.EqualTo; import org.opensearch.sql.ast.expression.Field; @@ -605,7 +606,8 @@ public UnresolvedPlan visitFillnullCommand(OpenSearchPPLParser.FillnullCommandCo @Override public UnresolvedPlan visitFlattenCommand(OpenSearchPPLParser.FlattenCommandContext ctx) { Field unresolvedExpression = (Field) internalVisitExpression(ctx.fieldExpression()); - return new Flatten(unresolvedExpression); + List alias = ctx.alias == null ? emptyList() : ((AttributeList) internalVisitExpression(ctx.alias)).getAttrList(); + return new Flatten(unresolvedExpression, alias); } /** AD command. */ diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/parser/AstExpressionBuilder.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/parser/AstExpressionBuilder.java index e683a1395..1fef4251e 100644 --- a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/parser/AstExpressionBuilder.java +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/parser/AstExpressionBuilder.java @@ -329,6 +329,11 @@ public UnresolvedExpression visitIdentsAsQualifiedName(OpenSearchPPLParser.Ident return visitIdentifiers(ctx.ident()); } + @Override + public UnresolvedExpression visitIdentsAsQualifiedNameSeq(OpenSearchPPLParser.IdentsAsQualifiedNameSeqContext ctx) { + return new AttributeList(ctx.qualifiedName().stream().map(this::visit).collect(Collectors.toList())); + } + @Override public UnresolvedExpression visitIdentsAsTableQualifiedName( OpenSearchPPLParser.IdentsAsTableQualifiedNameContext ctx) { diff --git a/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanFlattenCommandTranslatorTestSuite.scala b/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanFlattenCommandTranslatorTestSuite.scala index 58a6c04b3..543e5c05d 100644 --- a/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanFlattenCommandTranslatorTestSuite.scala +++ b/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanFlattenCommandTranslatorTestSuite.scala @@ -13,9 +13,9 @@ import org.scalatest.matchers.should.Matchers import org.apache.spark.SparkFunSuite import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedFunction, UnresolvedRelation, UnresolvedStar} -import org.apache.spark.sql.catalyst.expressions.{Alias, Descending, GeneratorOuter, Literal, NullsLast, RegExpExtract, SortOrder} +import org.apache.spark.sql.catalyst.expressions.{Alias, GeneratorOuter, Literal, RegExpExtract} import org.apache.spark.sql.catalyst.plans.PlanTest -import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, DataFrameDropColumns, Generate, GlobalLimit, LocalLimit, Project, Sort} +import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, DataFrameDropColumns, Generate, Project} import org.apache.spark.sql.types.IntegerType class PPLLogicalPlanFlattenCommandTranslatorTestSuite @@ -153,4 +153,45 @@ class PPLLogicalPlanFlattenCommandTranslatorTestSuite comparePlans(expectedPlan, logPlan, checkAnalysis = false) } + test("test flatten with one alias") { + val context = new CatalystPlanContext + val logPlan = + planTransformer.visit( + plan(pplParser, "source=relation | flatten field_with_array as col1"), + context) + + val relation = UnresolvedRelation(Seq("relation")) + val flattenGenerator = new FlattenGenerator(UnresolvedAttribute("field_with_array")) + val outerGenerator = GeneratorOuter(flattenGenerator) + val generate = + Generate(outerGenerator, seq(), true, None, Seq(UnresolvedAttribute("col1")), relation) + val dropSourceColumn = + DataFrameDropColumns(Seq(UnresolvedAttribute("field_with_array")), generate) + val expectedPlan = Project(seq(UnresolvedStar(None)), dropSourceColumn) + comparePlans(expectedPlan, logPlan, checkAnalysis = false) + } + + test("test flatten with alias list") { + val context = new CatalystPlanContext + val logPlan = + planTransformer.visit( + plan(pplParser, "source=relation | flatten field_with_array as (col1, col2)"), + context) + + val relation = UnresolvedRelation(Seq("relation")) + val flattenGenerator = new FlattenGenerator(UnresolvedAttribute("field_with_array")) + val outerGenerator = GeneratorOuter(flattenGenerator) + val generate = Generate( + outerGenerator, + seq(), + true, + None, + Seq(UnresolvedAttribute("col1"), UnresolvedAttribute("col2")), + relation) + val dropSourceColumn = + DataFrameDropColumns(Seq(UnresolvedAttribute("field_with_array")), generate) + val expectedPlan = Project(seq(UnresolvedStar(None)), dropSourceColumn) + comparePlans(expectedPlan, logPlan, checkAnalysis = false) + } + }