From 35ea4d10c767b2974ba9082474b3d25251f418d1 Mon Sep 17 00:00:00 2001 From: YANGDB Date: Thu, 31 Oct 2024 13:23:53 -0700 Subject: [PATCH 01/11] add expand command Signed-off-by: YANGDB --- .../src/main/antlr4/OpenSearchPPLLexer.g4 | 1 + .../src/main/antlr4/OpenSearchPPLParser.g4 | 5 +++ .../sql/ast/AbstractNodeVisitor.java | 4 ++ .../org/opensearch/sql/ast/tree/Expand.java | 44 +++++++++++++++++++ .../sql/ppl/CatalystQueryPlanVisitor.java | 6 +++ .../opensearch/sql/ppl/parser/AstBuilder.java | 5 +++ 6 files changed, 65 insertions(+) create mode 100644 ppl-spark-integration/src/main/java/org/opensearch/sql/ast/tree/Expand.java diff --git a/ppl-spark-integration/src/main/antlr4/OpenSearchPPLLexer.g4 b/ppl-spark-integration/src/main/antlr4/OpenSearchPPLLexer.g4 index bf6989b7c..457e90cac 100644 --- a/ppl-spark-integration/src/main/antlr4/OpenSearchPPLLexer.g4 +++ b/ppl-spark-integration/src/main/antlr4/OpenSearchPPLLexer.g4 @@ -37,6 +37,7 @@ KMEANS: 'KMEANS'; AD: 'AD'; ML: 'ML'; FILLNULL: 'FILLNULL'; +EXPAND: 'EXPAND'; //Native JOIN KEYWORDS JOIN: 'JOIN'; diff --git a/ppl-spark-integration/src/main/antlr4/OpenSearchPPLParser.g4 b/ppl-spark-integration/src/main/antlr4/OpenSearchPPLParser.g4 index aaf807a7b..a2cb4d096 100644 --- a/ppl-spark-integration/src/main/antlr4/OpenSearchPPLParser.g4 +++ b/ppl-spark-integration/src/main/antlr4/OpenSearchPPLParser.g4 @@ -53,6 +53,7 @@ commands | renameCommand | fillnullCommand | fieldsummaryCommand + | expandCommand ; commandName @@ -80,6 +81,7 @@ commandName | PATTERNS | LOOKUP | RENAME + | EXPAND | FILLNULL | FIELDSUMMARY ; @@ -246,6 +248,9 @@ fillnullCommand : expression ; +expandCommand + : EXPAND fieldExpression + ; kmeansCommand : KMEANS (kmeansParameter)* diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/ast/AbstractNodeVisitor.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/ast/AbstractNodeVisitor.java index 03c40fcd2..fa6318255 100644 --- a/ppl-spark-integration/src/main/java/org/opensearch/sql/ast/AbstractNodeVisitor.java +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/ast/AbstractNodeVisitor.java @@ -107,6 +107,10 @@ public T visitFilter(Filter node, C context) { return visitChildren(node, context); } + public T visitExpand(Expand node, C context) { + return visitChildren(node, context); + } + public T visitLookup(Lookup node, C context) { return visitChildren(node, context); } diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/ast/tree/Expand.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/ast/tree/Expand.java new file mode 100644 index 000000000..6fe7f77eb --- /dev/null +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/ast/tree/Expand.java @@ -0,0 +1,44 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.ast.tree; + +import com.google.common.collect.ImmutableList; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.ToString; +import org.opensearch.sql.ast.AbstractNodeVisitor; +import org.opensearch.sql.ast.expression.UnresolvedExpression; + +import java.util.List; + +/** Logical plan node of Expand */ +@ToString +@EqualsAndHashCode(callSuper = false) +@Getter +public class Expand extends UnresolvedPlan { + private UnresolvedExpression field; + private UnresolvedPlan child; + + public Expand(UnresolvedExpression field) { + this.field = field; + } + + @Override + public Expand attach(UnresolvedPlan child) { + this.child = child; + return this; + } + + @Override + public List getChild() { + return ImmutableList.of(child); + } + + @Override + public T accept(AbstractNodeVisitor nodeVisitor, C context) { + return nodeVisitor.visitExpand(this, context); + } +} diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java index 5d2fe986b..fcd14beb2 100644 --- a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java @@ -180,6 +180,12 @@ public LogicalPlan visitRelation(Relation node, CatalystPlanContext context) { return context.getPlan(); } + @Override + public LogicalPlan visitExpand(org.opensearch.sql.ast.tree.Expand node, CatalystPlanContext context) { + node.getChild().get(0).accept(this, context); + return super.visitExpand(node, context); + } + @Override public LogicalPlan visitFilter(Filter node, CatalystPlanContext context) { node.getChild().get(0).accept(this, context); diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java index c69e9541e..06bb90c88 100644 --- a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java @@ -131,6 +131,11 @@ public UnresolvedPlan visitWhereCommand(OpenSearchPPLParser.WhereCommandContext return new Filter(internalVisitExpression(ctx.logicalExpression())); } + @Override + public UnresolvedPlan visitExpandCommand(OpenSearchPPLParser.ExpandCommandContext ctx) { + return new Filter(internalVisitExpression(ctx.fieldExpression())); + } + @Override public UnresolvedPlan visitCorrelateCommand(OpenSearchPPLParser.CorrelateCommandContext ctx) { return new Correlation(ctx.correlationType().getText(), From a6e76ec2aef1de26c35899f5e0f66ca3dfe99c4e Mon Sep 17 00:00:00 2001 From: YANGDB Date: Thu, 31 Oct 2024 14:25:43 -0700 Subject: [PATCH 02/11] add expand command with visitor Signed-off-by: YANGDB --- .../ppl/FlintSparkPPLExpandITSuite.scala | 365 ++++++++++++++++++ .../org/opensearch/sql/ast/tree/Expand.java | 23 +- .../org/opensearch/sql/ast/tree/Flatten.java | 4 +- .../sql/ppl/CatalystQueryPlanVisitor.java | 65 +--- 4 files changed, 392 insertions(+), 65 deletions(-) create mode 100644 integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLExpandITSuite.scala diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLExpandITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLExpandITSuite.scala new file mode 100644 index 000000000..f0640f70c --- /dev/null +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLExpandITSuite.scala @@ -0,0 +1,365 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.flint.spark.ppl + +import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedFunction, UnresolvedRelation, UnresolvedStar} +import org.apache.spark.sql.catalyst.expressions.{Alias, EqualTo, GeneratorOuter, Literal, Or} +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.streaming.StreamTest +import org.apache.spark.sql.{QueryTest, Row} +import org.opensearch.flint.spark.FlattenGenerator +import org.opensearch.sql.ppl.utils.DataTypeTransformer.seq + +import java.nio.file.Files + +class FlintSparkPPLExpandITSuite + extends QueryTest + with LogicalPlanTestUtils + with FlintPPLSuite + with StreamTest { + + private val testTable = "flint_ppl_test" + private val structNestedTable = "spark_catalog.default.flint_ppl_struct_nested_test" + private val structTable = "spark_catalog.default.flint_ppl_struct_test" + private val multiValueTable = "spark_catalog.default.flint_ppl_multi_value_test" + private val tempFile = Files.createTempFile("jsonTestData", ".json") + + override def beforeAll(): Unit = { + super.beforeAll() + + // Create test table + createNestedJsonContentTable(tempFile, testTable) + createStructNestedTable(structNestedTable) + createStructTable(structTable) + createMultiValueStructTable(multiValueTable) + } + + protected override def afterEach(): Unit = { + super.afterEach() + // Stop all streaming jobs if any + spark.streams.active.foreach { job => + job.stop() + job.awaitTermination() + } + } + + override def afterAll(): Unit = { + super.afterAll() + Files.deleteIfExists(tempFile) + } + + /** + * 'Project [*] + * +- 'Generate 'explode('multi_value), false, as, ['exploded_multi_value] + * +- 'UnresolvedRelation [spark_catalog, default, flint_ppl_multi_value_test], [], false + */ + test("expand for structs") { + val frame = sql( + s""" SELECT * FROM $multiValueTable + LATERAL VIEW explode(multi_value) AS exploded_multi_value + """.stripMargin) + + val results: Array[Row] = frame.collect() + val logical = frame.queryExecution.logical + print(logical) + } + + test("flatten for structs") { + val frame = sql(s""" + | source = $testTable + | | where country = 'England' or country = 'Poland' + | | fields coor + | | flatten coor + | """.stripMargin) + + assert(frame.columns.sameElements(Array("alt", "lat", "long"))) + val results: Array[Row] = frame.collect() + val expectedResults: Array[Row] = + Array(Row(35, 51.5074, -0.1278), Row(null, null, null)) + // Compare the results + implicit val rowOrdering: Ordering[Row] = Ordering.by[Row, Double](_.getAs[Double](1)) + assert(results.sorted.sameElements(expectedResults.sorted)) + val logicalPlan: LogicalPlan = frame.queryExecution.logical + val table = UnresolvedRelation(Seq("flint_ppl_test")) + val filter = Filter( + Or( + EqualTo(UnresolvedAttribute("country"), Literal("England")), + EqualTo(UnresolvedAttribute("country"), Literal("Poland"))), + table) + val projectCoor = Project(Seq(UnresolvedAttribute("coor")), filter) + val flattenCoor = flattenPlanFor("coor", projectCoor) + val expectedPlan = Project(Seq(UnresolvedStar(None)), flattenCoor) + comparePlans(logicalPlan, expectedPlan, checkAnalysis = false) + } + + private def flattenPlanFor(flattenedColumn: String, parentPlan: LogicalPlan): LogicalPlan = { + val flattenGenerator = new FlattenGenerator(UnresolvedAttribute(flattenedColumn)) + val outerGenerator = GeneratorOuter(flattenGenerator) + val generate = Generate(outerGenerator, seq(), outer = true, None, seq(), parentPlan) + val dropSourceColumn = + DataFrameDropColumns(Seq(UnresolvedAttribute(flattenedColumn)), generate) + dropSourceColumn + } + + test("flatten for arrays") { + val frame = sql(s""" + | source = $testTable + | | fields bridges + | | flatten bridges + | """.stripMargin) + + assert(frame.columns.sameElements(Array("length", "name"))) + val results: Array[Row] = frame.collect() + val expectedResults: Array[Row] = + Array( + Row(null, null), + Row(11L, "Bridge of Sighs"), + Row(48L, "Rialto Bridge"), + Row(160L, "Pont Alexandre III"), + Row(232L, "Pont Neuf"), + Row(801L, "Tower Bridge"), + Row(928L, "London Bridge"), + Row(343L, "Legion Bridge"), + Row(516L, "Charles Bridge"), + Row(333L, "Liberty Bridge"), + Row(375L, "Chain Bridge")) + // Compare the results + implicit val rowOrdering: Ordering[Row] = Ordering.by[Row, Long](_.getAs[Long](0)) + assert(results.sorted.sameElements(expectedResults.sorted)) + val logicalPlan: LogicalPlan = frame.queryExecution.logical + val table = UnresolvedRelation(Seq("flint_ppl_test")) + val projectCoor = Project(Seq(UnresolvedAttribute("bridges")), table) + val flattenBridges = flattenPlanFor("bridges", projectCoor) + val expectedPlan = Project(Seq(UnresolvedStar(None)), flattenBridges) + comparePlans(logicalPlan, expectedPlan, checkAnalysis = false) + } + + test("flatten for structs and arrays") { + val frame = sql(s""" + | source = $testTable | flatten bridges | flatten coor + | """.stripMargin) + + assert( + frame.columns.sameElements( + Array("_time", "city", "country", "length", "name", "alt", "lat", "long"))) + val results: Array[Row] = frame.collect() + val expectedResults: Array[Row] = + Array( + Row("1990-09-13T12:00:00", "Warsaw", "Poland", null, null, null, null, null), + Row( + "2024-09-13T12:00:00", + "Venice", + "Italy", + 11L, + "Bridge of Sighs", + 2, + 45.4408, + 12.3155), + Row("2024-09-13T12:00:00", "Venice", "Italy", 48L, "Rialto Bridge", 2, 45.4408, 12.3155), + Row( + "2024-09-13T12:00:00", + "Paris", + "France", + 160L, + "Pont Alexandre III", + 35, + 48.8566, + 2.3522), + Row("2024-09-13T12:00:00", "Paris", "France", 232L, "Pont Neuf", 35, 48.8566, 2.3522), + Row( + "2024-09-13T12:00:00", + "London", + "England", + 801L, + "Tower Bridge", + 35, + 51.5074, + -0.1278), + Row( + "2024-09-13T12:00:00", + "London", + "England", + 928L, + "London Bridge", + 35, + 51.5074, + -0.1278), + Row( + "2024-09-13T12:00:00", + "Prague", + "Czech Republic", + 343L, + "Legion Bridge", + 200, + 50.0755, + 14.4378), + Row( + "2024-09-13T12:00:00", + "Prague", + "Czech Republic", + 516L, + "Charles Bridge", + 200, + 50.0755, + 14.4378), + Row( + "2024-09-13T12:00:00", + "Budapest", + "Hungary", + 333L, + "Liberty Bridge", + 96, + 47.4979, + 19.0402), + Row( + "2024-09-13T12:00:00", + "Budapest", + "Hungary", + 375L, + "Chain Bridge", + 96, + 47.4979, + 19.0402)) + // Compare the results + implicit val rowOrdering: Ordering[Row] = Ordering.by[Row, Long](_.getAs[Long](3)) + assert(results.sorted.sameElements(expectedResults.sorted)) + + val logicalPlan: LogicalPlan = frame.queryExecution.logical + val table = UnresolvedRelation(Seq("flint_ppl_test")) + val flattenBridges = flattenPlanFor("bridges", table) + val flattenCoor = flattenPlanFor("coor", flattenBridges) + val expectedPlan = Project(Seq(UnresolvedStar(None)), flattenCoor) + comparePlans(logicalPlan, expectedPlan, checkAnalysis = false) + } + + test("test flatten and stats") { + val frame = sql(s""" + | source = $testTable + | | fields country, bridges + | | flatten bridges + | | fields country, length + | | stats avg(length) as avg by country + | """.stripMargin) + + val results: Array[Row] = frame.collect() + val expectedResults: Array[Row] = + Array( + Row(null, "Poland"), + Row(196d, "France"), + Row(429.5, "Czech Republic"), + Row(864.5, "England"), + Row(29.5, "Italy"), + Row(354.0, "Hungary")) + // Compare the results + implicit val rowOrdering: Ordering[Row] = Ordering.by[Row, Double](_.getAs[Double](0)) + assert(results.sorted.sameElements(expectedResults.sorted)) + val logicalPlan: LogicalPlan = frame.queryExecution.logical + val table = UnresolvedRelation(Seq("flint_ppl_test")) + val projectCountryBridges = + Project(Seq(UnresolvedAttribute("country"), UnresolvedAttribute("bridges")), table) + val flattenBridges = flattenPlanFor("bridges", projectCountryBridges) + val projectCountryLength = + Project(Seq(UnresolvedAttribute("country"), UnresolvedAttribute("length")), flattenBridges) + val average = Alias( + UnresolvedFunction( + seq("AVG"), + seq(UnresolvedAttribute("length")), + isDistinct = false, + None, + ignoreNulls = false), + "avg")() + val country = Alias(UnresolvedAttribute("country"), "country")() + val grouping = Alias(UnresolvedAttribute("country"), "country")() + val aggregate = Aggregate(Seq(grouping), Seq(average, country), projectCountryLength) + val expectedPlan = Project(Seq(UnresolvedStar(None)), aggregate) + comparePlans(logicalPlan, expectedPlan, checkAnalysis = false) + } + + test("flatten struct table") { + val frame = sql(s""" + | source = $structTable + | | flatten struct_col + | | flatten field1 + | """.stripMargin) + + assert(frame.columns.sameElements(Array("int_col", "field2", "subfield"))) + val results: Array[Row] = frame.collect() + val expectedResults: Array[Row] = + Array(Row(30, 123, "value1"), Row(40, 456, "value2"), Row(50, 789, "value3")) + // Compare the results + implicit val rowOrdering: Ordering[Row] = Ordering.by[Row, Int](_.getAs[Int](0)) + assert(results.sorted.sameElements(expectedResults.sorted)) + + val logicalPlan: LogicalPlan = frame.queryExecution.logical + val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_struct_test")) + val flattenStructCol = flattenPlanFor("struct_col", table) + val flattenField1 = flattenPlanFor("field1", flattenStructCol) + val expectedPlan = Project(Seq(UnresolvedStar(None)), flattenField1) + comparePlans(logicalPlan, expectedPlan, checkAnalysis = false) + } + + test("flatten struct nested table") { + val frame = sql(s""" + | source = $structNestedTable + | | flatten struct_col + | | flatten field1 + | | flatten struct_col2 + | | flatten field1 + | """.stripMargin) + + assert( + frame.columns.sameElements(Array("int_col", "field2", "subfield", "field2", "subfield"))) + val results: Array[Row] = frame.collect() + val expectedResults: Array[Row] = + Array( + Row(30, 123, "value1", 23, "valueA"), + Row(40, 123, "value5", 33, "valueB"), + Row(30, 823, "value4", 83, "valueC"), + Row(40, 456, "value2", 46, "valueD"), + Row(50, 789, "value3", 89, "valueE")) + // Compare the results + implicit val rowOrdering: Ordering[Row] = Ordering.by[Row, Int](_.getAs[Int](0)) + assert(results.sorted.sameElements(expectedResults.sorted)) + + val logicalPlan: LogicalPlan = frame.queryExecution.logical + val table = + UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_struct_nested_test")) + val flattenStructCol = flattenPlanFor("struct_col", table) + val flattenField1 = flattenPlanFor("field1", flattenStructCol) + val flattenStructCol2 = flattenPlanFor("struct_col2", flattenField1) + val flattenField1Again = flattenPlanFor("field1", flattenStructCol2) + val expectedPlan = Project(Seq(UnresolvedStar(None)), flattenField1Again) + comparePlans(logicalPlan, expectedPlan, checkAnalysis = false) + } + + test("flatten multi value nullable") { + val frame = sql(s""" + | source = $multiValueTable + | | flatten multi_value + | """.stripMargin) + + assert(frame.columns.sameElements(Array("int_col", "name", "value"))) + val results: Array[Row] = frame.collect() + val expectedResults: Array[Row] = + Array( + Row(1, "1_one", 1), + Row(1, null, 11), + Row(1, "1_three", null), + Row(2, "2_Monday", 2), + Row(2, null, null), + Row(3, "3_third", 3), + Row(3, "3_4th", 4), + Row(4, null, null)) + // Compare the results + implicit val rowOrdering: Ordering[Row] = Ordering.by[Row, Int](_.getAs[Int](0)) + assert(results.sorted.sameElements(expectedResults.sorted)) + + val logicalPlan: LogicalPlan = frame.queryExecution.logical + val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_multi_value_test")) + val flattenMultiValue = flattenPlanFor("multi_value", table) + val expectedPlan = Project(Seq(UnresolvedStar(None)), flattenMultiValue) + comparePlans(logicalPlan, expectedPlan, checkAnalysis = false) + } +} diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/ast/tree/Expand.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/ast/tree/Expand.java index 6fe7f77eb..22eaaf6e6 100644 --- a/ppl-spark-integration/src/main/java/org/opensearch/sql/ast/tree/Expand.java +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/ast/tree/Expand.java @@ -5,27 +5,22 @@ package org.opensearch.sql.ast.tree; -import com.google.common.collect.ImmutableList; -import lombok.EqualsAndHashCode; import lombok.Getter; -import lombok.ToString; +import lombok.RequiredArgsConstructor; import org.opensearch.sql.ast.AbstractNodeVisitor; -import org.opensearch.sql.ast.expression.UnresolvedExpression; +import org.opensearch.sql.ast.Node; +import org.opensearch.sql.ast.expression.Field; import java.util.List; /** Logical plan node of Expand */ -@ToString -@EqualsAndHashCode(callSuper = false) -@Getter +@RequiredArgsConstructor public class Expand extends UnresolvedPlan { - private UnresolvedExpression field; private UnresolvedPlan child; - public Expand(UnresolvedExpression field) { - this.field = field; - } - + @Getter + private final Field field; + @Override public Expand attach(UnresolvedPlan child) { this.child = child; @@ -33,8 +28,8 @@ public Expand attach(UnresolvedPlan child) { } @Override - public List getChild() { - return ImmutableList.of(child); + public List getChild() { + return child == null ? List.of() : List.of(child); } @Override diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/ast/tree/Flatten.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/ast/tree/Flatten.java index e31fbb6e3..9c57d2adf 100644 --- a/ppl-spark-integration/src/main/java/org/opensearch/sql/ast/tree/Flatten.java +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/ast/tree/Flatten.java @@ -14,7 +14,7 @@ public class Flatten extends UnresolvedPlan { private UnresolvedPlan child; @Getter - private final Field fieldToBeFlattened; + private final Field field; @Override public UnresolvedPlan attach(UnresolvedPlan child) { @@ -26,7 +26,7 @@ public UnresolvedPlan attach(UnresolvedPlan child) { public List getChild() { return child == null ? List.of() : List.of(child); } - + @Override public T accept(AbstractNodeVisitor nodeVisitor, C context) { return nodeVisitor.visitFlatten(this, context); diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java index 6c572ff35..f2a0f2638 100644 --- a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java @@ -6,26 +6,15 @@ package org.opensearch.sql.ppl; import org.apache.spark.sql.catalyst.TableIdentifier; -import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute$; import org.apache.spark.sql.catalyst.analysis.UnresolvedFunction; import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation; import org.apache.spark.sql.catalyst.analysis.UnresolvedStar$; import org.apache.spark.sql.catalyst.expressions.Ascending$; -import org.apache.spark.sql.catalyst.expressions.CaseWhen; import org.apache.spark.sql.catalyst.expressions.Descending$; -import org.apache.spark.sql.catalyst.expressions.Exists$; +import org.apache.spark.sql.catalyst.expressions.Explode; import org.apache.spark.sql.catalyst.expressions.Expression; import org.apache.spark.sql.catalyst.expressions.GeneratorOuter; -import org.apache.spark.sql.catalyst.expressions.In$; -import org.apache.spark.sql.catalyst.expressions.GreaterThanOrEqual; -import org.apache.spark.sql.catalyst.expressions.InSubquery$; -import org.apache.spark.sql.catalyst.expressions.LessThanOrEqual; -import org.apache.spark.sql.catalyst.expressions.ListQuery$; -import org.apache.spark.sql.catalyst.expressions.MakeInterval$; import org.apache.spark.sql.catalyst.expressions.NamedExpression; -import org.apache.spark.sql.catalyst.expressions.Predicate; -import org.apache.spark.sql.catalyst.expressions.ScalarSubquery$; -import org.apache.spark.sql.catalyst.expressions.ScalaUDF; import org.apache.spark.sql.catalyst.expressions.SortDirection; import org.apache.spark.sql.catalyst.expressions.SortOrder; import org.apache.spark.sql.catalyst.plans.logical.*; @@ -36,35 +25,16 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap; import org.opensearch.flint.spark.FlattenGenerator; import org.opensearch.sql.ast.AbstractNodeVisitor; -import org.opensearch.sql.ast.expression.AggregateFunction; import org.opensearch.sql.ast.expression.Alias; -import org.opensearch.sql.ast.expression.AllFields; -import org.opensearch.sql.ast.expression.And; import org.opensearch.sql.ast.expression.Argument; -import org.opensearch.sql.ast.expression.Between; -import org.opensearch.sql.ast.expression.BinaryExpression; -import org.opensearch.sql.ast.expression.Case; -import org.opensearch.sql.ast.expression.Compare; import org.opensearch.sql.ast.expression.Field; -import org.opensearch.sql.ast.expression.FieldsMapping; import org.opensearch.sql.ast.expression.Function; import org.opensearch.sql.ast.expression.In; -import org.opensearch.sql.ast.expression.subquery.ExistsSubquery; -import org.opensearch.sql.ast.expression.subquery.InSubquery; -import org.opensearch.sql.ast.expression.Interval; -import org.opensearch.sql.ast.expression.IsEmpty; import org.opensearch.sql.ast.expression.Let; import org.opensearch.sql.ast.expression.Literal; -import org.opensearch.sql.ast.expression.Not; -import org.opensearch.sql.ast.expression.Or; import org.opensearch.sql.ast.expression.ParseMethod; -import org.opensearch.sql.ast.expression.QualifiedName; -import org.opensearch.sql.ast.expression.subquery.ScalarSubquery; -import org.opensearch.sql.ast.expression.Span; import org.opensearch.sql.ast.expression.UnresolvedExpression; -import org.opensearch.sql.ast.expression.When; import org.opensearch.sql.ast.expression.WindowFunction; -import org.opensearch.sql.ast.expression.Xor; import org.opensearch.sql.ast.statement.Explain; import org.opensearch.sql.ast.statement.Query; import org.opensearch.sql.ast.statement.Statement; @@ -90,20 +60,14 @@ import org.opensearch.sql.ast.tree.Sort; import org.opensearch.sql.ast.tree.SubqueryAlias; import org.opensearch.sql.ast.tree.TopAggregation; -import org.opensearch.sql.ast.tree.UnresolvedPlan; import org.opensearch.sql.ast.tree.Window; import org.opensearch.sql.common.antlr.SyntaxCheckException; -import org.opensearch.sql.expression.function.SerializableUdf; -import org.opensearch.sql.ppl.utils.AggregatorTransformer; -import org.opensearch.sql.ppl.utils.BuiltinFunctionTransformer; -import org.opensearch.sql.ppl.utils.ComparatorTransformer; import org.opensearch.sql.ppl.utils.FieldSummaryTransformer; import org.opensearch.sql.ppl.utils.ParseTransformer; import org.opensearch.sql.ppl.utils.SortUtils; import org.opensearch.sql.ppl.utils.WindowSpecTransformer; import scala.None$; import scala.Option; -import scala.Tuple2; import scala.collection.IterableLike; import scala.collection.Seq; @@ -111,13 +75,10 @@ import java.util.List; import java.util.Objects; import java.util.Optional; -import java.util.Stack; -import java.util.function.BiFunction; import java.util.stream.Collectors; import static java.util.Collections.emptyList; import static java.util.List.of; -import static org.opensearch.sql.expression.function.BuiltinFunctionName.EQUAL; import static org.opensearch.sql.ppl.CatalystPlanContext.findRelation; import static org.opensearch.sql.ppl.utils.DataTypeTransformer.seq; import static org.opensearch.sql.ppl.utils.DataTypeTransformer.translate; @@ -132,8 +93,6 @@ import static org.opensearch.sql.ppl.utils.LookupTransformer.buildOutputProjectList; import static org.opensearch.sql.ppl.utils.LookupTransformer.buildProjectListFromFields; import static org.opensearch.sql.ppl.utils.RelationUtils.getTableIdentifier; -import static org.opensearch.sql.ppl.utils.RelationUtils.resolveField; -import static org.opensearch.sql.ppl.utils.WindowSpecTransformer.window; import static scala.collection.JavaConverters.seqAsJavaList; /** @@ -184,12 +143,6 @@ public LogicalPlan visitRelation(Relation node, CatalystPlanContext context) { return context.getPlan(); } - @Override - public LogicalPlan visitExpand(org.opensearch.sql.ast.tree.Expand node, CatalystPlanContext context) { - node.getChild().get(0).accept(this, context); - return super.visitExpand(node, context); - } - @Override public LogicalPlan visitFilter(Filter node, CatalystPlanContext context) { node.getChild().get(0).accept(this, context); @@ -470,13 +423,27 @@ public LogicalPlan visitFlatten(Flatten flatten, CatalystPlanContext context) { // Create an UnresolvedStar for all-fields projection context.getNamedParseExpressions().push(UnresolvedStar$.MODULE$.apply(Option.>empty())); } - Expression field = visitExpression(flatten.getFieldToBeFlattened(), context); + Expression field = visitExpression(flatten.getField(), context); context.retainAllNamedParseExpressions(p -> (NamedExpression) p); FlattenGenerator flattenGenerator = new FlattenGenerator(field); context.apply(p -> new Generate(new GeneratorOuter(flattenGenerator), seq(), true, (Option) None$.MODULE$, seq(), p)); return context.apply(logicalPlan -> DataFrameDropColumns$.MODULE$.apply(seq(field), logicalPlan)); } + @Override + public LogicalPlan visitExpand(org.opensearch.sql.ast.tree.Expand node, CatalystPlanContext context) { + node.getChild().get(0).accept(this, context); + if (context.getNamedParseExpressions().isEmpty()) { + // Create an UnresolvedStar for all-fields projection + context.getNamedParseExpressions().push(UnresolvedStar$.MODULE$.apply(Option.>empty())); + } + Expression field = visitExpression(node.getField(), context); + context.retainAllNamedParseExpressions(p -> (NamedExpression) p); + Explode explodeGenerator = new Explode(field); + context.apply(p -> new Generate(new GeneratorOuter(explodeGenerator), seq(), true, (Option) None$.MODULE$, seq(), p)); + return context.apply(logicalPlan -> DataFrameDropColumns$.MODULE$.apply(seq(field), logicalPlan)); + } + private void visitFieldList(List fieldList, CatalystPlanContext context) { fieldList.forEach(field -> visitExpression(field, context)); } From f0335ccecdb4cf86a5d34e98dbf59af1482cabfe Mon Sep 17 00:00:00 2001 From: YANGDB Date: Fri, 1 Nov 2024 09:29:19 -0700 Subject: [PATCH 03/11] create unit / integration tests Signed-off-by: YANGDB --- docs/ppl-lang/ppl-expand-command.md | 89 ++++++++++ .../ppl/FlintSparkPPLExpandITSuite.scala | 83 ++++------ .../FlintSparkPPLFieldSummaryITSuite.scala | 12 +- .../opensearch/sql/ppl/parser/AstBuilder.java | 2 +- ...PlanExpandCommandTranslatorTestSuite.scala | 155 ++++++++++++++++++ 5 files changed, 281 insertions(+), 60 deletions(-) create mode 100644 docs/ppl-lang/ppl-expand-command.md create mode 100644 ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanExpandCommandTranslatorTestSuite.scala diff --git a/docs/ppl-lang/ppl-expand-command.md b/docs/ppl-lang/ppl-expand-command.md new file mode 100644 index 000000000..67c871e75 --- /dev/null +++ b/docs/ppl-lang/ppl-expand-command.md @@ -0,0 +1,89 @@ +## PPL `expand` command + +### Description +Using `expand` command to flatten a field of type: +- `Array` +- `Map` + + +### Syntax +`flatten ` + +* field: to be expanded (exploded). The field must be of supported type. + +### Test table +#### Schema +| col\_name | data\_type | +|-----------|----------------------------------------------| +| \_time | string | +| bridges | array\\> | +| city | string | +| country | string | +#### Data +| \_time | bridges | city | country | +|---------------------|----------------------------------------------|---------|----------------| +| 2024-09-13T12:00:00 | [{801, Tower Bridge}, {928, London Bridge}] | London | England | +| 2024-09-13T12:00:00 | [{232, Pont Neuf}, {160, Pont Alexandre III}]| Paris | France | +| 2024-09-13T12:00:00 | [{48, Rialto Bridge}, {11, Bridge of Sighs}] | Venice | Italy | +| 2024-09-13T12:00:00 | [{516, Charles Bridge}, {343, Legion Bridge}]| Prague | Czech Republic | +| 2024-09-13T12:00:00 | [{375, Chain Bridge}, {333, Liberty Bridge}] | Budapest| Hungary | +| 1990-09-13T12:00:00 | NULL | Warsaw | Poland | + + + +### Example 1: expand struct +This example shows how to expand an array of struct field. +PPL query: + - `source=table | flatten bridges` + +| \_time | bridges | city | country | alt | lat | long | +|---------------------|----------------------------------------------|---------|---------------|-----|--------|--------| +| 2024-09-13T12:00:00 | [{801, Tower Bridge}, {928, London Bridge}] | London | England | 35 | 51.5074| -0.1278| +| 2024-09-13T12:00:00 | [{232, Pont Neuf}, {160, Pont Alexandre III}]| Paris | France | 35 | 48.8566| 2.3522 | +| 2024-09-13T12:00:00 | [{48, Rialto Bridge}, {11, Bridge of Sighs}] | Venice | Italy | 2 | 45.4408| 12.3155| +| 2024-09-13T12:00:00 | [{516, Charles Bridge}, {343, Legion Bridge}]| Prague | Czech Republic| 200 | 50.0755| 14.4378| +| 2024-09-13T12:00:00 | [{375, Chain Bridge}, {333, Liberty Bridge}] | Budapest| Hungary | 96 | 47.4979| 19.0402| +| 1990-09-13T12:00:00 | NULL | Warsaw | Poland | NULL| NULL | NULL | + + + +### Example 2: flatten array + +The example shows how to flatten an array of struct fields. + +PPL query: + - `source=table | flatten bridges` + +| \_time | city | coor | country | length | name | +|---------------------|---------|------------------------|---------------|--------|-------------------| +| 2024-09-13T12:00:00 | London | {35, 51.5074, -0.1278} | England | 801 | Tower Bridge | +| 2024-09-13T12:00:00 | London | {35, 51.5074, -0.1278} | England | 928 | London Bridge | +| 2024-09-13T12:00:00 | Paris | {35, 48.8566, 2.3522} | France | 232 | Pont Neuf | +| 2024-09-13T12:00:00 | Paris | {35, 48.8566, 2.3522} | France | 160 | Pont Alexandre III| +| 2024-09-13T12:00:00 | Venice | {2, 45.4408, 12.3155} | Italy | 48 | Rialto Bridge | +| 2024-09-13T12:00:00 | Venice | {2, 45.4408, 12.3155} | Italy | 11 | Bridge of Sighs | +| 2024-09-13T12:00:00 | Prague | {200, 50.0755, 14.4378}| Czech Republic| 516 | Charles Bridge | +| 2024-09-13T12:00:00 | Prague | {200, 50.0755, 14.4378}| Czech Republic| 343 | Legion Bridge | +| 2024-09-13T12:00:00 | Budapest| {96, 47.4979, 19.0402} | Hungary | 375 | Chain Bridge | +| 2024-09-13T12:00:00 | Budapest| {96, 47.4979, 19.0402} | Hungary | 333 | Liberty Bridge | +| 1990-09-13T12:00:00 | Warsaw | NULL | Poland | NULL | NULL | + + +### Example 3: flatten array and struct +This example shows how to flatten multiple fields. +PPL query: + - `source=table | flatten bridges | flatten coor` + +| \_time | city | country | length | name | alt | lat | long | +|---------------------|---------|---------------|--------|-------------------|------|--------|--------| +| 2024-09-13T12:00:00 | London | England | 801 | Tower Bridge | 35 | 51.5074| -0.1278| +| 2024-09-13T12:00:00 | London | England | 928 | London Bridge | 35 | 51.5074| -0.1278| +| 2024-09-13T12:00:00 | Paris | France | 232 | Pont Neuf | 35 | 48.8566| 2.3522 | +| 2024-09-13T12:00:00 | Paris | France | 160 | Pont Alexandre III| 35 | 48.8566| 2.3522 | +| 2024-09-13T12:00:00 | Venice | Italy | 48 | Rialto Bridge | 2 | 45.4408| 12.3155| +| 2024-09-13T12:00:00 | Venice | Italy | 11 | Bridge of Sighs | 2 | 45.4408| 12.3155| +| 2024-09-13T12:00:00 | Prague | Czech Republic| 516 | Charles Bridge | 200 | 50.0755| 14.4378| +| 2024-09-13T12:00:00 | Prague | Czech Republic| 343 | Legion Bridge | 200 | 50.0755| 14.4378| +| 2024-09-13T12:00:00 | Budapest| Hungary | 375 | Chain Bridge | 96 | 47.4979| 19.0402| +| 2024-09-13T12:00:00 | Budapest| Hungary | 333 | Liberty Bridge | 96 | 47.4979| 19.0402| +| 1990-09-13T12:00:00 | Warsaw | Poland | NULL | NULL | NULL | NULL | NULL | \ No newline at end of file diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLExpandITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLExpandITSuite.scala index f0640f70c..65b7b424e 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLExpandITSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLExpandITSuite.scala @@ -4,18 +4,16 @@ */ package org.opensearch.flint.spark.ppl +import java.nio.file.Files +import org.opensearch.sql.ppl.utils.DataTypeTransformer.seq +import org.apache.spark.sql.{QueryTest, Row} import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedFunction, UnresolvedRelation, UnresolvedStar} -import org.apache.spark.sql.catalyst.expressions.{Alias, EqualTo, GeneratorOuter, Literal, Or} +import org.apache.spark.sql.catalyst.expressions.{Alias, EqualTo, Explode, GeneratorOuter, Literal, Or} import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.streaming.StreamTest -import org.apache.spark.sql.{QueryTest, Row} -import org.opensearch.flint.spark.FlattenGenerator -import org.opensearch.sql.ppl.utils.DataTypeTransformer.seq - -import java.nio.file.Files class FlintSparkPPLExpandITSuite - extends QueryTest + extends QueryTest with LogicalPlanTestUtils with FlintPPLSuite with StreamTest { @@ -50,37 +48,29 @@ class FlintSparkPPLExpandITSuite Files.deleteIfExists(tempFile) } - /** - * 'Project [*] - * +- 'Generate 'explode('multi_value), false, as, ['exploded_multi_value] - * +- 'UnresolvedRelation [spark_catalog, default, flint_ppl_multi_value_test], [], false - */ - test("expand for structs") { - val frame = sql( - s""" SELECT * FROM $multiValueTable - LATERAL VIEW explode(multi_value) AS exploded_multi_value - """.stripMargin) - - val results: Array[Row] = frame.collect() - val logical = frame.queryExecution.logical - print(logical) + private def generator(flattenedColumn: String, parentPlan: LogicalPlan): LogicalPlan = { + val outerGenerator = GeneratorOuter(Explode(UnresolvedAttribute(flattenedColumn))) + val generate = Generate(outerGenerator, seq(), outer = true, None, seq(), parentPlan) + val dropSourceColumn = + DataFrameDropColumns(Seq(UnresolvedAttribute(flattenedColumn)), generate) + dropSourceColumn } - test("flatten for structs") { + test("expand for array of structs") { val frame = sql(s""" | source = $testTable | | where country = 'England' or country = 'Poland' - | | fields coor - | | flatten coor + | | expand bridges + | | fields city, country, col | """.stripMargin) - assert(frame.columns.sameElements(Array("alt", "lat", "long"))) val results: Array[Row] = frame.collect() val expectedResults: Array[Row] = - Array(Row(35, 51.5074, -0.1278), Row(null, null, null)) + Array(Row( "London", "England", Seq(801, "Tower Bridge")), + Row( "London", "England", Seq(928, "London Bridge")), + Row("Warsaw", "Poland", null)) // Compare the results - implicit val rowOrdering: Ordering[Row] = Ordering.by[Row, Double](_.getAs[Double](1)) - assert(results.sorted.sameElements(expectedResults.sorted)) + assert(results.toSet == expectedResults.toSet) val logicalPlan: LogicalPlan = frame.queryExecution.logical val table = UnresolvedRelation(Seq("flint_ppl_test")) val filter = Filter( @@ -88,21 +78,12 @@ class FlintSparkPPLExpandITSuite EqualTo(UnresolvedAttribute("country"), Literal("England")), EqualTo(UnresolvedAttribute("country"), Literal("Poland"))), table) - val projectCoor = Project(Seq(UnresolvedAttribute("coor")), filter) - val flattenCoor = flattenPlanFor("coor", projectCoor) - val expectedPlan = Project(Seq(UnresolvedStar(None)), flattenCoor) + val project = Project(Seq(UnresolvedAttribute("city"), UnresolvedAttribute("country"), UnresolvedAttribute("col")), filter) + val generatorExp = generator("bridges", project) + val expectedPlan = Project(Seq(UnresolvedStar(None)), generatorExp) comparePlans(logicalPlan, expectedPlan, checkAnalysis = false) } - private def flattenPlanFor(flattenedColumn: String, parentPlan: LogicalPlan): LogicalPlan = { - val flattenGenerator = new FlattenGenerator(UnresolvedAttribute(flattenedColumn)) - val outerGenerator = GeneratorOuter(flattenGenerator) - val generate = Generate(outerGenerator, seq(), outer = true, None, seq(), parentPlan) - val dropSourceColumn = - DataFrameDropColumns(Seq(UnresolvedAttribute(flattenedColumn)), generate) - dropSourceColumn - } - test("flatten for arrays") { val frame = sql(s""" | source = $testTable @@ -131,7 +112,7 @@ class FlintSparkPPLExpandITSuite val logicalPlan: LogicalPlan = frame.queryExecution.logical val table = UnresolvedRelation(Seq("flint_ppl_test")) val projectCoor = Project(Seq(UnresolvedAttribute("bridges")), table) - val flattenBridges = flattenPlanFor("bridges", projectCoor) + val flattenBridges = generator("bridges", projectCoor) val expectedPlan = Project(Seq(UnresolvedStar(None)), flattenBridges) comparePlans(logicalPlan, expectedPlan, checkAnalysis = false) } @@ -228,8 +209,8 @@ class FlintSparkPPLExpandITSuite val logicalPlan: LogicalPlan = frame.queryExecution.logical val table = UnresolvedRelation(Seq("flint_ppl_test")) - val flattenBridges = flattenPlanFor("bridges", table) - val flattenCoor = flattenPlanFor("coor", flattenBridges) + val flattenBridges = generator("bridges", table) + val flattenCoor = generator("coor", flattenBridges) val expectedPlan = Project(Seq(UnresolvedStar(None)), flattenCoor) comparePlans(logicalPlan, expectedPlan, checkAnalysis = false) } @@ -259,7 +240,7 @@ class FlintSparkPPLExpandITSuite val table = UnresolvedRelation(Seq("flint_ppl_test")) val projectCountryBridges = Project(Seq(UnresolvedAttribute("country"), UnresolvedAttribute("bridges")), table) - val flattenBridges = flattenPlanFor("bridges", projectCountryBridges) + val flattenBridges = generator("bridges", projectCountryBridges) val projectCountryLength = Project(Seq(UnresolvedAttribute("country"), UnresolvedAttribute("length")), flattenBridges) val average = Alias( @@ -294,8 +275,8 @@ class FlintSparkPPLExpandITSuite val logicalPlan: LogicalPlan = frame.queryExecution.logical val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_struct_test")) - val flattenStructCol = flattenPlanFor("struct_col", table) - val flattenField1 = flattenPlanFor("field1", flattenStructCol) + val flattenStructCol = generator("struct_col", table) + val flattenField1 = generator("field1", flattenStructCol) val expectedPlan = Project(Seq(UnresolvedStar(None)), flattenField1) comparePlans(logicalPlan, expectedPlan, checkAnalysis = false) } @@ -326,10 +307,10 @@ class FlintSparkPPLExpandITSuite val logicalPlan: LogicalPlan = frame.queryExecution.logical val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_struct_nested_test")) - val flattenStructCol = flattenPlanFor("struct_col", table) - val flattenField1 = flattenPlanFor("field1", flattenStructCol) - val flattenStructCol2 = flattenPlanFor("struct_col2", flattenField1) - val flattenField1Again = flattenPlanFor("field1", flattenStructCol2) + val flattenStructCol = generator("struct_col", table) + val flattenField1 = generator("field1", flattenStructCol) + val flattenStructCol2 = generator("struct_col2", flattenField1) + val flattenField1Again = generator("field1", flattenStructCol2) val expectedPlan = Project(Seq(UnresolvedStar(None)), flattenField1Again) comparePlans(logicalPlan, expectedPlan, checkAnalysis = false) } @@ -358,7 +339,7 @@ class FlintSparkPPLExpandITSuite val logicalPlan: LogicalPlan = frame.queryExecution.logical val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_multi_value_test")) - val flattenMultiValue = flattenPlanFor("multi_value", table) + val flattenMultiValue = generator("multi_value", table) val expectedPlan = Project(Seq(UnresolvedStar(None)), flattenMultiValue) comparePlans(logicalPlan, expectedPlan, checkAnalysis = false) } diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLFieldSummaryITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLFieldSummaryITSuite.scala index 5a5990001..3f24eeda0 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLFieldSummaryITSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLFieldSummaryITSuite.scala @@ -183,8 +183,7 @@ class FlintSparkPPLFieldSummaryITSuite comparePlans(expectedPlan, logicalPlan, false) } - test( - "test fieldsummary with single field includefields(status_code) & nulls=true with a where filter ") { + test("test fieldsummary with single field includefields(status_code) & nulls=true with a where filter ") { val frame = sql(s""" | source = $testTable | where status_code != 200 | fieldsummary includefields= status_code nulls=true | """.stripMargin) @@ -269,8 +268,7 @@ class FlintSparkPPLFieldSummaryITSuite comparePlans(expectedPlan, logicalPlan, false) } - test( - "test fieldsummary with single field includefields(status_code) & nulls=false with a where filter ") { + test("test fieldsummary with single field includefields(status_code) & nulls=false with a where filter ") { val frame = sql(s""" | source = $testTable | where status_code != 200 | fieldsummary includefields= status_code nulls=false | """.stripMargin) @@ -334,8 +332,7 @@ class FlintSparkPPLFieldSummaryITSuite comparePlans(expectedPlan, logicalPlan, false) } - test( - "test fieldsummary with single field includefields(id, status_code, request_path) & nulls=true") { + test("test fieldsummary with single field includefields(id, status_code, request_path) & nulls=true") { val frame = sql(s""" | source = $testTable | fieldsummary includefields= id, status_code, request_path nulls=true | """.stripMargin) @@ -568,8 +565,7 @@ class FlintSparkPPLFieldSummaryITSuite comparePlans(expectedPlan, logicalPlan, false) } - test( - "test fieldsummary with single field includefields(id, status_code, request_path) & nulls=false") { + test("test fieldsummary with single field includefields(id, status_code, request_path) & nulls=false") { val frame = sql(s""" | source = $testTable | fieldsummary includefields= id, status_code, request_path nulls=false | """.stripMargin) diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java index 044664132..f64645496 100644 --- a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java @@ -133,7 +133,7 @@ public UnresolvedPlan visitWhereCommand(OpenSearchPPLParser.WhereCommandContext @Override public UnresolvedPlan visitExpandCommand(OpenSearchPPLParser.ExpandCommandContext ctx) { - return new Filter(internalVisitExpression(ctx.fieldExpression())); + return new Expand((Field) internalVisitExpression(ctx.fieldExpression())); } @Override diff --git a/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanExpandCommandTranslatorTestSuite.scala b/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanExpandCommandTranslatorTestSuite.scala new file mode 100644 index 000000000..076994936 --- /dev/null +++ b/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanExpandCommandTranslatorTestSuite.scala @@ -0,0 +1,155 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.spark.ppl + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedFunction, UnresolvedRelation, UnresolvedStar} +import org.apache.spark.sql.catalyst.expressions.{Alias, Explode, GeneratorOuter, Literal, RegExpExtract} +import org.apache.spark.sql.catalyst.plans.PlanTest +import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, DataFrameDropColumns, Generate, Project} +import org.apache.spark.sql.types.IntegerType +import org.opensearch.flint.spark.FlattenGenerator +import org.opensearch.flint.spark.ppl.PlaneUtils.plan +import org.opensearch.sql.ppl.utils.DataTypeTransformer.seq +import org.opensearch.sql.ppl.{CatalystPlanContext, CatalystQueryPlanVisitor} +import org.scalatest.matchers.should.Matchers + +class PPLLogicalPlanExpandCommandTranslatorTestSuite + extends SparkFunSuite + with PlanTest + with LogicalPlanTestUtils + with Matchers { + + private val planTransformer = new CatalystQueryPlanVisitor() + private val pplParser = new PPLSyntaxParser() + + test("test expand only field") { + val context = new CatalystPlanContext + val logPlan = + planTransformer.visit( + plan(pplParser, "source=relation | expand field_with_array"), + context) + + val relation = UnresolvedRelation(Seq("relation")) + val generator = Explode(UnresolvedAttribute("field_with_array")) + val outerGenerator = GeneratorOuter(generator) + val generate = Generate(outerGenerator, seq(), true, None, seq(), relation) + val dropSourceColumn = + DataFrameDropColumns(Seq(UnresolvedAttribute("field_with_array")), generate) + val expectedPlan = Project(seq(UnresolvedStar(None)), dropSourceColumn) + comparePlans(expectedPlan, logPlan, checkAnalysis = false) + } + + test("test expand and stats") { + val context = new CatalystPlanContext + val query = + "source = relation | fields state, company, employee | expand employee | fields state, company, salary | stats max(salary) as max by state, company" + val logPlan = + planTransformer.visit(plan(pplParser, query), context) + val table = UnresolvedRelation(Seq("relation")) + val projectStateCompanyEmployee = + Project( + Seq( + UnresolvedAttribute("state"), + UnresolvedAttribute("company"), + UnresolvedAttribute("employee")), + table) + val generate = Generate( + GeneratorOuter(Explode(UnresolvedAttribute("employee"))), + seq(), + true, + None, + seq(), + projectStateCompanyEmployee) + val dropSourceColumn = DataFrameDropColumns(Seq(UnresolvedAttribute("employee")), generate) + val projectStateCompanySalary = Project( + Seq( + UnresolvedAttribute("state"), + UnresolvedAttribute("company"), + UnresolvedAttribute("salary")), + dropSourceColumn) + val average = Alias( + UnresolvedFunction(seq("MAX"), seq(UnresolvedAttribute("salary")), false, None, false), + "max")() + val state = Alias(UnresolvedAttribute("state"), "state")() + val company = Alias(UnresolvedAttribute("company"), "company")() + val groupingState = Alias(UnresolvedAttribute("state"), "state")() + val groupingCompany = Alias(UnresolvedAttribute("company"), "company")() + val aggregate = Aggregate( + Seq(groupingState, groupingCompany), + Seq(average, state, company), + projectStateCompanySalary) + val expectedPlan = Project(Seq(UnresolvedStar(None)), aggregate) + + comparePlans(expectedPlan, logPlan, checkAnalysis = false) + } + + test("test expand and eval") { + val context = new CatalystPlanContext + val query = "source = relation | expand employee | eval bonus = salary * 3" + val logPlan = planTransformer.visit(plan(pplParser, query), context) + val table = UnresolvedRelation(Seq("relation")) + val generate = Generate( + GeneratorOuter(Explode(UnresolvedAttribute("employee"))), + seq(), + true, + None, + seq(), + table) + val dropSourceColumn = DataFrameDropColumns(Seq(UnresolvedAttribute("employee")), generate) + val bonusProject = Project( + Seq( + UnresolvedStar(None), + Alias( + UnresolvedFunction( + "*", + Seq(UnresolvedAttribute("salary"), Literal(3, IntegerType)), + isDistinct = false), + "bonus")()), + dropSourceColumn) + val expectedPlan = Project(Seq(UnresolvedStar(None)), bonusProject) + comparePlans(expectedPlan, logPlan, checkAnalysis = false) + } + + test("test expand and parse and flatten") { + val context = new CatalystPlanContext + val logPlan = + planTransformer.visit( + plan( + pplParser, + "source=relation | expand employee | parse description '(?.+@.+)' | flatten roles"), + context) + val table = UnresolvedRelation(Seq("relation")) + val generateEmployee = Generate( + GeneratorOuter(Explode(UnresolvedAttribute("employee"))), + seq(), + true, + None, + seq(), + table) + val dropSourceColumnEmployee = + DataFrameDropColumns(Seq(UnresolvedAttribute("employee")), generateEmployee) + val emailAlias = + Alias( + RegExpExtract(UnresolvedAttribute("description"), Literal("(?.+@.+)"), Literal(1)), + "email")() + val parseProject = Project( + Seq(UnresolvedAttribute("description"), emailAlias, UnresolvedStar(None)), + dropSourceColumnEmployee) + val generateRoles = Generate( + GeneratorOuter(new FlattenGenerator(UnresolvedAttribute("roles"))), + seq(), + true, + None, + seq(), + parseProject) + val dropSourceColumnRoles = + DataFrameDropColumns(Seq(UnresolvedAttribute("roles")), generateRoles) + val expectedPlan = Project(Seq(UnresolvedStar(None)), dropSourceColumnRoles) + comparePlans(expectedPlan, logPlan, checkAnalysis = false) + } + +} From d8a003f57803d819169a70e26f5fe4fcffbe0c66 Mon Sep 17 00:00:00 2001 From: YANGDB Date: Mon, 4 Nov 2024 09:08:14 -0800 Subject: [PATCH 04/11] update expand tests Signed-off-by: YANGDB --- .../ppl/FlintSparkPPLExpandITSuite.scala | 18 ++++++++++++----- .../FlintSparkPPLFieldSummaryITSuite.scala | 12 +++++++---- .../sql/ppl/CatalystQueryPlanVisitor.java | 3 +-- ...PlanExpandCommandTranslatorTestSuite.scala | 20 +++++++++---------- 4 files changed, 31 insertions(+), 22 deletions(-) diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLExpandITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLExpandITSuite.scala index 65b7b424e..60cb8e7f9 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLExpandITSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLExpandITSuite.scala @@ -5,7 +5,9 @@ package org.opensearch.flint.spark.ppl import java.nio.file.Files + import org.opensearch.sql.ppl.utils.DataTypeTransformer.seq + import org.apache.spark.sql.{QueryTest, Row} import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedFunction, UnresolvedRelation, UnresolvedStar} import org.apache.spark.sql.catalyst.expressions.{Alias, EqualTo, Explode, GeneratorOuter, Literal, Or} @@ -13,7 +15,7 @@ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.streaming.StreamTest class FlintSparkPPLExpandITSuite - extends QueryTest + extends QueryTest with LogicalPlanTestUtils with FlintPPLSuite with StreamTest { @@ -61,13 +63,14 @@ class FlintSparkPPLExpandITSuite | source = $testTable | | where country = 'England' or country = 'Poland' | | expand bridges - | | fields city, country, col + | | fields city, country, bridges | """.stripMargin) val results: Array[Row] = frame.collect() val expectedResults: Array[Row] = - Array(Row( "London", "England", Seq(801, "Tower Bridge")), - Row( "London", "England", Seq(928, "London Bridge")), + Array( + Row("London", "England", Seq(801, "Tower Bridge")), + Row("London", "England", Seq(928, "London Bridge")), Row("Warsaw", "Poland", null)) // Compare the results assert(results.toSet == expectedResults.toSet) @@ -78,7 +81,12 @@ class FlintSparkPPLExpandITSuite EqualTo(UnresolvedAttribute("country"), Literal("England")), EqualTo(UnresolvedAttribute("country"), Literal("Poland"))), table) - val project = Project(Seq(UnresolvedAttribute("city"), UnresolvedAttribute("country"), UnresolvedAttribute("col")), filter) + val project = Project( + Seq( + UnresolvedAttribute("city"), + UnresolvedAttribute("country"), + UnresolvedAttribute("bridges")), + filter) val generatorExp = generator("bridges", project) val expectedPlan = Project(Seq(UnresolvedStar(None)), generatorExp) comparePlans(logicalPlan, expectedPlan, checkAnalysis = false) diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLFieldSummaryITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLFieldSummaryITSuite.scala index 3f24eeda0..5a5990001 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLFieldSummaryITSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLFieldSummaryITSuite.scala @@ -183,7 +183,8 @@ class FlintSparkPPLFieldSummaryITSuite comparePlans(expectedPlan, logicalPlan, false) } - test("test fieldsummary with single field includefields(status_code) & nulls=true with a where filter ") { + test( + "test fieldsummary with single field includefields(status_code) & nulls=true with a where filter ") { val frame = sql(s""" | source = $testTable | where status_code != 200 | fieldsummary includefields= status_code nulls=true | """.stripMargin) @@ -268,7 +269,8 @@ class FlintSparkPPLFieldSummaryITSuite comparePlans(expectedPlan, logicalPlan, false) } - test("test fieldsummary with single field includefields(status_code) & nulls=false with a where filter ") { + test( + "test fieldsummary with single field includefields(status_code) & nulls=false with a where filter ") { val frame = sql(s""" | source = $testTable | where status_code != 200 | fieldsummary includefields= status_code nulls=false | """.stripMargin) @@ -332,7 +334,8 @@ class FlintSparkPPLFieldSummaryITSuite comparePlans(expectedPlan, logicalPlan, false) } - test("test fieldsummary with single field includefields(id, status_code, request_path) & nulls=true") { + test( + "test fieldsummary with single field includefields(id, status_code, request_path) & nulls=true") { val frame = sql(s""" | source = $testTable | fieldsummary includefields= id, status_code, request_path nulls=true | """.stripMargin) @@ -565,7 +568,8 @@ class FlintSparkPPLFieldSummaryITSuite comparePlans(expectedPlan, logicalPlan, false) } - test("test fieldsummary with single field includefields(id, status_code, request_path) & nulls=false") { + test( + "test fieldsummary with single field includefields(id, status_code, request_path) & nulls=false") { val frame = sql(s""" | source = $testTable | fieldsummary includefields= id, status_code, request_path nulls=false | """.stripMargin) diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java index db9bf8688..0532c77b2 100644 --- a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java @@ -478,8 +478,7 @@ public LogicalPlan visitExpand(org.opensearch.sql.ast.tree.Expand node, Catalyst Expression field = visitExpression(node.getField(), context); context.retainAllNamedParseExpressions(p -> (NamedExpression) p); Explode explodeGenerator = new Explode(field); - context.apply(p -> new Generate(new GeneratorOuter(explodeGenerator), seq(), true, (Option) None$.MODULE$, seq(), p)); - return context.apply(logicalPlan -> DataFrameDropColumns$.MODULE$.apply(seq(field), logicalPlan)); + return context.apply(p -> new Generate(new GeneratorOuter(explodeGenerator), seq(), true, (Option) None$.MODULE$, seq(), p)); } private void visitFieldList(List fieldList, CatalystPlanContext context) { diff --git a/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanExpandCommandTranslatorTestSuite.scala b/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanExpandCommandTranslatorTestSuite.scala index 076994936..4653dc054 100644 --- a/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanExpandCommandTranslatorTestSuite.scala +++ b/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanExpandCommandTranslatorTestSuite.scala @@ -5,17 +5,18 @@ package org.opensearch.flint.spark.ppl +import org.opensearch.flint.spark.FlattenGenerator +import org.opensearch.flint.spark.ppl.PlaneUtils.plan +import org.opensearch.sql.ppl.{CatalystPlanContext, CatalystQueryPlanVisitor} +import org.opensearch.sql.ppl.utils.DataTypeTransformer.seq +import org.scalatest.matchers.should.Matchers + import org.apache.spark.SparkFunSuite import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedFunction, UnresolvedRelation, UnresolvedStar} import org.apache.spark.sql.catalyst.expressions.{Alias, Explode, GeneratorOuter, Literal, RegExpExtract} import org.apache.spark.sql.catalyst.plans.PlanTest import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, DataFrameDropColumns, Generate, Project} import org.apache.spark.sql.types.IntegerType -import org.opensearch.flint.spark.FlattenGenerator -import org.opensearch.flint.spark.ppl.PlaneUtils.plan -import org.opensearch.sql.ppl.utils.DataTypeTransformer.seq -import org.opensearch.sql.ppl.{CatalystPlanContext, CatalystQueryPlanVisitor} -import org.scalatest.matchers.should.Matchers class PPLLogicalPlanExpandCommandTranslatorTestSuite extends SparkFunSuite @@ -29,9 +30,7 @@ class PPLLogicalPlanExpandCommandTranslatorTestSuite test("test expand only field") { val context = new CatalystPlanContext val logPlan = - planTransformer.visit( - plan(pplParser, "source=relation | expand field_with_array"), - context) + planTransformer.visit(plan(pplParser, "source=relation | expand field_with_array"), context) val relation = UnresolvedRelation(Seq("relation")) val generator = Explode(UnresolvedAttribute("field_with_array")) @@ -64,13 +63,12 @@ class PPLLogicalPlanExpandCommandTranslatorTestSuite None, seq(), projectStateCompanyEmployee) - val dropSourceColumn = DataFrameDropColumns(Seq(UnresolvedAttribute("employee")), generate) val projectStateCompanySalary = Project( Seq( UnresolvedAttribute("state"), UnresolvedAttribute("company"), - UnresolvedAttribute("salary")), - dropSourceColumn) + UnresolvedAttribute("salary"), + UnresolvedAttribute("employee"))) val average = Alias( UnresolvedFunction(seq("MAX"), seq(UnresolvedAttribute("salary")), false, None, false), "max")() From 71b749769a02c579d53e31b91267acfb3a6ec1bc Mon Sep 17 00:00:00 2001 From: YANGDB Date: Mon, 4 Nov 2024 16:26:26 -0800 Subject: [PATCH 05/11] add tests Signed-off-by: YANGDB --- .../ppl/FlintSparkPPLExpandITSuite.scala | 326 +++++++----------- .../src/main/antlr4/OpenSearchPPLParser.g4 | 2 +- .../org/opensearch/sql/ast/tree/Expand.java | 5 + .../sql/ppl/CatalystQueryPlanVisitor.java | 10 +- .../opensearch/sql/ppl/parser/AstBuilder.java | 3 +- ...PlanExpandCommandTranslatorTestSuite.scala | 149 ++++++-- 6 files changed, 270 insertions(+), 225 deletions(-) diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLExpandITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLExpandITSuite.scala index 60cb8e7f9..11be46756 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLExpandITSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLExpandITSuite.scala @@ -6,6 +6,8 @@ package org.opensearch.flint.spark.ppl import java.nio.file.Files +import scala.collection.mutable + import org.opensearch.sql.ppl.utils.DataTypeTransformer.seq import org.apache.spark.sql.{QueryTest, Row} @@ -50,12 +52,39 @@ class FlintSparkPPLExpandITSuite Files.deleteIfExists(tempFile) } - private def generator(flattenedColumn: String, parentPlan: LogicalPlan): LogicalPlan = { - val outerGenerator = GeneratorOuter(Explode(UnresolvedAttribute(flattenedColumn))) - val generate = Generate(outerGenerator, seq(), outer = true, None, seq(), parentPlan) + test("flatten for structs") { + val frame = sql( + s""" source = $multiValueTable | expand multi_value AS exploded_multi_value | fields exploded_multi_value + """.stripMargin) + + val results: Array[Row] = frame.collect() + val expectedResults: Array[Row] = Array( + Row(Row("1_one", 1)), + Row(Row(null, 11)), + Row(Row("1_three", null)), + Row(Row("2_Monday", 2)), + Row(null), + Row(Row("3_third", 3)), + Row(Row("3_4th", 4)), + Row(null)) + // Compare the results + assert(results.toSet == expectedResults.toSet) + + val logicalPlan: LogicalPlan = frame.queryExecution.logical + // expected plan + val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_multi_value_test")) + val outerGenerator = GeneratorOuter(Explode(UnresolvedAttribute("multi_value"))) + val generate = Generate( + outerGenerator, + seq(), + outer = true, + None, + seq(UnresolvedAttribute("exploded_multi_value")), + table) val dropSourceColumn = - DataFrameDropColumns(Seq(UnresolvedAttribute(flattenedColumn)), generate) - dropSourceColumn + DataFrameDropColumns(Seq(UnresolvedAttribute("multi_value")), generate) + val expectedPlan = Project(Seq(UnresolvedAttribute("exploded_multi_value")), dropSourceColumn) + comparePlans(logicalPlan, expectedPlan, checkAnalysis = false) } test("expand for array of structs") { @@ -63,15 +92,14 @@ class FlintSparkPPLExpandITSuite | source = $testTable | | where country = 'England' or country = 'Poland' | | expand bridges - | | fields city, country, bridges + | | fields bridges | """.stripMargin) val results: Array[Row] = frame.collect() - val expectedResults: Array[Row] = - Array( - Row("London", "England", Seq(801, "Tower Bridge")), - Row("London", "England", Seq(928, "London Bridge")), - Row("Warsaw", "Poland", null)) + val expectedResults: Array[Row] = Array( + Row(mutable.WrappedArray.make(Array(Row(801, "Tower Bridge"), Row(928, "London Bridge")))), + Row(mutable.WrappedArray.make(Array(Row(801, "Tower Bridge"), Row(928, "London Bridge")))), + Row(null)) // Compare the results assert(results.toSet == expectedResults.toSet) val logicalPlan: LogicalPlan = frame.queryExecution.logical @@ -81,221 +109,119 @@ class FlintSparkPPLExpandITSuite EqualTo(UnresolvedAttribute("country"), Literal("England")), EqualTo(UnresolvedAttribute("country"), Literal("Poland"))), table) - val project = Project( - Seq( - UnresolvedAttribute("city"), - UnresolvedAttribute("country"), - UnresolvedAttribute("bridges")), - filter) - val generatorExp = generator("bridges", project) - val expectedPlan = Project(Seq(UnresolvedStar(None)), generatorExp) + val outerGenerator = GeneratorOuter(Explode(UnresolvedAttribute("bridges"))) + val generate = Generate(outerGenerator, seq(), outer = true, None, seq(), filter) + val expectedPlan = Project(Seq(UnresolvedAttribute("bridges")), generate) comparePlans(logicalPlan, expectedPlan, checkAnalysis = false) } - test("flatten for arrays") { + test("expand for array of structs with alias") { val frame = sql(s""" | source = $testTable - | | fields bridges - | | flatten bridges + | | where country = 'England' + | | expand bridges as britishBridges + | | fields britishBridges | """.stripMargin) - assert(frame.columns.sameElements(Array("length", "name"))) val results: Array[Row] = frame.collect() - val expectedResults: Array[Row] = - Array( - Row(null, null), - Row(11L, "Bridge of Sighs"), - Row(48L, "Rialto Bridge"), - Row(160L, "Pont Alexandre III"), - Row(232L, "Pont Neuf"), - Row(801L, "Tower Bridge"), - Row(928L, "London Bridge"), - Row(343L, "Legion Bridge"), - Row(516L, "Charles Bridge"), - Row(333L, "Liberty Bridge"), - Row(375L, "Chain Bridge")) + val expectedResults: Array[Row] = Array( + Row(Row(801, "Tower Bridge")), + Row(Row(928, "London Bridge")), + Row(Row(801, "Tower Bridge")), + Row(Row(928, "London Bridge"))) // Compare the results - implicit val rowOrdering: Ordering[Row] = Ordering.by[Row, Long](_.getAs[Long](0)) - assert(results.sorted.sameElements(expectedResults.sorted)) + assert(results.toSet == expectedResults.toSet) val logicalPlan: LogicalPlan = frame.queryExecution.logical val table = UnresolvedRelation(Seq("flint_ppl_test")) - val projectCoor = Project(Seq(UnresolvedAttribute("bridges")), table) - val flattenBridges = generator("bridges", projectCoor) - val expectedPlan = Project(Seq(UnresolvedStar(None)), flattenBridges) + val filter = Filter(EqualTo(UnresolvedAttribute("country"), Literal("England")), table) + val outerGenerator = GeneratorOuter(Explode(UnresolvedAttribute("bridges"))) + val generate = Generate( + outerGenerator, + seq(), + outer = true, + None, + seq(UnresolvedAttribute("britishBridges")), + filter) + val dropColumn = + DataFrameDropColumns(Seq(UnresolvedAttribute("bridges")), generate) + val expectedPlan = Project(Seq(UnresolvedAttribute("britishBridges")), dropColumn) + comparePlans(logicalPlan, expectedPlan, checkAnalysis = false) } - test("flatten for structs and arrays") { + ignore("expand struct table") { val frame = sql(s""" - | source = $testTable | flatten bridges | flatten coor - | """.stripMargin) + | source = $structTable + | | expand struct_col + | | expand field1 + | """.stripMargin) - assert( - frame.columns.sameElements( - Array("_time", "city", "country", "length", "name", "alt", "lat", "long"))) + assert(frame.columns.sameElements(Array("int_col", "field2", "subfield"))) val results: Array[Row] = frame.collect() val expectedResults: Array[Row] = - Array( - Row("1990-09-13T12:00:00", "Warsaw", "Poland", null, null, null, null, null), - Row( - "2024-09-13T12:00:00", - "Venice", - "Italy", - 11L, - "Bridge of Sighs", - 2, - 45.4408, - 12.3155), - Row("2024-09-13T12:00:00", "Venice", "Italy", 48L, "Rialto Bridge", 2, 45.4408, 12.3155), - Row( - "2024-09-13T12:00:00", - "Paris", - "France", - 160L, - "Pont Alexandre III", - 35, - 48.8566, - 2.3522), - Row("2024-09-13T12:00:00", "Paris", "France", 232L, "Pont Neuf", 35, 48.8566, 2.3522), - Row( - "2024-09-13T12:00:00", - "London", - "England", - 801L, - "Tower Bridge", - 35, - 51.5074, - -0.1278), - Row( - "2024-09-13T12:00:00", - "London", - "England", - 928L, - "London Bridge", - 35, - 51.5074, - -0.1278), - Row( - "2024-09-13T12:00:00", - "Prague", - "Czech Republic", - 343L, - "Legion Bridge", - 200, - 50.0755, - 14.4378), - Row( - "2024-09-13T12:00:00", - "Prague", - "Czech Republic", - 516L, - "Charles Bridge", - 200, - 50.0755, - 14.4378), - Row( - "2024-09-13T12:00:00", - "Budapest", - "Hungary", - 333L, - "Liberty Bridge", - 96, - 47.4979, - 19.0402), - Row( - "2024-09-13T12:00:00", - "Budapest", - "Hungary", - 375L, - "Chain Bridge", - 96, - 47.4979, - 19.0402)) + Array(Row(30, 123, "value1"), Row(40, 456, "value2"), Row(50, 789, "value3")) // Compare the results - implicit val rowOrdering: Ordering[Row] = Ordering.by[Row, Long](_.getAs[Long](3)) + implicit val rowOrdering: Ordering[Row] = Ordering.by[Row, Int](_.getAs[Int](0)) assert(results.sorted.sameElements(expectedResults.sorted)) val logicalPlan: LogicalPlan = frame.queryExecution.logical - val table = UnresolvedRelation(Seq("flint_ppl_test")) - val flattenBridges = generator("bridges", table) - val flattenCoor = generator("coor", flattenBridges) - val expectedPlan = Project(Seq(UnresolvedStar(None)), flattenCoor) + val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_struct_test")) + val outerGenerator = GeneratorOuter(Explode(UnresolvedAttribute("bridges"))) + val generate = Generate( + outerGenerator, + seq(), + outer = true, + None, + seq(UnresolvedAttribute("britishBridges")), + table) + val expectedPlan = Project(Seq(UnresolvedStar(None)), generate) comparePlans(logicalPlan, expectedPlan, checkAnalysis = false) } - test("test flatten and stats") { + ignore("expand multi value nullable") { val frame = sql(s""" - | source = $testTable - | | fields country, bridges - | | flatten bridges - | | fields country, length - | | stats avg(length) as avg by country - | """.stripMargin) + | source = $multiValueTable + | | expand multi_value as expand_field + | | fields expand_field + | """.stripMargin) + assert(frame.columns.sameElements(Array("expand_field"))) val results: Array[Row] = frame.collect() val expectedResults: Array[Row] = Array( - Row(null, "Poland"), - Row(196d, "France"), - Row(429.5, "Czech Republic"), - Row(864.5, "England"), - Row(29.5, "Italy"), - Row(354.0, "Hungary")) - // Compare the results - implicit val rowOrdering: Ordering[Row] = Ordering.by[Row, Double](_.getAs[Double](0)) - assert(results.sorted.sameElements(expectedResults.sorted)) - val logicalPlan: LogicalPlan = frame.queryExecution.logical - val table = UnresolvedRelation(Seq("flint_ppl_test")) - val projectCountryBridges = - Project(Seq(UnresolvedAttribute("country"), UnresolvedAttribute("bridges")), table) - val flattenBridges = generator("bridges", projectCountryBridges) - val projectCountryLength = - Project(Seq(UnresolvedAttribute("country"), UnresolvedAttribute("length")), flattenBridges) - val average = Alias( - UnresolvedFunction( - seq("AVG"), - seq(UnresolvedAttribute("length")), - isDistinct = false, - None, - ignoreNulls = false), - "avg")() - val country = Alias(UnresolvedAttribute("country"), "country")() - val grouping = Alias(UnresolvedAttribute("country"), "country")() - val aggregate = Aggregate(Seq(grouping), Seq(average, country), projectCountryLength) - val expectedPlan = Project(Seq(UnresolvedStar(None)), aggregate) - comparePlans(logicalPlan, expectedPlan, checkAnalysis = false) - } - - test("flatten struct table") { - val frame = sql(s""" - | source = $structTable - | | flatten struct_col - | | flatten field1 - | """.stripMargin) - - assert(frame.columns.sameElements(Array("int_col", "field2", "subfield"))) - val results: Array[Row] = frame.collect() - val expectedResults: Array[Row] = - Array(Row(30, 123, "value1"), Row(40, 456, "value2"), Row(50, 789, "value3")) + Row(1, "1_one", 1), + Row(1, null, 11), + Row(1, "1_three", null), + Row(2, "2_Monday", 2), + Row(2, null, null), + Row(3, "3_third", 3), + Row(3, "3_4th", 4), + Row(4, null, null)) // Compare the results implicit val rowOrdering: Ordering[Row] = Ordering.by[Row, Int](_.getAs[Int](0)) assert(results.sorted.sameElements(expectedResults.sorted)) val logicalPlan: LogicalPlan = frame.queryExecution.logical - val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_struct_test")) - val flattenStructCol = generator("struct_col", table) - val flattenField1 = generator("field1", flattenStructCol) - val expectedPlan = Project(Seq(UnresolvedStar(None)), flattenField1) + val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_multi_value_test")) + val outerGenerator = GeneratorOuter(Explode(UnresolvedAttribute("bridges"))) + val generate = Generate( + outerGenerator, + seq(), + outer = true, + None, + seq(UnresolvedAttribute("britishBridges")), + table) + val expectedPlan = Project(Seq(UnresolvedStar(None)), generate) comparePlans(logicalPlan, expectedPlan, checkAnalysis = false) } - test("flatten struct nested table") { + ignore("expand struct nested table") { val frame = sql(s""" | source = $structNestedTable - | | flatten struct_col - | | flatten field1 - | | flatten struct_col2 - | | flatten field1 + | | expand struct_col + | | expand field1 + | | expand struct_col2 + | | expand field1 | """.stripMargin) assert( @@ -315,21 +241,22 @@ class FlintSparkPPLExpandITSuite val logicalPlan: LogicalPlan = frame.queryExecution.logical val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_struct_nested_test")) - val flattenStructCol = generator("struct_col", table) - val flattenField1 = generator("field1", flattenStructCol) - val flattenStructCol2 = generator("struct_col2", flattenField1) - val flattenField1Again = generator("field1", flattenStructCol2) - val expectedPlan = Project(Seq(UnresolvedStar(None)), flattenField1Again) +// val flattenStructCol = generator("struct_col", table) +// val flattenField1 = generator("field1", flattenStructCol) +// val flattenStructCol2 = generator("struct_col2", flattenField1) +// val flattenField1Again = generator("field1", flattenStructCol2) + val expectedPlan = Project(Seq(UnresolvedStar(None)), table) comparePlans(logicalPlan, expectedPlan, checkAnalysis = false) } - test("flatten multi value nullable") { + ignore("flatten multi value nullable") { val frame = sql(s""" | source = $multiValueTable - | | flatten multi_value + | | expand multi_value as expand_field + | | fields expand_field | """.stripMargin) - assert(frame.columns.sameElements(Array("int_col", "name", "value"))) + assert(frame.columns.sameElements(Array("expand_field"))) val results: Array[Row] = frame.collect() val expectedResults: Array[Row] = Array( @@ -347,8 +274,15 @@ class FlintSparkPPLExpandITSuite val logicalPlan: LogicalPlan = frame.queryExecution.logical val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_multi_value_test")) - val flattenMultiValue = generator("multi_value", table) - val expectedPlan = Project(Seq(UnresolvedStar(None)), flattenMultiValue) + val outerGenerator = GeneratorOuter(Explode(UnresolvedAttribute("bridges"))) + val generate = Generate( + outerGenerator, + seq(), + outer = true, + None, + seq(UnresolvedAttribute("britishBridges")), + table) + val expectedPlan = Project(Seq(UnresolvedStar(None)), generate) comparePlans(logicalPlan, expectedPlan, checkAnalysis = false) } } diff --git a/ppl-spark-integration/src/main/antlr4/OpenSearchPPLParser.g4 b/ppl-spark-integration/src/main/antlr4/OpenSearchPPLParser.g4 index 1b606a273..ccb47bcf9 100644 --- a/ppl-spark-integration/src/main/antlr4/OpenSearchPPLParser.g4 +++ b/ppl-spark-integration/src/main/antlr4/OpenSearchPPLParser.g4 @@ -253,7 +253,7 @@ fillnullCommand ; expandCommand - : EXPAND fieldExpression + : EXPAND fieldExpression (AS alias = qualifiedName)? ; flattenCommand diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/ast/tree/Expand.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/ast/tree/Expand.java index 22eaaf6e6..0e164ccd7 100644 --- a/ppl-spark-integration/src/main/java/org/opensearch/sql/ast/tree/Expand.java +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/ast/tree/Expand.java @@ -10,8 +10,11 @@ import org.opensearch.sql.ast.AbstractNodeVisitor; import org.opensearch.sql.ast.Node; import org.opensearch.sql.ast.expression.Field; +import org.opensearch.sql.ast.expression.UnresolvedAttribute; +import org.opensearch.sql.ast.expression.UnresolvedExpression; import java.util.List; +import java.util.Optional; /** Logical plan node of Expand */ @RequiredArgsConstructor @@ -20,6 +23,8 @@ public class Expand extends UnresolvedPlan { @Getter private final Field field; + @Getter + private final Optional alias; @Override public Expand attach(UnresolvedPlan child) { diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java index 0532c77b2..a5c68574f 100644 --- a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java @@ -476,9 +476,17 @@ public LogicalPlan visitExpand(org.opensearch.sql.ast.tree.Expand node, Catalyst context.getNamedParseExpressions().push(UnresolvedStar$.MODULE$.apply(Option.>empty())); } Expression field = visitExpression(node.getField(), context); + Optional alias = node.getAlias().map(aliasNode -> visitExpression(aliasNode, context)); context.retainAllNamedParseExpressions(p -> (NamedExpression) p); Explode explodeGenerator = new Explode(field); - return context.apply(p -> new Generate(new GeneratorOuter(explodeGenerator), seq(), true, (Option) None$.MODULE$, seq(), p)); + scala.collection.mutable.Seq seq = alias.isEmpty() ? seq() : seq(alias.get()); + if(alias.isEmpty()) + return context.apply(p -> new Generate(new GeneratorOuter(explodeGenerator), seq(), true, (Option) None$.MODULE$, seq, p)); + else { + //in case an alias does appear - remove the original field from the returning columns + context.apply(p -> new Generate(new GeneratorOuter(explodeGenerator), seq(), true, (Option) None$.MODULE$, seq, p)); + return context.apply(logicalPlan -> DataFrameDropColumns$.MODULE$.apply(seq(field), logicalPlan)); + } } private void visitFieldList(List fieldList, CatalystPlanContext context) { diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java index 90c1854ae..059c0ae6d 100644 --- a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java @@ -133,7 +133,8 @@ public UnresolvedPlan visitWhereCommand(OpenSearchPPLParser.WhereCommandContext @Override public UnresolvedPlan visitExpandCommand(OpenSearchPPLParser.ExpandCommandContext ctx) { - return new Expand((Field) internalVisitExpression(ctx.fieldExpression())); + return new Expand((Field) internalVisitExpression(ctx.fieldExpression()), + ctx.alias!=null ? Optional.of(internalVisitExpression(ctx.alias)) : Optional.empty()); } @Override diff --git a/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanExpandCommandTranslatorTestSuite.scala b/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanExpandCommandTranslatorTestSuite.scala index 4653dc054..4e165cd06 100644 --- a/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanExpandCommandTranslatorTestSuite.scala +++ b/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanExpandCommandTranslatorTestSuite.scala @@ -36,6 +36,31 @@ class PPLLogicalPlanExpandCommandTranslatorTestSuite val generator = Explode(UnresolvedAttribute("field_with_array")) val outerGenerator = GeneratorOuter(generator) val generate = Generate(outerGenerator, seq(), true, None, seq(), relation) + val expectedPlan = Project(seq(UnresolvedStar(None)), generate) + comparePlans(expectedPlan, logPlan, checkAnalysis = false) + } + + /** + * 'Project [*] +- 'Generate 'explode('multi_value), false, as, ['exploded_multi_value] +- + * 'UnresolvedRelation [spark_catalog, default, flint_ppl_multi_value_test], [], false + */ + test("test expand only field with alias") { + val context = new CatalystPlanContext + val logPlan = + planTransformer.visit( + plan(pplParser, "source=relation | expand field_with_array as array_list "), + context) + + val relation = UnresolvedRelation(Seq("relation")) + val generator = Explode(UnresolvedAttribute("field_with_array")) + val outerGenerator = GeneratorOuter(generator) + val generate = Generate( + outerGenerator, + seq(), + true, + None, + seq(UnresolvedAttribute("array_list")), + relation) val dropSourceColumn = DataFrameDropColumns(Seq(UnresolvedAttribute("field_with_array")), generate) val expectedPlan = Project(seq(UnresolvedStar(None)), dropSourceColumn) @@ -45,30 +70,47 @@ class PPLLogicalPlanExpandCommandTranslatorTestSuite test("test expand and stats") { val context = new CatalystPlanContext val query = - "source = relation | fields state, company, employee | expand employee | fields state, company, salary | stats max(salary) as max by state, company" + "source = table | expand employee | stats max(salary) as max by state, company" val logPlan = planTransformer.visit(plan(pplParser, query), context) - val table = UnresolvedRelation(Seq("relation")) - val projectStateCompanyEmployee = - Project( - Seq( - UnresolvedAttribute("state"), - UnresolvedAttribute("company"), - UnresolvedAttribute("employee")), - table) + val table = UnresolvedRelation(Seq("table")) val generate = Generate( GeneratorOuter(Explode(UnresolvedAttribute("employee"))), seq(), true, None, seq(), - projectStateCompanyEmployee) - val projectStateCompanySalary = Project( - Seq( - UnresolvedAttribute("state"), - UnresolvedAttribute("company"), - UnresolvedAttribute("salary"), - UnresolvedAttribute("employee"))) + table) + val average = Alias( + UnresolvedFunction(seq("MAX"), seq(UnresolvedAttribute("salary")), false, None, false), + "max")() + val state = Alias(UnresolvedAttribute("state"), "state")() + val company = Alias(UnresolvedAttribute("company"), "company")() + val groupingState = Alias(UnresolvedAttribute("state"), "state")() + val groupingCompany = Alias(UnresolvedAttribute("company"), "company")() + val aggregate = + Aggregate(Seq(groupingState, groupingCompany), Seq(average, state, company), generate) + val expectedPlan = Project(Seq(UnresolvedStar(None)), aggregate) + + comparePlans(expectedPlan, logPlan, checkAnalysis = false) + } + + test("test expand and stats with alias") { + val context = new CatalystPlanContext + val query = + "source = table | expand employee as workers | stats max(salary) as max by state, company" + val logPlan = + planTransformer.visit(plan(pplParser, query), context) + val table = UnresolvedRelation(Seq("table")) + val generate = Generate( + GeneratorOuter(Explode(UnresolvedAttribute("employee"))), + seq(), + true, + None, + seq(UnresolvedAttribute("workers")), + table) + val dropSourceColumn = DataFrameDropColumns(Seq(UnresolvedAttribute("employee")), generate) + val dropColumn = Project(seq(UnresolvedStar(None)), dropSourceColumn) val average = Alias( UnresolvedFunction(seq("MAX"), seq(UnresolvedAttribute("salary")), false, None, false), "max")() @@ -79,7 +121,7 @@ class PPLLogicalPlanExpandCommandTranslatorTestSuite val aggregate = Aggregate( Seq(groupingState, groupingCompany), Seq(average, state, company), - projectStateCompanySalary) + dropSourceColumn) val expectedPlan = Project(Seq(UnresolvedStar(None)), aggregate) comparePlans(expectedPlan, logPlan, checkAnalysis = false) @@ -87,9 +129,9 @@ class PPLLogicalPlanExpandCommandTranslatorTestSuite test("test expand and eval") { val context = new CatalystPlanContext - val query = "source = relation | expand employee | eval bonus = salary * 3" + val query = "source = table | expand employee | eval bonus = salary * 3" val logPlan = planTransformer.visit(plan(pplParser, query), context) - val table = UnresolvedRelation(Seq("relation")) + val table = UnresolvedRelation(Seq("table")) val generate = Generate( GeneratorOuter(Explode(UnresolvedAttribute("employee"))), seq(), @@ -97,7 +139,6 @@ class PPLLogicalPlanExpandCommandTranslatorTestSuite None, seq(), table) - val dropSourceColumn = DataFrameDropColumns(Seq(UnresolvedAttribute("employee")), generate) val bonusProject = Project( Seq( UnresolvedStar(None), @@ -107,18 +148,76 @@ class PPLLogicalPlanExpandCommandTranslatorTestSuite Seq(UnresolvedAttribute("salary"), Literal(3, IntegerType)), isDistinct = false), "bonus")()), - dropSourceColumn) + generate) val expectedPlan = Project(Seq(UnresolvedStar(None)), bonusProject) comparePlans(expectedPlan, logPlan, checkAnalysis = false) } - test("test expand and parse and flatten") { + test("test expand and eval with fields and alias") { + val context = new CatalystPlanContext + val query = + "source = table | expand employee as worker | eval bonus = salary * 3 | fields worker, bonus " + val logPlan = planTransformer.visit(plan(pplParser, query), context) + val table = UnresolvedRelation(Seq("table")) + val generate = Generate( + GeneratorOuter(Explode(UnresolvedAttribute("employee"))), + seq(), + true, + None, + seq(UnresolvedAttribute("worker")), + table) + val dropSourceColumn = + DataFrameDropColumns(Seq(UnresolvedAttribute("employee")), generate) + val bonusProject = Project( + Seq( + UnresolvedStar(None), + Alias( + UnresolvedFunction( + "*", + Seq(UnresolvedAttribute("salary"), Literal(3, IntegerType)), + isDistinct = false), + "bonus")()), + dropSourceColumn) + val expectedPlan = + Project(Seq(UnresolvedAttribute("worker"), UnresolvedAttribute("bonus")), bonusProject) + comparePlans(expectedPlan, logPlan, checkAnalysis = false) + } + + test("test expand and parse and fields") { + val context = new CatalystPlanContext + val logPlan = + planTransformer.visit( + plan( + pplParser, + "source=table | expand employee | parse description '(?.+@.+)' | fields employee, email"), + context) + val table = UnresolvedRelation(Seq("table")) + val generator = Generate( + GeneratorOuter(Explode(UnresolvedAttribute("employee"))), + seq(), + true, + None, + seq(), + table) + val emailAlias = + Alias( + RegExpExtract(UnresolvedAttribute("description"), Literal("(?.+@.+)"), Literal(1)), + "email")() + val parseProject = Project( + Seq(UnresolvedAttribute("description"), emailAlias, UnresolvedStar(None)), + generator) + val expectedPlan = + Project(Seq(UnresolvedAttribute("employee"), UnresolvedAttribute("email")), parseProject) + comparePlans(expectedPlan, logPlan, checkAnalysis = false) + } + + test("test expand and parse and flatten ") { val context = new CatalystPlanContext val logPlan = planTransformer.visit( plan( pplParser, - "source=relation | expand employee | parse description '(?.+@.+)' | flatten roles"), + "source=relation | expand employee | parse description '(?.+@.+)' | flatten roles "), context) val table = UnresolvedRelation(Seq("relation")) val generateEmployee = Generate( @@ -128,15 +227,13 @@ class PPLLogicalPlanExpandCommandTranslatorTestSuite None, seq(), table) - val dropSourceColumnEmployee = - DataFrameDropColumns(Seq(UnresolvedAttribute("employee")), generateEmployee) val emailAlias = Alias( RegExpExtract(UnresolvedAttribute("description"), Literal("(?.+@.+)"), Literal(1)), "email")() val parseProject = Project( Seq(UnresolvedAttribute("description"), emailAlias, UnresolvedStar(None)), - dropSourceColumnEmployee) + generateEmployee) val generateRoles = Generate( GeneratorOuter(new FlattenGenerator(UnresolvedAttribute("roles"))), seq(), From ef1d89034f867a16ad49b50007abd4ded2bba599 Mon Sep 17 00:00:00 2001 From: YANGDB Date: Mon, 4 Nov 2024 16:41:35 -0800 Subject: [PATCH 06/11] update doc Signed-off-by: YANGDB --- docs/ppl-lang/ppl-expand-command.md | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/docs/ppl-lang/ppl-expand-command.md b/docs/ppl-lang/ppl-expand-command.md index 67c871e75..0a52b4eb9 100644 --- a/docs/ppl-lang/ppl-expand-command.md +++ b/docs/ppl-lang/ppl-expand-command.md @@ -7,11 +7,13 @@ Using `expand` command to flatten a field of type: ### Syntax -`flatten ` +`expand [As alias]` * field: to be expanded (exploded). The field must be of supported type. +* alias: Optional to be expanded as the name to be used instead of the original field name ### Test table + #### Schema | col\_name | data\_type | |-----------|----------------------------------------------| @@ -19,6 +21,7 @@ Using `expand` command to flatten a field of type: | bridges | array\\> | | city | string | | country | string | + #### Data | \_time | bridges | city | country | |---------------------|----------------------------------------------|---------|----------------| @@ -34,7 +37,7 @@ Using `expand` command to flatten a field of type: ### Example 1: expand struct This example shows how to expand an array of struct field. PPL query: - - `source=table | flatten bridges` + - `source=table | expand bridges as britishBridge | fields britishBridge` | \_time | bridges | city | country | alt | lat | long | |---------------------|----------------------------------------------|---------|---------------|-----|--------|--------| @@ -47,12 +50,12 @@ PPL query: -### Example 2: flatten array +### Example 2: expand array -The example shows how to flatten an array of struct fields. +The example shows how to expand an array of struct fields. PPL query: - - `source=table | flatten bridges` + - `source=table | expand bridges` | \_time | city | coor | country | length | name | |---------------------|---------|------------------------|---------------|--------|-------------------| @@ -69,10 +72,10 @@ PPL query: | 1990-09-13T12:00:00 | Warsaw | NULL | Poland | NULL | NULL | -### Example 3: flatten array and struct -This example shows how to flatten multiple fields. +### Example 3: expand array and struct +This example shows how to expand multiple fields. PPL query: - - `source=table | flatten bridges | flatten coor` + - `source=table | expand bridges | expand coor` | \_time | city | country | length | name | alt | lat | long | |---------------------|---------|---------------|--------|-------------------|------|--------|--------| From 3db02ce226039b7b95f0f518ad68cb47b0f3af61 Mon Sep 17 00:00:00 2001 From: YANGDB Date: Mon, 4 Nov 2024 16:46:31 -0800 Subject: [PATCH 07/11] update docs with examples Signed-off-by: YANGDB --- docs/ppl-lang/PPL-Example-Commands.md | 35 ++++++++++++------- docs/ppl-lang/README.md | 2 ++ ...PlanExpandCommandTranslatorTestSuite.scala | 6 +--- 3 files changed, 25 insertions(+), 18 deletions(-) diff --git a/docs/ppl-lang/PPL-Example-Commands.md b/docs/ppl-lang/PPL-Example-Commands.md index e780f688d..11709b32b 100644 --- a/docs/ppl-lang/PPL-Example-Commands.md +++ b/docs/ppl-lang/PPL-Example-Commands.md @@ -437,8 +437,28 @@ Assumptions: `a`, `b` are fields of table outer, `c`, `d` are fields of table in _- **Limitation: another command usage of (relation) subquery is in `appendcols` commands which is unsupported**_ ---- -#### Experimental Commands: + +#### **fillnull** +[See additional command details](ppl-fillnull-command.md) +```sql + - `source=accounts | fillnull fields status_code=101` + - `source=accounts | fillnull fields request_path='/not_found', timestamp='*'` + - `source=accounts | fillnull using field1=101` + - `source=accounts | fillnull using field1=concat(field2, field3), field4=2*pi()*field5` + - `source=accounts | fillnull using field1=concat(field2, field3), field4=2*pi()*field5, field6 = 'N/A'` +``` + +#### **expand** +[See additional command details](ppl-expand-command.md) +```sql + - `source= table | expand field_with_array as array_list` + - `source = table | expand employee | stats max(salary) as max by state, company` + - `source = table | expand employee as worker | stats max(salary) as max by state, company` + - `source = table | expand employee as worker | eval bonus = salary * 3 | fields worker, bonus` + - `source = table | expand employee | parse description '(?.+@.+)' | fields employee, email` +``` + +#### Correlation Commands: [See additional command details](ppl-correlation-command.md) ```sql @@ -450,14 +470,3 @@ _- **Limitation: another command usage of (relation) subquery is in `appendcols` > ppl-correlation-command is an experimental command - it may be removed in future versions --- -### Planned Commands: - -#### **fillnull** -[See additional command details](ppl-fillnull-command.md) -```sql - - `source=accounts | fillnull fields status_code=101` - - `source=accounts | fillnull fields request_path='/not_found', timestamp='*'` - - `source=accounts | fillnull using field1=101` - - `source=accounts | fillnull using field1=concat(field2, field3), field4=2*pi()*field5` - - `source=accounts | fillnull using field1=concat(field2, field3), field4=2*pi()*field5, field6 = 'N/A'` -``` diff --git a/docs/ppl-lang/README.md b/docs/ppl-lang/README.md index 6ba49b031..fd87582ac 100644 --- a/docs/ppl-lang/README.md +++ b/docs/ppl-lang/README.md @@ -71,6 +71,8 @@ For additional examples see the next [documentation](PPL-Example-Commands.md). - [`correlation commands`](ppl-correlation-command.md) - [`trendline commands`](ppl-trendline-command.md) + + - [`expand commands`](ppl-expand-command.md) * **Functions** diff --git a/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanExpandCommandTranslatorTestSuite.scala b/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanExpandCommandTranslatorTestSuite.scala index 4e165cd06..491d5cef3 100644 --- a/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanExpandCommandTranslatorTestSuite.scala +++ b/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanExpandCommandTranslatorTestSuite.scala @@ -39,11 +39,7 @@ class PPLLogicalPlanExpandCommandTranslatorTestSuite val expectedPlan = Project(seq(UnresolvedStar(None)), generate) comparePlans(expectedPlan, logPlan, checkAnalysis = false) } - - /** - * 'Project [*] +- 'Generate 'explode('multi_value), false, as, ['exploded_multi_value] +- - * 'UnresolvedRelation [spark_catalog, default, flint_ppl_multi_value_test], [], false - */ + test("test expand only field with alias") { val context = new CatalystPlanContext val logPlan = From 76baeeed9cd926840f4807e22e404ee58241e8cb Mon Sep 17 00:00:00 2001 From: YANGDB Date: Mon, 4 Nov 2024 16:48:32 -0800 Subject: [PATCH 08/11] update scala style Signed-off-by: YANGDB --- .../ppl/PPLLogicalPlanExpandCommandTranslatorTestSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanExpandCommandTranslatorTestSuite.scala b/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanExpandCommandTranslatorTestSuite.scala index 491d5cef3..92d0d521e 100644 --- a/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanExpandCommandTranslatorTestSuite.scala +++ b/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanExpandCommandTranslatorTestSuite.scala @@ -39,7 +39,7 @@ class PPLLogicalPlanExpandCommandTranslatorTestSuite val expectedPlan = Project(seq(UnresolvedStar(None)), generate) comparePlans(expectedPlan, logPlan, checkAnalysis = false) } - + test("test expand only field with alias") { val context = new CatalystPlanContext val logPlan = From a91a9867afffe8aaf65ecf4b89208d00ab21e652 Mon Sep 17 00:00:00 2001 From: YANGDB Date: Tue, 5 Nov 2024 14:51:50 -0800 Subject: [PATCH 09/11] update with additional test case remove outer generator Signed-off-by: YANGDB --- docs/ppl-lang/PPL-Example-Commands.md | 9 +- docs/ppl-lang/ppl-expand-command.md | 9 -- .../ppl/FlintSparkPPLExpandITSuite.scala | 135 ++++++++++-------- .../sql/ppl/CatalystQueryPlanVisitor.java | 6 +- ...PlanExpandCommandTranslatorTestSuite.scala | 81 ++++++----- 5 files changed, 126 insertions(+), 114 deletions(-) diff --git a/docs/ppl-lang/PPL-Example-Commands.md b/docs/ppl-lang/PPL-Example-Commands.md index 11709b32b..5d4f68cb6 100644 --- a/docs/ppl-lang/PPL-Example-Commands.md +++ b/docs/ppl-lang/PPL-Example-Commands.md @@ -451,11 +451,12 @@ _- **Limitation: another command usage of (relation) subquery is in `appendcols` #### **expand** [See additional command details](ppl-expand-command.md) ```sql - - `source= table | expand field_with_array as array_list` + - `source = table | expand field_with_array as array_list` - `source = table | expand employee | stats max(salary) as max by state, company` - - `source = table | expand employee as worker | stats max(salary) as max by state, company` - - `source = table | expand employee as worker | eval bonus = salary * 3 | fields worker, bonus` - - `source = table | expand employee | parse description '(?.+@.+)' | fields employee, email` + - `source = table | expand employee as worker | stats max(salary) as max by state, company` + - `source = table | expand employee as worker | eval bonus = salary * 3 | fields worker, bonus` + - `source = table | expand employee | parse description '(?.+@.+)' | fields employee, email` + - `source = table | eval array=json_array(1, 2, 3) | expand array as uid | fields name, occupation, uid` ``` #### Correlation Commands: diff --git a/docs/ppl-lang/ppl-expand-command.md b/docs/ppl-lang/ppl-expand-command.md index 0a52b4eb9..1e9fc319f 100644 --- a/docs/ppl-lang/ppl-expand-command.md +++ b/docs/ppl-lang/ppl-expand-command.md @@ -39,15 +39,6 @@ This example shows how to expand an array of struct field. PPL query: - `source=table | expand bridges as britishBridge | fields britishBridge` -| \_time | bridges | city | country | alt | lat | long | -|---------------------|----------------------------------------------|---------|---------------|-----|--------|--------| -| 2024-09-13T12:00:00 | [{801, Tower Bridge}, {928, London Bridge}] | London | England | 35 | 51.5074| -0.1278| -| 2024-09-13T12:00:00 | [{232, Pont Neuf}, {160, Pont Alexandre III}]| Paris | France | 35 | 48.8566| 2.3522 | -| 2024-09-13T12:00:00 | [{48, Rialto Bridge}, {11, Bridge of Sighs}] | Venice | Italy | 2 | 45.4408| 12.3155| -| 2024-09-13T12:00:00 | [{516, Charles Bridge}, {343, Legion Bridge}]| Prague | Czech Republic| 200 | 50.0755| 14.4378| -| 2024-09-13T12:00:00 | [{375, Chain Bridge}, {333, Liberty Bridge}] | Budapest| Hungary | 96 | 47.4979| 19.0402| -| 1990-09-13T12:00:00 | NULL | Warsaw | Poland | NULL| NULL | NULL | - ### Example 2: expand array diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLExpandITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLExpandITSuite.scala index 11be46756..a2b780c59 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLExpandITSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLExpandITSuite.scala @@ -23,6 +23,7 @@ class FlintSparkPPLExpandITSuite with StreamTest { private val testTable = "flint_ppl_test" + private val occupationTable = "spark_catalog.default.flint_ppl_flat_table_test" private val structNestedTable = "spark_catalog.default.flint_ppl_struct_nested_test" private val structTable = "spark_catalog.default.flint_ppl_struct_test" private val multiValueTable = "spark_catalog.default.flint_ppl_multi_value_test" @@ -33,6 +34,7 @@ class FlintSparkPPLExpandITSuite // Create test table createNestedJsonContentTable(tempFile, testTable) + createOccupationTable(occupationTable) createStructNestedTable(structNestedTable) createStructTable(structTable) createMultiValueStructTable(multiValueTable) @@ -52,7 +54,61 @@ class FlintSparkPPLExpandITSuite Files.deleteIfExists(tempFile) } - test("flatten for structs") { + test("expand for eval field of an array") { + val frame = sql( + s""" source = $occupationTable | eval array=json_array(1, 2, 3) | expand array as uid | fields name, occupation, uid + """.stripMargin) + + val results: Array[Row] = frame.collect() + val expectedResults: Array[Row] = Array( + Row("Jake", "Engineer", 1), + Row("Jake", "Engineer", 2), + Row("Jake", "Engineer", 3), + Row("Hello", "Artist", 1), + Row("Hello", "Artist", 2), + Row("Hello", "Artist", 3), + Row("John", "Doctor", 1), + Row("John", "Doctor", 2), + Row("John", "Doctor", 3), + Row("David", "Doctor", 1), + Row("David", "Doctor", 2), + Row("David", "Doctor", 3), + Row("David", "Unemployed", 1), + Row("David", "Unemployed", 2), + Row("David", "Unemployed", 3), + Row("Jane", "Scientist", 1), + Row("Jane", "Scientist", 2), + Row("Jane", "Scientist", 3)) + + // Compare the results + assert(results.toSet == expectedResults.toSet) + + val logicalPlan: LogicalPlan = frame.queryExecution.logical + // expected plan + val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_flat_table_test")) + val jsonFunc = + UnresolvedFunction("array", Seq(Literal(1), Literal(2), Literal(3)), isDistinct = false) + val aliasA = Alias(jsonFunc, "array")() + val project = Project(seq(UnresolvedStar(None), aliasA), table) + val generate = Generate( + Explode(UnresolvedAttribute("array")), + seq(), + false, + None, + seq(UnresolvedAttribute("uid")), + project) + val dropSourceColumn = + DataFrameDropColumns(Seq(UnresolvedAttribute("array")), generate) + val expectedPlan = Project( + seq( + UnresolvedAttribute("name"), + UnresolvedAttribute("occupation"), + UnresolvedAttribute("uid")), + dropSourceColumn) + comparePlans(expectedPlan, logicalPlan, checkAnalysis = false) + } + + test("expand for structs") { val frame = sql( s""" source = $multiValueTable | expand multi_value AS exploded_multi_value | fields exploded_multi_value """.stripMargin) @@ -73,11 +129,10 @@ class FlintSparkPPLExpandITSuite val logicalPlan: LogicalPlan = frame.queryExecution.logical // expected plan val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_multi_value_test")) - val outerGenerator = GeneratorOuter(Explode(UnresolvedAttribute("multi_value"))) val generate = Generate( - outerGenerator, + Explode(UnresolvedAttribute("multi_value")), seq(), - outer = true, + outer = false, None, seq(UnresolvedAttribute("exploded_multi_value")), table) @@ -98,8 +153,10 @@ class FlintSparkPPLExpandITSuite val results: Array[Row] = frame.collect() val expectedResults: Array[Row] = Array( Row(mutable.WrappedArray.make(Array(Row(801, "Tower Bridge"), Row(928, "London Bridge")))), - Row(mutable.WrappedArray.make(Array(Row(801, "Tower Bridge"), Row(928, "London Bridge")))), - Row(null)) + Row(mutable.WrappedArray.make(Array(Row(801, "Tower Bridge"), Row(928, "London Bridge")))) + // Row(null)) -> in case of outerGenerator = GeneratorOuter(Explode(UnresolvedAttribute("bridges"))) it will include the `null` row + ) + // Compare the results assert(results.toSet == expectedResults.toSet) val logicalPlan: LogicalPlan = frame.queryExecution.logical @@ -109,8 +166,8 @@ class FlintSparkPPLExpandITSuite EqualTo(UnresolvedAttribute("country"), Literal("England")), EqualTo(UnresolvedAttribute("country"), Literal("Poland"))), table) - val outerGenerator = GeneratorOuter(Explode(UnresolvedAttribute("bridges"))) - val generate = Generate(outerGenerator, seq(), outer = true, None, seq(), filter) + val generate = + Generate(Explode(UnresolvedAttribute("bridges")), seq(), outer = false, None, seq(), filter) val expectedPlan = Project(Seq(UnresolvedAttribute("bridges")), generate) comparePlans(logicalPlan, expectedPlan, checkAnalysis = false) } @@ -134,11 +191,10 @@ class FlintSparkPPLExpandITSuite val logicalPlan: LogicalPlan = frame.queryExecution.logical val table = UnresolvedRelation(Seq("flint_ppl_test")) val filter = Filter(EqualTo(UnresolvedAttribute("country"), Literal("England")), table) - val outerGenerator = GeneratorOuter(Explode(UnresolvedAttribute("bridges"))) val generate = Generate( - outerGenerator, + Explode(UnresolvedAttribute("bridges")), seq(), - outer = true, + outer = false, None, seq(UnresolvedAttribute("britishBridges")), filter) @@ -166,48 +222,10 @@ class FlintSparkPPLExpandITSuite val logicalPlan: LogicalPlan = frame.queryExecution.logical val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_struct_test")) - val outerGenerator = GeneratorOuter(Explode(UnresolvedAttribute("bridges"))) - val generate = Generate( - outerGenerator, - seq(), - outer = true, - None, - seq(UnresolvedAttribute("britishBridges")), - table) - val expectedPlan = Project(Seq(UnresolvedStar(None)), generate) - comparePlans(logicalPlan, expectedPlan, checkAnalysis = false) - } - - ignore("expand multi value nullable") { - val frame = sql(s""" - | source = $multiValueTable - | | expand multi_value as expand_field - | | fields expand_field - | """.stripMargin) - - assert(frame.columns.sameElements(Array("expand_field"))) - val results: Array[Row] = frame.collect() - val expectedResults: Array[Row] = - Array( - Row(1, "1_one", 1), - Row(1, null, 11), - Row(1, "1_three", null), - Row(2, "2_Monday", 2), - Row(2, null, null), - Row(3, "3_third", 3), - Row(3, "3_4th", 4), - Row(4, null, null)) - // Compare the results - implicit val rowOrdering: Ordering[Row] = Ordering.by[Row, Int](_.getAs[Int](0)) - assert(results.sorted.sameElements(expectedResults.sorted)) - - val logicalPlan: LogicalPlan = frame.queryExecution.logical - val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_multi_value_test")) - val outerGenerator = GeneratorOuter(Explode(UnresolvedAttribute("bridges"))) val generate = Generate( - outerGenerator, + Explode(UnresolvedAttribute("bridges")), seq(), - outer = true, + outer = false, None, seq(UnresolvedAttribute("britishBridges")), table) @@ -241,15 +259,15 @@ class FlintSparkPPLExpandITSuite val logicalPlan: LogicalPlan = frame.queryExecution.logical val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_struct_nested_test")) -// val flattenStructCol = generator("struct_col", table) -// val flattenField1 = generator("field1", flattenStructCol) -// val flattenStructCol2 = generator("struct_col2", flattenField1) -// val flattenField1Again = generator("field1", flattenStructCol2) +// val expandStructCol = generator("struct_col", table) +// val expandField1 = generator("field1", expandStructCol) +// val expandStructCol2 = generator("struct_col2", expandField1) +// val expandField1Again = generator("field1", expandStructCol2) val expectedPlan = Project(Seq(UnresolvedStar(None)), table) comparePlans(logicalPlan, expectedPlan, checkAnalysis = false) } - ignore("flatten multi value nullable") { + ignore("expand multi value nullable") { val frame = sql(s""" | source = $multiValueTable | | expand multi_value as expand_field @@ -274,11 +292,10 @@ class FlintSparkPPLExpandITSuite val logicalPlan: LogicalPlan = frame.queryExecution.logical val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_multi_value_test")) - val outerGenerator = GeneratorOuter(Explode(UnresolvedAttribute("bridges"))) val generate = Generate( - outerGenerator, + Explode(UnresolvedAttribute("bridges")), seq(), - outer = true, + outer = false, None, seq(UnresolvedAttribute("britishBridges")), table) diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java index a5c68574f..e4df7b16d 100644 --- a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java @@ -479,12 +479,12 @@ public LogicalPlan visitExpand(org.opensearch.sql.ast.tree.Expand node, Catalyst Optional alias = node.getAlias().map(aliasNode -> visitExpression(aliasNode, context)); context.retainAllNamedParseExpressions(p -> (NamedExpression) p); Explode explodeGenerator = new Explode(field); - scala.collection.mutable.Seq seq = alias.isEmpty() ? seq() : seq(alias.get()); + scala.collection.mutable.Seq outputs = alias.isEmpty() ? seq() : seq(alias.get()); if(alias.isEmpty()) - return context.apply(p -> new Generate(new GeneratorOuter(explodeGenerator), seq(), true, (Option) None$.MODULE$, seq, p)); + return context.apply(p -> new Generate(explodeGenerator, seq(), false, (Option) None$.MODULE$, outputs, p)); else { //in case an alias does appear - remove the original field from the returning columns - context.apply(p -> new Generate(new GeneratorOuter(explodeGenerator), seq(), true, (Option) None$.MODULE$, seq, p)); + context.apply(p -> new Generate(explodeGenerator, seq(), false, (Option) None$.MODULE$, outputs, p)); return context.apply(logicalPlan -> DataFrameDropColumns$.MODULE$.apply(seq(field), logicalPlan)); } } diff --git a/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanExpandCommandTranslatorTestSuite.scala b/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanExpandCommandTranslatorTestSuite.scala index 92d0d521e..5f15c93e4 100644 --- a/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanExpandCommandTranslatorTestSuite.scala +++ b/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanExpandCommandTranslatorTestSuite.scala @@ -34,12 +34,38 @@ class PPLLogicalPlanExpandCommandTranslatorTestSuite val relation = UnresolvedRelation(Seq("relation")) val generator = Explode(UnresolvedAttribute("field_with_array")) - val outerGenerator = GeneratorOuter(generator) - val generate = Generate(outerGenerator, seq(), true, None, seq(), relation) + val generate = Generate(generator, seq(), false, None, seq(), relation) val expectedPlan = Project(seq(UnresolvedStar(None)), generate) comparePlans(expectedPlan, logPlan, checkAnalysis = false) } + test("test expand on array field which is eval array=json_array") { + val context = new CatalystPlanContext + val logPlan = + planTransformer.visit( + plan( + pplParser, + "source = table | eval array=json_array(1, 2, 3) | expand array as uid | fields uid"), + context) + + val relation = UnresolvedRelation(Seq("table")) + val jsonFunc = + UnresolvedFunction("array", Seq(Literal(1), Literal(2), Literal(3)), isDistinct = false) + val aliasA = Alias(jsonFunc, "array")() + val project = Project(seq(UnresolvedStar(None), aliasA), relation) + val generate = Generate( + Explode(UnresolvedAttribute("array")), + seq(), + false, + None, + seq(UnresolvedAttribute("uid")), + project) + val dropSourceColumn = + DataFrameDropColumns(Seq(UnresolvedAttribute("array")), generate) + val expectedPlan = Project(seq(UnresolvedAttribute("uid")), dropSourceColumn) + comparePlans(expectedPlan, logPlan, checkAnalysis = false) + } + test("test expand only field with alias") { val context = new CatalystPlanContext val logPlan = @@ -48,12 +74,10 @@ class PPLLogicalPlanExpandCommandTranslatorTestSuite context) val relation = UnresolvedRelation(Seq("relation")) - val generator = Explode(UnresolvedAttribute("field_with_array")) - val outerGenerator = GeneratorOuter(generator) val generate = Generate( - outerGenerator, + Explode(UnresolvedAttribute("field_with_array")), seq(), - true, + false, None, seq(UnresolvedAttribute("array_list")), relation) @@ -70,13 +94,8 @@ class PPLLogicalPlanExpandCommandTranslatorTestSuite val logPlan = planTransformer.visit(plan(pplParser, query), context) val table = UnresolvedRelation(Seq("table")) - val generate = Generate( - GeneratorOuter(Explode(UnresolvedAttribute("employee"))), - seq(), - true, - None, - seq(), - table) + val generate = + Generate(Explode(UnresolvedAttribute("employee")), seq(), false, None, seq(), table) val average = Alias( UnresolvedFunction(seq("MAX"), seq(UnresolvedAttribute("salary")), false, None, false), "max")() @@ -99,14 +118,13 @@ class PPLLogicalPlanExpandCommandTranslatorTestSuite planTransformer.visit(plan(pplParser, query), context) val table = UnresolvedRelation(Seq("table")) val generate = Generate( - GeneratorOuter(Explode(UnresolvedAttribute("employee"))), + Explode(UnresolvedAttribute("employee")), seq(), - true, + false, None, seq(UnresolvedAttribute("workers")), table) val dropSourceColumn = DataFrameDropColumns(Seq(UnresolvedAttribute("employee")), generate) - val dropColumn = Project(seq(UnresolvedStar(None)), dropSourceColumn) val average = Alias( UnresolvedFunction(seq("MAX"), seq(UnresolvedAttribute("salary")), false, None, false), "max")() @@ -128,13 +146,8 @@ class PPLLogicalPlanExpandCommandTranslatorTestSuite val query = "source = table | expand employee | eval bonus = salary * 3" val logPlan = planTransformer.visit(plan(pplParser, query), context) val table = UnresolvedRelation(Seq("table")) - val generate = Generate( - GeneratorOuter(Explode(UnresolvedAttribute("employee"))), - seq(), - true, - None, - seq(), - table) + val generate = + Generate(Explode(UnresolvedAttribute("employee")), seq(), false, None, seq(), table) val bonusProject = Project( Seq( UnresolvedStar(None), @@ -156,9 +169,9 @@ class PPLLogicalPlanExpandCommandTranslatorTestSuite val logPlan = planTransformer.visit(plan(pplParser, query), context) val table = UnresolvedRelation(Seq("table")) val generate = Generate( - GeneratorOuter(Explode(UnresolvedAttribute("employee"))), + Explode(UnresolvedAttribute("employee")), seq(), - true, + false, None, seq(UnresolvedAttribute("worker")), table) @@ -188,13 +201,8 @@ class PPLLogicalPlanExpandCommandTranslatorTestSuite "source=table | expand employee | parse description '(?.+@.+)' | fields employee, email"), context) val table = UnresolvedRelation(Seq("table")) - val generator = Generate( - GeneratorOuter(Explode(UnresolvedAttribute("employee"))), - seq(), - true, - None, - seq(), - table) + val generator = + Generate(Explode(UnresolvedAttribute("employee")), seq(), false, None, seq(), table) val emailAlias = Alias( RegExpExtract(UnresolvedAttribute("description"), Literal("(?.+@.+)"), Literal(1)), @@ -216,13 +224,8 @@ class PPLLogicalPlanExpandCommandTranslatorTestSuite "source=relation | expand employee | parse description '(?.+@.+)' | flatten roles "), context) val table = UnresolvedRelation(Seq("relation")) - val generateEmployee = Generate( - GeneratorOuter(Explode(UnresolvedAttribute("employee"))), - seq(), - true, - None, - seq(), - table) + val generateEmployee = + Generate(Explode(UnresolvedAttribute("employee")), seq(), false, None, seq(), table) val emailAlias = Alias( RegExpExtract(UnresolvedAttribute("description"), Literal("(?.+@.+)"), Literal(1)), From 82f7135d738650a25319b4952103f531531211a0 Mon Sep 17 00:00:00 2001 From: YANGDB Date: Tue, 5 Nov 2024 18:39:20 -0800 Subject: [PATCH 10/11] update with additional test case remove outer generator Signed-off-by: YANGDB --- docs/ppl-lang/PPL-Example-Commands.md | 1 + .../flint/spark/FlintSparkSuite.scala | 22 ++++ .../ppl/FlintSparkPPLExpandITSuite.scala | 124 ++++++------------ ...PlanExpandCommandTranslatorTestSuite.scala | 32 +++++ 4 files changed, 92 insertions(+), 87 deletions(-) diff --git a/docs/ppl-lang/PPL-Example-Commands.md b/docs/ppl-lang/PPL-Example-Commands.md index 5d4f68cb6..4a70ff610 100644 --- a/docs/ppl-lang/PPL-Example-Commands.md +++ b/docs/ppl-lang/PPL-Example-Commands.md @@ -457,6 +457,7 @@ _- **Limitation: another command usage of (relation) subquery is in `appendcols` - `source = table | expand employee as worker | eval bonus = salary * 3 | fields worker, bonus` - `source = table | expand employee | parse description '(?.+@.+)' | fields employee, email` - `source = table | eval array=json_array(1, 2, 3) | expand array as uid | fields name, occupation, uid` + - `source = table | expand multi_valueA as multiA | expand multi_valueB as multiB` ``` #### Correlation Commands: diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkSuite.scala index c53eee548..68d370791 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkSuite.scala @@ -559,6 +559,28 @@ trait FlintSparkSuite extends QueryTest with FlintSuite with OpenSearchSuite wit |""".stripMargin) } + protected def createMultiColumnArrayTable(testTable: String): Unit = { + // CSV doesn't support struct field + sql(s""" + | CREATE TABLE $testTable + | ( + | int_col INT, + | multi_valueA Array>, + | multi_valueB Array> + | ) + | USING JSON + |""".stripMargin) + + sql(s""" + | INSERT INTO $testTable + | VALUES + | ( 1, array(STRUCT("1_one", 1), STRUCT(null, 11), STRUCT("1_three", null)), array(STRUCT("2_Monday", 2), null) ), + | ( 2, array(STRUCT("2_Monday", 2), null) , array(STRUCT("3_third", 3), STRUCT("3_4th", 4)) ), + | ( 3, array(STRUCT("3_third", 3), STRUCT("3_4th", 4)) , array(STRUCT("1_one", 1))), + | ( 4, null, array(STRUCT("1_one", 1))) + |""".stripMargin) + } + protected def createTableIssue112(testTable: String): Unit = { sql(s""" | CREATE TABLE $testTable ( diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLExpandITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLExpandITSuite.scala index a2b780c59..f0404bf7b 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLExpandITSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLExpandITSuite.scala @@ -27,6 +27,7 @@ class FlintSparkPPLExpandITSuite private val structNestedTable = "spark_catalog.default.flint_ppl_struct_nested_test" private val structTable = "spark_catalog.default.flint_ppl_struct_test" private val multiValueTable = "spark_catalog.default.flint_ppl_multi_value_test" + private val multiArraysTable = "spark_catalog.default.flint_ppl_multi_array_test" private val tempFile = Files.createTempFile("jsonTestData", ".json") override def beforeAll(): Unit = { @@ -38,6 +39,7 @@ class FlintSparkPPLExpandITSuite createStructNestedTable(structNestedTable) createStructTable(structTable) createMultiValueStructTable(multiValueTable) + createMultiColumnArrayTable(multiArraysTable) } protected override def afterEach(): Unit = { @@ -205,101 +207,49 @@ class FlintSparkPPLExpandITSuite comparePlans(logicalPlan, expectedPlan, checkAnalysis = false) } - ignore("expand struct table") { + test("expand multi columns array table") { val frame = sql(s""" - | source = $structTable - | | expand struct_col - | | expand field1 - | """.stripMargin) - - assert(frame.columns.sameElements(Array("int_col", "field2", "subfield"))) - val results: Array[Row] = frame.collect() - val expectedResults: Array[Row] = - Array(Row(30, 123, "value1"), Row(40, 456, "value2"), Row(50, 789, "value3")) - // Compare the results - implicit val rowOrdering: Ordering[Row] = Ordering.by[Row, Int](_.getAs[Int](0)) - assert(results.sorted.sameElements(expectedResults.sorted)) - - val logicalPlan: LogicalPlan = frame.queryExecution.logical - val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_struct_test")) - val generate = Generate( - Explode(UnresolvedAttribute("bridges")), - seq(), - outer = false, - None, - seq(UnresolvedAttribute("britishBridges")), - table) - val expectedPlan = Project(Seq(UnresolvedStar(None)), generate) - comparePlans(logicalPlan, expectedPlan, checkAnalysis = false) - } - - ignore("expand struct nested table") { - val frame = sql(s""" - | source = $structNestedTable - | | expand struct_col - | | expand field1 - | | expand struct_col2 - | | expand field1 - | """.stripMargin) - - assert( - frame.columns.sameElements(Array("int_col", "field2", "subfield", "field2", "subfield"))) - val results: Array[Row] = frame.collect() - val expectedResults: Array[Row] = - Array( - Row(30, 123, "value1", 23, "valueA"), - Row(40, 123, "value5", 33, "valueB"), - Row(30, 823, "value4", 83, "valueC"), - Row(40, 456, "value2", 46, "valueD"), - Row(50, 789, "value3", 89, "valueE")) - // Compare the results - implicit val rowOrdering: Ordering[Row] = Ordering.by[Row, Int](_.getAs[Int](0)) - assert(results.sorted.sameElements(expectedResults.sorted)) - - val logicalPlan: LogicalPlan = frame.queryExecution.logical - val table = - UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_struct_nested_test")) -// val expandStructCol = generator("struct_col", table) -// val expandField1 = generator("field1", expandStructCol) -// val expandStructCol2 = generator("struct_col2", expandField1) -// val expandField1Again = generator("field1", expandStructCol2) - val expectedPlan = Project(Seq(UnresolvedStar(None)), table) - comparePlans(logicalPlan, expectedPlan, checkAnalysis = false) - } - - ignore("expand multi value nullable") { - val frame = sql(s""" - | source = $multiValueTable - | | expand multi_value as expand_field - | | fields expand_field + | source = $multiArraysTable + | | expand multi_valueA as multiA + | | expand multi_valueB as multiB | """.stripMargin) - assert(frame.columns.sameElements(Array("expand_field"))) val results: Array[Row] = frame.collect() - val expectedResults: Array[Row] = - Array( - Row(1, "1_one", 1), - Row(1, null, 11), - Row(1, "1_three", null), - Row(2, "2_Monday", 2), - Row(2, null, null), - Row(3, "3_third", 3), - Row(3, "3_4th", 4), - Row(4, null, null)) + val expectedResults: Array[Row] = Array( + Row(1, Row("1_one", 1), Row("2_Monday", 2)), + Row(1, Row("1_one", 1), null), + Row(1, Row(null, 11), Row("2_Monday", 2)), + Row(1, Row(null, 11), null), + Row(1, Row("1_three", null), Row("2_Monday", 2)), + Row(1, Row("1_three", null), null), + Row(2, Row("2_Monday", 2), Row("3_third", 3)), + Row(2, Row("2_Monday", 2), Row("3_4th", 4)), + Row(2, null, Row("3_third", 3)), + Row(2, null, Row("3_4th", 4)), + Row(3, Row("3_third", 3), Row("1_one", 1)), + Row(3, Row("3_4th", 4), Row("1_one", 1))) // Compare the results - implicit val rowOrdering: Ordering[Row] = Ordering.by[Row, Int](_.getAs[Int](0)) - assert(results.sorted.sameElements(expectedResults.sorted)) + assert(results.toSet == expectedResults.toSet) val logicalPlan: LogicalPlan = frame.queryExecution.logical - val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_multi_value_test")) - val generate = Generate( - Explode(UnresolvedAttribute("bridges")), + val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_multi_array_test")) + val generatorA = Explode(UnresolvedAttribute("multi_valueA")) + val generateA = + Generate(generatorA, seq(), false, None, seq(UnresolvedAttribute("multiA")), table) + val dropSourceColumnA = + DataFrameDropColumns(Seq(UnresolvedAttribute("multi_valueA")), generateA) + val generatorB = Explode(UnresolvedAttribute("multi_valueB")) + val generateB = Generate( + generatorB, seq(), - outer = false, + false, None, - seq(UnresolvedAttribute("britishBridges")), - table) - val expectedPlan = Project(Seq(UnresolvedStar(None)), generate) - comparePlans(logicalPlan, expectedPlan, checkAnalysis = false) + seq(UnresolvedAttribute("multiB")), + dropSourceColumnA) + val dropSourceColumnB = + DataFrameDropColumns(Seq(UnresolvedAttribute("multi_valueB")), generateB) + val expectedPlan = Project(seq(UnresolvedStar(None)), dropSourceColumnB) + comparePlans(expectedPlan, logicalPlan, checkAnalysis = false) + } } diff --git a/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanExpandCommandTranslatorTestSuite.scala b/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanExpandCommandTranslatorTestSuite.scala index 5f15c93e4..2acaac529 100644 --- a/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanExpandCommandTranslatorTestSuite.scala +++ b/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanExpandCommandTranslatorTestSuite.scala @@ -39,6 +39,38 @@ class PPLLogicalPlanExpandCommandTranslatorTestSuite comparePlans(expectedPlan, logPlan, checkAnalysis = false) } + test("expand multi columns array table") { + val context = new CatalystPlanContext + val logPlan = planTransformer.visit( + plan( + pplParser, + s""" + | source = table + | | expand multi_valueA as multiA + | | expand multi_valueB as multiB + | """.stripMargin), + context) + + val relation = UnresolvedRelation(Seq("table")) + val generatorA = Explode(UnresolvedAttribute("multi_valueA")) + val generateA = + Generate(generatorA, seq(), false, None, seq(UnresolvedAttribute("multiA")), relation) + val dropSourceColumnA = + DataFrameDropColumns(Seq(UnresolvedAttribute("multi_valueA")), generateA) + val generatorB = Explode(UnresolvedAttribute("multi_valueB")) + val generateB = Generate( + generatorB, + seq(), + false, + None, + seq(UnresolvedAttribute("multiB")), + dropSourceColumnA) + val dropSourceColumnB = + DataFrameDropColumns(Seq(UnresolvedAttribute("multi_valueB")), generateB) + val expectedPlan = Project(seq(UnresolvedStar(None)), dropSourceColumnB) + comparePlans(expectedPlan, logPlan, checkAnalysis = false) + } + test("test expand on array field which is eval array=json_array") { val context = new CatalystPlanContext val logPlan = From cdadb1810a18359778c0c5bf795d7e1c9c4e74f7 Mon Sep 17 00:00:00 2001 From: YANGDB Date: Wed, 6 Nov 2024 11:37:27 -0800 Subject: [PATCH 11/11] update documentation Signed-off-by: YANGDB --- docs/ppl-lang/ppl-expand-command.md | 100 +++++++++------------------- 1 file changed, 31 insertions(+), 69 deletions(-) diff --git a/docs/ppl-lang/ppl-expand-command.md b/docs/ppl-lang/ppl-expand-command.md index 1e9fc319f..144c0aafa 100644 --- a/docs/ppl-lang/ppl-expand-command.md +++ b/docs/ppl-lang/ppl-expand-command.md @@ -12,72 +12,34 @@ Using `expand` command to flatten a field of type: * field: to be expanded (exploded). The field must be of supported type. * alias: Optional to be expanded as the name to be used instead of the original field name -### Test table - -#### Schema -| col\_name | data\_type | -|-----------|----------------------------------------------| -| \_time | string | -| bridges | array\\> | -| city | string | -| country | string | - -#### Data -| \_time | bridges | city | country | -|---------------------|----------------------------------------------|---------|----------------| -| 2024-09-13T12:00:00 | [{801, Tower Bridge}, {928, London Bridge}] | London | England | -| 2024-09-13T12:00:00 | [{232, Pont Neuf}, {160, Pont Alexandre III}]| Paris | France | -| 2024-09-13T12:00:00 | [{48, Rialto Bridge}, {11, Bridge of Sighs}] | Venice | Italy | -| 2024-09-13T12:00:00 | [{516, Charles Bridge}, {343, Legion Bridge}]| Prague | Czech Republic | -| 2024-09-13T12:00:00 | [{375, Chain Bridge}, {333, Liberty Bridge}] | Budapest| Hungary | -| 1990-09-13T12:00:00 | NULL | Warsaw | Poland | - - - -### Example 1: expand struct -This example shows how to expand an array of struct field. -PPL query: - - `source=table | expand bridges as britishBridge | fields britishBridge` - - - -### Example 2: expand array - -The example shows how to expand an array of struct fields. - -PPL query: - - `source=table | expand bridges` - -| \_time | city | coor | country | length | name | -|---------------------|---------|------------------------|---------------|--------|-------------------| -| 2024-09-13T12:00:00 | London | {35, 51.5074, -0.1278} | England | 801 | Tower Bridge | -| 2024-09-13T12:00:00 | London | {35, 51.5074, -0.1278} | England | 928 | London Bridge | -| 2024-09-13T12:00:00 | Paris | {35, 48.8566, 2.3522} | France | 232 | Pont Neuf | -| 2024-09-13T12:00:00 | Paris | {35, 48.8566, 2.3522} | France | 160 | Pont Alexandre III| -| 2024-09-13T12:00:00 | Venice | {2, 45.4408, 12.3155} | Italy | 48 | Rialto Bridge | -| 2024-09-13T12:00:00 | Venice | {2, 45.4408, 12.3155} | Italy | 11 | Bridge of Sighs | -| 2024-09-13T12:00:00 | Prague | {200, 50.0755, 14.4378}| Czech Republic| 516 | Charles Bridge | -| 2024-09-13T12:00:00 | Prague | {200, 50.0755, 14.4378}| Czech Republic| 343 | Legion Bridge | -| 2024-09-13T12:00:00 | Budapest| {96, 47.4979, 19.0402} | Hungary | 375 | Chain Bridge | -| 2024-09-13T12:00:00 | Budapest| {96, 47.4979, 19.0402} | Hungary | 333 | Liberty Bridge | -| 1990-09-13T12:00:00 | Warsaw | NULL | Poland | NULL | NULL | - - -### Example 3: expand array and struct -This example shows how to expand multiple fields. -PPL query: - - `source=table | expand bridges | expand coor` - -| \_time | city | country | length | name | alt | lat | long | -|---------------------|---------|---------------|--------|-------------------|------|--------|--------| -| 2024-09-13T12:00:00 | London | England | 801 | Tower Bridge | 35 | 51.5074| -0.1278| -| 2024-09-13T12:00:00 | London | England | 928 | London Bridge | 35 | 51.5074| -0.1278| -| 2024-09-13T12:00:00 | Paris | France | 232 | Pont Neuf | 35 | 48.8566| 2.3522 | -| 2024-09-13T12:00:00 | Paris | France | 160 | Pont Alexandre III| 35 | 48.8566| 2.3522 | -| 2024-09-13T12:00:00 | Venice | Italy | 48 | Rialto Bridge | 2 | 45.4408| 12.3155| -| 2024-09-13T12:00:00 | Venice | Italy | 11 | Bridge of Sighs | 2 | 45.4408| 12.3155| -| 2024-09-13T12:00:00 | Prague | Czech Republic| 516 | Charles Bridge | 200 | 50.0755| 14.4378| -| 2024-09-13T12:00:00 | Prague | Czech Republic| 343 | Legion Bridge | 200 | 50.0755| 14.4378| -| 2024-09-13T12:00:00 | Budapest| Hungary | 375 | Chain Bridge | 96 | 47.4979| 19.0402| -| 2024-09-13T12:00:00 | Budapest| Hungary | 333 | Liberty Bridge | 96 | 47.4979| 19.0402| -| 1990-09-13T12:00:00 | Warsaw | Poland | NULL | NULL | NULL | NULL | NULL | \ No newline at end of file +### Usage Guidelines +The expand command produces a row for each element in the specified array or map field, where: +- Array elements become individual rows. +- Map key-value pairs are broken into separate rows, with each key-value represented as a row. + +- When an alias is provided, the exploded values are represented under the alias instead of the original field name. +- This can be used in combination with other commands, such as stats, eval, and parse to manipulate or extract data post-expansion. + +### Examples: +- `source = table | expand employee | stats max(salary) as max by state, company` +- `source = table | expand employee as worker | stats max(salary) as max by state, company` +- `source = table | expand employee as worker | eval bonus = salary * 3 | fields worker, bonus` +- `source = table | expand employee | parse description '(?.+@.+)' | fields employee, email` +- `source = table | eval array=json_array(1, 2, 3) | expand array as uid | fields name, occupation, uid` +- `source = table | expand multi_valueA as multiA | expand multi_valueB as multiB` + +- Expand command can be used in combination with other commands such as `eval`, `stats` and more +- Using multiple expand commands will create a cartesian product of all the internal elements within each composite array or map + +### Effective SQL push-down query +The expand command is translated into an equivalent SQL operation using LATERAL VIEW explode, allowing for efficient exploding of arrays or maps at the SQL query level. + +```sql +SELECT customer exploded_productId +FROM table +LATERAL VIEW explode(productId) AS exploded_productId +``` +Where the `explode` command offers the following functionality: +- it is a column operation that returns a new column +- it creates a new row for every element in the exploded column +- internal `null`s are ignored as part of the exploded field (no row is created/exploded for null)