From 745f73ae6902dbc4709da340874ab7a928a31512 Mon Sep 17 00:00:00 2001 From: YANGDB Date: Fri, 11 Oct 2024 17:56:19 -0700 Subject: [PATCH] support spark prior to 3.5 with its extended table identifier (existing table identifier only has 2 parts) Signed-off-by: YANGDB --- .../spark/ppl/FlintSparkPPLBasicITSuite.scala | 45 ++++--------------- .../sql/ppl/CatalystQueryPlanVisitor.java | 3 +- .../sql/ppl/utils/RelationUtils.java | 16 ++++--- ...lPlanBasicQueriesTranslatorTestSuite.scala | 19 ++++---- 4 files changed, 28 insertions(+), 55 deletions(-) diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLBasicITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLBasicITSuite.scala index 4acb818fb..65472d4fd 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLBasicITSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLBasicITSuite.scala @@ -26,7 +26,7 @@ class FlintSparkPPLBasicITSuite private val t2 = "`spark_catalog`.default.`flint_ppl_test2`" private val t3 = "spark_catalog.`default`.`flint_ppl_test3`" private val t4 = "`spark_catalog`.`default`.flint_ppl_test4" - + override def beforeAll(): Unit = { super.beforeAll() @@ -129,54 +129,25 @@ class FlintSparkPPLBasicITSuite test("test backtick table names and name contains '.'") { Seq(t1, t2, t3, t4).foreach { table => - val frame = sql( - s""" + val frame = sql(s""" | source = $table| head 2 | """.stripMargin) assert(frame.collect().length == 2) } // test read table which is unable to create - val t5 = "`spark_catalog`.default.`flint/ppl/test5.log`" - val t6 = "spark_catalog.default.`flint_ppl_test6.log`" + val t5 = "default.`flint/ppl/test5.log`" + val t6 = "default.`flint_ppl_test6.log`" Seq(t5, t6).foreach { table => - val ex = intercept[AnalysisException](sql( - s""" + val ex = intercept[AnalysisException](sql(s""" | source = $table| head 2 | """.stripMargin)) - assert(ex.getMessage().contains("TABLE_OR_VIEW_NOT_FOUND")) + assert(ex.getMessage().contains("Table or view not found")) } val t7 = "spark_catalog.default.flint_ppl_test7.log" - val ex = intercept[IllegalArgumentException](sql( - s""" + val ex = intercept[AnalysisException](sql(s""" | source = $t7| head 2 | """.stripMargin)) - assert(ex.getMessage().contains("Invalid table name")) - } - - test("test describe backtick table names and name contains '.'") { - Seq(t1, t2, t3, t4).foreach { table => - val frame = sql( - s""" - | describe $table - | """.stripMargin) - assert(frame.collect().length > 0) - } - // test read table which is unable to create - val t5 = "`spark_catalog`.default.`flint/ppl/test5.log`" - val t6 = "spark_catalog.default.`flint_ppl_test6.log`" - Seq(t5, t6).foreach { table => - val ex = intercept[AnalysisException](sql( - s""" - | describe $table - | """.stripMargin)) - assert(ex.getMessage().contains("TABLE_OR_VIEW_NOT_FOUND")) - } - val t7 = "spark_catalog.default.flint_ppl_test7.log" - val ex = intercept[IllegalArgumentException](sql( - s""" - | describe $t7 - | """.stripMargin)) - assert(ex.getMessage().contains("Invalid table name")) + assert(ex.getMessage().contains("spark_catalog requires a single-part namespace")) } test("create ppl simple query test") { diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java index 03cd8cc43..6d2b6e432 100644 --- a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java @@ -98,6 +98,7 @@ import static org.opensearch.sql.ppl.utils.DataTypeTransformer.translate; import static org.opensearch.sql.ppl.utils.JoinSpecTransformer.join; import static org.opensearch.sql.ppl.utils.RelationUtils.getTableIdentifier; +import static org.opensearch.sql.ppl.utils.RelationUtils.namedParts; import static org.opensearch.sql.ppl.utils.RelationUtils.resolveField; import static org.opensearch.sql.ppl.utils.WindowSpecTransformer.window; @@ -143,7 +144,7 @@ public LogicalPlan visitRelation(Relation node, CatalystPlanContext context) { //regular sql algebraic relations node.getQualifiedNames().forEach(q -> // Resolving the qualifiedName which is composed of a datasource.schema.table - context.withRelation(new UnresolvedRelation(getTableIdentifier(q).nameParts(), CaseInsensitiveStringMap.empty(), false)) + context.withRelation(new UnresolvedRelation(namedParts(q), CaseInsensitiveStringMap.empty(), false)) ); return context.getPlan(); } diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/utils/RelationUtils.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/utils/RelationUtils.java index c325f5185..7eeca5f3d 100644 --- a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/utils/RelationUtils.java +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/utils/RelationUtils.java @@ -5,10 +5,13 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan; import org.opensearch.sql.ast.expression.QualifiedName; import scala.Option$; +import scala.collection.Seq; import java.util.List; import java.util.Optional; +import static org.opensearch.sql.ppl.utils.DataTypeTransformer.seq; + public interface RelationUtils { /** * attempt resolving if the field is relating to the given relation @@ -21,7 +24,7 @@ public interface RelationUtils { */ static Optional resolveField(List relations, QualifiedName node, List tables) { //when is only a single tables in the query - return the node as is to be resolved by the schema itself - if(tables.size()==1) return Optional.of(node); + if (tables.size() == 1) return Optional.of(node); //when is more than one table in the query (union or join) - filter out nodes that dont apply to the current relation return relations.stream() .filter(rel -> node.getPrefix().isEmpty() || @@ -30,7 +33,7 @@ static Optional resolveField(List relations, .findFirst() .map(rel -> node); } - + static TableIdentifier getTableIdentifier(QualifiedName qualifiedName) { TableIdentifier identifier; if (qualifiedName.getParts().isEmpty()) { @@ -41,15 +44,14 @@ static TableIdentifier getTableIdentifier(QualifiedName qualifiedName) { identifier = new TableIdentifier( qualifiedName.getParts().get(1), Option$.MODULE$.apply(qualifiedName.getParts().get(0))); - } else if (qualifiedName.getParts().size() == 3) { - identifier = new TableIdentifier( - qualifiedName.getParts().get(2), - Option$.MODULE$.apply(qualifiedName.getParts().get(1)), - Option$.MODULE$.apply(qualifiedName.getParts().get(0))); } else { throw new IllegalArgumentException("Invalid table name: " + qualifiedName + " Syntax: [ database_name. ] table_name"); } return identifier; } + + static Seq namedParts(QualifiedName qualifiedName) { + return seq(qualifiedName.getParts()); + } } diff --git a/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanBasicQueriesTranslatorTestSuite.scala b/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanBasicQueriesTranslatorTestSuite.scala index 7007dd5aa..59f5e8eb6 100644 --- a/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanBasicQueriesTranslatorTestSuite.scala +++ b/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanBasicQueriesTranslatorTestSuite.scala @@ -6,14 +6,13 @@ package org.opensearch.flint.spark.ppl import org.opensearch.flint.spark.ppl.PlaneUtils.plan -import org.opensearch.sql.common.antlr.SyntaxCheckException import org.opensearch.sql.ppl.{CatalystPlanContext, CatalystQueryPlanVisitor} import org.scalatest.matchers.should.Matchers import org.apache.spark.SparkFunSuite import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedRelation, UnresolvedStar} -import org.apache.spark.sql.catalyst.expressions.{Ascending, AttributeReference, Descending, GreaterThan, Literal, NamedExpression, SortOrder} +import org.apache.spark.sql.catalyst.expressions.{Ascending, Descending, GreaterThan, Literal, NamedExpression, SortOrder} import org.apache.spark.sql.catalyst.plans.PlanTest import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.execution.command.DescribeTableCommand @@ -30,33 +29,33 @@ class PPLLogicalPlanBasicQueriesTranslatorTestSuite test("test error describe clause") { val context = new CatalystPlanContext val thrown = intercept[IllegalArgumentException] { - planTransformer.visit(plan(pplParser, "describe t.b.c.d", false), context) + planTransformer.visit(plan(pplParser, "describe b.c.d", false), context) } assert( - thrown.getMessage === "Invalid table name: t.b.c.d Syntax: [ database_name. ] table_name") + thrown.getMessage === "Invalid table name: b.c.d Syntax: [ database_name. ] table_name") } - + test("test describe with backticks") { val context = new CatalystPlanContext val logPlan = - planTransformer.visit(plan(pplParser, "describe t.b.`c.d`", false), context) + planTransformer.visit(plan(pplParser, "describe b.`c.d`", false), context) val expectedPlan = DescribeTableCommand( - TableIdentifier("c.d", Option("b"), Option("t")), + TableIdentifier("c.d", Option("b")), Map.empty[String, String].empty, isExtended = true, output = DescribeRelation.getOutputAttrs) comparePlans(expectedPlan, logPlan, false) } - + test("test describe FQN table clause") { val context = new CatalystPlanContext val logPlan = - planTransformer.visit(plan(pplParser, "describe catalog.schema.http_logs", false), context) + planTransformer.visit(plan(pplParser, "describe default.http_logs", false), context) val expectedPlan = DescribeTableCommand( - TableIdentifier("http_logs", Option("schema"), Option("catalog")), + TableIdentifier("http_logs", Option("default")), Map.empty[String, String].empty, isExtended = true, output = DescribeRelation.getOutputAttrs)