From 07529ea0d6d29cf55cdadca1928cb8a693fa4de5 Mon Sep 17 00:00:00 2001 From: YANGDB Date: Tue, 26 Sep 2023 14:11:17 -0700 Subject: [PATCH] minor refactory & package movement Signed-off-by: YANGDB --- ...ntSparkPPLAggregationWithSpanITSuite.scala | 4 +++- .../FlintSparkPPLAggregationsITSuite.scala | 6 ++++-- .../FlintSparkPPLFiltersITSuite.scala | 6 ++++-- .../{ => ppl}/FlintSparkPPLITSuite.scala | 4 +++- .../FlintSparkPPLTimeWindowITSuite.scala | 4 +++- .../sql/ppl/CatalystQueryPlanVisitor.java | 19 ++++++------------- 6 files changed, 23 insertions(+), 20 deletions(-) rename integ-test/src/test/scala/org/opensearch/flint/spark/{ => ppl}/FlintSparkPPLAggregationWithSpanITSuite.scala (99%) rename integ-test/src/test/scala/org/opensearch/flint/spark/{ => ppl}/FlintSparkPPLAggregationsITSuite.scala (98%) rename integ-test/src/test/scala/org/opensearch/flint/spark/{ => ppl}/FlintSparkPPLFiltersITSuite.scala (99%) rename integ-test/src/test/scala/org/opensearch/flint/spark/{ => ppl}/FlintSparkPPLITSuite.scala (98%) rename integ-test/src/test/scala/org/opensearch/flint/spark/{ => ppl}/FlintSparkPPLTimeWindowITSuite.scala (99%) diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkPPLAggregationWithSpanITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLAggregationWithSpanITSuite.scala similarity index 99% rename from integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkPPLAggregationWithSpanITSuite.scala rename to integ-test/src/test/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLAggregationWithSpanITSuite.scala index ca405b1dd..3e66c16ea 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkPPLAggregationWithSpanITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLAggregationWithSpanITSuite.scala @@ -3,7 +3,9 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.flint.spark +package org.opensearch.flint.spark.ppl + +import org.opensearch.flint.spark.FlintPPLSuite import org.apache.spark.sql.{QueryTest, Row} import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedFunction, UnresolvedRelation, UnresolvedStar} diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkPPLAggregationsITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLAggregationsITSuite.scala similarity index 98% rename from integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkPPLAggregationsITSuite.scala rename to integ-test/src/test/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLAggregationsITSuite.scala index ab9cc07d5..9a3fa0c7b 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkPPLAggregationsITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLAggregationsITSuite.scala @@ -3,11 +3,13 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.flint.spark +package org.opensearch.flint.spark.ppl + +import org.opensearch.flint.spark.FlintPPLSuite import org.apache.spark.sql.{QueryTest, Row} import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedFunction, UnresolvedRelation, UnresolvedStar} -import org.apache.spark.sql.catalyst.expressions.{Alias, And, Ascending, Descending, Divide, EqualTo, Floor, GreaterThan, LessThan, LessThanOrEqual, Literal, Multiply, Not, Or, SortOrder} +import org.apache.spark.sql.catalyst.expressions.{Alias, Ascending, EqualTo, LessThan, Literal, Not, SortOrder} import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.streaming.StreamTest diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkPPLFiltersITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLFiltersITSuite.scala similarity index 99% rename from integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkPPLFiltersITSuite.scala rename to integ-test/src/test/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLFiltersITSuite.scala index 4d03d4f16..be4d693a1 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkPPLFiltersITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLFiltersITSuite.scala @@ -3,11 +3,13 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.flint.spark +package org.opensearch.flint.spark.ppl + +import org.opensearch.flint.spark.FlintPPLSuite import org.apache.spark.sql.{QueryTest, Row} import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedFunction, UnresolvedRelation, UnresolvedStar} -import org.apache.spark.sql.catalyst.expressions.{Alias, And, Ascending, Descending, Divide, EqualTo, Floor, GreaterThan, LessThan, LessThanOrEqual, Literal, Multiply, Not, Or, SortOrder} +import org.apache.spark.sql.catalyst.expressions.{Alias, And, Ascending, Descending, Divide, EqualTo, Floor, GreaterThan, LessThanOrEqual, Literal, Multiply, Not, Or, SortOrder} import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.streaming.StreamTest diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkPPLITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLITSuite.scala similarity index 98% rename from integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkPPLITSuite.scala rename to integ-test/src/test/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLITSuite.scala index 4079e512e..cbe8c1731 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkPPLITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLITSuite.scala @@ -3,7 +3,9 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.flint.spark +package org.opensearch.flint.spark.ppl + +import org.opensearch.flint.spark.FlintPPLSuite import org.apache.spark.sql.{QueryTest, Row} import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedRelation, UnresolvedStar} diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkPPLTimeWindowITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLTimeWindowITSuite.scala similarity index 99% rename from integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkPPLTimeWindowITSuite.scala rename to integ-test/src/test/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLTimeWindowITSuite.scala index d1c420091..79ef4e014 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkPPLTimeWindowITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLTimeWindowITSuite.scala @@ -3,10 +3,12 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.flint.spark +package org.opensearch.flint.spark.ppl import java.sql.Timestamp +import org.opensearch.flint.spark.FlintPPLSuite + import org.apache.spark.sql.{QueryTest, Row} import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedFunction, UnresolvedRelation, UnresolvedStar} import org.apache.spark.sql.catalyst.expressions.{Alias, Ascending, Divide, Floor, GenericRowWithSchema, Literal, Multiply, SortOrder, TimeWindow} diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java index 210bfeeae..b5f381dd0 100644 --- a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java @@ -124,27 +124,20 @@ public LogicalPlan visitAggregation(Aggregation node, CatalystPlanContext contex if (!groupExpList.isEmpty()) { //add group by fields to context - extractedGroupBy(node.getGroupExprList().size(), context); + context.getGroupingParseExpressions().addAll(groupExpList); } UnresolvedExpression span = node.getSpan(); if (!Objects.isNull(span)) { span.accept(this, context); - //add span's group by field to context - extractedGroupBy(1, context); + //add span's group alias field (most recent added expression) + context.getGroupingParseExpressions().add(context.getNamedParseExpressions().peek()); } // build the aggregation logical step extractedAggregation(context); return child; } - - private static void extractedGroupBy(int groupByElementsCount, CatalystPlanContext context) { - //copy the group by aliases from the namedExpressionList to the groupByExpressionList - for (int i = 1; i <= groupByElementsCount; i++) { - context.getGroupingParseExpressions().add(context.getNamedParseExpressions().get(context.getNamedParseExpressions().size() - i)); - } - } - + private static void extractedAggregation(CatalystPlanContext context) { Seq groupingExpression = context.retainAllGroupingNamedParseExpressions(p -> p); Seq aggregateExpressions = context.retainAllNamedParseExpressions(p -> (NamedExpression) p); @@ -160,7 +153,7 @@ public LogicalPlan visitAlias(Alias node, CatalystPlanContext context) { @Override public LogicalPlan visitProject(Project node, CatalystPlanContext context) { LogicalPlan child = node.getChild().get(0).accept(this, context); - visitExpressionList(node.getProjectList(), context); + List expressionList = visitExpressionList(node.getProjectList(), context); // Create a projection list from the existing expressions Seq projectList = seq(context.getNamedParseExpressions()); @@ -171,7 +164,7 @@ public LogicalPlan visitProject(Project node, CatalystPlanContext context) { } if (node.hasArgument()) { Argument argument = node.getArgExprList().get(0); - //todo exclude the argument from the projected aruments list + //todo exclude the argument from the projected arguments list } return child; }