diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala index af1e9fa74..ec452d36f 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala @@ -79,6 +79,29 @@ object FlintSparkIndex { */ val ID_COLUMN: String = "__id__" + /** + * invalid index name characters. + */ + val INVALID_INDEX_NAME_CHARS: Set[Char] = Set(' ', ',', ':', '"', '*', '+', '/', '\\', '|', '?', '#', '>', '<') + + /** + * Percent-encode invalid OpenSearch index name characters. + * + * @param indexName + * raw Flint index name + * @return + * percent-encoded index name + */ + def percentEncode(indexName: String): String = { + indexName + .flatMap(ch => + if (INVALID_INDEX_NAME_CHARS.contains(ch)) { + s"%${ch.toInt.toHexString}" + } else { + ch.toString + }) + } + /** * Common prefix of Flint index name which is "flint_database_table_" * @@ -92,7 +115,9 @@ object FlintSparkIndex { // Keep all parts since the third as it is val parts = fullTableName.split('.') - s"flint_${parts(0)}_${parts(1)}_${parts.drop(2).mkString(".")}" + val raw = s"flint_${parts(0)}_${parts(1)}_${parts.drop(2).mkString(".")}" + + percentEncode(raw) } /** diff --git a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndexSuite.scala b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndexSuite.scala index f52e6ef85..9d460ed1f 100644 --- a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndexSuite.scala +++ b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndexSuite.scala @@ -23,6 +23,12 @@ class FlintSparkCoveringIndexSuite extends FlintSuite { index.name() shouldBe "flint_spark_catalog_default_test.2023.10_ci.01_index" } + test("get encoded covering index name on table name with special characters") { + val testTableSpecial = "spark_catalog.default.test ,:\"*+/\\|?#><" + val index = new FlintSparkCoveringIndex("ci", testTableSpecial, Map("name" -> "string")) + index.name() shouldBe "flint_spark_catalog_default_test%20%2c%3a%22%2a%2b%2f%5c%7c%3f%23%3e%3c_ci_index" + } + test("should fail if get index name without full table name") { val index = new FlintSparkCoveringIndex("ci", "test", Map("name" -> "string")) assertThrows[IllegalArgumentException] { diff --git a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedViewSuite.scala b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedViewSuite.scala index b7746d44a..c82038588 100644 --- a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedViewSuite.scala +++ b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedViewSuite.scala @@ -47,6 +47,12 @@ class FlintSparkMaterializedViewSuite extends FlintSuite { mv.name() shouldBe "flint_spark_catalog_default_mv.2023.10" } + test("get encoded mv name with special characters") { + val testMvNameSpecial = "spark_catalog.default.mv ,:\"*+/\\|?#><" + val mv = FlintSparkMaterializedView(testMvNameSpecial, testQuery, Map.empty) + mv.name() shouldBe "flint_spark_catalog_default_mv%20%2c%3a%22%2a%2b%2f%5c%7c%3f%23%3e%3c" + } + test("should fail if get name with unqualified MV name") { the[IllegalArgumentException] thrownBy FlintSparkMaterializedView("mv", testQuery, Map.empty).name() diff --git a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndexSuite.scala b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndexSuite.scala index 9760e8cd2..2c7a2a91e 100644 --- a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndexSuite.scala +++ b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndexSuite.scala @@ -36,6 +36,13 @@ class FlintSparkSkippingIndexSuite extends FlintSuite { index.name() shouldBe "flint_spark_catalog_default_test.2023.10_skipping_index" } + test("get encoded skipping index name on table name with special characters") { + val testTableSpecial = "spark_catalog.default.test ,:\"*+/\\|?#><" + val index = + new FlintSparkSkippingIndex(testTableSpecial, Seq(mock[FlintSparkSkippingStrategy])) + index.name() shouldBe "flint_spark_catalog_default_test%20%2c%3a%22%2a%2b%2f%5c%7c%3f%23%3e%3c_skipping_index" + } + test("get index metadata") { val indexCol = mock[FlintSparkSkippingStrategy] when(indexCol.kind).thenReturn(SkippingKind.PARTITION)