From 0128e2bc8610379d68db70d6d201d6ede1896295 Mon Sep 17 00:00:00 2001 From: YANGDB Date: Wed, 14 Aug 2024 16:03:57 -0700 Subject: [PATCH] Adding support for Rare & Top PPL top [N] [by-clause] N: number of results to return. Default: 10 field-list: mandatory. comma-delimited list of field names. by-clause: optional. one or more fields to group the results by. ------------------------------------------------------------------------------------------- rare [by-clause] field-list: mandatory. comma-delimited list of field names. by-clause: optional. one or more fields to group the results by. ------------------------------------------------------------------------------------------- commands: - https://github.com/opensearch-project/opensearch-spark/issues/461 - https://github.com/opensearch-project/opensearch-spark/issues/536 Signed-off-by: YANGDB --- .../sql/ppl/CatalystQueryPlanVisitor.java | 35 ++++++++++--------- 1 file changed, 18 insertions(+), 17 deletions(-) diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java index 6ac9d3a34..7eba00c94 100644 --- a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java @@ -184,22 +184,7 @@ public LogicalPlan visitAggregation(Aggregation node, CatalystPlanContext contex //add group by fields to context context.getGroupingParseExpressions().addAll(groupExpList); } - - // set sort direction according to command type - List sortDirections = new ArrayList<>(); - if (node instanceof RareAggregation) { - sortDirections.add(Ascending$.MODULE$); - } else if(node instanceof TopAggregation) { - sortDirections.add(Descending$.MODULE$); - } - if (!sortExpList.isEmpty()) { - visitExpressionList(node.getSortExprList(), context); - Seq sortElements = context.retainAllNamedParseExpressions(exp -> - new SortOrder((NamedExpression) exp, sortDirections.get(0) , sortDirections.get(0).defaultNullOrdering(), seq(new ArrayList()))); - context.apply(p -> (LogicalPlan) new org.apache.spark.sql.catalyst.plans.logical.Sort(sortElements, true, p)); - } - UnresolvedExpression span = node.getSpan(); if (!Objects.isNull(span)) { span.accept(this, context); @@ -207,8 +192,24 @@ public LogicalPlan visitAggregation(Aggregation node, CatalystPlanContext contex context.getGroupingParseExpressions().add(context.getNamedParseExpressions().peek()); } // build the aggregation logical step - return extractedAggregation(context); -} +// context.apply(p -> extractedAggregation(context)); TODO remove + LogicalPlan logicalPlan = extractedAggregation(context); + + // set sort direction according to command type (`rare` is Asc, `top` is Desc, default to Asc) + List sortDirections = new ArrayList<>(); + sortDirections.add(node instanceof RareAggregation ? Ascending$.MODULE$ : node instanceof TopAggregation ? Descending$.MODULE$ : Ascending$.MODULE$); + + if (!sortExpList.isEmpty()) { + visitExpressionList(node.getSortExprList(), context); + Seq sortElements = context.retainAllNamedParseExpressions(exp -> + new SortOrder(exp, + sortDirections.get(0), + sortDirections.get(0).defaultNullOrdering(), + seq(new ArrayList()))); + context.apply(p -> new org.apache.spark.sql.catalyst.plans.logical.Sort(sortElements, true, logicalPlan)); + } + return logicalPlan; + } private static LogicalPlan extractedAggregation(CatalystPlanContext context) { Seq groupingExpression = context.retainAllGroupingNamedParseExpressions(p -> p);