From 30a77de3198945ecc9f1bd9afc4ff4ca6c4c3f1a Mon Sep 17 00:00:00 2001 From: YANGDB Date: Fri, 11 Oct 2024 19:19:24 -0700 Subject: [PATCH] [Backport 0.5-nexus] Support table identifier contains dot with backticks (#774) * backport #768 to 0.5-nexus Signed-off-by: YANGDB * support spark prior to 3.5 with its extended table identifier (existing table identifier only has 2 parts) Signed-off-by: YANGDB --------- Signed-off-by: YANGDB --- .../spark/ppl/FlintSparkPPLBasicITSuite.scala | 151 +++++++++++++----- ppl-spark-integration/README.md | 13 +- .../org/opensearch/sql/ast/tree/Relation.java | 7 +- .../sql/ppl/CatalystQueryPlanVisitor.java | 20 +-- .../sql/ppl/utils/RelationUtils.java | 29 +++- ...lPlanBasicQueriesTranslatorTestSuite.scala | 36 ++++- 6 files changed, 190 insertions(+), 66 deletions(-) diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLBasicITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLBasicITSuite.scala index 7d51e123d..65472d4fd 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLBasicITSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLBasicITSuite.scala @@ -5,12 +5,13 @@ package org.opensearch.flint.spark.ppl -import org.apache.spark.sql.{QueryTest, Row} +import org.apache.spark.sql.{AnalysisException, QueryTest, Row} import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedRelation, UnresolvedStar, UnresolvedTableOrView} -import org.apache.spark.sql.catalyst.expressions.{Ascending, Literal, SortOrder} +import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedFunction, UnresolvedRelation, UnresolvedStar} +import org.apache.spark.sql.catalyst.expressions.{Alias, Ascending, Descending, Literal, SortOrder} import org.apache.spark.sql.catalyst.plans.logical._ -import org.apache.spark.sql.execution.command.DescribeTableCommand +import org.apache.spark.sql.execution.ExplainMode +import org.apache.spark.sql.execution.command.{DescribeTableCommand, ExplainCommand} import org.apache.spark.sql.streaming.StreamTest class FlintSparkPPLBasicITSuite @@ -21,12 +22,20 @@ class FlintSparkPPLBasicITSuite /** Test table and index name */ private val testTable = "spark_catalog.default.flint_ppl_test" + private val t1 = "`spark_catalog`.`default`.`flint_ppl_test1`" + private val t2 = "`spark_catalog`.default.`flint_ppl_test2`" + private val t3 = "spark_catalog.`default`.`flint_ppl_test3`" + private val t4 = "`spark_catalog`.`default`.flint_ppl_test4" override def beforeAll(): Unit = { super.beforeAll() // Create test table createPartitionedStateCountryTable(testTable) + createPartitionedStateCountryTable(t1) + createPartitionedStateCountryTable(t2) + createPartitionedStateCountryTable(t3) + createPartitionedStateCountryTable(t4) } protected override def afterEach(): Unit = { @@ -39,48 +48,106 @@ class FlintSparkPPLBasicITSuite } test("describe (extended) table query test") { - val testTableQuoted = "`spark_catalog`.`default`.`flint_ppl_test`" - Seq(testTable, testTableQuoted).foreach { table => - val frame = sql(s""" + val frame = sql(s""" describe flint_ppl_test """.stripMargin) - // Retrieve the results - val results: Array[Row] = frame.collect() - // Define the expected results - val expectedResults: Array[Row] = Array( - Row("name", "string", null), - Row("age", "int", null), - Row("state", "string", null), - Row("country", "string", null), - Row("year", "int", null), - Row("month", "int", null), - Row("# Partition Information", "", ""), - Row("# col_name", "data_type", "comment"), - Row("year", "int", null), - Row("month", "int", null)) - - // Convert actual results to a Set for quick lookup - val resultsSet: Set[Row] = results.toSet - // Check that each expected row is present in the actual results - expectedResults.foreach { expectedRow => - assert( - resultsSet.contains(expectedRow), - s"Expected row $expectedRow not found in results") - } - // Retrieve the logical plan - val logicalPlan: LogicalPlan = - frame.queryExecution.commandExecuted.asInstanceOf[CommandResult].commandLogicalPlan - // Define the expected logical plan - val expectedPlan: LogicalPlan = - DescribeTableCommand( - TableIdentifier("flint_ppl_test"), - Map.empty[String, String], - isExtended = true, - output = DescribeRelation.getOutputAttrs) - // Compare the two plans - comparePlans(logicalPlan, expectedPlan, checkAnalysis = false) + // Retrieve the results + val results: Array[Row] = frame.collect() + // Define the expected results + val expectedResults: Array[Row] = Array( + Row("name", "string", null), + Row("age", "int", null), + Row("state", "string", null), + Row("country", "string", null), + Row("year", "int", null), + Row("month", "int", null), + Row("# Partition Information", "", ""), + Row("# col_name", "data_type", "comment"), + Row("year", "int", null), + Row("month", "int", null)) + + // Convert actual results to a Set for quick lookup + val resultsSet: Set[Row] = results.toSet + // Check that each expected row is present in the actual results + expectedResults.foreach { expectedRow => + assert(resultsSet.contains(expectedRow), s"Expected row $expectedRow not found in results") + } + // Retrieve the logical plan + val logicalPlan: LogicalPlan = + frame.queryExecution.commandExecuted.asInstanceOf[CommandResult].commandLogicalPlan + // Define the expected logical plan + val expectedPlan: LogicalPlan = + DescribeTableCommand( + TableIdentifier("flint_ppl_test"), + Map.empty[String, String], + isExtended = true, + output = DescribeRelation.getOutputAttrs) + // Compare the two plans + comparePlans(logicalPlan, expectedPlan, checkAnalysis = false) + } + + test("describe (extended) FQN (2 parts) table query test") { + val frame = sql(s""" + describe default.flint_ppl_test + """.stripMargin) + + // Retrieve the results + val results: Array[Row] = frame.collect() + // Define the expected results + val expectedResults: Array[Row] = Array( + Row("name", "string", null), + Row("age", "int", null), + Row("state", "string", null), + Row("country", "string", null), + Row("year", "int", null), + Row("month", "int", null), + Row("# Partition Information", "", ""), + Row("# col_name", "data_type", "comment"), + Row("year", "int", null), + Row("month", "int", null)) + + // Convert actual results to a Set for quick lookup + val resultsSet: Set[Row] = results.toSet + // Check that each expected row is present in the actual results + expectedResults.foreach { expectedRow => + assert(resultsSet.contains(expectedRow), s"Expected row $expectedRow not found in results") + } + // Retrieve the logical plan + val logicalPlan: LogicalPlan = + frame.queryExecution.commandExecuted.asInstanceOf[CommandResult].commandLogicalPlan + // Define the expected logical plan + val expectedPlan: LogicalPlan = + DescribeTableCommand( + TableIdentifier("flint_ppl_test", Option("default")), + Map.empty[String, String], + isExtended = true, + output = DescribeRelation.getOutputAttrs) + // Compare the two plans + comparePlans(logicalPlan, expectedPlan, checkAnalysis = false) + } + + test("test backtick table names and name contains '.'") { + Seq(t1, t2, t3, t4).foreach { table => + val frame = sql(s""" + | source = $table| head 2 + | """.stripMargin) + assert(frame.collect().length == 2) + } + // test read table which is unable to create + val t5 = "default.`flint/ppl/test5.log`" + val t6 = "default.`flint_ppl_test6.log`" + Seq(t5, t6).foreach { table => + val ex = intercept[AnalysisException](sql(s""" + | source = $table| head 2 + | """.stripMargin)) + assert(ex.getMessage().contains("Table or view not found")) } + val t7 = "spark_catalog.default.flint_ppl_test7.log" + val ex = intercept[AnalysisException](sql(s""" + | source = $t7| head 2 + | """.stripMargin)) + assert(ex.getMessage().contains("spark_catalog requires a single-part namespace")) } test("create ppl simple query test") { diff --git a/ppl-spark-integration/README.md b/ppl-spark-integration/README.md index aeb2653e8..d7e0445ff 100644 --- a/ppl-spark-integration/README.md +++ b/ppl-spark-integration/README.md @@ -226,10 +226,21 @@ See the next samples of PPL queries : **Describe** - `describe table` This command is equal to the `DESCRIBE EXTENDED table` SQL command - + - `describe schema.table` + - `` describe schema.`table` `` + - `describe catalog.schema.table` + - `` describe catalog.schema.`table` `` + - `` describe `catalog`.`schema`.`table` `` **Fields** - `source = table` - `source = table | fields a,b,c` + - `source = table | fields + a,b,c` + - `source = table | fields - b,c` + - `source = table | eval b1 = b | fields - b1,c` + +_- **Limitation: new field added by eval command with a function cannot be dropped in current version:**_ + - `source = table | eval b1 = b + 1 | fields - b1,c` (Field `b1` cannot be dropped caused by SPARK-49782) + - `source = table | eval b1 = lower(b) | fields - b1,c` (Field `b1` cannot be dropped caused by SPARK-49782) **Nested-Fields** - `source = catalog.schema.table1, catalog.schema.table2 | fields A.nested1, B.nested1` diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/ast/tree/Relation.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/ast/tree/Relation.java index 6a482db67..da442ec01 100644 --- a/ppl-spark-integration/src/main/java/org/opensearch/sql/ast/tree/Relation.java +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/ast/tree/Relation.java @@ -38,7 +38,6 @@ public Relation(UnresolvedExpression tableName, String alias) { private String alias; /** - * Return table name. * * @return table name */ @@ -46,7 +45,11 @@ public List getTableName() { return tableName.stream().map(Object::toString).collect(Collectors.toList()); } - + + public List getQualifiedNames() { + return tableName.stream().map(t -> (QualifiedName) t).collect(Collectors.toList()); + } + /** * Return alias. * diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java index 6caaec839..6d2b6e432 100644 --- a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java @@ -97,6 +97,8 @@ import static org.opensearch.sql.ppl.utils.DataTypeTransformer.seq; import static org.opensearch.sql.ppl.utils.DataTypeTransformer.translate; import static org.opensearch.sql.ppl.utils.JoinSpecTransformer.join; +import static org.opensearch.sql.ppl.utils.RelationUtils.getTableIdentifier; +import static org.opensearch.sql.ppl.utils.RelationUtils.namedParts; import static org.opensearch.sql.ppl.utils.RelationUtils.resolveField; import static org.opensearch.sql.ppl.utils.WindowSpecTransformer.window; @@ -131,17 +133,7 @@ public LogicalPlan visitExplain(Explain node, CatalystPlanContext context) { @Override public LogicalPlan visitRelation(Relation node, CatalystPlanContext context) { if (node instanceof DescribeRelation) { - TableIdentifier identifier; - if (node.getTableQualifiedName().getParts().size() == 1) { - identifier = new TableIdentifier(node.getTableQualifiedName().getParts().get(0)); - } else if (node.getTableQualifiedName().getParts().size() == 2) { - identifier = new TableIdentifier( - node.getTableQualifiedName().getParts().get(1), - Option$.MODULE$.apply(node.getTableQualifiedName().getParts().get(0))); - } else { - throw new IllegalArgumentException("Invalid table name: " + node.getTableQualifiedName() - + " Syntax: [ database_name. ] table_name"); - } + TableIdentifier identifier = getTableIdentifier(node.getTableQualifiedName()); return context.with( new DescribeTableCommand( identifier, @@ -149,10 +141,10 @@ public LogicalPlan visitRelation(Relation node, CatalystPlanContext context) { true, DescribeRelation$.MODULE$.getOutputAttrs())); } - //regular sql algebraic relations - node.getTableName().forEach(t -> + //regular sql algebraic relations + node.getQualifiedNames().forEach(q -> // Resolving the qualifiedName which is composed of a datasource.schema.table - context.withRelation(new UnresolvedRelation(seq(of(t.split("\\."))), CaseInsensitiveStringMap.empty(), false)) + context.withRelation(new UnresolvedRelation(namedParts(q), CaseInsensitiveStringMap.empty(), false)) ); return context.getPlan(); } diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/utils/RelationUtils.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/utils/RelationUtils.java index 33cb5611d..7eeca5f3d 100644 --- a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/utils/RelationUtils.java +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/utils/RelationUtils.java @@ -1,12 +1,17 @@ package org.opensearch.sql.ppl.utils; +import org.apache.spark.sql.catalyst.TableIdentifier; import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation; import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan; import org.opensearch.sql.ast.expression.QualifiedName; +import scala.Option$; +import scala.collection.Seq; import java.util.List; import java.util.Optional; +import static org.opensearch.sql.ppl.utils.DataTypeTransformer.seq; + public interface RelationUtils { /** * attempt resolving if the field is relating to the given relation @@ -15,12 +20,11 @@ public interface RelationUtils { * * @param relations * @param node - * @param contextRelations * @return */ static Optional resolveField(List relations, QualifiedName node, List tables) { //when is only a single tables in the query - return the node as is to be resolved by the schema itself - if(tables.size()==1) return Optional.of(node); + if (tables.size() == 1) return Optional.of(node); //when is more than one table in the query (union or join) - filter out nodes that dont apply to the current relation return relations.stream() .filter(rel -> node.getPrefix().isEmpty() || @@ -29,4 +33,25 @@ static Optional resolveField(List relations, .findFirst() .map(rel -> node); } + + static TableIdentifier getTableIdentifier(QualifiedName qualifiedName) { + TableIdentifier identifier; + if (qualifiedName.getParts().isEmpty()) { + throw new IllegalArgumentException("Empty table name is invalid"); + } else if (qualifiedName.getParts().size() == 1) { + identifier = new TableIdentifier(qualifiedName.getParts().get(0)); + } else if (qualifiedName.getParts().size() == 2) { + identifier = new TableIdentifier( + qualifiedName.getParts().get(1), + Option$.MODULE$.apply(qualifiedName.getParts().get(0))); + } else { + throw new IllegalArgumentException("Invalid table name: " + qualifiedName + + " Syntax: [ database_name. ] table_name"); + } + return identifier; + } + + static Seq namedParts(QualifiedName qualifiedName) { + return seq(qualifiedName.getParts()); + } } diff --git a/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanBasicQueriesTranslatorTestSuite.scala b/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanBasicQueriesTranslatorTestSuite.scala index 131778820..59f5e8eb6 100644 --- a/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanBasicQueriesTranslatorTestSuite.scala +++ b/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanBasicQueriesTranslatorTestSuite.scala @@ -12,7 +12,7 @@ import org.scalatest.matchers.should.Matchers import org.apache.spark.SparkFunSuite import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedRelation, UnresolvedStar} -import org.apache.spark.sql.catalyst.expressions.{Ascending, AttributeReference, Descending, GreaterThan, Literal, NamedExpression, SortOrder} +import org.apache.spark.sql.catalyst.expressions.{Ascending, Descending, GreaterThan, Literal, NamedExpression, SortOrder} import org.apache.spark.sql.catalyst.plans.PlanTest import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.execution.command.DescribeTableCommand @@ -29,11 +29,37 @@ class PPLLogicalPlanBasicQueriesTranslatorTestSuite test("test error describe clause") { val context = new CatalystPlanContext val thrown = intercept[IllegalArgumentException] { - planTransformer.visit(plan(pplParser, "describe t.b.c.d", false), context) + planTransformer.visit(plan(pplParser, "describe b.c.d", false), context) } assert( - thrown.getMessage === "Invalid table name: t.b.c.d Syntax: [ database_name. ] table_name") + thrown.getMessage === "Invalid table name: b.c.d Syntax: [ database_name. ] table_name") + } + + test("test describe with backticks") { + val context = new CatalystPlanContext + val logPlan = + planTransformer.visit(plan(pplParser, "describe b.`c.d`", false), context) + + val expectedPlan = DescribeTableCommand( + TableIdentifier("c.d", Option("b")), + Map.empty[String, String].empty, + isExtended = true, + output = DescribeRelation.getOutputAttrs) + comparePlans(expectedPlan, logPlan, false) + } + + test("test describe FQN table clause") { + val context = new CatalystPlanContext + val logPlan = + planTransformer.visit(plan(pplParser, "describe default.http_logs", false), context) + + val expectedPlan = DescribeTableCommand( + TableIdentifier("http_logs", Option("default")), + Map.empty[String, String].empty, + isExtended = true, + output = DescribeRelation.getOutputAttrs) + comparePlans(expectedPlan, logPlan, false) } test("test simple describe clause") { @@ -50,10 +76,10 @@ class PPLLogicalPlanBasicQueriesTranslatorTestSuite test("test FQN table describe table clause") { val context = new CatalystPlanContext - val logPlan = planTransformer.visit(plan(pplParser, "describe catalog.t", false), context) + val logPlan = planTransformer.visit(plan(pplParser, "describe schema.t", false), context) val expectedPlan = DescribeTableCommand( - TableIdentifier("t", Option("catalog")), + TableIdentifier("t", Option("schema")), Map.empty[String, String].empty, isExtended = true, output = DescribeRelation.getOutputAttrs)