Skip to content

Commit

Permalink
Support flatten with alias
Browse files Browse the repository at this point in the history
Signed-off-by: Heng Qian <[email protected]>
  • Loading branch information
qianheng-aws committed Nov 18, 2024
1 parent ec337b4 commit 0899d53
Show file tree
Hide file tree
Showing 8 changed files with 109 additions and 7 deletions.
19 changes: 17 additions & 2 deletions docs/ppl-lang/ppl-flatten-command.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@ Using `flatten` command to flatten a field of type:


### Syntax
`flatten <field>`
`flatten <field> [As alias]`

* field: to be flattened. The field must be of supported type.
* alias: to be used as alias for the flattened-output fields. Need to put the alias in brace if there is more than one field.

### Test table
#### Schema
Expand Down Expand Up @@ -87,4 +88,18 @@ PPL query:
| 2024-09-13T12:00:00 | Prague | Czech Republic| 343 | Legion Bridge | 200 | 50.0755| 14.4378|
| 2024-09-13T12:00:00 | Budapest| Hungary | 375 | Chain Bridge | 96 | 47.4979| 19.0402|
| 2024-09-13T12:00:00 | Budapest| Hungary | 333 | Liberty Bridge | 96 | 47.4979| 19.0402|
| 1990-09-13T12:00:00 | Warsaw | Poland | NULL | NULL | NULL | NULL | NULL |
| 1990-09-13T12:00:00 | Warsaw | Poland | NULL | NULL | NULL | NULL | NULL |

### Example 4: flatten with alias
This example shows how to flatten with alias.
PPL query:
- `source=table | flatten coor as (altitude, latitude, longitude)`

| \_time | bridges | city | country | altitude | latitude | longtitude |
|---------------------|----------------------------------------------|---------|---------------|----------|----------|------------|
| 2024-09-13T12:00:00 | [{801, Tower Bridge}, {928, London Bridge}] | London | England | 35 | 51.5074 | -0.1278 |
| 2024-09-13T12:00:00 | [{232, Pont Neuf}, {160, Pont Alexandre III}]| Paris | France | 35 | 48.8566 | 2.3522 |
| 2024-09-13T12:00:00 | [{48, Rialto Bridge}, {11, Bridge of Sighs}] | Venice | Italy | 2 | 45.4408 | 12.3155 |
| 2024-09-13T12:00:00 | [{516, Charles Bridge}, {343, Legion Bridge}]| Prague | Czech Republic| 200 | 50.0755 | 14.4378 |
| 2024-09-13T12:00:00 | [{375, Chain Bridge}, {333, Liberty Bridge}] | Budapest| Hungary | 96 | 47.4979 | 19.0402 |
| 1990-09-13T12:00:00 | NULL | Warsaw | Poland | NULL | NULL | NULL |
Original file line number Diff line number Diff line change
Expand Up @@ -347,4 +347,30 @@ class FlintSparkPPLFlattenITSuite
val expectedPlan = Project(Seq(UnresolvedStar(None)), flattenMultiValue)
comparePlans(logicalPlan, expectedPlan, checkAnalysis = false)
}

test("flatten struct nested table using alias") {
val frame = sql(s"""
| source = $structNestedTable
| | flatten struct_col
| | flatten field1 as subfield_1
| | flatten struct_col2 as (field1, field2_2)
| | flatten field1 as subfield_2
| """.stripMargin)

assert(
frame.columns.sameElements(
Array("int_col", "field2", "subfield_1", "field2_2", "subfield_2")))
val results: Array[Row] = frame.collect()
val expectedResults: Array[Row] =
Array(
Row(30, 123, "value1", 23, "valueA"),
Row(40, 123, "value5", 33, "valueB"),
Row(30, 823, "value4", 83, "valueC"),
Row(40, 456, "value2", 46, "valueD"),
Row(50, 789, "value3", 89, "valueE"))
// Compare the results
implicit val rowOrdering: Ordering[Row] = Ordering.by[Row, Int](_.getAs[Int](0))
assert(results.sorted.sameElements(expectedResults.sorted))
}

}
7 changes: 6 additions & 1 deletion ppl-spark-integration/src/main/antlr4/OpenSearchPPLParser.g4
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ expandCommand
;

flattenCommand
: FLATTEN fieldExpression
: FLATTEN fieldExpression (AS alias = identifierSeq)?
;

trendlineCommand
Expand Down Expand Up @@ -1032,6 +1032,11 @@ qualifiedName
: ident (DOT ident)* # identsAsQualifiedName
;

identifierSeq
: qualifiedName (COMMA qualifiedName)* # identsAsQualifiedNameSeq
| LT_PRTHS qualifiedName (COMMA qualifiedName)* RT_PRTHS # identsAsQualifiedNameSeq
;

tableQualifiedName
: tableIdent (DOT ident)* # identsAsTableQualifiedName
;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import org.opensearch.sql.ast.expression.Field;

import java.util.List;
import org.opensearch.sql.ast.expression.UnresolvedExpression;

@RequiredArgsConstructor
public class Flatten extends UnresolvedPlan {
Expand All @@ -15,6 +16,8 @@ public class Flatten extends UnresolvedPlan {

@Getter
private final Field field;
@Getter
private final List<UnresolvedExpression> alias;

@Override
public UnresolvedPlan attach(UnresolvedPlan child) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation;
import org.apache.spark.sql.catalyst.analysis.UnresolvedStar$;
import org.apache.spark.sql.catalyst.expressions.Ascending$;
import org.apache.spark.sql.catalyst.expressions.Attribute;
import org.apache.spark.sql.catalyst.expressions.Descending$;
import org.apache.spark.sql.catalyst.expressions.Explode;
import org.apache.spark.sql.catalyst.expressions.Expression;
Expand Down Expand Up @@ -462,9 +463,13 @@ public LogicalPlan visitFlatten(Flatten flatten, CatalystPlanContext context) {
context.getNamedParseExpressions().push(UnresolvedStar$.MODULE$.apply(Option.<Seq<String>>empty()));
}
Expression field = visitExpression(flatten.getField(), context);
List<Expression> alias = flatten.getAlias().stream()
.map(aliasNode -> visitExpression(aliasNode, context))
.collect(Collectors.toList());
context.retainAllNamedParseExpressions(p -> (NamedExpression) p);
FlattenGenerator flattenGenerator = new FlattenGenerator(field);
context.apply(p -> new Generate(new GeneratorOuter(flattenGenerator), seq(), true, (Option) None$.MODULE$, seq(), p));
scala.collection.mutable.Seq outputs = alias.isEmpty() ? seq() : seq(alias);
context.apply(p -> new Generate(new GeneratorOuter(flattenGenerator), seq(), true, (Option) None$.MODULE$, outputs, p));
return context.apply(logicalPlan -> DataFrameDropColumns$.MODULE$.apply(seq(field), logicalPlan));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.opensearch.sql.ast.expression.Alias;
import org.opensearch.sql.ast.expression.And;
import org.opensearch.sql.ast.expression.Argument;
import org.opensearch.sql.ast.expression.AttributeList;
import org.opensearch.sql.ast.expression.DataType;
import org.opensearch.sql.ast.expression.EqualTo;
import org.opensearch.sql.ast.expression.Field;
Expand Down Expand Up @@ -605,7 +606,8 @@ public UnresolvedPlan visitFillnullCommand(OpenSearchPPLParser.FillnullCommandCo
@Override
public UnresolvedPlan visitFlattenCommand(OpenSearchPPLParser.FlattenCommandContext ctx) {
Field unresolvedExpression = (Field) internalVisitExpression(ctx.fieldExpression());
return new Flatten(unresolvedExpression);
List<UnresolvedExpression> alias = ctx.alias == null ? emptyList() : ((AttributeList) internalVisitExpression(ctx.alias)).getAttrList();
return new Flatten(unresolvedExpression, alias);
}

/** AD command. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,11 @@ public UnresolvedExpression visitIdentsAsQualifiedName(OpenSearchPPLParser.Ident
return visitIdentifiers(ctx.ident());
}

@Override
public UnresolvedExpression visitIdentsAsQualifiedNameSeq(OpenSearchPPLParser.IdentsAsQualifiedNameSeqContext ctx) {
return new AttributeList(ctx.qualifiedName().stream().map(this::visit).collect(Collectors.toList()));
}

@Override
public UnresolvedExpression visitIdentsAsTableQualifiedName(
OpenSearchPPLParser.IdentsAsTableQualifiedNameContext ctx) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ import org.scalatest.matchers.should.Matchers

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedFunction, UnresolvedRelation, UnresolvedStar}
import org.apache.spark.sql.catalyst.expressions.{Alias, Descending, GeneratorOuter, Literal, NullsLast, RegExpExtract, SortOrder}
import org.apache.spark.sql.catalyst.expressions.{Alias, GeneratorOuter, Literal, RegExpExtract}
import org.apache.spark.sql.catalyst.plans.PlanTest
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, DataFrameDropColumns, Generate, GlobalLimit, LocalLimit, Project, Sort}
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, DataFrameDropColumns, Generate, Project}
import org.apache.spark.sql.types.IntegerType

class PPLLogicalPlanFlattenCommandTranslatorTestSuite
Expand Down Expand Up @@ -153,4 +153,45 @@ class PPLLogicalPlanFlattenCommandTranslatorTestSuite
comparePlans(expectedPlan, logPlan, checkAnalysis = false)
}

test("test flatten with one alias") {
val context = new CatalystPlanContext
val logPlan =
planTransformer.visit(
plan(pplParser, "source=relation | flatten field_with_array as col1"),
context)

val relation = UnresolvedRelation(Seq("relation"))
val flattenGenerator = new FlattenGenerator(UnresolvedAttribute("field_with_array"))
val outerGenerator = GeneratorOuter(flattenGenerator)
val generate =
Generate(outerGenerator, seq(), true, None, Seq(UnresolvedAttribute("col1")), relation)
val dropSourceColumn =
DataFrameDropColumns(Seq(UnresolvedAttribute("field_with_array")), generate)
val expectedPlan = Project(seq(UnresolvedStar(None)), dropSourceColumn)
comparePlans(expectedPlan, logPlan, checkAnalysis = false)
}

test("test flatten with alias list") {
val context = new CatalystPlanContext
val logPlan =
planTransformer.visit(
plan(pplParser, "source=relation | flatten field_with_array as (col1, col2)"),
context)

val relation = UnresolvedRelation(Seq("relation"))
val flattenGenerator = new FlattenGenerator(UnresolvedAttribute("field_with_array"))
val outerGenerator = GeneratorOuter(flattenGenerator)
val generate = Generate(
outerGenerator,
seq(),
true,
None,
Seq(UnresolvedAttribute("col1"), UnresolvedAttribute("col2")),
relation)
val dropSourceColumn =
DataFrameDropColumns(Seq(UnresolvedAttribute("field_with_array")), generate)
val expectedPlan = Project(seq(UnresolvedStar(None)), dropSourceColumn)
comparePlans(expectedPlan, logPlan, checkAnalysis = false)
}

}

0 comments on commit 0899d53

Please sign in to comment.