From 1664dc9727e7d6af7a70267158825a2974cc6ce4 Mon Sep 17 00:00:00 2001 From: YANGDB Date: Wed, 14 Aug 2024 12:12:29 -0700 Subject: [PATCH] Adding support for Rare & Top PPL top [N] [by-clause] N: number of results to return. Default: 10 field-list: mandatory. comma-delimited list of field names. by-clause: optional. one or more fields to group the results by. ------------------------------------------------------------------------------------------- rare [by-clause] field-list: mandatory. comma-delimited list of field names. by-clause: optional. one or more fields to group the results by. ------------------------------------------------------------------------------------------- commands: - https://github.com/opensearch-project/opensearch-spark/issues/461 - https://github.com/opensearch-project/opensearch-spark/issues/536 Signed-off-by: YANGDB --- .../ppl/FlintSparkPPLTopAndRareITSuite.scala | 187 ++++++++++++++++++ .../src/main/antlr4/OpenSearchPPLParser.g4 | 2 + .../opensearch/sql/ppl/parser/AstBuilder.java | 48 +++-- ...TopAndRareQueriesTranslatorTestSuite.scala | 177 +++++++++++++++++ 4 files changed, 402 insertions(+), 12 deletions(-) create mode 100644 integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLTopAndRareITSuite.scala create mode 100644 ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanTopAndRareQueriesTranslatorTestSuite.scala diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLTopAndRareITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLTopAndRareITSuite.scala new file mode 100644 index 000000000..2418ecc14 --- /dev/null +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLTopAndRareITSuite.scala @@ -0,0 +1,187 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.spark.ppl + +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedRelation, UnresolvedStar} +import org.apache.spark.sql.catalyst.expressions.{Ascending, Literal, SortOrder} +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.execution.command.DescribeTableCommand +import org.apache.spark.sql.streaming.StreamTest +import org.apache.spark.sql.{QueryTest, Row} + +class FlintSparkPPLTopAndRareITSuite + extends QueryTest + with LogicalPlanTestUtils + with FlintPPLSuite + with StreamTest { + + /** Test table and index name */ + private val testTable = "spark_catalog.default.flint_ppl_test" + + override def beforeAll(): Unit = { + super.beforeAll() + + // Create test table + createPartitionedMultiRowAddressTable(testTable) + } + + protected override def afterEach(): Unit = { + super.afterEach() + // Stop all streaming jobs if any + spark.streams.active.foreach { job => + job.stop() + job.awaitTermination() + } + } + + test("create ppl rare address field query test") { + val frame = sql(s""" + | source = $testTable| rare address" + | """.stripMargin) + + // Retrieve the results + val results: Array[Row] = frame.collect() + assert(results.length == 2) + + // Retrieve the logical plan + val logicalPlan: LogicalPlan = frame.queryExecution.logical + // Define the expected logical plan + val limitPlan: LogicalPlan = + Limit(Literal(2), UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test"))) + val expectedPlan = Project(Seq(UnresolvedStar(None)), limitPlan) + + // Compare the two plans + assert(compareByString(expectedPlan) === compareByString(logicalPlan)) + } + + test("create ppl simple query with head (limit) and sorted test") { + val frame = sql(s""" + | source = $testTable| sort name | head 2 + | """.stripMargin) + + // Retrieve the results + val results: Array[Row] = frame.collect() + assert(results.length == 2) + + // Retrieve the logical plan + val logicalPlan: LogicalPlan = frame.queryExecution.logical + + val sortedPlan: LogicalPlan = + Sort( + Seq(SortOrder(UnresolvedAttribute("name"), Ascending)), + global = true, + UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test"))) + + // Define the expected logical plan + val expectedPlan: LogicalPlan = + Project(Seq(UnresolvedStar(None)), Limit(Literal(2), sortedPlan)) + + // Compare the two plans + assert(compareByString(expectedPlan) === compareByString(logicalPlan)) + } + + test("create ppl simple query two with fields result test") { + val frame = sql(s""" + | source = $testTable| fields name, age + | """.stripMargin) + + // Retrieve the results + val results: Array[Row] = frame.collect() + // Define the expected results + val expectedResults: Array[Row] = + Array(Row("Jake", 70), Row("Hello", 30), Row("John", 25), Row("Jane", 20)) + // Compare the results + implicit val rowOrdering: Ordering[Row] = Ordering.by[Row, String](_.getAs[String](0)) + assert(results.sorted.sameElements(expectedResults.sorted)) + + // Retrieve the logical plan + val logicalPlan: LogicalPlan = frame.queryExecution.logical + // Define the expected logical plan + val expectedPlan: LogicalPlan = Project( + Seq(UnresolvedAttribute("name"), UnresolvedAttribute("age")), + UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test"))) + // Compare the two plans + assert(expectedPlan === logicalPlan) + } + + test("create ppl simple sorted query two with fields result test sorted") { + val frame = sql(s""" + | source = $testTable| sort age | fields name, age + | """.stripMargin) + + // Retrieve the results + val results: Array[Row] = frame.collect() + // Define the expected results + val expectedResults: Array[Row] = + Array(Row("Jane", 20), Row("John", 25), Row("Hello", 30), Row("Jake", 70)) + assert(results === expectedResults) + + // Retrieve the logical plan + val logicalPlan: LogicalPlan = frame.queryExecution.logical + + val sortedPlan: LogicalPlan = + Sort( + Seq(SortOrder(UnresolvedAttribute("age"), Ascending)), + global = true, + UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test"))) + + // Define the expected logical plan + val expectedPlan: LogicalPlan = + Project(Seq(UnresolvedAttribute("name"), UnresolvedAttribute("age")), sortedPlan) + + // Compare the two plans + assert(compareByString(expectedPlan) === compareByString(logicalPlan)) + } + + test("create ppl simple query two with fields and head (limit) test") { + val frame = sql(s""" + | source = $testTable| fields name, age | head 1 + | """.stripMargin) + + // Retrieve the results + val results: Array[Row] = frame.collect() + assert(results.length == 1) + + // Retrieve the logical plan + val logicalPlan: LogicalPlan = frame.queryExecution.logical + val project = Project( + Seq(UnresolvedAttribute("name"), UnresolvedAttribute("age")), + UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test"))) + // Define the expected logical plan + val limitPlan: LogicalPlan = Limit(Literal(1), project) + val expectedPlan: LogicalPlan = Project(Seq(UnresolvedStar(None)), limitPlan) + // Compare the two plans + assert(compareByString(expectedPlan) === compareByString(logicalPlan)) + } + + test("create ppl simple query two with fields and head (limit) with sorting test") { + Seq(("name, age", "age"), ("`name`, `age`", "`age`")).foreach { + case (selectFields, sortField) => + val frame = sql(s""" + | source = $testTable| fields $selectFields | head 1 | sort $sortField + | """.stripMargin) + + // Retrieve the results + val results: Array[Row] = frame.collect() + assert(results.length == 1) + + // Retrieve the logical plan + val logicalPlan: LogicalPlan = frame.queryExecution.logical + val project = Project( + Seq(UnresolvedAttribute("name"), UnresolvedAttribute("age")), + UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test"))) + // Define the expected logical plan + val limitPlan: LogicalPlan = Limit(Literal(1), project) + val sortedPlan: LogicalPlan = + Sort(Seq(SortOrder(UnresolvedAttribute("age"), Ascending)), global = true, limitPlan) + + val expectedPlan = Project(Seq(UnresolvedStar(None)), sortedPlan) + // Compare the two plans + assert(compareByString(expectedPlan) === compareByString(logicalPlan)) + } + } +} diff --git a/ppl-spark-integration/src/main/antlr4/OpenSearchPPLParser.g4 b/ppl-spark-integration/src/main/antlr4/OpenSearchPPLParser.g4 index 19d480327..f750e99ff 100644 --- a/ppl-spark-integration/src/main/antlr4/OpenSearchPPLParser.g4 +++ b/ppl-spark-integration/src/main/antlr4/OpenSearchPPLParser.g4 @@ -38,6 +38,8 @@ commands | dedupCommand | sortCommand | headCommand + | topCommand + | rareCommand | evalCommand ; diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java index e94d4e0f4..2c0cf163c 100644 --- a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java @@ -12,6 +12,7 @@ import org.antlr.v4.runtime.tree.ParseTree; import org.opensearch.flint.spark.ppl.OpenSearchPPLParser; import org.opensearch.flint.spark.ppl.OpenSearchPPLParserBaseVisitor; +import org.opensearch.sql.ast.expression.AggregateFunction; import org.opensearch.sql.ast.expression.Alias; import org.opensearch.sql.ast.expression.DataType; import org.opensearch.sql.ast.expression.Field; @@ -236,12 +237,6 @@ private List getFieldList(OpenSearchPPLParser.FieldListContext ctx) { .collect(Collectors.toList()); } - /** Rare command. */ - @Override - public UnresolvedPlan visitRareCommand(OpenSearchPPLParser.RareCommandContext ctx) { - throw new RuntimeException("Rare Command is not supported "); - } - @Override public UnresolvedPlan visitGrokCommand(OpenSearchPPLParser.GrokCommandContext ctx) { UnresolvedExpression sourceField = internalVisitExpression(ctx.source_field); @@ -278,13 +273,42 @@ public UnresolvedPlan visitPatternsCommand(OpenSearchPPLParser.PatternsCommandCo /** Top command. */ @Override public UnresolvedPlan visitTopCommand(OpenSearchPPLParser.TopCommandContext ctx) { + + } + + /** Rare command. */ + @Override + public UnresolvedPlan visitRareCommand(OpenSearchPPLParser.RareCommandContext ctx) { + ImmutableList.Builder aggListBuilder = new ImmutableList.Builder<>(); + ctx.fieldList().fieldExpression().forEach(field -> { + UnresolvedExpression aggExpression = new AggregateFunction("count",internalVisitExpression(field)); + String name = field.qualifiedName().getText(); + Alias alias = new Alias(name, aggExpression); + aggListBuilder.add(alias); + }); List groupList = - ctx.byClause() == null ? emptyList() : getGroupByList(ctx.byClause()); - return new RareTopN( - RareTopN.CommandType.TOP, - ArgumentFactory.getArgumentList(ctx), - getFieldList(ctx.fieldList()), - groupList); + Optional.ofNullable(ctx.byClause()) + .map(OpenSearchPPLParser.ByClauseContext::fieldList) + .map( + expr -> + expr.fieldExpression().stream() + .map( + groupCtx -> + (UnresolvedExpression) + new Alias( + getTextInQuery(groupCtx), + internalVisitExpression(groupCtx))) + .collect(Collectors.toList())) + .orElse(emptyList()); + + Aggregation aggregation = + new Aggregation( + aggListBuilder.build(), + emptyList(), + groupList, + null, + ArgumentFactory.getArgumentList(ctx)); + return aggregation; } /** From clause. */ diff --git a/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanTopAndRareQueriesTranslatorTestSuite.scala b/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanTopAndRareQueriesTranslatorTestSuite.scala new file mode 100644 index 000000000..f33cc3000 --- /dev/null +++ b/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanTopAndRareQueriesTranslatorTestSuite.scala @@ -0,0 +1,177 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.spark.ppl + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedRelation, UnresolvedStar} +import org.apache.spark.sql.catalyst.expressions.{Ascending, Descending, Literal, NamedExpression, SortOrder} +import org.apache.spark.sql.catalyst.plans.PlanTest +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.execution.command.DescribeTableCommand +import org.opensearch.flint.spark.ppl.PlaneUtils.plan +import org.opensearch.sql.ppl.{CatalystPlanContext, CatalystQueryPlanVisitor} +import org.scalatest.matchers.should.Matchers + +class PPLLogicalPlanTopAndRareQueriesTranslatorTestSuite + extends SparkFunSuite + with PlanTest + with LogicalPlanTestUtils + with Matchers { + + private val planTransformer = new CatalystQueryPlanVisitor() + private val pplParser = new PPLSyntaxParser() + + test("test simple rare command with a single field") { + // if successful build ppl logical plan and translate to catalyst logical plan + val context = new CatalystPlanContext + val logPlan = planTransformer.visit(plan(pplParser, "source=accounts | rare gender", false), context) + + val projectList: Seq[NamedExpression] = Seq(UnresolvedStar(None)) + val expectedPlan = Project(projectList, UnresolvedRelation(Seq("accounts"))) + comparePlans(expectedPlan, logPlan, false) + } + + test("test simple search with escaped table name") { + // if successful build ppl logical plan and translate to catalyst logical plan + val context = new CatalystPlanContext + val logPlan = planTransformer.visit(plan(pplParser, "source=`table`", false), context) + + val projectList: Seq[NamedExpression] = Seq(UnresolvedStar(None)) + val expectedPlan = Project(projectList, UnresolvedRelation(Seq("table"))) + comparePlans(expectedPlan, logPlan, false) + } + + test("test simple search with schema.table and no explicit fields (defaults to all fields)") { + // if successful build ppl logical plan and translate to catalyst logical plan + val context = new CatalystPlanContext + val logPlan = planTransformer.visit(plan(pplParser, "source=schema.table", false), context) + + val projectList: Seq[NamedExpression] = Seq(UnresolvedStar(None)) + val expectedPlan = Project(projectList, UnresolvedRelation(Seq("schema", "table"))) + comparePlans(expectedPlan, logPlan, false) + + } + + test("test simple search with schema.table and one field projected") { + val context = new CatalystPlanContext + val logPlan = + planTransformer.visit(plan(pplParser, "source=schema.table | fields A", false), context) + + val projectList: Seq[NamedExpression] = Seq(UnresolvedAttribute("A")) + val expectedPlan = Project(projectList, UnresolvedRelation(Seq("schema", "table"))) + comparePlans(expectedPlan, logPlan, false) + } + + test("test simple search with only one table with one field projected") { + val context = new CatalystPlanContext + val logPlan = + planTransformer.visit(plan(pplParser, "source=table | fields A", false), context) + + val projectList: Seq[NamedExpression] = Seq(UnresolvedAttribute("A")) + val expectedPlan = Project(projectList, UnresolvedRelation(Seq("table"))) + comparePlans(expectedPlan, logPlan, false) + } + + test("test simple search with only one table with two fields projected") { + val context = new CatalystPlanContext + val logPlan = planTransformer.visit(plan(pplParser, "source=t | fields A, B", false), context) + + val table = UnresolvedRelation(Seq("t")) + val projectList = Seq(UnresolvedAttribute("A"), UnresolvedAttribute("B")) + val expectedPlan = Project(projectList, table) + comparePlans(expectedPlan, logPlan, false) + } + + test("test simple search with one table with two fields projected sorted by one field") { + val context = new CatalystPlanContext + val logPlan = + planTransformer.visit(plan(pplParser, "source=t | sort A | fields A, B", false), context) + + val table = UnresolvedRelation(Seq("t")) + val projectList = Seq(UnresolvedAttribute("A"), UnresolvedAttribute("B")) + // Sort by A ascending + val sortOrder = Seq(SortOrder(UnresolvedAttribute("A"), Ascending)) + val sorted = Sort(sortOrder, true, table) + val expectedPlan = Project(projectList, sorted) + + comparePlans(expectedPlan, logPlan, false) + } + + test( + "test simple search with only one table with two fields with head (limit ) command projected") { + val context = new CatalystPlanContext + val logPlan = + planTransformer.visit(plan(pplParser, "source=t | fields A, B | head 5", false), context) + + val table = UnresolvedRelation(Seq("t")) + val projectList = Seq(UnresolvedAttribute("A"), UnresolvedAttribute("B")) + val planWithLimit = + GlobalLimit(Literal(5), LocalLimit(Literal(5), Project(projectList, table))) + val expectedPlan = Project(Seq(UnresolvedStar(None)), planWithLimit) + comparePlans(expectedPlan, logPlan, false) + } + + test( + "test simple search with only one table with two fields with head (limit ) command projected sorted by one descending field") { + val context = new CatalystPlanContext + val logPlan = planTransformer.visit( + plan(pplParser, "source=t | sort - A | fields A, B | head 5", false), + context) + + val table = UnresolvedRelation(Seq("t")) + val sortOrder = Seq(SortOrder(UnresolvedAttribute("A"), Descending)) + val sorted = Sort(sortOrder, true, table) + val projectList = Seq(UnresolvedAttribute("A"), UnresolvedAttribute("B")) + val projectAB = Project(projectList, sorted) + + val planWithLimit = GlobalLimit(Literal(5), LocalLimit(Literal(5), projectAB)) + val expectedPlan = Project(Seq(UnresolvedStar(None)), planWithLimit) + comparePlans(expectedPlan, logPlan, false) + } + + test( + "Search multiple tables - translated into union call - fields expected to exist in both tables ") { + val context = new CatalystPlanContext + val logPlan = planTransformer.visit( + plan(pplParser, "search source = table1, table2 | fields A, B", false), + context) + + val table1 = UnresolvedRelation(Seq("table1")) + val table2 = UnresolvedRelation(Seq("table2")) + + val allFields1 = Seq(UnresolvedAttribute("A"), UnresolvedAttribute("B")) + val allFields2 = Seq(UnresolvedAttribute("A"), UnresolvedAttribute("B")) + + val projectedTable1 = Project(allFields1, table1) + val projectedTable2 = Project(allFields2, table2) + + val expectedPlan = + Union(Seq(projectedTable1, projectedTable2), byName = true, allowMissingCol = true) + + comparePlans(expectedPlan, logPlan, false) + } + + test("Search multiple tables - translated into union call with fields") { + val context = new CatalystPlanContext + val logPlan = + planTransformer.visit(plan(pplParser, "source = table1, table2 ", false), context) + + val table1 = UnresolvedRelation(Seq("table1")) + val table2 = UnresolvedRelation(Seq("table2")) + + val allFields1 = UnresolvedStar(None) + val allFields2 = UnresolvedStar(None) + + val projectedTable1 = Project(Seq(allFields1), table1) + val projectedTable2 = Project(Seq(allFields2), table2) + + val expectedPlan = + Union(Seq(projectedTable1, projectedTable2), byName = true, allowMissingCol = true) + + comparePlans(expectedPlan, logPlan, false) + } +}