From f0e7639d795a90179707a13490bce7bc5dfe0a77 Mon Sep 17 00:00:00 2001 From: YANGDB Date: Thu, 21 Sep 2023 20:19:59 -0700 Subject: [PATCH] continue update according to PR comments Signed-off-by: YANGDB --- README.md | 2 ++ .../sql/ppl/CatalystQueryPlanVisitor.java | 13 ++++----- .../sql/ppl/utils/DataTypeTransformer.java | 13 ++++++++- .../opensearch/sql/ppl/utils/SortUtils.java | 20 ++++++++----- .../sql/ppl/utils/WindowSpecTransformer.java | 29 +++++++++++++++++++ ...ggregationQueriesTranslatorTestSuite.scala | 26 ----------------- 6 files changed, 62 insertions(+), 41 deletions(-) create mode 100644 ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/utils/WindowSpecTransformer.java diff --git a/README.md b/README.md index 030f34b5e..b7ae73683 100644 --- a/README.md +++ b/README.md @@ -4,10 +4,12 @@ OpenSearch Flint is ... It consists of two modules: - `flint-core`: a module that contains Flint specification and client. - `flint-spark-integration`: a module that provides Spark integration for Flint and derived dataset based on it. +- `ppl-spark-integration`: a module that provides PPL query execution on top of Spark See [PPL repository](https://github.com/opensearch-project/piped-processing-language). ## Documentation Please refer to the [Flint Index Reference Manual](./docs/index.md) for more information. +For PPL language see [PPL Reference Manual](https://github.com/opensearch-project/sql/blob/main/docs/user/ppl/index.rst) for more information. ## Prerequisites diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java index 32075bd9a..71e41cc16 100644 --- a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java @@ -11,10 +11,7 @@ import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute$; import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation; import org.apache.spark.sql.catalyst.analysis.UnresolvedStar$; -import org.apache.spark.sql.catalyst.expressions.Divide; import org.apache.spark.sql.catalyst.expressions.Expression; -import org.apache.spark.sql.catalyst.expressions.Floor; -import org.apache.spark.sql.catalyst.expressions.Multiply; import org.apache.spark.sql.catalyst.expressions.NamedExpression; import org.apache.spark.sql.catalyst.expressions.Predicate; import org.apache.spark.sql.catalyst.plans.logical.Aggregate; @@ -69,9 +66,10 @@ import static java.lang.String.format; import static java.util.Collections.singletonList; import static java.util.List.of; +import static org.opensearch.sql.ppl.utils.DataTypeTransformer.seq; import static org.opensearch.sql.ppl.utils.DataTypeTransformer.translate; +import static org.opensearch.sql.ppl.utils.WindowSpecTransformer.window; import static scala.collection.JavaConverters.asScalaBuffer; -import static scala.collection.JavaConverters.asScalaBufferConverter; /** * Utility class to traverse PPL logical plan and translate it into catalyst logical plan @@ -349,7 +347,8 @@ public String visitSpan(Span node, CatalystPlanContext context) { Expression valueExpression = context.getNamedParseExpressions().pop(); Expression fieldExpression = context.getNamedParseExpressions().pop(); - context.getNamedParseExpressions().push(new Multiply(new Floor(new Divide(fieldExpression, valueExpression)), valueExpression)); + + context.getNamedParseExpressions().push(window(fieldExpression,valueExpression,node.getUnit())); return format("span (%s,%s,%s)", field, value, unit); } @@ -406,9 +405,9 @@ public String visitAlias(Alias node, CatalystPlanContext context) { org.apache.spark.sql.catalyst.expressions.Alias$.MODULE$.apply((Expression) expression, node.getAlias()!=null ? node.getAlias() : expr, NamedExpression.newExprId(), - asScalaBufferConverter(new java.util.ArrayList()).asScala().seq(), + seq(new java.util.ArrayList()), Option.empty(), - asScalaBufferConverter(new java.util.ArrayList()).asScala().seq())); + seq(new java.util.ArrayList()))); return format("%s", expr); } diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/utils/DataTypeTransformer.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/utils/DataTypeTransformer.java index ee6ec0bb1..f0369ae69 100644 --- a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/utils/DataTypeTransformer.java +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/utils/DataTypeTransformer.java @@ -8,16 +8,27 @@ import org.apache.spark.sql.types.ByteType$; import org.apache.spark.sql.types.DataType; -import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.DateType$; import org.apache.spark.sql.types.IntegerType$; import org.apache.spark.sql.types.StringType$; import org.apache.spark.unsafe.types.UTF8String; +import scala.collection.mutable.Seq; + +import java.util.List; + +import static scala.collection.JavaConverters.asScalaBufferConverter; /** * translate the PPL ast expressions data-types into catalyst data-types */ public interface DataTypeTransformer { + static Seq seq(T element) { + return seq(List.of(element)); + } + static Seq seq(List list) { + return asScalaBufferConverter(list).asScala().seq(); + } + static DataType translate(org.opensearch.sql.ast.expression.DataType source) { switch (source.getCoreType()) { case TIME: diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/utils/SortUtils.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/utils/SortUtils.java index f3f2311b6..17ed3fa94 100644 --- a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/utils/SortUtils.java +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/utils/SortUtils.java @@ -10,13 +10,14 @@ import org.apache.spark.sql.catalyst.expressions.Expression; import org.apache.spark.sql.catalyst.expressions.NamedExpression; import org.apache.spark.sql.catalyst.expressions.SortOrder; +import org.jetbrains.annotations.NotNull; import org.opensearch.sql.ast.expression.Field; import org.opensearch.sql.ast.tree.Sort; import java.util.ArrayList; import java.util.Optional; -import static scala.collection.JavaConverters.asScalaBufferConverter; +import static org.opensearch.sql.ppl.utils.DataTypeTransformer.seq; /** * Utility interface for sorting operations. @@ -37,13 +38,18 @@ static SortOrder getSortDirection(Sort node, NamedExpression expression) { .findAny(); if(field.isPresent()) { - return new SortOrder( - (Expression) expression, - (Boolean)field.get().getFieldArgs().get(0).getValue().getValue() ? Ascending$.MODULE$ : Descending$.MODULE$, - (Boolean)field.get().getFieldArgs().get(0).getValue().getValue() ? Ascending$.MODULE$.defaultNullOrdering() : Descending$.MODULE$.defaultNullOrdering(), - asScalaBufferConverter(new ArrayList()).asScala().seq() - ); + return sortOrder((Expression) expression, (Boolean) field.get().getFieldArgs().get(0).getValue().getValue()); } return null; } + + @NotNull + static SortOrder sortOrder(Expression expression, boolean ascending) { + return new SortOrder( + expression, + ascending ? Ascending$.MODULE$ : Descending$.MODULE$, + ascending ? Ascending$.MODULE$.defaultNullOrdering() : Descending$.MODULE$.defaultNullOrdering(), + seq(new ArrayList()) + ); + } } \ No newline at end of file diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/utils/WindowSpecTransformer.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/utils/WindowSpecTransformer.java new file mode 100644 index 000000000..5fb1c1942 --- /dev/null +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/utils/WindowSpecTransformer.java @@ -0,0 +1,29 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.ppl.utils; + +import org.apache.spark.sql.catalyst.expressions.Divide; +import org.apache.spark.sql.catalyst.expressions.Expression; +import org.apache.spark.sql.catalyst.expressions.Floor; +import org.apache.spark.sql.catalyst.expressions.Multiply; +import org.opensearch.sql.ast.expression.SpanUnit; + +public interface WindowSpecTransformer { + + /** + * create a static window buckets based on the given value + * + * @param fieldExpression + * @param valueExpression + * @param unit + * @return + */ + static Expression window(Expression fieldExpression, Expression valueExpression, SpanUnit unit) { + // todo check can WindowSpec provide the same functionality as below + // todo for time unit - use TimeWindowSpec if possible + return new Multiply(new Floor(new Divide(fieldExpression, valueExpression)), valueExpression); + } +} diff --git a/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanAggregationQueriesTranslatorTestSuite.scala b/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanAggregationQueriesTranslatorTestSuite.scala index 82aa33f7a..e45f30c6c 100644 --- a/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanAggregationQueriesTranslatorTestSuite.scala +++ b/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanAggregationQueriesTranslatorTestSuite.scala @@ -153,32 +153,6 @@ class PPLLogicalPlanAggregationQueriesTranslatorTestSuite "source=[table] | where country = 'USA' | stats avg(price) by product | sort product | fields + *") assertEquals(compareByString(sortedPlan), compareByString(context.getPlan)) } - - ignore("test average price group by product over a time window") { - // if successful build ppl logical plan and translate to catalyst logical plan - val context = new CatalystPlanContext - val logPlan = planTrnasformer.visit( - plan(pplParser, "source = table | stats avg(price) by span( request_time , 15m) ", false), - context) - // SQL: SELECT product, AVG(price) AS avg_price FROM table GROUP BY product - val star = Seq(UnresolvedStar(None)) - val productField = UnresolvedAttribute("product") - val priceField = UnresolvedAttribute("price") - val tableRelation = UnresolvedRelation(Seq("table")) - - val groupByAttributes = Seq(Alias(productField, "product")()) - val aggregateExpressions = - Alias(UnresolvedFunction(Seq("AVG"), Seq(priceField), isDistinct = false), "avg(price)")() - val productAlias = Alias(productField, "product")() - - val aggregatePlan = - Aggregate(groupByAttributes, Seq(aggregateExpressions, productAlias), tableRelation) - val expectedPlan = Project(star, aggregatePlan) - - assertEquals(logPlan, "source=[table] | stats avg(price) by product | fields + *") - assertEquals(compareByString(expectedPlan), compareByString(context.getPlan)) - } - test("create ppl simple avg age by span of interval of 10 years query test ") { val context = new CatalystPlanContext val logPlan = planTrnasformer.visit(