From 87470659018600dc3ab681bf1c8dc21b1e7df3bf Mon Sep 17 00:00:00 2001 From: Sean Kao Date: Fri, 12 Jan 2024 11:33:31 -0800 Subject: [PATCH 1/4] quote table name for special character in df read Signed-off-by: Sean Kao --- .../opensearch/flint/spark/FlintSparkIndex.scala | 16 ++++++++++++++++ .../spark/covering/FlintSparkCoveringIndex.scala | 4 ++-- .../spark/skipping/FlintSparkSkippingIndex.scala | 3 ++- 3 files changed, 20 insertions(+), 3 deletions(-) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala index af1e9fa74..038a44005 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala @@ -95,6 +95,22 @@ object FlintSparkIndex { s"flint_${parts(0)}_${parts(1)}_${parts.drop(2).mkString(".")}" } + /** + * Add backticks to table name for special character handling + * + * @param fullTableName + * source full table name + * @return + * quoted table name + */ + def quotedTableName(fullTableName: String): String = { + // TODO: add UT + require(fullTableName.split('.').length >= 3, s"Table name $fullTableName is not qualified") + + val parts = fullTableName.split('.') + s"${parts(0)}.${parts(1)}.`${parts.drop(2).mkString(".")}`" + } + /** * Populate environment variables to persist in Flint metadata. * diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala index cdb3a3462..e23126c68 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala @@ -9,7 +9,7 @@ import scala.collection.JavaConverters.mapAsJavaMapConverter import org.opensearch.flint.core.metadata.FlintMetadata import org.opensearch.flint.spark._ -import org.opensearch.flint.spark.FlintSparkIndex.{flintIndexNamePrefix, generateSchemaJSON, metadataBuilder} +import org.opensearch.flint.spark.FlintSparkIndex.{flintIndexNamePrefix, generateSchemaJSON, metadataBuilder, quotedTableName} import org.opensearch.flint.spark.FlintSparkIndexOptions.empty import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex.{getFlintIndexName, COVERING_INDEX_TYPE} @@ -60,7 +60,7 @@ case class FlintSparkCoveringIndex( override def build(spark: SparkSession, df: Option[DataFrame]): DataFrame = { val colNames = indexedColumns.keys.toSeq - val job = df.getOrElse(spark.read.table(tableName)) + val job = df.getOrElse(spark.read.table(quotedTableName(tableName))) // Add optional filtering condition filterCondition diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala index 120ca8219..8251febfe 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala @@ -77,7 +77,8 @@ case class FlintSparkSkippingIndex( new Column(aggFunc.as(name)) } - df.getOrElse(spark.read.table(tableName)) + // todo: find all occurance of spark.read.table + df.getOrElse(spark.read.table(quotedTableName(tableName))) .groupBy(input_file_name().as(FILE_PATH_COLUMN)) .agg(namedAggFuncs.head, namedAggFuncs.tail: _*) .withColumn(ID_COLUMN, sha1(col(FILE_PATH_COLUMN))) From f46c51ac34d586a3ab4a906872fb0843c6199dfa Mon Sep 17 00:00:00 2001 From: Sean Kao Date: Fri, 12 Jan 2024 13:35:50 -0800 Subject: [PATCH 2/4] add UT Signed-off-by: Sean Kao --- .../FlintSparkCoveringIndexSuite.scala | 19 +++++++++++++++++++ .../FlintSparkSkippingIndexSuite.scala | 13 +++++++++++++ 2 files changed, 32 insertions(+) diff --git a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndexSuite.scala b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndexSuite.scala index f52e6ef85..9829fd5fd 100644 --- a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndexSuite.scala +++ b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndexSuite.scala @@ -5,6 +5,7 @@ package org.opensearch.flint.spark.covering +import org.scalatest.matchers.must.Matchers.contain import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper import org.apache.spark.FlintSuite @@ -30,6 +31,24 @@ class FlintSparkCoveringIndexSuite extends FlintSuite { } } + test("can build index building job with unique ID column") { + val index = + new FlintSparkCoveringIndex("ci", "spark_catalog.default.test", Map("name" -> "string")) + + val df = spark.createDataFrame(Seq(("hello", 20))).toDF("name", "age") + val indexDf = index.build(spark, Some(df)) + indexDf.schema.fieldNames should contain only("name") + } + + test("can build index on table name with special characters") { + val testTableSpecial = "spark_catalog.default.test/2023/10" + val index = new FlintSparkCoveringIndex("ci", testTableSpecial, Map("name" -> "string")) + + val df = spark.createDataFrame(Seq(("hello", 20))).toDF("name", "age") + val indexDf = index.build(spark, Some(df)) + indexDf.schema.fieldNames should contain only("name") + } + test("should fail if no indexed column given") { assertThrows[IllegalArgumentException] { new FlintSparkCoveringIndex("ci", "default.test", Map.empty) diff --git a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndexSuite.scala b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndexSuite.scala index 9760e8cd2..7a55bae0f 100644 --- a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndexSuite.scala +++ b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndexSuite.scala @@ -67,6 +67,19 @@ class FlintSparkSkippingIndexSuite extends FlintSuite { indexDf.schema.fieldNames should contain only ("name", FILE_PATH_COLUMN, ID_COLUMN) } + test("can build index on table name with special characters") { + val testTableSpecial = "spark_catalog.default.test/2023/10" + val indexCol = mock[FlintSparkSkippingStrategy] + when(indexCol.outputSchema()).thenReturn(Map("name" -> "string")) + when(indexCol.getAggregators).thenReturn( + Seq(CollectSet(col("name").expr).toAggregateExpression())) + val index = new FlintSparkSkippingIndex(testTableSpecial, Seq(indexCol)) + + val df = spark.createDataFrame(Seq(("hello", 20))).toDF("name", "age") + val indexDf = index.build(spark, Some(df)) + indexDf.schema.fieldNames should contain only("name", FILE_PATH_COLUMN, ID_COLUMN) + } + // Test index build for different column type Seq( ( From 67d3e4c33eab338d008c250e3bd733a0d6f65ee2 Mon Sep 17 00:00:00 2001 From: Sean Kao Date: Fri, 12 Jan 2024 13:38:45 -0800 Subject: [PATCH 3/4] update comments; scalafmtAll Signed-off-by: Sean Kao --- .../scala/org/opensearch/flint/spark/FlintSparkIndex.scala | 3 +-- .../flint/spark/skipping/FlintSparkSkippingIndex.scala | 1 - .../flint/spark/covering/FlintSparkCoveringIndexSuite.scala | 4 ++-- .../flint/spark/skipping/FlintSparkSkippingIndexSuite.scala | 2 +- 4 files changed, 4 insertions(+), 6 deletions(-) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala index 038a44005..248d105a2 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala @@ -96,7 +96,7 @@ object FlintSparkIndex { } /** - * Add backticks to table name for special character handling + * Add backticks to table name to escape special character * * @param fullTableName * source full table name @@ -104,7 +104,6 @@ object FlintSparkIndex { * quoted table name */ def quotedTableName(fullTableName: String): String = { - // TODO: add UT require(fullTableName.split('.').length >= 3, s"Table name $fullTableName is not qualified") val parts = fullTableName.split('.') diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala index 8251febfe..2e8a3c82d 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala @@ -77,7 +77,6 @@ case class FlintSparkSkippingIndex( new Column(aggFunc.as(name)) } - // todo: find all occurance of spark.read.table df.getOrElse(spark.read.table(quotedTableName(tableName))) .groupBy(input_file_name().as(FILE_PATH_COLUMN)) .agg(namedAggFuncs.head, namedAggFuncs.tail: _*) diff --git a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndexSuite.scala b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndexSuite.scala index 9829fd5fd..1cce47d1a 100644 --- a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndexSuite.scala +++ b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndexSuite.scala @@ -37,7 +37,7 @@ class FlintSparkCoveringIndexSuite extends FlintSuite { val df = spark.createDataFrame(Seq(("hello", 20))).toDF("name", "age") val indexDf = index.build(spark, Some(df)) - indexDf.schema.fieldNames should contain only("name") + indexDf.schema.fieldNames should contain only ("name") } test("can build index on table name with special characters") { @@ -46,7 +46,7 @@ class FlintSparkCoveringIndexSuite extends FlintSuite { val df = spark.createDataFrame(Seq(("hello", 20))).toDF("name", "age") val indexDf = index.build(spark, Some(df)) - indexDf.schema.fieldNames should contain only("name") + indexDf.schema.fieldNames should contain only ("name") } test("should fail if no indexed column given") { diff --git a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndexSuite.scala b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndexSuite.scala index 7a55bae0f..247a055bf 100644 --- a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndexSuite.scala +++ b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndexSuite.scala @@ -77,7 +77,7 @@ class FlintSparkSkippingIndexSuite extends FlintSuite { val df = spark.createDataFrame(Seq(("hello", 20))).toDF("name", "age") val indexDf = index.build(spark, Some(df)) - indexDf.schema.fieldNames should contain only("name", FILE_PATH_COLUMN, ID_COLUMN) + indexDf.schema.fieldNames should contain only ("name", FILE_PATH_COLUMN, ID_COLUMN) } // Test index build for different column type From 7de78d268daed8ee1d720016fbefde3156eaa251 Mon Sep 17 00:00:00 2001 From: Sean Kao Date: Fri, 12 Jan 2024 14:19:13 -0800 Subject: [PATCH 4/4] add a missed table name to quote Signed-off-by: Sean Kao --- .../main/scala/org/opensearch/flint/spark/FlintSpark.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala index 122fea601..25b554581 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala @@ -13,7 +13,7 @@ import org.opensearch.flint.core.{FlintClient, FlintClientBuilder} import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState._ import org.opensearch.flint.core.metadata.log.OptimisticTransaction.NO_LOG_ENTRY import org.opensearch.flint.spark.FlintSpark.RefreshMode.{FULL, INCREMENTAL, RefreshMode} -import org.opensearch.flint.spark.FlintSparkIndex.{ID_COLUMN, StreamingRefresh} +import org.opensearch.flint.spark.FlintSparkIndex.{quotedTableName, ID_COLUMN, StreamingRefresh} import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex import org.opensearch.flint.spark.mv.FlintSparkMaterializedView import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex @@ -372,7 +372,7 @@ class FlintSpark(val spark: SparkSession) extends Logging { logInfo("Start refreshing index in foreach streaming style") val job = spark.readStream .options(options.extraSourceOptions(tableName)) - .table(tableName) + .table(quotedTableName(tableName)) .writeStream .queryName(indexName) .addSinkOptions(options)