Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support flatten with alias #927

Merged
merged 5 commits into from
Nov 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/ppl-lang/PPL-Example-Commands.md
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ Assumptions: `a`, `b`, `c`, `d`, `e` are existing fields in `table`
Assumptions: `bridges`, `coor` are existing fields in `table`, and the field's types are `struct<?,?>` or `array<struct<?,?>>`
- `source = table | flatten bridges`
- `source = table | flatten coor`
- `source = table | flatten coor as (altitude, latitude, longitude)`
- `source = table | flatten bridges | flatten coor`
- `source = table | fields bridges | flatten bridges`
- `source = table | fields country, bridges | flatten bridges | fields country, length | stats avg(length) as avg by country`
Expand Down
19 changes: 17 additions & 2 deletions docs/ppl-lang/ppl-flatten-command.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@ Using `flatten` command to flatten a field of type:


### Syntax
`flatten <field>`
`flatten <field> [As aliasSequence]`

* field: to be flattened. The field must be of supported type.
* aliasSequence: to be used as aliasSequence for the flattened-output fields. Better to put the aliasSequence in brace if there is more than one field.

### Test table
#### Schema
Expand Down Expand Up @@ -87,4 +88,18 @@ PPL query:
| 2024-09-13T12:00:00 | Prague | Czech Republic| 343 | Legion Bridge | 200 | 50.0755| 14.4378|
| 2024-09-13T12:00:00 | Budapest| Hungary | 375 | Chain Bridge | 96 | 47.4979| 19.0402|
| 2024-09-13T12:00:00 | Budapest| Hungary | 333 | Liberty Bridge | 96 | 47.4979| 19.0402|
| 1990-09-13T12:00:00 | Warsaw | Poland | NULL | NULL | NULL | NULL | NULL |
| 1990-09-13T12:00:00 | Warsaw | Poland | NULL | NULL | NULL | NULL | NULL |

### Example 4: flatten with aliasSequence
This example shows how to flatten with aliasSequence.
PPL query:
- `source=table | flatten coor as (altitude, latitude, longitude)`

| \_time | bridges | city | country | altitude | latitude | longtitude |
|---------------------|----------------------------------------------|---------|---------------|----------|----------|------------|
| 2024-09-13T12:00:00 | [{801, Tower Bridge}, {928, London Bridge}] | London | England | 35 | 51.5074 | -0.1278 |
| 2024-09-13T12:00:00 | [{232, Pont Neuf}, {160, Pont Alexandre III}]| Paris | France | 35 | 48.8566 | 2.3522 |
| 2024-09-13T12:00:00 | [{48, Rialto Bridge}, {11, Bridge of Sighs}] | Venice | Italy | 2 | 45.4408 | 12.3155 |
| 2024-09-13T12:00:00 | [{516, Charles Bridge}, {343, Legion Bridge}]| Prague | Czech Republic| 200 | 50.0755 | 14.4378 |
| 2024-09-13T12:00:00 | [{375, Chain Bridge}, {333, Liberty Bridge}] | Budapest| Hungary | 96 | 47.4979 | 19.0402 |
| 1990-09-13T12:00:00 | NULL | Warsaw | Poland | NULL | NULL | NULL |
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import java.nio.file.Files
import org.opensearch.flint.spark.FlattenGenerator
import org.opensearch.sql.ppl.utils.DataTypeTransformer.seq

import org.apache.spark.sql.{QueryTest, Row}
import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedFunction, UnresolvedRelation, UnresolvedStar}
import org.apache.spark.sql.catalyst.expressions.{Alias, EqualTo, GeneratorOuter, Literal, Or}
import org.apache.spark.sql.catalyst.plans.logical._
Expand Down Expand Up @@ -347,4 +347,85 @@ class FlintSparkPPLFlattenITSuite
val expectedPlan = Project(Seq(UnresolvedStar(None)), flattenMultiValue)
comparePlans(logicalPlan, expectedPlan, checkAnalysis = false)
}

test("flatten struct nested table using alias") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add a test for duplicated aliases:

| flatten struct_col as (field1, field2_2)
| flatten struct_col2 as (field1, field2_2)

And

| source = $structNestedTable
| flatten struct_col
| flatten field1 as int_col

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Case 1 will be similar to this test except that it aliased field2 to field2_2.

frame.columns.sameElements(Array("int_col", "field2", "subfield", "field2", "subfield")))
.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Case 1 will

I mean that will the duplicated field2_2 in query fail?

source = ... | flatten struct_col as (field1, field2_2) | flatten struct_col2 as (field1, field2_2)

val frame = sql(s"""
| source = $structNestedTable
| | flatten struct_col
| | flatten field1 as subfield_1
| | flatten struct_col2 as (field1, field2_2)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a test case: it should fail

source =... | flatten struct_col as (field1_1)

| | flatten field1 as subfield_2
| """.stripMargin)

assert(
frame.columns.sameElements(
Array("int_col", "field2", "subfield_1", "field2_2", "subfield_2")))
val results: Array[Row] = frame.collect()
implicit val rowOrdering: Ordering[Row] = Ordering.by[Row, Int](_.getAs[Int](0))
val expectedResults: Array[Row] =
Array(
Row(30, 123, "value1", 23, "valueA"),
Row(40, 123, "value5", 33, "valueB"),
Row(30, 823, "value4", 83, "valueC"),
Row(40, 456, "value2", 46, "valueD"),
Row(50, 789, "value3", 89, "valueE")).sorted
// Compare the results
assert(results.sorted.sameElements(expectedResults))
Copy link
Member

@LantaoJin LantaoJin Nov 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

try using assertSameRows in future which won't consider any ordering.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, seems that function will do sorting internally


// duplicate alias names
val frame2 = sql(s"""
| source = $structNestedTable
| | flatten struct_col as (field1, field2_2)
| | flatten field1 as subfield_1
| | flatten struct_col2 as (field1, field2_2)
| | flatten field1 as subfield_2
Comment on lines +375 to +381
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

duplicated alias case I mentioned is

| flatten struct_col as (field1, field2_2)
| flatten struct_col2 as (field1, field2_2)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any concern on your case? I think the purpose is to test on duplicate alias names between 2 flatten command. I only change the field name in the original test cases to satisfy our purpose, so we can reuse the constructed expected results.

| """.stripMargin)

// alias names duplicate with existing fields
assert(
frame2.columns.sameElements(
Array("int_col", "field2_2", "subfield_1", "field2_2", "subfield_2")))
assert(frame2.collect().sorted.sameElements(expectedResults))

val frame3 = sql(s"""
| source = $structNestedTable
| | flatten struct_col as (field1, field2_2)
| | flatten field1 as int_col
| | flatten struct_col2 as (field1, field2_2)
| | flatten field1 as int_col
| """.stripMargin)

assert(
frame3.columns.sameElements(Array("int_col", "field2_2", "int_col", "field2_2", "int_col")))
assert(frame3.collect().sorted.sameElements(expectedResults))

// Throw AnalysisException if The number of aliases supplied in the AS clause does not match the
// number of columns output
val except = intercept[AnalysisException] {
sql(s"""
| source = $structNestedTable
| | flatten struct_col as (field1)
| | flatten field1 as int_col
| | flatten struct_col2 as (field1, field2_2)
| | flatten field1 as int_col
| """.stripMargin)
}
assert(except.message.contains(
"The number of aliases supplied in the AS clause does not match the number of columns output by the UDTF"))

// Throw AnalysisException because of ambiguous
val except2 = intercept[AnalysisException] {
sql(s"""
| source = $structNestedTable
| | flatten struct_col as (field1, field2_2)
| | flatten field1 as int_col
| | flatten struct_col2 as (field1, field2_2)
| | flatten field1 as int_col
| | fields field2_2
| """.stripMargin)
}
assert(except2.message.contains(
"[AMBIGUOUS_REFERENCE] Reference `field2_2` is ambiguous, could be: [`field2_2`, `field2_2`]."))
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ expandCommand
;

flattenCommand
: FLATTEN fieldExpression
: FLATTEN fieldExpression (AS alias = identifierSeq)?
;

trendlineCommand
Expand Down Expand Up @@ -1032,6 +1032,11 @@ qualifiedName
: ident (DOT ident)* # identsAsQualifiedName
;

identifierSeq
: qualifiedName (COMMA qualifiedName)* # identsAsQualifiedNameSeq
| LT_PRTHS qualifiedName (COMMA qualifiedName)* RT_PRTHS # identsAsQualifiedNameSeq
Comment on lines +1036 to +1037
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This grammar allows four patterns:

  • flatten struct_col2 as field1
  • flatten struct_col2 as (field1).
  • flatten struct_col2 as field1, field2
  • flatten struct_col2 as (field1, field2).

Is ( ) necessary or restrict to two?

  • flatten struct_col2 as field1
  • flatten struct_col2 as (field1, field2)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not restrict to two, as you said, both 4 patterns are all allowed. For most of the case, both is ok with or without (), but having () is safer to avoid ambiguous.

For example, in SQL, when having sql like select flatten(a) as b, c, field c will be parsed as a child of select instead of flatten alias, so put alias inner brace will avoid this. I'm not sure if PPL will have case like that.

Copy link
Member

@LantaoJin LantaoJin Nov 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For example, in SQL, when having sql like select flatten(a) as b, c, field c will be parsed as a child of select instead of flatten alias, so put alias inner brace will avoid this. I'm not sure if PPL will have case like that.

Yes. That's my point. How about only remains

  • flatten struct_col2 as field1
  • flatten struct_col2 as (field1)
  • flatten struct_col2 as (field1, field2)

or

  • flatten struct_col2 as (field1)
  • flatten struct_col2 as (field1, field2)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@YANG-DB any thoughts?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@LantaoJin I personally dont mind the extra verbose - so Im good with @qianheng-aws suggestion...

;

tableQualifiedName
: tableIdent (DOT ident)* # identsAsTableQualifiedName
;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import org.opensearch.sql.ast.expression.Field;

import java.util.List;
import org.opensearch.sql.ast.expression.UnresolvedExpression;

@RequiredArgsConstructor
public class Flatten extends UnresolvedPlan {
Expand All @@ -15,6 +16,8 @@ public class Flatten extends UnresolvedPlan {

@Getter
private final Field field;
@Getter
private final List<UnresolvedExpression> aliasSequence;

@Override
public UnresolvedPlan attach(UnresolvedPlan child) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,12 +84,10 @@
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;

import static java.util.Collections.emptyList;
import static java.util.List.of;
import static org.opensearch.sql.ppl.CatalystPlanContext.findRelation;
import static org.opensearch.sql.ppl.utils.DataTypeTransformer.seq;
import static org.opensearch.sql.ppl.utils.DedupeTransformer.retainMultipleDuplicateEvents;
import static org.opensearch.sql.ppl.utils.DedupeTransformer.retainMultipleDuplicateEventsAndKeepEmpty;
Expand Down Expand Up @@ -462,9 +460,13 @@ public LogicalPlan visitFlatten(Flatten flatten, CatalystPlanContext context) {
context.getNamedParseExpressions().push(UnresolvedStar$.MODULE$.apply(Option.<Seq<String>>empty()));
}
Expression field = visitExpression(flatten.getField(), context);
List<Expression> alias = flatten.getAliasSequence().stream()
.map(aliasNode -> visitExpression(aliasNode, context))
.collect(Collectors.toList());
context.retainAllNamedParseExpressions(p -> (NamedExpression) p);
FlattenGenerator flattenGenerator = new FlattenGenerator(field);
context.apply(p -> new Generate(new GeneratorOuter(flattenGenerator), seq(), true, (Option) None$.MODULE$, seq(), p));
scala.collection.mutable.Seq outputs = alias.isEmpty() ? seq() : seq(alias);
context.apply(p -> new Generate(new GeneratorOuter(flattenGenerator), seq(), true, (Option) None$.MODULE$, outputs, p));
return context.apply(logicalPlan -> DataFrameDropColumns$.MODULE$.apply(seq(field), logicalPlan));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.opensearch.sql.ast.expression.Alias;
import org.opensearch.sql.ast.expression.And;
import org.opensearch.sql.ast.expression.Argument;
import org.opensearch.sql.ast.expression.AttributeList;
import org.opensearch.sql.ast.expression.DataType;
import org.opensearch.sql.ast.expression.EqualTo;
import org.opensearch.sql.ast.expression.Field;
Expand Down Expand Up @@ -605,7 +606,8 @@ public UnresolvedPlan visitFillnullCommand(OpenSearchPPLParser.FillnullCommandCo
@Override
public UnresolvedPlan visitFlattenCommand(OpenSearchPPLParser.FlattenCommandContext ctx) {
Field unresolvedExpression = (Field) internalVisitExpression(ctx.fieldExpression());
return new Flatten(unresolvedExpression);
List<UnresolvedExpression> alias = ctx.alias == null ? emptyList() : ((AttributeList) internalVisitExpression(ctx.alias)).getAttrList();
return new Flatten(unresolvedExpression, alias);
}

/** AD command. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,11 @@ public UnresolvedExpression visitIdentsAsQualifiedName(OpenSearchPPLParser.Ident
return visitIdentifiers(ctx.ident());
}

@Override
public UnresolvedExpression visitIdentsAsQualifiedNameSeq(OpenSearchPPLParser.IdentsAsQualifiedNameSeqContext ctx) {
return new AttributeList(ctx.qualifiedName().stream().map(this::visit).collect(Collectors.toList()));
}

@Override
public UnresolvedExpression visitIdentsAsTableQualifiedName(
OpenSearchPPLParser.IdentsAsTableQualifiedNameContext ctx) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ import org.scalatest.matchers.should.Matchers

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedFunction, UnresolvedRelation, UnresolvedStar}
import org.apache.spark.sql.catalyst.expressions.{Alias, Descending, GeneratorOuter, Literal, NullsLast, RegExpExtract, SortOrder}
import org.apache.spark.sql.catalyst.expressions.{Alias, GeneratorOuter, Literal, RegExpExtract}
import org.apache.spark.sql.catalyst.plans.PlanTest
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, DataFrameDropColumns, Generate, GlobalLimit, LocalLimit, Project, Sort}
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, DataFrameDropColumns, Generate, Project}
import org.apache.spark.sql.types.IntegerType

class PPLLogicalPlanFlattenCommandTranslatorTestSuite
Expand Down Expand Up @@ -153,4 +153,45 @@ class PPLLogicalPlanFlattenCommandTranslatorTestSuite
comparePlans(expectedPlan, logPlan, checkAnalysis = false)
}

test("test flatten with one alias") {
Copy link
Member

@YANG-DB YANG-DB Nov 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@qianheng-aws plz add additional flatten with aliases queries unit tests

Copy link
Contributor Author

@qianheng-aws qianheng-aws Nov 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@YANG-DB we already have but named "test flatten with alias list"

val context = new CatalystPlanContext
val logPlan =
planTransformer.visit(
plan(pplParser, "source=relation | flatten field_with_array as col1"),
context)

val relation = UnresolvedRelation(Seq("relation"))
val flattenGenerator = new FlattenGenerator(UnresolvedAttribute("field_with_array"))
val outerGenerator = GeneratorOuter(flattenGenerator)
val generate =
Generate(outerGenerator, seq(), true, None, Seq(UnresolvedAttribute("col1")), relation)
val dropSourceColumn =
DataFrameDropColumns(Seq(UnresolvedAttribute("field_with_array")), generate)
val expectedPlan = Project(seq(UnresolvedStar(None)), dropSourceColumn)
comparePlans(expectedPlan, logPlan, checkAnalysis = false)
}

test("test flatten with alias list") {
val context = new CatalystPlanContext
val logPlan =
planTransformer.visit(
plan(pplParser, "source=relation | flatten field_with_array as (col1, col2)"),
context)

val relation = UnresolvedRelation(Seq("relation"))
val flattenGenerator = new FlattenGenerator(UnresolvedAttribute("field_with_array"))
val outerGenerator = GeneratorOuter(flattenGenerator)
val generate = Generate(
outerGenerator,
seq(),
true,
None,
Seq(UnresolvedAttribute("col1"), UnresolvedAttribute("col2")),
relation)
val dropSourceColumn =
DataFrameDropColumns(Seq(UnresolvedAttribute("field_with_array")), generate)
val expectedPlan = Project(seq(UnresolvedStar(None)), dropSourceColumn)
comparePlans(expectedPlan, logPlan, checkAnalysis = false)
}

}
Loading