From 9d377fc7bae6ff7865403c888321a6c6e88a22fa Mon Sep 17 00:00:00 2001 From: Sean Kao Date: Thu, 9 May 2024 19:00:10 -0700 Subject: [PATCH] fix build index with special full table name Signed-off-by: Sean Kao --- .../flint/spark/FlintSparkIndex.scala | 4 ++-- .../FlintSparkCoveringIndexSuite.scala | 22 +++++++------------ .../FlintSparkSkippingIndexSuite.scala | 13 ++++++----- 3 files changed, 18 insertions(+), 21 deletions(-) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala index 702b1475e..94e8d68d5 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala @@ -102,7 +102,7 @@ object FlintSparkIndex { } /** - * Add backticks to table name to escape special character + * Add backticks to all parts of full table name to escape special character * * @param fullTableName * source full table name @@ -113,7 +113,7 @@ object FlintSparkIndex { require(fullTableName.split('.').length >= 3, s"Table name $fullTableName is not qualified") val parts = fullTableName.split('.') - s"${parts(0)}.${parts(1)}.`${parts.drop(2).mkString(".")}`" + s"`${parts(0)}`.`${parts(1)}`.`${parts.drop(2).mkString(".")}`" } /** diff --git a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndexSuite.scala b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndexSuite.scala index 1cce47d1a..5abf12850 100644 --- a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndexSuite.scala +++ b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndexSuite.scala @@ -9,6 +9,7 @@ import org.scalatest.matchers.must.Matchers.contain import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper import org.apache.spark.FlintSuite +import org.apache.spark.sql.AnalysisException class FlintSparkCoveringIndexSuite extends FlintSuite { @@ -31,22 +32,15 @@ class FlintSparkCoveringIndexSuite extends FlintSuite { } } - test("can build index building job with unique ID column") { - val index = - new FlintSparkCoveringIndex("ci", "spark_catalog.default.test", Map("name" -> "string")) - - val df = spark.createDataFrame(Seq(("hello", 20))).toDF("name", "age") - val indexDf = index.build(spark, Some(df)) - indexDf.schema.fieldNames should contain only ("name") - } - - test("can build index on table name with special characters") { - val testTableSpecial = "spark_catalog.default.test/2023/10" + test("can parse identifier name with special characters during index build") { + val testTableSpecial = "spark_catalog.de-fault.test/2023/10" val index = new FlintSparkCoveringIndex("ci", testTableSpecial, Map("name" -> "string")) - val df = spark.createDataFrame(Seq(("hello", 20))).toDF("name", "age") - val indexDf = index.build(spark, Some(df)) - indexDf.schema.fieldNames should contain only ("name") + val error = intercept[AnalysisException] { + index.build(spark, None) + } + // Getting this error means that parsing doesn't fail with unquoted identifier + assert(error.getMessage().contains("UnresolvedRelation")) } test("should fail if no indexed column given") { diff --git a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndexSuite.scala b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndexSuite.scala index 6772eb8f3..1d9c2b81a 100644 --- a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndexSuite.scala +++ b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndexSuite.scala @@ -20,6 +20,7 @@ import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper import org.scalatestplus.mockito.MockitoSugar.mock import org.apache.spark.FlintSuite +import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.expressions.aggregate.CollectSet import org.apache.spark.sql.functions.col @@ -71,17 +72,19 @@ class FlintSparkSkippingIndexSuite extends FlintSuite { indexDf.schema.fieldNames should contain only ("name", FILE_PATH_COLUMN, ID_COLUMN) } - test("can build index on table name with special characters") { - val testTableSpecial = "spark_catalog.default.test/2023/10" + test("can parse identifier name with special characters during index build") { + val testTableSpecial = "spark_catalog.de-fault.test/2023/10" val indexCol = mock[FlintSparkSkippingStrategy] when(indexCol.outputSchema()).thenReturn(Map("name" -> "string")) when(indexCol.getAggregators).thenReturn( Seq(CollectSet(col("name").expr).toAggregateExpression())) val index = new FlintSparkSkippingIndex(testTableSpecial, Seq(indexCol)) - val df = spark.createDataFrame(Seq(("hello", 20))).toDF("name", "age") - val indexDf = index.build(spark, Some(df)) - indexDf.schema.fieldNames should contain only ("name", FILE_PATH_COLUMN, ID_COLUMN) + val error = intercept[AnalysisException] { + index.build(spark, None) + } + // Getting this error means that parsing doesn't fail with unquoted identifier + assert(error.getMessage().contains("UnresolvedRelation")) } // Test index build for different column type