Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support flatten with alias #927

Merged
merged 5 commits into from
Nov 20, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 17 additions & 2 deletions docs/ppl-lang/ppl-flatten-command.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@ Using `flatten` command to flatten a field of type:


### Syntax
`flatten <field>`
`flatten <field> [As alias]`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[as aliasSequence]


* field: to be flattened. The field must be of supported type.
* alias: to be used as alias for the flattened-output fields. Need to put the alias in brace if there is more than one field.
qianheng-aws marked this conversation as resolved.
Show resolved Hide resolved

### Test table
#### Schema
Expand Down Expand Up @@ -87,4 +88,18 @@ PPL query:
| 2024-09-13T12:00:00 | Prague | Czech Republic| 343 | Legion Bridge | 200 | 50.0755| 14.4378|
| 2024-09-13T12:00:00 | Budapest| Hungary | 375 | Chain Bridge | 96 | 47.4979| 19.0402|
| 2024-09-13T12:00:00 | Budapest| Hungary | 333 | Liberty Bridge | 96 | 47.4979| 19.0402|
| 1990-09-13T12:00:00 | Warsaw | Poland | NULL | NULL | NULL | NULL | NULL |
| 1990-09-13T12:00:00 | Warsaw | Poland | NULL | NULL | NULL | NULL | NULL |

qianheng-aws marked this conversation as resolved.
Show resolved Hide resolved
### Example 4: flatten with alias
This example shows how to flatten with alias.
PPL query:
- `source=table | flatten coor as (altitude, latitude, longitude)`

| \_time | bridges | city | country | altitude | latitude | longtitude |
|---------------------|----------------------------------------------|---------|---------------|----------|----------|------------|
| 2024-09-13T12:00:00 | [{801, Tower Bridge}, {928, London Bridge}] | London | England | 35 | 51.5074 | -0.1278 |
| 2024-09-13T12:00:00 | [{232, Pont Neuf}, {160, Pont Alexandre III}]| Paris | France | 35 | 48.8566 | 2.3522 |
| 2024-09-13T12:00:00 | [{48, Rialto Bridge}, {11, Bridge of Sighs}] | Venice | Italy | 2 | 45.4408 | 12.3155 |
| 2024-09-13T12:00:00 | [{516, Charles Bridge}, {343, Legion Bridge}]| Prague | Czech Republic| 200 | 50.0755 | 14.4378 |
| 2024-09-13T12:00:00 | [{375, Chain Bridge}, {333, Liberty Bridge}] | Budapest| Hungary | 96 | 47.4979 | 19.0402 |
| 1990-09-13T12:00:00 | NULL | Warsaw | Poland | NULL | NULL | NULL |
Original file line number Diff line number Diff line change
Expand Up @@ -347,4 +347,30 @@ class FlintSparkPPLFlattenITSuite
val expectedPlan = Project(Seq(UnresolvedStar(None)), flattenMultiValue)
comparePlans(logicalPlan, expectedPlan, checkAnalysis = false)
}

test("flatten struct nested table using alias") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add a test for duplicated aliases:

| flatten struct_col as (field1, field2_2)
| flatten struct_col2 as (field1, field2_2)

And

| source = $structNestedTable
| flatten struct_col
| flatten field1 as int_col

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Case 1 will be similar to this test except that it aliased field2 to field2_2.

frame.columns.sameElements(Array("int_col", "field2", "subfield", "field2", "subfield")))
.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Case 1 will

I mean that will the duplicated field2_2 in query fail?

source = ... | flatten struct_col as (field1, field2_2) | flatten struct_col2 as (field1, field2_2)

val frame = sql(s"""
| source = $structNestedTable
| | flatten struct_col
| | flatten field1 as subfield_1
| | flatten struct_col2 as (field1, field2_2)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a test case: it should fail

source =... | flatten struct_col as (field1_1)

| | flatten field1 as subfield_2
| """.stripMargin)

assert(
frame.columns.sameElements(
Array("int_col", "field2", "subfield_1", "field2_2", "subfield_2")))
val results: Array[Row] = frame.collect()
val expectedResults: Array[Row] =
Array(
Row(30, 123, "value1", 23, "valueA"),
Row(40, 123, "value5", 33, "valueB"),
Row(30, 823, "value4", 83, "valueC"),
Row(40, 456, "value2", 46, "valueD"),
Row(50, 789, "value3", 89, "valueE"))
// Compare the results
implicit val rowOrdering: Ordering[Row] = Ordering.by[Row, Int](_.getAs[Int](0))
assert(results.sorted.sameElements(expectedResults.sorted))
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ expandCommand
;

flattenCommand
: FLATTEN fieldExpression
: FLATTEN fieldExpression (AS alias = identifierSeq)?
;

trendlineCommand
Expand Down Expand Up @@ -1032,6 +1032,11 @@ qualifiedName
: ident (DOT ident)* # identsAsQualifiedName
;

identifierSeq
: qualifiedName (COMMA qualifiedName)* # identsAsQualifiedNameSeq
| LT_PRTHS qualifiedName (COMMA qualifiedName)* RT_PRTHS # identsAsQualifiedNameSeq
Comment on lines +1036 to +1037
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This grammar allows four patterns:

  • flatten struct_col2 as field1
  • flatten struct_col2 as (field1).
  • flatten struct_col2 as field1, field2
  • flatten struct_col2 as (field1, field2).

Is ( ) necessary or restrict to two?

  • flatten struct_col2 as field1
  • flatten struct_col2 as (field1, field2)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not restrict to two, as you said, both 4 patterns are all allowed. For most of the case, both is ok with or without (), but having () is safer to avoid ambiguous.

For example, in SQL, when having sql like select flatten(a) as b, c, field c will be parsed as a child of select instead of flatten alias, so put alias inner brace will avoid this. I'm not sure if PPL will have case like that.

Copy link
Member

@LantaoJin LantaoJin Nov 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For example, in SQL, when having sql like select flatten(a) as b, c, field c will be parsed as a child of select instead of flatten alias, so put alias inner brace will avoid this. I'm not sure if PPL will have case like that.

Yes. That's my point. How about only remains

  • flatten struct_col2 as field1
  • flatten struct_col2 as (field1)
  • flatten struct_col2 as (field1, field2)

or

  • flatten struct_col2 as (field1)
  • flatten struct_col2 as (field1, field2)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@YANG-DB any thoughts?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@LantaoJin I personally dont mind the extra verbose - so Im good with @qianheng-aws suggestion...

;

tableQualifiedName
: tableIdent (DOT ident)* # identsAsTableQualifiedName
;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import org.opensearch.sql.ast.expression.Field;

import java.util.List;
import org.opensearch.sql.ast.expression.UnresolvedExpression;

@RequiredArgsConstructor
public class Flatten extends UnresolvedPlan {
Expand All @@ -15,6 +16,8 @@ public class Flatten extends UnresolvedPlan {

@Getter
private final Field field;
@Getter
private final List<UnresolvedExpression> alias;
qianheng-aws marked this conversation as resolved.
Show resolved Hide resolved

@Override
public UnresolvedPlan attach(UnresolvedPlan child) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation;
import org.apache.spark.sql.catalyst.analysis.UnresolvedStar$;
import org.apache.spark.sql.catalyst.expressions.Ascending$;
import org.apache.spark.sql.catalyst.expressions.Attribute;
import org.apache.spark.sql.catalyst.expressions.Descending$;
import org.apache.spark.sql.catalyst.expressions.Explode;
import org.apache.spark.sql.catalyst.expressions.Expression;
Expand Down Expand Up @@ -462,9 +463,13 @@ public LogicalPlan visitFlatten(Flatten flatten, CatalystPlanContext context) {
context.getNamedParseExpressions().push(UnresolvedStar$.MODULE$.apply(Option.<Seq<String>>empty()));
}
Expression field = visitExpression(flatten.getField(), context);
List<Expression> alias = flatten.getAlias().stream()
.map(aliasNode -> visitExpression(aliasNode, context))
.collect(Collectors.toList());
context.retainAllNamedParseExpressions(p -> (NamedExpression) p);
FlattenGenerator flattenGenerator = new FlattenGenerator(field);
context.apply(p -> new Generate(new GeneratorOuter(flattenGenerator), seq(), true, (Option) None$.MODULE$, seq(), p));
scala.collection.mutable.Seq outputs = alias.isEmpty() ? seq() : seq(alias);
context.apply(p -> new Generate(new GeneratorOuter(flattenGenerator), seq(), true, (Option) None$.MODULE$, outputs, p));
return context.apply(logicalPlan -> DataFrameDropColumns$.MODULE$.apply(seq(field), logicalPlan));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.opensearch.sql.ast.expression.Alias;
import org.opensearch.sql.ast.expression.And;
import org.opensearch.sql.ast.expression.Argument;
import org.opensearch.sql.ast.expression.AttributeList;
import org.opensearch.sql.ast.expression.DataType;
import org.opensearch.sql.ast.expression.EqualTo;
import org.opensearch.sql.ast.expression.Field;
Expand Down Expand Up @@ -605,7 +606,8 @@ public UnresolvedPlan visitFillnullCommand(OpenSearchPPLParser.FillnullCommandCo
@Override
public UnresolvedPlan visitFlattenCommand(OpenSearchPPLParser.FlattenCommandContext ctx) {
Field unresolvedExpression = (Field) internalVisitExpression(ctx.fieldExpression());
return new Flatten(unresolvedExpression);
List<UnresolvedExpression> alias = ctx.alias == null ? emptyList() : ((AttributeList) internalVisitExpression(ctx.alias)).getAttrList();
return new Flatten(unresolvedExpression, alias);
}

/** AD command. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,11 @@ public UnresolvedExpression visitIdentsAsQualifiedName(OpenSearchPPLParser.Ident
return visitIdentifiers(ctx.ident());
}

@Override
public UnresolvedExpression visitIdentsAsQualifiedNameSeq(OpenSearchPPLParser.IdentsAsQualifiedNameSeqContext ctx) {
return new AttributeList(ctx.qualifiedName().stream().map(this::visit).collect(Collectors.toList()));
}

@Override
public UnresolvedExpression visitIdentsAsTableQualifiedName(
OpenSearchPPLParser.IdentsAsTableQualifiedNameContext ctx) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ import org.scalatest.matchers.should.Matchers

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedFunction, UnresolvedRelation, UnresolvedStar}
import org.apache.spark.sql.catalyst.expressions.{Alias, Descending, GeneratorOuter, Literal, NullsLast, RegExpExtract, SortOrder}
import org.apache.spark.sql.catalyst.expressions.{Alias, GeneratorOuter, Literal, RegExpExtract}
import org.apache.spark.sql.catalyst.plans.PlanTest
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, DataFrameDropColumns, Generate, GlobalLimit, LocalLimit, Project, Sort}
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, DataFrameDropColumns, Generate, Project}
import org.apache.spark.sql.types.IntegerType

class PPLLogicalPlanFlattenCommandTranslatorTestSuite
Expand Down Expand Up @@ -153,4 +153,45 @@ class PPLLogicalPlanFlattenCommandTranslatorTestSuite
comparePlans(expectedPlan, logPlan, checkAnalysis = false)
}

test("test flatten with one alias") {
Copy link
Member

@YANG-DB YANG-DB Nov 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@qianheng-aws plz add additional flatten with aliases queries unit tests

Copy link
Contributor Author

@qianheng-aws qianheng-aws Nov 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@YANG-DB we already have but named "test flatten with alias list"

val context = new CatalystPlanContext
val logPlan =
planTransformer.visit(
plan(pplParser, "source=relation | flatten field_with_array as col1"),
context)

val relation = UnresolvedRelation(Seq("relation"))
val flattenGenerator = new FlattenGenerator(UnresolvedAttribute("field_with_array"))
val outerGenerator = GeneratorOuter(flattenGenerator)
val generate =
Generate(outerGenerator, seq(), true, None, Seq(UnresolvedAttribute("col1")), relation)
val dropSourceColumn =
DataFrameDropColumns(Seq(UnresolvedAttribute("field_with_array")), generate)
val expectedPlan = Project(seq(UnresolvedStar(None)), dropSourceColumn)
comparePlans(expectedPlan, logPlan, checkAnalysis = false)
}

test("test flatten with alias list") {
val context = new CatalystPlanContext
val logPlan =
planTransformer.visit(
plan(pplParser, "source=relation | flatten field_with_array as (col1, col2)"),
context)

val relation = UnresolvedRelation(Seq("relation"))
val flattenGenerator = new FlattenGenerator(UnresolvedAttribute("field_with_array"))
val outerGenerator = GeneratorOuter(flattenGenerator)
val generate = Generate(
outerGenerator,
seq(),
true,
None,
Seq(UnresolvedAttribute("col1"), UnresolvedAttribute("col2")),
relation)
val dropSourceColumn =
DataFrameDropColumns(Seq(UnresolvedAttribute("field_with_array")), generate)
val expectedPlan = Project(seq(UnresolvedStar(None)), dropSourceColumn)
comparePlans(expectedPlan, logPlan, checkAnalysis = false)
}

}
Loading