From 6d625677f6698c6e091c21aebe6fa449e4156338 Mon Sep 17 00:00:00 2001 From: Lantao Jin Date: Fri, 11 Oct 2024 14:16:06 +0800 Subject: [PATCH 1/3] Support table identifier contains dot with backticks Signed-off-by: Lantao Jin --- .../spark/ppl/FlintSparkPPLBasicITSuite.scala | 70 ++++++++++++++++++- .../sql/ppl/CatalystQueryPlanVisitor.java | 10 +-- 2 files changed, 75 insertions(+), 5 deletions(-) diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLBasicITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLBasicITSuite.scala index 4c38e1471..087a2080a 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLBasicITSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLBasicITSuite.scala @@ -5,7 +5,7 @@ package org.opensearch.flint.spark.ppl -import org.apache.spark.sql.{QueryTest, Row} +import org.apache.spark.sql.{AnalysisException, QueryTest, Row} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedFunction, UnresolvedRelation, UnresolvedStar} import org.apache.spark.sql.catalyst.expressions.{Alias, Ascending, Descending, EqualTo, IsNotNull, Literal, Not, SortOrder} @@ -22,12 +22,20 @@ class FlintSparkPPLBasicITSuite /** Test table and index name */ private val testTable = "spark_catalog.default.flint_ppl_test" + private val t1 = "`spark_catalog`.`default`.`flint_ppl_test1`" + private val t2 = "`spark_catalog`.default.`flint_ppl_test2`" + private val t3 = "spark_catalog.`default`.`flint_ppl_test3`" + private val t4 = "`spark_catalog`.`default`.flint_ppl_test4" override def beforeAll(): Unit = { super.beforeAll() // Create test table createPartitionedStateCountryTable(testTable) + createPartitionedStateCountryTable(t1) + createPartitionedStateCountryTable(t2) + createPartitionedStateCountryTable(t3) + createPartitionedStateCountryTable(t4) } protected override def afterEach(): Unit = { @@ -516,4 +524,64 @@ class FlintSparkPPLBasicITSuite // Compare the two plans comparePlans(logicalPlan, expectedPlan, checkAnalysis = false) } + + test("test backtick table names and name contains '.'") { + Seq(t1, t2, t3, t4).foreach { table => + val frame = sql(s""" + | source = $table| head 2 + | """.stripMargin) + assert(frame.collect().length == 2) + } + // test read table which is unable to create + val t5 = "`spark_catalog`.default.`flint/ppl/test5.log`" + val t6 = "spark_catalog.default.`flint_ppl_test6.log`" + Seq(t5, t6).foreach { table => + val ex = intercept[AnalysisException](sql(s""" + | source = $table| head 2 + | """.stripMargin)) + assert(ex.getMessage().contains("TABLE_OR_VIEW_NOT_FOUND")) + } + } + + test("test describe backtick table names and name contains '.'") { + Seq(t1, t2, t3, t4).foreach { table => + val frame = sql(s""" + | describe $table + | """.stripMargin) + assert(frame.collect().length > 0) + } + // test read table which is unable to create + val t5 = "`spark_catalog`.default.`flint/ppl/test4.log`" + val t6 = "spark_catalog.default.`flint_ppl_test5.log`" + Seq(t5, t6).foreach { table => + val ex = intercept[AnalysisException](sql(s""" + | describe $table + | """.stripMargin)) + assert(ex.getMessage().contains("TABLE_OR_VIEW_NOT_FOUND")) + } + } + + test("test explain backtick table names and name contains '.'") { + Seq(t1, t2, t3, t4).foreach { table => + val frame = sql(s""" + | explain extended | source = $table + | """.stripMargin) + assert(frame.collect().length > 0) + } + // test read table which is unable to create + val table = "`spark_catalog`.default.`flint/ppl/test4.log`" + val frame = sql(s""" + | explain extended | source = $table + | """.stripMargin) + // Retrieve the logical plan + val logicalPlan: LogicalPlan = frame.queryExecution.logical + // Define the expected logical plan + val relation = UnresolvedRelation(Seq("spark_catalog", "default", "flint/ppl/test4.log")) + val expectedPlan: LogicalPlan = + ExplainCommand( + Project(Seq(UnresolvedStar(None)), relation), + ExplainMode.fromString("extended")) + // Compare the two plans + comparePlans(logicalPlan, expectedPlan, checkAnalysis = false) + } } diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java index 26ad4198a..28a9c5f32 100644 --- a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java @@ -151,7 +151,9 @@ public LogicalPlan visitExplain(Explain node, CatalystPlanContext context) { public LogicalPlan visitRelation(Relation node, CatalystPlanContext context) { if (node instanceof DescribeRelation) { TableIdentifier identifier; - if (node.getTableQualifiedName().getParts().size() == 1) { + if (node.getTableQualifiedName().getParts().isEmpty()) { + throw new IllegalArgumentException("Empty table name is invalid"); + } else if (node.getTableQualifiedName().getParts().size() == 1) { identifier = new TableIdentifier(node.getTableQualifiedName().getParts().get(0)); } else if (node.getTableQualifiedName().getParts().size() == 2) { identifier = new TableIdentifier( @@ -160,8 +162,8 @@ public LogicalPlan visitRelation(Relation node, CatalystPlanContext context) { } else if (node.getTableQualifiedName().getParts().size() == 3) { identifier = new TableIdentifier( node.getTableQualifiedName().getParts().get(2), - Option$.MODULE$.apply(node.getTableQualifiedName().getParts().get(0)), - Option$.MODULE$.apply(node.getTableQualifiedName().getParts().get(1))); + Option$.MODULE$.apply(node.getTableQualifiedName().getParts().get(1)), + Option$.MODULE$.apply(node.getTableQualifiedName().getParts().get(0))); } else { throw new IllegalArgumentException("Invalid table name: " + node.getTableQualifiedName() + " Syntax: [ database_name. ] table_name"); @@ -176,7 +178,7 @@ public LogicalPlan visitRelation(Relation node, CatalystPlanContext context) { //regular sql algebraic relations node.getTableName().forEach(t -> // Resolving the qualifiedName which is composed of a datasource.schema.table - context.withRelation(new UnresolvedRelation(seq(of(t.split("\\."))), CaseInsensitiveStringMap.empty(), false)) + context.withRelation(new UnresolvedRelation(seq(node.getTableQualifiedName().getParts()), CaseInsensitiveStringMap.empty(), false)) ); return context.getPlan(); } From ee80aafdd8a2e0780c855c4d63dfb69bbe6e454b Mon Sep 17 00:00:00 2001 From: Lantao Jin Date: Fri, 11 Oct 2024 15:21:22 +0800 Subject: [PATCH 2/3] also fix the bug of describe Signed-off-by: Lantao Jin --- .../spark/ppl/FlintSparkPPLBasicITSuite.scala | 23 ++++++++++++---- .../org/opensearch/sql/ast/tree/Relation.java | 4 +++ .../sql/ppl/CatalystQueryPlanVisitor.java | 27 ++++--------------- .../sql/ppl/utils/RelationUtils.java | 26 +++++++++++++++++- ...lPlanBasicQueriesTranslatorTestSuite.scala | 2 +- 5 files changed, 53 insertions(+), 29 deletions(-) diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLBasicITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLBasicITSuite.scala index 087a2080a..cbc4308b0 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLBasicITSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLBasicITSuite.scala @@ -541,6 +541,11 @@ class FlintSparkPPLBasicITSuite | """.stripMargin)) assert(ex.getMessage().contains("TABLE_OR_VIEW_NOT_FOUND")) } + val t7 = "spark_catalog.default.flint_ppl_test7.log" + val ex = intercept[IllegalArgumentException](sql(s""" + | source = $t7| head 2 + | """.stripMargin)) + assert(ex.getMessage().contains("Invalid table name")) } test("test describe backtick table names and name contains '.'") { @@ -551,14 +556,19 @@ class FlintSparkPPLBasicITSuite assert(frame.collect().length > 0) } // test read table which is unable to create - val t5 = "`spark_catalog`.default.`flint/ppl/test4.log`" - val t6 = "spark_catalog.default.`flint_ppl_test5.log`" + val t5 = "`spark_catalog`.default.`flint/ppl/test5.log`" + val t6 = "spark_catalog.default.`flint_ppl_test6.log`" Seq(t5, t6).foreach { table => val ex = intercept[AnalysisException](sql(s""" | describe $table | """.stripMargin)) assert(ex.getMessage().contains("TABLE_OR_VIEW_NOT_FOUND")) } + val t7 = "spark_catalog.default.flint_ppl_test7.log" + val ex = intercept[IllegalArgumentException](sql(s""" + | describe $t7 + | """.stripMargin)) + assert(ex.getMessage().contains("Invalid table name")) } test("test explain backtick table names and name contains '.'") { @@ -573,15 +583,18 @@ class FlintSparkPPLBasicITSuite val frame = sql(s""" | explain extended | source = $table | """.stripMargin) - // Retrieve the logical plan val logicalPlan: LogicalPlan = frame.queryExecution.logical - // Define the expected logical plan val relation = UnresolvedRelation(Seq("spark_catalog", "default", "flint/ppl/test4.log")) val expectedPlan: LogicalPlan = ExplainCommand( Project(Seq(UnresolvedStar(None)), relation), ExplainMode.fromString("extended")) - // Compare the two plans comparePlans(logicalPlan, expectedPlan, checkAnalysis = false) + + val t7 = "spark_catalog.default.flint_ppl_test7.log" + val ex = intercept[IllegalArgumentException](sql(s""" + | explain extended | source = $t7 + | """.stripMargin)) + assert(ex.getMessage().contains("Invalid table name")) } } diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/ast/tree/Relation.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/ast/tree/Relation.java index cb9bbd64d..e1732f75f 100644 --- a/ppl-spark-integration/src/main/java/org/opensearch/sql/ast/tree/Relation.java +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/ast/tree/Relation.java @@ -49,6 +49,10 @@ public List getTableName() { return tableName.stream().map(Object::toString).collect(Collectors.toList()); } + public List getQualifiedNames() { + return tableName.stream().map(t -> (QualifiedName) t).collect(Collectors.toList()); + } + /** * Return alias. * diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java index 28a9c5f32..38dc4092e 100644 --- a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java @@ -86,7 +86,6 @@ import org.opensearch.sql.ppl.utils.ParseStrategy; import org.opensearch.sql.ppl.utils.SortUtils; import scala.Option; -import scala.Option$; import scala.Tuple2; import scala.collection.IterableLike; import scala.collection.Seq; @@ -111,6 +110,7 @@ import static org.opensearch.sql.ppl.utils.LookupTransformer.buildLookupRelationProjectList; import static org.opensearch.sql.ppl.utils.LookupTransformer.buildOutputProjectList; import static org.opensearch.sql.ppl.utils.LookupTransformer.buildProjectListFromFields; +import static org.opensearch.sql.ppl.utils.RelationUtils.getTableIdentifier; import static org.opensearch.sql.ppl.utils.RelationUtils.resolveField; import static org.opensearch.sql.ppl.utils.WindowSpecTransformer.window; @@ -150,24 +150,7 @@ public LogicalPlan visitExplain(Explain node, CatalystPlanContext context) { @Override public LogicalPlan visitRelation(Relation node, CatalystPlanContext context) { if (node instanceof DescribeRelation) { - TableIdentifier identifier; - if (node.getTableQualifiedName().getParts().isEmpty()) { - throw new IllegalArgumentException("Empty table name is invalid"); - } else if (node.getTableQualifiedName().getParts().size() == 1) { - identifier = new TableIdentifier(node.getTableQualifiedName().getParts().get(0)); - } else if (node.getTableQualifiedName().getParts().size() == 2) { - identifier = new TableIdentifier( - node.getTableQualifiedName().getParts().get(1), - Option$.MODULE$.apply(node.getTableQualifiedName().getParts().get(0))); - } else if (node.getTableQualifiedName().getParts().size() == 3) { - identifier = new TableIdentifier( - node.getTableQualifiedName().getParts().get(2), - Option$.MODULE$.apply(node.getTableQualifiedName().getParts().get(1)), - Option$.MODULE$.apply(node.getTableQualifiedName().getParts().get(0))); - } else { - throw new IllegalArgumentException("Invalid table name: " + node.getTableQualifiedName() - + " Syntax: [ database_name. ] table_name"); - } + TableIdentifier identifier = getTableIdentifier(node.getTableQualifiedName()); return context.with( new DescribeTableCommand( identifier, @@ -176,9 +159,9 @@ public LogicalPlan visitRelation(Relation node, CatalystPlanContext context) { DescribeRelation$.MODULE$.getOutputAttrs())); } //regular sql algebraic relations - node.getTableName().forEach(t -> + node.getQualifiedNames().forEach(q -> // Resolving the qualifiedName which is composed of a datasource.schema.table - context.withRelation(new UnresolvedRelation(seq(node.getTableQualifiedName().getParts()), CaseInsensitiveStringMap.empty(), false)) + context.withRelation(new UnresolvedRelation(getTableIdentifier(q).nameParts(), CaseInsensitiveStringMap.empty(), false)) ); return context.getPlan(); } @@ -327,7 +310,7 @@ public LogicalPlan visitAggregation(Aggregation node, CatalystPlanContext contex seq(new ArrayList()))); context.apply(p -> new org.apache.spark.sql.catalyst.plans.logical.Sort(sortElements, true, logicalPlan)); } - //visit TopAggregation results limit + //visit TopAggregation results limit if ((node instanceof TopAggregation) && ((TopAggregation) node).getResults().isPresent()) { context.apply(p -> (LogicalPlan) Limit.apply(new org.apache.spark.sql.catalyst.expressions.Literal( ((TopAggregation) node).getResults().get().getValue(), org.apache.spark.sql.types.DataTypes.IntegerType), p)); diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/utils/RelationUtils.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/utils/RelationUtils.java index 33cb5611d..7be7f1f45 100644 --- a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/utils/RelationUtils.java +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/utils/RelationUtils.java @@ -1,8 +1,10 @@ package org.opensearch.sql.ppl.utils; +import org.apache.spark.sql.catalyst.TableIdentifier; import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation; import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan; import org.opensearch.sql.ast.expression.QualifiedName; +import scala.Option$; import java.util.List; import java.util.Optional; @@ -15,7 +17,7 @@ public interface RelationUtils { * * @param relations * @param node - * @param contextRelations + * @param tables * @return */ static Optional resolveField(List relations, QualifiedName node, List tables) { @@ -29,4 +31,26 @@ static Optional resolveField(List relations, .findFirst() .map(rel -> node); } + + static TableIdentifier getTableIdentifier(QualifiedName qualifiedName) { + TableIdentifier identifier; + if (qualifiedName.getParts().isEmpty()) { + throw new IllegalArgumentException("Empty table name is invalid"); + } else if (qualifiedName.getParts().size() == 1) { + identifier = new TableIdentifier(qualifiedName.getParts().get(0)); + } else if (qualifiedName.getParts().size() == 2) { + identifier = new TableIdentifier( + qualifiedName.getParts().get(1), + Option$.MODULE$.apply(qualifiedName.getParts().get(0))); + } else if (qualifiedName.getParts().size() == 3) { + identifier = new TableIdentifier( + qualifiedName.getParts().get(2), + Option$.MODULE$.apply(qualifiedName.getParts().get(1)), + Option$.MODULE$.apply(qualifiedName.getParts().get(0))); + } else { + throw new IllegalArgumentException("Invalid table name: " + qualifiedName + + " Syntax: [ database_name. ] table_name"); + } + return identifier; + } } diff --git a/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanBasicQueriesTranslatorTestSuite.scala b/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanBasicQueriesTranslatorTestSuite.scala index cc87e8853..0c10fcb06 100644 --- a/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanBasicQueriesTranslatorTestSuite.scala +++ b/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanBasicQueriesTranslatorTestSuite.scala @@ -43,7 +43,7 @@ class PPLLogicalPlanBasicQueriesTranslatorTestSuite planTransformer.visit(plan(pplParser, "describe schema.default.http_logs"), context) val expectedPlan = DescribeTableCommand( - TableIdentifier("http_logs", Option("schema"), Option("default")), + TableIdentifier("http_logs", Option("default"), Option("schema")), Map.empty[String, String].empty, isExtended = true, output = DescribeRelation.getOutputAttrs) From 3de578d478873b618f12a5e9169e8f43f9354a37 Mon Sep 17 00:00:00 2001 From: Lantao Jin Date: Fri, 11 Oct 2024 15:33:06 +0800 Subject: [PATCH 3/3] add doc Signed-off-by: Lantao Jin --- docs/ppl-lang/PPL-Example-Commands.md | 5 +++++ ...lPlanBasicQueriesTranslatorTestSuite.scala | 21 +++++++++++++++---- 2 files changed, 22 insertions(+), 4 deletions(-) diff --git a/docs/ppl-lang/PPL-Example-Commands.md b/docs/ppl-lang/PPL-Example-Commands.md index c553d483f..7d57651c3 100644 --- a/docs/ppl-lang/PPL-Example-Commands.md +++ b/docs/ppl-lang/PPL-Example-Commands.md @@ -2,6 +2,11 @@ #### **Describe** - `describe table` This command is equal to the `DESCRIBE EXTENDED table` SQL command +- `describe schema.table` +- `` describe schema.`table` `` +- `describe catalog.schema.table` +- `` describe catalog.schema.`table` `` +- `` describe `catalog`.`schema`.`table` `` #### **Explain** - `explain simple | source = table | where a = 1 | fields a,b,c` diff --git a/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanBasicQueriesTranslatorTestSuite.scala b/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanBasicQueriesTranslatorTestSuite.scala index 0c10fcb06..96176982e 100644 --- a/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanBasicQueriesTranslatorTestSuite.scala +++ b/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanBasicQueriesTranslatorTestSuite.scala @@ -37,13 +37,26 @@ class PPLLogicalPlanBasicQueriesTranslatorTestSuite thrown.getMessage === "Invalid table name: t.b.c.d Syntax: [ database_name. ] table_name") } + test("test describe with backticks") { + val context = new CatalystPlanContext + val logPlan = + planTransformer.visit(plan(pplParser, "describe t.b.`c.d`"), context) + + val expectedPlan = DescribeTableCommand( + TableIdentifier("c.d", Option("b"), Option("t")), + Map.empty[String, String].empty, + isExtended = true, + output = DescribeRelation.getOutputAttrs) + comparePlans(expectedPlan, logPlan, false) + } + test("test describe FQN table clause") { val context = new CatalystPlanContext val logPlan = - planTransformer.visit(plan(pplParser, "describe schema.default.http_logs"), context) + planTransformer.visit(plan(pplParser, "describe catalog.schema.http_logs"), context) val expectedPlan = DescribeTableCommand( - TableIdentifier("http_logs", Option("default"), Option("schema")), + TableIdentifier("http_logs", Option("schema"), Option("catalog")), Map.empty[String, String].empty, isExtended = true, output = DescribeRelation.getOutputAttrs) @@ -64,10 +77,10 @@ class PPLLogicalPlanBasicQueriesTranslatorTestSuite test("test FQN table describe table clause") { val context = new CatalystPlanContext - val logPlan = planTransformer.visit(plan(pplParser, "describe catalog.t"), context) + val logPlan = planTransformer.visit(plan(pplParser, "describe schema.t"), context) val expectedPlan = DescribeTableCommand( - TableIdentifier("t", Option("catalog")), + TableIdentifier("t", Option("schema")), Map.empty[String, String].empty, isExtended = true, output = DescribeRelation.getOutputAttrs)