From 21c92a0d0637a9b00865a4707bef1fbc06e77ab8 Mon Sep 17 00:00:00 2001 From: YANGDB Date: Tue, 6 Aug 2024 19:17:30 -0700 Subject: [PATCH] add PPL describe command Signed-off-by: YANGDB --- .../spark/ppl/FlintSparkPPLBasicITSuite.scala | 26 ++++++++++++++++++- .../sql/ast/tree/DescribeRelation.java | 17 ++++++++++++ .../sql/ppl/CatalystQueryPlanVisitor.java | 7 +++++ .../opensearch/sql/ppl/parser/AstBuilder.java | 3 ++- ...lPlanBasicQueriesTranslatorTestSuite.scala | 23 ++++++++++++++++ 5 files changed, 74 insertions(+), 2 deletions(-) create mode 100644 ppl-spark-integration/src/main/java/org/opensearch/sql/ast/tree/DescribeRelation.java diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLBasicITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLBasicITSuite.scala index fc77b7156..e0358aa6b 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLBasicITSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLBasicITSuite.scala @@ -6,9 +6,11 @@ package org.opensearch.flint.spark.ppl import org.apache.spark.sql.{QueryTest, Row} +import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedRelation, UnresolvedStar} import org.apache.spark.sql.catalyst.expressions.{Ascending, Literal, SortOrder} import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.execution.command.DescribeTableCommand import org.apache.spark.sql.streaming.StreamTest class FlintSparkPPLBasicITSuite @@ -36,7 +38,29 @@ class FlintSparkPPLBasicITSuite } } - test("create ppl simple query test") { + test("describe table query test") { + val testTableQuoted = "`spark_catalog`.`default`.`flint_ppl_test`" + Seq(testTable, testTableQuoted).foreach { table => + val frame = sql(s""" + describe $table + """.stripMargin) + + // Retrieve the results + val results: Array[Row] = frame.collect() + assert(results.length == 2) + // Retrieve the logical plan + val logicalPlan: LogicalPlan = frame.queryExecution.logical + // Define the expected logical plan + val expectedPlan: LogicalPlan = + Project( + Seq(UnresolvedStar(None)), + DescribeTableCommand(TableIdentifier("table"), null, isExtended = false, Seq.empty)) + // Compare the two plans + assert(expectedPlan === logicalPlan) + } + } + + test("create ppl simple query test") { val testTableQuoted = "`spark_catalog`.`default`.`flint_ppl_test`" Seq(testTable, testTableQuoted).foreach { table => val frame = sql(s""" diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/ast/tree/DescribeRelation.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/ast/tree/DescribeRelation.java new file mode 100644 index 000000000..5fd237bcb --- /dev/null +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/ast/tree/DescribeRelation.java @@ -0,0 +1,17 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.ast.tree; + +import org.opensearch.sql.ast.expression.UnresolvedExpression; + +/** + * Extend Relation to describe the table itself + */ +public class DescribeRelation extends Relation{ + public DescribeRelation(UnresolvedExpression tableName) { + super(tableName); + } +} diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java index fd8d81e5c..4fafb6304 100644 --- a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java @@ -5,6 +5,7 @@ package org.opensearch.sql.ppl; +import org.apache.spark.sql.catalyst.TableIdentifier; import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute$; import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation; import org.apache.spark.sql.catalyst.analysis.UnresolvedStar$; @@ -15,6 +16,7 @@ import org.apache.spark.sql.catalyst.plans.logical.Aggregate; import org.apache.spark.sql.catalyst.plans.logical.Limit; import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan; +import org.apache.spark.sql.execution.command.DescribeTableCommand; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.util.CaseInsensitiveStringMap; import org.opensearch.sql.ast.AbstractNodeVisitor; @@ -46,6 +48,7 @@ import org.opensearch.sql.ast.tree.Aggregation; import org.opensearch.sql.ast.tree.Correlation; import org.opensearch.sql.ast.tree.Dedupe; +import org.opensearch.sql.ast.tree.DescribeRelation; import org.opensearch.sql.ast.tree.Eval; import org.opensearch.sql.ast.tree.Filter; import org.opensearch.sql.ast.tree.Head; @@ -107,6 +110,10 @@ public LogicalPlan visitExplain(Explain node, CatalystPlanContext context) { @Override public LogicalPlan visitRelation(Relation node, CatalystPlanContext context) { + if (node instanceof DescribeRelation) { + return context.with(new DescribeTableCommand(new TableIdentifier(node.getTableQualifiedName().toString()), null, false, seq())); + } + //regular sql algebraic relations node.getTableName().forEach(t -> // Resolving the qualifiedName which is composed of a datasource.schema.table context.with(new UnresolvedRelation(seq(of(t.split("\\."))), CaseInsensitiveStringMap.empty(), false)) diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java index 9973f4676..e94d4e0f4 100644 --- a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java @@ -28,6 +28,7 @@ import org.opensearch.sql.ast.tree.Aggregation; import org.opensearch.sql.ast.tree.Correlation; import org.opensearch.sql.ast.tree.Dedupe; +import org.opensearch.sql.ast.tree.DescribeRelation; import org.opensearch.sql.ast.tree.Eval; import org.opensearch.sql.ast.tree.Filter; import org.opensearch.sql.ast.tree.Head; @@ -97,7 +98,7 @@ public UnresolvedPlan visitDescribeCommand(OpenSearchPPLParser.DescribeCommandCo final Relation table = (Relation) visitTableSourceClause(ctx.tableSourceClause()); QualifiedName tableQualifiedName = table.getTableQualifiedName(); ArrayList parts = new ArrayList<>(tableQualifiedName.getParts()); - return new Relation(new QualifiedName(parts)); + return new DescribeRelation(new QualifiedName(parts)); } /** Where command. */ diff --git a/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanBasicQueriesTranslatorTestSuite.scala b/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanBasicQueriesTranslatorTestSuite.scala index 5b94ca092..928b81077 100644 --- a/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanBasicQueriesTranslatorTestSuite.scala +++ b/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanBasicQueriesTranslatorTestSuite.scala @@ -10,10 +10,12 @@ import org.opensearch.sql.ppl.{CatalystPlanContext, CatalystQueryPlanVisitor} import org.scalatest.matchers.should.Matchers import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedRelation, UnresolvedStar} import org.apache.spark.sql.catalyst.expressions.{Ascending, AttributeReference, Descending, Literal, NamedExpression, SortOrder} import org.apache.spark.sql.catalyst.plans.PlanTest import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.execution.command.DescribeTableCommand class PPLLogicalPlanBasicQueriesTranslatorTestSuite extends SparkFunSuite @@ -24,6 +26,27 @@ class PPLLogicalPlanBasicQueriesTranslatorTestSuite private val planTransformer = new CatalystQueryPlanVisitor() private val pplParser = new PPLSyntaxParser() + test("test simple describe clause") { + // if successful build ppl logical plan and translate to catalyst logical plan + val context = new CatalystPlanContext + val logPlan = planTransformer.visit(plan(pplParser, "describe table", false), context) + + val projectList: Seq[NamedExpression] = Seq(UnresolvedStar(None)) + val expectedPlan = Project(projectList, DescribeTableCommand(TableIdentifier("table"), null, isExtended = false, Seq.empty)) + comparePlans(expectedPlan, logPlan, false) + } + + +test("test FQN table describe clause") { + // if successful build ppl logical plan and translate to catalyst logical plan + val context = new CatalystPlanContext + val logPlan = planTransformer.visit(plan(pplParser, "describe catalog.schema.table", false), context) + + val projectList: Seq[NamedExpression] = Seq(UnresolvedStar(None)) + val expectedPlan = Project(projectList, DescribeTableCommand(TableIdentifier("catalog.schema.table"), null, isExtended = false, Seq.empty)) + comparePlans(expectedPlan, logPlan, false) + } + test("test simple search with only one table and no explicit fields (defaults to all fields)") { // if successful build ppl logical plan and translate to catalyst logical plan val context = new CatalystPlanContext