diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLBasicITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLBasicITSuite.scala index 6d9c3a5ab..45f66bc24 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLBasicITSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLBasicITSuite.scala @@ -5,12 +5,13 @@ package org.opensearch.flint.spark.ppl -import org.apache.spark.sql.{QueryTest, Row} +import org.apache.spark.sql.{AnalysisException, QueryTest, Row} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedFunction, UnresolvedRelation, UnresolvedStar} import org.apache.spark.sql.catalyst.expressions.{Alias, Ascending, Descending, Literal, SortOrder} import org.apache.spark.sql.catalyst.plans.logical._ -import org.apache.spark.sql.execution.command.DescribeTableCommand +import org.apache.spark.sql.execution.ExplainMode +import org.apache.spark.sql.execution.command.{DescribeTableCommand, ExplainCommand} import org.apache.spark.sql.streaming.StreamTest class FlintSparkPPLBasicITSuite @@ -21,12 +22,20 @@ class FlintSparkPPLBasicITSuite /** Test table and index name */ private val testTable = "spark_catalog.default.flint_ppl_test" + private val t1 = "`spark_catalog`.`default`.`flint_ppl_test1`" + private val t2 = "`spark_catalog`.default.`flint_ppl_test2`" + private val t3 = "spark_catalog.`default`.`flint_ppl_test3`" + private val t4 = "`spark_catalog`.`default`.flint_ppl_test4" override def beforeAll(): Unit = { super.beforeAll() // Create test table createPartitionedStateCountryTable(testTable) + createPartitionedStateCountryTable(t1) + createPartitionedStateCountryTable(t2) + createPartitionedStateCountryTable(t3) + createPartitionedStateCountryTable(t4) } protected override def afterEach(): Unit = { @@ -118,6 +127,52 @@ class FlintSparkPPLBasicITSuite comparePlans(logicalPlan, expectedPlan, checkAnalysis = false) } + test("test backtick table names and name contains '.'") { + Seq(t1, t2, t3, t4).foreach { table => + val frame = sql(s""" + | source = $table| head 2 + | """.stripMargin) + assert(frame.collect().length == 2) + } + // test read table which is unable to create + val t5 = "`spark_catalog`.default.`flint/ppl/test5.log`" + val t6 = "spark_catalog.default.`flint_ppl_test6.log`" + Seq(t5, t6).foreach { table => + val ex = intercept[AnalysisException](sql(s""" + | source = $table| head 2 + | """.stripMargin)) + assert(ex.getMessage().contains("TABLE_OR_VIEW_NOT_FOUND")) + } + val t7 = "spark_catalog.default.flint_ppl_test7.log" + val ex = intercept[IllegalArgumentException](sql(s""" + | source = $t7| head 2 + | """.stripMargin)) + assert(ex.getMessage().contains("Invalid table name")) + } + + test("test describe backtick table names and name contains '.'") { + Seq(t1, t2, t3, t4).foreach { table => + val frame = sql(s""" + | describe $table + | """.stripMargin) + assert(frame.collect().length > 0) + } + // test read table which is unable to create + val t5 = "`spark_catalog`.default.`flint/ppl/test5.log`" + val t6 = "spark_catalog.default.`flint_ppl_test6.log`" + Seq(t5, t6).foreach { table => + val ex = intercept[AnalysisException](sql(s""" + | describe $table + | """.stripMargin)) + assert(ex.getMessage().contains("TABLE_OR_VIEW_NOT_FOUND")) + } + val t7 = "spark_catalog.default.flint_ppl_test7.log" + val ex = intercept[IllegalArgumentException](sql(s""" + | describe $t7 + | """.stripMargin)) + assert(ex.getMessage().contains("Invalid table name")) + } + test("create ppl simple query test") { val testTableQuoted = "`spark_catalog`.`default`.`flint_ppl_test`" Seq(testTable, testTableQuoted).foreach { table => diff --git a/ppl-spark-integration/README.md b/ppl-spark-integration/README.md index ab16808a1..eb5ca2a13 100644 --- a/ppl-spark-integration/README.md +++ b/ppl-spark-integration/README.md @@ -226,7 +226,11 @@ See the next samples of PPL queries : **Describe** - `describe table` This command is equal to the `DESCRIBE EXTENDED table` SQL command - + - `describe schema.table` + - `` describe schema.`table` `` + - `describe catalog.schema.table` + - `` describe catalog.schema.`table` `` + - `` describe `catalog`.`schema`.`table` `` **Fields** - `source = table` - `source = table | fields a,b,c` diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/ast/tree/Relation.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/ast/tree/Relation.java index 6a482db67..da442ec01 100644 --- a/ppl-spark-integration/src/main/java/org/opensearch/sql/ast/tree/Relation.java +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/ast/tree/Relation.java @@ -38,7 +38,6 @@ public Relation(UnresolvedExpression tableName, String alias) { private String alias; /** - * Return table name. * * @return table name */ @@ -46,7 +45,11 @@ public List getTableName() { return tableName.stream().map(Object::toString).collect(Collectors.toList()); } - + + public List getQualifiedNames() { + return tableName.stream().map(t -> (QualifiedName) t).collect(Collectors.toList()); + } + /** * Return alias. * diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java index be00796a8..adada043e 100644 --- a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java @@ -92,6 +92,7 @@ import static org.opensearch.sql.ppl.utils.DedupeTransformer.retainOneDuplicateEvent; import static org.opensearch.sql.ppl.utils.DedupeTransformer.retainOneDuplicateEventAndKeepEmpty; import static org.opensearch.sql.ppl.utils.JoinSpecTransformer.join; +import static org.opensearch.sql.ppl.utils.RelationUtils.getTableIdentifier; import static org.opensearch.sql.ppl.utils.RelationUtils.resolveField; import static org.opensearch.sql.ppl.utils.WindowSpecTransformer.window; @@ -126,22 +127,7 @@ public LogicalPlan visitExplain(Explain node, CatalystPlanContext context) { @Override public LogicalPlan visitRelation(Relation node, CatalystPlanContext context) { if (node instanceof DescribeRelation) { - TableIdentifier identifier; - if (node.getTableQualifiedName().getParts().size() == 1) { - identifier = new TableIdentifier(node.getTableQualifiedName().getParts().get(0)); - } else if (node.getTableQualifiedName().getParts().size() == 2) { - identifier = new TableIdentifier( - node.getTableQualifiedName().getParts().get(1), - Option$.MODULE$.apply(node.getTableQualifiedName().getParts().get(0))); - } else if (node.getTableQualifiedName().getParts().size() == 3) { - identifier = new TableIdentifier( - node.getTableQualifiedName().getParts().get(2), - Option$.MODULE$.apply(node.getTableQualifiedName().getParts().get(0)), - Option$.MODULE$.apply(node.getTableQualifiedName().getParts().get(1))); - } else { - throw new IllegalArgumentException("Invalid table name: " + node.getTableQualifiedName() - + " Syntax: [ database_name. ] table_name"); - } + TableIdentifier identifier = getTableIdentifier(node.getTableQualifiedName()); return context.with( new DescribeTableCommand( identifier, @@ -149,10 +135,10 @@ public LogicalPlan visitRelation(Relation node, CatalystPlanContext context) { true, DescribeRelation$.MODULE$.getOutputAttrs())); } - //regular sql algebraic relations - node.getTableName().forEach(t -> + //regular sql algebraic relations + node.getQualifiedNames().forEach(q -> // Resolving the qualifiedName which is composed of a datasource.schema.table - context.withRelation(new UnresolvedRelation(seq(of(t.split("\\."))), CaseInsensitiveStringMap.empty(), false)) + context.withRelation(new UnresolvedRelation(getTableIdentifier(q).nameParts(), CaseInsensitiveStringMap.empty(), false)) ); return context.getPlan(); } diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/utils/RelationUtils.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/utils/RelationUtils.java index 33cb5611d..c325f5185 100644 --- a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/utils/RelationUtils.java +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/utils/RelationUtils.java @@ -1,8 +1,10 @@ package org.opensearch.sql.ppl.utils; +import org.apache.spark.sql.catalyst.TableIdentifier; import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation; import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan; import org.opensearch.sql.ast.expression.QualifiedName; +import scala.Option$; import java.util.List; import java.util.Optional; @@ -15,7 +17,6 @@ public interface RelationUtils { * * @param relations * @param node - * @param contextRelations * @return */ static Optional resolveField(List relations, QualifiedName node, List tables) { @@ -29,4 +30,26 @@ static Optional resolveField(List relations, .findFirst() .map(rel -> node); } + + static TableIdentifier getTableIdentifier(QualifiedName qualifiedName) { + TableIdentifier identifier; + if (qualifiedName.getParts().isEmpty()) { + throw new IllegalArgumentException("Empty table name is invalid"); + } else if (qualifiedName.getParts().size() == 1) { + identifier = new TableIdentifier(qualifiedName.getParts().get(0)); + } else if (qualifiedName.getParts().size() == 2) { + identifier = new TableIdentifier( + qualifiedName.getParts().get(1), + Option$.MODULE$.apply(qualifiedName.getParts().get(0))); + } else if (qualifiedName.getParts().size() == 3) { + identifier = new TableIdentifier( + qualifiedName.getParts().get(2), + Option$.MODULE$.apply(qualifiedName.getParts().get(1)), + Option$.MODULE$.apply(qualifiedName.getParts().get(0))); + } else { + throw new IllegalArgumentException("Invalid table name: " + qualifiedName + + " Syntax: [ database_name. ] table_name"); + } + return identifier; + } } diff --git a/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanBasicQueriesTranslatorTestSuite.scala b/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanBasicQueriesTranslatorTestSuite.scala index 34de86d92..054ab97db 100644 --- a/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanBasicQueriesTranslatorTestSuite.scala +++ b/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanBasicQueriesTranslatorTestSuite.scala @@ -37,13 +37,26 @@ class PPLLogicalPlanBasicQueriesTranslatorTestSuite thrown.getMessage === "Invalid table name: t.b.c.d Syntax: [ database_name. ] table_name") } + test("test describe with backticks") { + val context = new CatalystPlanContext + val logPlan = + planTransformer.visit(plan(pplParser, "describe t.b.`c.d`", false), context) + + val expectedPlan = DescribeTableCommand( + TableIdentifier("c.d", Option("b"), Option("t")), + Map.empty[String, String].empty, + isExtended = true, + output = DescribeRelation.getOutputAttrs) + comparePlans(expectedPlan, logPlan, false) + } + test("test describe FQN table clause") { val context = new CatalystPlanContext val logPlan = - planTransformer.visit(plan(pplParser, "describe schema.default.http_logs", false), context) + planTransformer.visit(plan(pplParser, "describe catalog.schema.http_logs", false), context) val expectedPlan = DescribeTableCommand( - TableIdentifier("http_logs", Option("schema"), Option("default")), + TableIdentifier("http_logs", Option("schema"), Option("catalog")), Map.empty[String, String].empty, isExtended = true, output = DescribeRelation.getOutputAttrs) @@ -64,10 +77,10 @@ class PPLLogicalPlanBasicQueriesTranslatorTestSuite test("test FQN table describe table clause") { val context = new CatalystPlanContext - val logPlan = planTransformer.visit(plan(pplParser, "describe catalog.t", false), context) + val logPlan = planTransformer.visit(plan(pplParser, "describe schema.t", false), context) val expectedPlan = DescribeTableCommand( - TableIdentifier("t", Option("catalog")), + TableIdentifier("t", Option("schema")), Map.empty[String, String].empty, isExtended = true, output = DescribeRelation.getOutputAttrs)