-
Notifications
You must be signed in to change notification settings - Fork 33
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Translate PPL dedup
Command Part 2: allowedDuplication>1
#543
Changes from 3 commits
548a0a4
52da2f2
0f0bdf2
bfa6e01
772a400
ec0efe0
9ef0b3c
c2a6394
b187a12
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -10,10 +10,12 @@ | |
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation; | ||
import org.apache.spark.sql.catalyst.analysis.UnresolvedStar$; | ||
import org.apache.spark.sql.catalyst.expressions.Expression; | ||
import org.apache.spark.sql.catalyst.expressions.LessThanOrEqual; | ||
import org.apache.spark.sql.catalyst.expressions.NamedExpression; | ||
import org.apache.spark.sql.catalyst.expressions.Predicate; | ||
import org.apache.spark.sql.catalyst.expressions.SortOrder; | ||
import org.apache.spark.sql.catalyst.plans.logical.Aggregate; | ||
import org.apache.spark.sql.catalyst.plans.logical.DataFrameDropColumns; | ||
import org.apache.spark.sql.catalyst.plans.logical.Deduplicate; | ||
import org.apache.spark.sql.catalyst.plans.logical.DescribeRelation$; | ||
import org.apache.spark.sql.catalyst.plans.logical.Limit; | ||
|
@@ -64,6 +66,7 @@ | |
import org.opensearch.sql.ppl.utils.BuiltinFunctionTranslator; | ||
import org.opensearch.sql.ppl.utils.ComparatorTransformer; | ||
import org.opensearch.sql.ppl.utils.SortUtils; | ||
import org.opensearch.sql.ppl.utils.WindowSpecTransformer; | ||
import scala.Option; | ||
import scala.Option$; | ||
import scala.collection.Seq; | ||
|
@@ -318,13 +321,14 @@ public LogicalPlan visitDedupe(Dedupe node, CatalystPlanContext context) { | |
// adding Aggregate operator could achieve better performance. | ||
if (allowedDuplication == 1) { | ||
if (keepEmpty) { | ||
// | dedup a, b keepempty=true | ||
// Union | ||
// :- Deduplicate ['a, 'b] | ||
// : +- Filter (isnotnull('a) AND isnotnull('b) | ||
// : +- Project | ||
// : +- ... | ||
// : +- UnresolvedRelation | ||
// +- Filter (isnull('a) OR isnull('a)) | ||
// +- Project | ||
// +- ... | ||
// +- UnresolvedRelation | ||
|
||
context.apply(p -> { | ||
|
@@ -339,9 +343,10 @@ public LogicalPlan visitDedupe(Dedupe node, CatalystPlanContext context) { | |
}); | ||
return context.getPlan(); | ||
} else { | ||
// | dedup a, b keepempty=false | ||
// Deduplicate ['a, 'b] | ||
// +- Filter (isnotnull('a) AND isnotnull('b)) | ||
// +- Project | ||
// +- ... | ||
// +- UnresolvedRelation | ||
|
||
Expression isNotNullExpr = buildIsNotNullFilterExpression(node, context); | ||
|
@@ -350,8 +355,86 @@ public LogicalPlan visitDedupe(Dedupe node, CatalystPlanContext context) { | |
return context.apply(p -> new Deduplicate(dedupFields, p)); | ||
} | ||
} else { | ||
// TODO | ||
throw new UnsupportedOperationException("Number of duplicate events greater than 1 is not supported"); | ||
if (keepEmpty) { | ||
// | dedup 2 a, b keepempty=true | ||
// Union | ||
//:- DataFrameDropColumns('_row_number_) | ||
//: +- Filter ('_row_number_ <= 2) | ||
//: +- Window [row_number() windowspecdefinition('a, 'b, 'a ASC NULLS FIRST, 'b ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _row_number_], ['a, 'b], ['a ASC NULLS FIRST, 'b ASC NULLS FIRST] | ||
//: +- Filter (isnotnull('a) AND isnotnull('b)) | ||
//: +- ... | ||
//: +- UnresolvedRelation | ||
//+- Filter (isnull('a) OR isnull('b)) | ||
// +- ... | ||
// +- UnresolvedRelation | ||
|
||
context.apply(p -> { | ||
// Build isnull Filter for right | ||
Expression isNullExpr = buildIsNullFilterExpression(node, context); | ||
LogicalPlan right = new org.apache.spark.sql.catalyst.plans.logical.Filter(isNullExpr, p); | ||
|
||
// Build isnotnull Filter | ||
Expression isNotNullExpr = buildIsNotNullFilterExpression(node, context); | ||
LogicalPlan isNotNullFilter = new org.apache.spark.sql.catalyst.plans.logical.Filter(isNotNullExpr, p); | ||
|
||
// Build Window | ||
visitFieldList(node.getFields(), context); | ||
Seq<Expression> partitionSpec = context.retainAllNamedParseExpressions(exp -> exp); | ||
visitFieldList(node.getFields(), context); | ||
Seq<SortOrder> orderSpec = context.retainAllNamedParseExpressions(exp -> SortUtils.sortOrder(exp, true)); | ||
NamedExpression rowNumber = WindowSpecTransformer.buildRowNumber(partitionSpec, orderSpec); | ||
LogicalPlan window = new org.apache.spark.sql.catalyst.plans.logical.Window( | ||
seq(rowNumber), | ||
partitionSpec, | ||
orderSpec, | ||
isNotNullFilter); | ||
|
||
// Build deduplication Filter ('_row_number_ <= n) | ||
Expression filterExpr = new LessThanOrEqual( | ||
rowNumber.toAttribute(), | ||
new org.apache.spark.sql.catalyst.expressions.Literal(allowedDuplication, DataTypes.IntegerType)); | ||
LogicalPlan deduplicationFilter = new org.apache.spark.sql.catalyst.plans.logical.Filter(filterExpr, window); | ||
|
||
// Build DataFrameDropColumns('_row_number_) for left | ||
LogicalPlan left = new DataFrameDropColumns(seq(rowNumber.toAttribute()), deduplicationFilter); | ||
|
||
// Build Union | ||
return new Union(seq(left, right), false, false); | ||
}); | ||
return context.getPlan(); | ||
} else { | ||
// | dedup 2 a, b keepempty=false | ||
// DataFrameDropColumns('row_number_col) | ||
// +- Filter ('_row_number_ <= n) | ||
// +- Window [row_number() windowspecdefinition('a, 'b, 'a ASC NULLS FIRST, 'b ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _row_number_], ['a, 'b], ['a ASC NULLS FIRST, 'b ASC NULLS FIRST] | ||
// +- Filter (isnotnull('a) AND isnotnull('b)) | ||
// +- ... | ||
// +- UnresolvedRelation | ||
|
||
// Build isnotnull Filter | ||
Expression isNotNullExpr = buildIsNotNullFilterExpression(node, context); | ||
context.apply(p -> new org.apache.spark.sql.catalyst.plans.logical.Filter(isNotNullExpr, p)); | ||
|
||
// Build Window | ||
visitFieldList(node.getFields(), context); | ||
Seq<Expression> partitionSpec = context.retainAllNamedParseExpressions(exp -> exp); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. does the order of the dedup fields keeped? for instance, There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes. the original order will be kept. I added several tests in IT to verify this case. |
||
visitFieldList(node.getFields(), context); | ||
Seq<SortOrder> orderSpec = context.retainAllNamedParseExpressions(exp -> SortUtils.sortOrder(exp, true)); | ||
NamedExpression rowNumber = WindowSpecTransformer.buildRowNumber(partitionSpec, orderSpec); | ||
context.apply(p -> new org.apache.spark.sql.catalyst.plans.logical.Window( | ||
seq(rowNumber), | ||
partitionSpec, | ||
orderSpec, p)); | ||
|
||
// Build deduplication Filter ('_row_number_ <= n) | ||
Expression filterExpr = new LessThanOrEqual( | ||
rowNumber.toAttribute(), | ||
new org.apache.spark.sql.catalyst.expressions.Literal(allowedDuplication, DataTypes.IntegerType)); | ||
context.apply(p -> new org.apache.spark.sql.catalyst.plans.logical.Filter(filterExpr, p)); | ||
|
||
// Build DataFrameDropColumns('_row_number_) Spark 3.5.1+ required | ||
return context.apply(p -> new DataFrameDropColumns(seq(rowNumber.toAttribute()), p)); | ||
} | ||
} | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi
visitDedup(...)
is too large (>136 lines) and needs to be extracted to a dedicatedDedupUtils
class or similarwe should try to reduce CatalystQueryPlanVisitor only to be the command pattern consolidator and move the actual business code away into dedicated strategy (commands)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done