From e5b345de31053688591bde1c1947f5c6bf9961fa Mon Sep 17 00:00:00 2001 From: YANGDB Date: Wed, 7 Aug 2024 20:25:22 -0700 Subject: [PATCH] Add PPL describe command update correlation comment as experimental Signed-off-by: YANGDB --- docs/PPL-Correlation-command.md | 5 +++ .../spark/ppl/FlintSparkPPLBasicITSuite.scala | 45 ++++++++++++++++++- ppl-spark-integration/README.md | 34 ++++---------- .../src/main/antlr4/OpenSearchPPLParser.g4 | 1 + .../sql/ast/tree/DescribeRelation.java | 17 +++++++ .../sql/ppl/CatalystQueryPlanVisitor.java | 25 +++++++++++ .../opensearch/sql/ppl/parser/AstBuilder.java | 3 +- .../sql/ppl/parser/AstStatementBuilder.java | 15 ++++--- ...lPlanBasicQueriesTranslatorTestSuite.scala | 41 +++++++++++++++++ 9 files changed, 152 insertions(+), 34 deletions(-) create mode 100644 ppl-spark-integration/src/main/java/org/opensearch/sql/ast/tree/DescribeRelation.java diff --git a/docs/PPL-Correlation-command.md b/docs/PPL-Correlation-command.md index f7ef3e266..2e8507a14 100644 --- a/docs/PPL-Correlation-command.md +++ b/docs/PPL-Correlation-command.md @@ -1,5 +1,8 @@ ## PPL Correlation Command +> This is an experimental command - it may be removed in future versions + + ## Overview In the past year OpenSearch Observability & security teams have been busy with many aspects of improving data monitoring and visibility. @@ -262,6 +265,8 @@ The new correlation command is actually a ‘hidden’ join command therefore th Catalyst engine will optimize this query according to the most efficient join ordering. +> This is an experimental command - it may be removed in future versions + * * * ## Appendix diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLBasicITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLBasicITSuite.scala index fc77b7156..8dfa9f5ab 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLBasicITSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLBasicITSuite.scala @@ -6,9 +6,11 @@ package org.opensearch.flint.spark.ppl import org.apache.spark.sql.{QueryTest, Row} -import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedRelation, UnresolvedStar} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedRelation, UnresolvedStar, UnresolvedTableOrView} import org.apache.spark.sql.catalyst.expressions.{Ascending, Literal, SortOrder} import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.execution.command.DescribeTableCommand import org.apache.spark.sql.streaming.StreamTest class FlintSparkPPLBasicITSuite @@ -36,6 +38,45 @@ class FlintSparkPPLBasicITSuite } } + test("describe table query test") { + val testTableQuoted = "`spark_catalog`.`default`.`flint_ppl_test`" + Seq(testTable, testTableQuoted).foreach { table => + val frame = sql(s""" + describe flint_ppl_test + """.stripMargin) + + // Retrieve the results + val results: Array[Row] = frame.collect() + // Define the expected results + val expectedResults: Array[Row] = Array( + Row("name", "string", null), + Row("age", "int", null), + Row("state", "string", null), + Row("country", "string", null), + Row("year", "int", null), + Row("month", "int", null), + Row("# Partition Information", "", ""), + Row("# col_name", "data_type", "comment"), + Row("year", "int", null), + Row("month", "int", null)) + // Compare the results + implicit val rowOrdering: Ordering[Row] = Ordering.by[Row, String](_.getAs[String](0)) + assert(results.sorted.sameElements(expectedResults.sorted)) + // Retrieve the logical plan + val logicalPlan: LogicalPlan = + frame.queryExecution.commandExecuted.asInstanceOf[CommandResult].commandLogicalPlan + // Define the expected logical plan + val expectedPlan: LogicalPlan = + DescribeTableCommand( + TableIdentifier("flint_ppl_test"), + Map.empty[String, String], + isExtended = false, + output = DescribeRelation.getOutputAttrs) + // Compare the two plans + comparePlans(logicalPlan, expectedPlan, checkAnalysis = false) + } + } + test("create ppl simple query test") { val testTableQuoted = "`spark_catalog`.`default`.`flint_ppl_test`" Seq(testTable, testTableQuoted).foreach { table => @@ -208,7 +249,7 @@ class FlintSparkPPLBasicITSuite val sortedPlan: LogicalPlan = Sort(Seq(SortOrder(UnresolvedAttribute("age"), Ascending)), global = true, limitPlan) - val expectedPlan = Project(Seq(UnresolvedStar(None)), sortedPlan); + val expectedPlan = Project(Seq(UnresolvedStar(None)), sortedPlan) // Compare the two plans assert(compareByString(expectedPlan) === compareByString(logicalPlan)) } diff --git a/ppl-spark-integration/README.md b/ppl-spark-integration/README.md index 1538f43be..2a17fe19f 100644 --- a/ppl-spark-integration/README.md +++ b/ppl-spark-integration/README.md @@ -221,8 +221,8 @@ Next tasks ahead will resolve this: This section describes the next steps planned for enabling additional commands and gamer translation. -#### Supported -The next samples of PPL queries are currently supported: +#### Example PPL Queries +See the next samples of PPL queries : **Fields** - `source = table` @@ -272,31 +272,15 @@ Limitation: Overriding existing field is unsupported, following queries throw ex - `source = table | stats sum(productsAmount) by span(transactionDate, 1d) as age_date | sort age_date` - `source = table | stats sum(productsAmount) by span(transactionDate, 1w) as age_date, productId` -> For additional details, review [FlintSparkPPLTimeWindowITSuite.scala](../integ-test/src/test/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLTimeWindowITSuite.scala) - -#### Supported Commands: - - `search` - [See details](https://github.com/opensearch-project/sql/blob/main/docs/user/ppl/cmd/search.rst) - - `where` - [See details](https://github.com/opensearch-project/sql/blob/main/docs/user/ppl/cmd/where.rst) - - `fields` - [See details](https://github.com/opensearch-project/sql/blob/main/docs/user/ppl/cmd/fields.rst) - - `eval` - [See details](https://github.com/opensearch-project/sql/blob/main/docs/user/ppl/cmd/eval.rst) - - `head` - [See details](https://github.com/opensearch-project/sql/blob/main/docs/user/ppl/cmd/head.rst) - - `stats` - [See details](https://github.com/opensearch-project/sql/blob/main/docs/user/ppl/cmd/stats.rst) (supports AVG, COUNT, DISTINCT_COUNT, MAX, MIN and SUM aggregation functions) - - `sort` - [See details](https://github.com/opensearch-project/sql/blob/main/docs/user/ppl/cmd/sort.rst) - - `correlation` - [See details](../docs/PPL-Correlation-command.md) - -> For additional details, review [Integration Tests](../integ-test/src/test/scala/org/opensearch/flint/spark/) - --- -#### Planned Support +For additional details on PPL commands - view [PPL Commands Docs](https://github.com/opensearch-project/sql/blob/main/docs/user/ppl/index.rst) - - support the `explain` command to return the explained PPL query logical plan and expected execution plan +For additional details on Spark PPL commands project, see [PPL Project](https://github.com/orgs/opensearch-project/projects/214/views/2) +For additional details on Spark PPL commands support campaign, see [PPL Commands Campaign](https://github.com/opensearch-project/opensearch-spark/issues/408) + +#### Experimental Commands: + - `correlation` - [See details](../docs/PPL-Correlation-command.md) - - attend [sort](https://github.com/opensearch-project/sql/blob/main/docs/user/ppl/cmd/sort.rst) partially supported, missing capability to sort by alias field (span like or aggregation) - - attend `alias` - partially supported, missing capability to sort by / group-by alias field name +> This is an experimental command - it may be removed in future versions - - add [conditions](https://github.com/opensearch-project/sql/blob/main/docs/user/ppl/functions/condition.rst) support - - add [top](https://github.com/opensearch-project/sql/blob/main/docs/user/ppl/cmd/top.rst) support - - add [cast](https://github.com/opensearch-project/sql/blob/main/docs/user/ppl/functions/conversion.rst) support - - add [math](https://github.com/opensearch-project/sql/blob/main/docs/user/ppl/functions/math.rst) support - - add [deduplicate](https://github.com/opensearch-project/sql/blob/main/docs/user/ppl/cmd/dedup.rst) support \ No newline at end of file diff --git a/ppl-spark-integration/src/main/antlr4/OpenSearchPPLParser.g4 b/ppl-spark-integration/src/main/antlr4/OpenSearchPPLParser.g4 index 2d0986890..765e54d93 100644 --- a/ppl-spark-integration/src/main/antlr4/OpenSearchPPLParser.g4 +++ b/ppl-spark-integration/src/main/antlr4/OpenSearchPPLParser.g4 @@ -27,6 +27,7 @@ queryStatement // commands pplCommands : searchCommand + | describeCommand ; commands diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/ast/tree/DescribeRelation.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/ast/tree/DescribeRelation.java new file mode 100644 index 000000000..5fd237bcb --- /dev/null +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/ast/tree/DescribeRelation.java @@ -0,0 +1,17 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.ast.tree; + +import org.opensearch.sql.ast.expression.UnresolvedExpression; + +/** + * Extend Relation to describe the table itself + */ +public class DescribeRelation extends Relation{ + public DescribeRelation(UnresolvedExpression tableName) { + super(tableName); + } +} diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java index fd8d81e5c..5910efd39 100644 --- a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java @@ -5,6 +5,7 @@ package org.opensearch.sql.ppl; +import org.apache.spark.sql.catalyst.TableIdentifier; import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute$; import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation; import org.apache.spark.sql.catalyst.analysis.UnresolvedStar$; @@ -13,8 +14,10 @@ import org.apache.spark.sql.catalyst.expressions.Predicate; import org.apache.spark.sql.catalyst.expressions.SortOrder; import org.apache.spark.sql.catalyst.plans.logical.Aggregate; +import org.apache.spark.sql.catalyst.plans.logical.DescribeRelation$; import org.apache.spark.sql.catalyst.plans.logical.Limit; import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan; +import org.apache.spark.sql.execution.command.DescribeTableCommand; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.util.CaseInsensitiveStringMap; import org.opensearch.sql.ast.AbstractNodeVisitor; @@ -46,6 +49,7 @@ import org.opensearch.sql.ast.tree.Aggregation; import org.opensearch.sql.ast.tree.Correlation; import org.opensearch.sql.ast.tree.Dedupe; +import org.opensearch.sql.ast.tree.DescribeRelation; import org.opensearch.sql.ast.tree.Eval; import org.opensearch.sql.ast.tree.Filter; import org.opensearch.sql.ast.tree.Head; @@ -59,6 +63,7 @@ import org.opensearch.sql.ppl.utils.ComparatorTransformer; import org.opensearch.sql.ppl.utils.SortUtils; import scala.Option; +import scala.Option$; import scala.collection.Seq; import java.util.ArrayList; @@ -107,6 +112,26 @@ public LogicalPlan visitExplain(Explain node, CatalystPlanContext context) { @Override public LogicalPlan visitRelation(Relation node, CatalystPlanContext context) { + if (node instanceof DescribeRelation) { + TableIdentifier identifier; + if (node.getTableQualifiedName().getParts().size() == 1) { + identifier = new TableIdentifier(node.getTableQualifiedName().getParts().get(0)); + } else if (node.getTableQualifiedName().getParts().size() == 2) { + identifier = new TableIdentifier( + node.getTableQualifiedName().getParts().get(1), + Option$.MODULE$.apply(node.getTableQualifiedName().getParts().get(0))); + } else { + throw new IllegalArgumentException("Invalid table name: " + node.getTableQualifiedName() + + " Syntax: [ database_name. ] table_name"); + } + return context.with( + new DescribeTableCommand( + identifier, + scala.collection.immutable.Map$.MODULE$.empty(), + false, + DescribeRelation$.MODULE$.getOutputAttrs())); + } + //regular sql algebraic relations node.getTableName().forEach(t -> // Resolving the qualifiedName which is composed of a datasource.schema.table context.with(new UnresolvedRelation(seq(of(t.split("\\."))), CaseInsensitiveStringMap.empty(), false)) diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java index 9973f4676..e94d4e0f4 100644 --- a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java @@ -28,6 +28,7 @@ import org.opensearch.sql.ast.tree.Aggregation; import org.opensearch.sql.ast.tree.Correlation; import org.opensearch.sql.ast.tree.Dedupe; +import org.opensearch.sql.ast.tree.DescribeRelation; import org.opensearch.sql.ast.tree.Eval; import org.opensearch.sql.ast.tree.Filter; import org.opensearch.sql.ast.tree.Head; @@ -97,7 +98,7 @@ public UnresolvedPlan visitDescribeCommand(OpenSearchPPLParser.DescribeCommandCo final Relation table = (Relation) visitTableSourceClause(ctx.tableSourceClause()); QualifiedName tableQualifiedName = table.getTableQualifiedName(); ArrayList parts = new ArrayList<>(tableQualifiedName.getParts()); - return new Relation(new QualifiedName(parts)); + return new DescribeRelation(new QualifiedName(parts)); } /** Where command. */ diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/parser/AstStatementBuilder.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/parser/AstStatementBuilder.java index 23ca992d9..7792dbecd 100644 --- a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/parser/AstStatementBuilder.java +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/parser/AstStatementBuilder.java @@ -15,6 +15,7 @@ import org.opensearch.sql.ast.statement.Explain; import org.opensearch.sql.ast.statement.Query; import org.opensearch.sql.ast.statement.Statement; +import org.opensearch.sql.ast.tree.DescribeRelation; import org.opensearch.sql.ast.tree.Project; import org.opensearch.sql.ast.tree.UnresolvedPlan; @@ -78,11 +79,13 @@ public Object build() { } } - private UnresolvedPlan addSelectAll(UnresolvedPlan plan) { - if ((plan instanceof Project) && !((Project) plan).isExcluded()) { - return plan; - } else { - return new Project(ImmutableList.of(AllFields.of())).attach(plan); + private UnresolvedPlan addSelectAll(UnresolvedPlan plan) { + if ((plan instanceof Project) && !((Project) plan).isExcluded()) { + return plan; + } else if (plan instanceof DescribeRelation) { + return plan; + } else { + return new Project(ImmutableList.of(AllFields.of())).attach(plan); + } } - } } diff --git a/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanBasicQueriesTranslatorTestSuite.scala b/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanBasicQueriesTranslatorTestSuite.scala index 5b94ca092..23d2425da 100644 --- a/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanBasicQueriesTranslatorTestSuite.scala +++ b/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanBasicQueriesTranslatorTestSuite.scala @@ -10,10 +10,12 @@ import org.opensearch.sql.ppl.{CatalystPlanContext, CatalystQueryPlanVisitor} import org.scalatest.matchers.should.Matchers import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedRelation, UnresolvedStar} import org.apache.spark.sql.catalyst.expressions.{Ascending, AttributeReference, Descending, Literal, NamedExpression, SortOrder} import org.apache.spark.sql.catalyst.plans.PlanTest import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.execution.command.DescribeTableCommand class PPLLogicalPlanBasicQueriesTranslatorTestSuite extends SparkFunSuite @@ -24,6 +26,45 @@ class PPLLogicalPlanBasicQueriesTranslatorTestSuite private val planTransformer = new CatalystQueryPlanVisitor() private val pplParser = new PPLSyntaxParser() + test("test error describe clause") { + // if successful build ppl logical plan and translate to catalyst logical plan + val context = new CatalystPlanContext + // Intercept the exception and check the message + val thrown = intercept[IllegalArgumentException] { + planTransformer.visit(plan(pplParser, "describe t.b.c.d", false), context) + } + + // Verify the exception message + assert( + thrown.getMessage === "Invalid table name: t.b.c.d Syntax: [ database_name. ] table_name") + } + + test("test simple describe clause") { + // if successful build ppl logical plan and translate to catalyst logical plan + val context = new CatalystPlanContext + val logPlan = planTransformer.visit(plan(pplParser, "describe t", false), context) + + val expectedPlan = DescribeTableCommand( + TableIdentifier("t"), + Map.empty[String, String], + isExtended = false, + output = DescribeRelation.getOutputAttrs) + comparePlans(expectedPlan, logPlan, false) + } + + test("test FQN table describe table clause") { + // if successful build ppl logical plan and translate to catalyst logical plan + val context = new CatalystPlanContext + val logPlan = planTransformer.visit(plan(pplParser, "describe catalog.t", false), context) + + val expectedPlan = DescribeTableCommand( + TableIdentifier("t", Option("catalog")), + Map.empty[String, String].empty, + isExtended = false, + output = DescribeRelation.getOutputAttrs) + comparePlans(expectedPlan, logPlan, false) + } + test("test simple search with only one table and no explicit fields (defaults to all fields)") { // if successful build ppl logical plan and translate to catalyst logical plan val context = new CatalystPlanContext