From dc708ff351b7cfdfc390a5301d30a2bded154cc1 Mon Sep 17 00:00:00 2001 From: Sean Kao Date: Mon, 8 Jan 2024 20:56:31 -0800 Subject: [PATCH 1/5] percent-encode invalid flint index characters Signed-off-by: Sean Kao --- .../flint/spark/FlintSparkIndex.scala | 27 ++++++++++++++++++- .../FlintSparkCoveringIndexSuite.scala | 6 +++++ .../mv/FlintSparkMaterializedViewSuite.scala | 6 +++++ .../FlintSparkSkippingIndexSuite.scala | 7 +++++ 4 files changed, 45 insertions(+), 1 deletion(-) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala index af1e9fa74..ec452d36f 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala @@ -79,6 +79,29 @@ object FlintSparkIndex { */ val ID_COLUMN: String = "__id__" + /** + * invalid index name characters. + */ + val INVALID_INDEX_NAME_CHARS: Set[Char] = Set(' ', ',', ':', '"', '*', '+', '/', '\\', '|', '?', '#', '>', '<') + + /** + * Percent-encode invalid OpenSearch index name characters. + * + * @param indexName + * raw Flint index name + * @return + * percent-encoded index name + */ + def percentEncode(indexName: String): String = { + indexName + .flatMap(ch => + if (INVALID_INDEX_NAME_CHARS.contains(ch)) { + s"%${ch.toInt.toHexString}" + } else { + ch.toString + }) + } + /** * Common prefix of Flint index name which is "flint_database_table_" * @@ -92,7 +115,9 @@ object FlintSparkIndex { // Keep all parts since the third as it is val parts = fullTableName.split('.') - s"flint_${parts(0)}_${parts(1)}_${parts.drop(2).mkString(".")}" + val raw = s"flint_${parts(0)}_${parts(1)}_${parts.drop(2).mkString(".")}" + + percentEncode(raw) } /** diff --git a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndexSuite.scala b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndexSuite.scala index f52e6ef85..9d460ed1f 100644 --- a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndexSuite.scala +++ b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndexSuite.scala @@ -23,6 +23,12 @@ class FlintSparkCoveringIndexSuite extends FlintSuite { index.name() shouldBe "flint_spark_catalog_default_test.2023.10_ci.01_index" } + test("get encoded covering index name on table name with special characters") { + val testTableSpecial = "spark_catalog.default.test ,:\"*+/\\|?#><" + val index = new FlintSparkCoveringIndex("ci", testTableSpecial, Map("name" -> "string")) + index.name() shouldBe "flint_spark_catalog_default_test%20%2c%3a%22%2a%2b%2f%5c%7c%3f%23%3e%3c_ci_index" + } + test("should fail if get index name without full table name") { val index = new FlintSparkCoveringIndex("ci", "test", Map("name" -> "string")) assertThrows[IllegalArgumentException] { diff --git a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedViewSuite.scala b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedViewSuite.scala index b7746d44a..c82038588 100644 --- a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedViewSuite.scala +++ b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedViewSuite.scala @@ -47,6 +47,12 @@ class FlintSparkMaterializedViewSuite extends FlintSuite { mv.name() shouldBe "flint_spark_catalog_default_mv.2023.10" } + test("get encoded mv name with special characters") { + val testMvNameSpecial = "spark_catalog.default.mv ,:\"*+/\\|?#><" + val mv = FlintSparkMaterializedView(testMvNameSpecial, testQuery, Map.empty) + mv.name() shouldBe "flint_spark_catalog_default_mv%20%2c%3a%22%2a%2b%2f%5c%7c%3f%23%3e%3c" + } + test("should fail if get name with unqualified MV name") { the[IllegalArgumentException] thrownBy FlintSparkMaterializedView("mv", testQuery, Map.empty).name() diff --git a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndexSuite.scala b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndexSuite.scala index 9760e8cd2..2c7a2a91e 100644 --- a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndexSuite.scala +++ b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndexSuite.scala @@ -36,6 +36,13 @@ class FlintSparkSkippingIndexSuite extends FlintSuite { index.name() shouldBe "flint_spark_catalog_default_test.2023.10_skipping_index" } + test("get encoded skipping index name on table name with special characters") { + val testTableSpecial = "spark_catalog.default.test ,:\"*+/\\|?#><" + val index = + new FlintSparkSkippingIndex(testTableSpecial, Seq(mock[FlintSparkSkippingStrategy])) + index.name() shouldBe "flint_spark_catalog_default_test%20%2c%3a%22%2a%2b%2f%5c%7c%3f%23%3e%3c_skipping_index" + } + test("get index metadata") { val indexCol = mock[FlintSparkSkippingStrategy] when(indexCol.kind).thenReturn(SkippingKind.PARTITION) From 3f82e67f698cf363dd35edae5a74589a398c7956 Mon Sep 17 00:00:00 2001 From: Sean Kao Date: Mon, 8 Jan 2024 21:24:50 -0800 Subject: [PATCH 2/5] formatting Signed-off-by: Sean Kao --- .../scala/org/opensearch/flint/spark/FlintSparkIndex.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala index ec452d36f..f12892398 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala @@ -82,7 +82,8 @@ object FlintSparkIndex { /** * invalid index name characters. */ - val INVALID_INDEX_NAME_CHARS: Set[Char] = Set(' ', ',', ':', '"', '*', '+', '/', '\\', '|', '?', '#', '>', '<') + val INVALID_INDEX_NAME_CHARS: Set[Char] = + Set(' ', ',', ':', '"', '*', '+', '/', '\\', '|', '?', '#', '>', '<') /** * Percent-encode invalid OpenSearch index name characters. From a26e7796c1337bcf54ad3e30c24d6bf6e45fddfb Mon Sep 17 00:00:00 2001 From: Sean Kao Date: Tue, 9 Jan 2024 11:32:55 -0800 Subject: [PATCH 3/5] move encoding logic to FlintOpenSearchClient Signed-off-by: Sean Kao --- .../core/storage/FlintOpenSearchClient.java | 50 ++++++++++++++++--- .../flint/spark/FlintSparkIndex.scala | 28 +---------- .../FlintSparkCoveringIndexSuite.scala | 6 --- .../mv/FlintSparkMaterializedViewSuite.scala | 6 --- .../FlintSparkSkippingIndexSuite.scala | 7 --- .../core/FlintOpenSearchClientSuite.scala | 24 +++++++++ 6 files changed, 68 insertions(+), 53 deletions(-) diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java index 22badfbf9..fd6530af3 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java @@ -19,6 +19,7 @@ import java.util.Objects; import java.util.concurrent.atomic.AtomicReference; import java.util.logging.Logger; +import java.util.Set; import java.util.stream.Collectors; import org.apache.http.HttpHost; import org.apache.http.auth.AuthScope; @@ -71,6 +72,13 @@ public class FlintOpenSearchClient implements FlintClient { new NamedXContentRegistry(new SearchModule(Settings.builder().build(), new ArrayList<>()).getNamedXContents()); + /** + * Invalid index name characters to percent-encode, + * excluding '*' because it's reserved for pattern matching. + */ + private final static Set INVALID_INDEX_NAME_CHARS = + Set.of(' ', ',', ':', '"', '+', '/', '\\', '|', '?', '#', '>', '<'); + /** * Metadata log index name prefix */ @@ -121,7 +129,7 @@ public void createIndex(String indexName, FlintMetadata metadata) { protected void createIndex(String indexName, String mapping, Option settings) { LOG.info("Creating Flint index " + indexName); - String osIndexName = toLowercase(indexName); + String osIndexName = sanitizeIndexName(indexName); try (RestHighLevelClient client = createClient()) { CreateIndexRequest request = new CreateIndexRequest(osIndexName); request.mapping(mapping, XContentType.JSON); @@ -137,7 +145,7 @@ protected void createIndex(String indexName, String mapping, Option sett @Override public boolean exists(String indexName) { LOG.info("Checking if Flint index exists " + indexName); - String osIndexName = toLowercase(indexName); + String osIndexName = sanitizeIndexName(indexName); try (RestHighLevelClient client = createClient()) { return client.indices().exists(new GetIndexRequest(osIndexName), RequestOptions.DEFAULT); } catch (IOException e) { @@ -148,7 +156,7 @@ public boolean exists(String indexName) { @Override public List getAllIndexMetadata(String indexNamePattern) { LOG.info("Fetching all Flint index metadata for pattern " + indexNamePattern); - String osIndexNamePattern = toLowercase(indexNamePattern); + String osIndexNamePattern = sanitizeIndexName(indexNamePattern); try (RestHighLevelClient client = createClient()) { GetIndexRequest request = new GetIndexRequest(osIndexNamePattern); GetIndexResponse response = client.indices().get(request, RequestOptions.DEFAULT); @@ -166,7 +174,7 @@ public List getAllIndexMetadata(String indexNamePattern) { @Override public FlintMetadata getIndexMetadata(String indexName) { LOG.info("Fetching Flint index metadata for " + indexName); - String osIndexName = toLowercase(indexName); + String osIndexName = sanitizeIndexName(indexName); try (RestHighLevelClient client = createClient()) { GetIndexRequest request = new GetIndexRequest(osIndexName); GetIndexResponse response = client.indices().get(request, RequestOptions.DEFAULT); @@ -182,7 +190,7 @@ public FlintMetadata getIndexMetadata(String indexName) { @Override public void deleteIndex(String indexName) { LOG.info("Deleting Flint index " + indexName); - String osIndexName = toLowercase(indexName); + String osIndexName = sanitizeIndexName(indexName); try (RestHighLevelClient client = createClient()) { DeleteIndexRequest request = new DeleteIndexRequest(osIndexName); @@ -211,7 +219,7 @@ public FlintReader createReader(String indexName, String query) { queryBuilder = AbstractQueryBuilder.parseInnerQueryBuilder(parser); } return new OpenSearchScrollReader(createClient(), - toLowercase(indexName), + sanitizeIndexName(indexName), new SearchSourceBuilder().query(queryBuilder), options); } catch (IOException e) { @@ -221,7 +229,7 @@ public FlintReader createReader(String indexName, String query) { public FlintWriter createWriter(String indexName) { LOG.info("Creating Flint index writer for " + indexName); - return new OpenSearchWriter(createClient(), toLowercase(indexName), options.getRefreshPolicy()); + return new OpenSearchWriter(createClient(), sanitizeIndexName(indexName), options.getRefreshPolicy()); } @Override @@ -287,4 +295,32 @@ private String toLowercase(String indexName) { return indexName.toLowerCase(Locale.ROOT); } + + /* + * Percent-encode invalid OpenSearch index name characters. + */ + private String percentEncode(String indexName, Set charsToEncode) { + Objects.requireNonNull(indexName); + Objects.requireNonNull(charsToEncode); + + StringBuilder builder = new StringBuilder(indexName.length()); + for (char ch : indexName.toCharArray()) { + if (charsToEncode.contains(ch)) { + builder.append("%").append(String.format("%02X", (int) ch)); + } else { + builder.append(ch); + } + } + return builder.toString(); + } + + /* + * Sanitize index name to comply with OpenSearch index name restrictions. + */ + private String sanitizeIndexName(String indexName) { + Objects.requireNonNull(indexName); + + String encoded = percentEncode(indexName, INVALID_INDEX_NAME_CHARS); + return toLowercase(encoded); + } } diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala index f12892398..af1e9fa74 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala @@ -79,30 +79,6 @@ object FlintSparkIndex { */ val ID_COLUMN: String = "__id__" - /** - * invalid index name characters. - */ - val INVALID_INDEX_NAME_CHARS: Set[Char] = - Set(' ', ',', ':', '"', '*', '+', '/', '\\', '|', '?', '#', '>', '<') - - /** - * Percent-encode invalid OpenSearch index name characters. - * - * @param indexName - * raw Flint index name - * @return - * percent-encoded index name - */ - def percentEncode(indexName: String): String = { - indexName - .flatMap(ch => - if (INVALID_INDEX_NAME_CHARS.contains(ch)) { - s"%${ch.toInt.toHexString}" - } else { - ch.toString - }) - } - /** * Common prefix of Flint index name which is "flint_database_table_" * @@ -116,9 +92,7 @@ object FlintSparkIndex { // Keep all parts since the third as it is val parts = fullTableName.split('.') - val raw = s"flint_${parts(0)}_${parts(1)}_${parts.drop(2).mkString(".")}" - - percentEncode(raw) + s"flint_${parts(0)}_${parts(1)}_${parts.drop(2).mkString(".")}" } /** diff --git a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndexSuite.scala b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndexSuite.scala index 9d460ed1f..f52e6ef85 100644 --- a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndexSuite.scala +++ b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndexSuite.scala @@ -23,12 +23,6 @@ class FlintSparkCoveringIndexSuite extends FlintSuite { index.name() shouldBe "flint_spark_catalog_default_test.2023.10_ci.01_index" } - test("get encoded covering index name on table name with special characters") { - val testTableSpecial = "spark_catalog.default.test ,:\"*+/\\|?#><" - val index = new FlintSparkCoveringIndex("ci", testTableSpecial, Map("name" -> "string")) - index.name() shouldBe "flint_spark_catalog_default_test%20%2c%3a%22%2a%2b%2f%5c%7c%3f%23%3e%3c_ci_index" - } - test("should fail if get index name without full table name") { val index = new FlintSparkCoveringIndex("ci", "test", Map("name" -> "string")) assertThrows[IllegalArgumentException] { diff --git a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedViewSuite.scala b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedViewSuite.scala index c82038588..b7746d44a 100644 --- a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedViewSuite.scala +++ b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedViewSuite.scala @@ -47,12 +47,6 @@ class FlintSparkMaterializedViewSuite extends FlintSuite { mv.name() shouldBe "flint_spark_catalog_default_mv.2023.10" } - test("get encoded mv name with special characters") { - val testMvNameSpecial = "spark_catalog.default.mv ,:\"*+/\\|?#><" - val mv = FlintSparkMaterializedView(testMvNameSpecial, testQuery, Map.empty) - mv.name() shouldBe "flint_spark_catalog_default_mv%20%2c%3a%22%2a%2b%2f%5c%7c%3f%23%3e%3c" - } - test("should fail if get name with unqualified MV name") { the[IllegalArgumentException] thrownBy FlintSparkMaterializedView("mv", testQuery, Map.empty).name() diff --git a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndexSuite.scala b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndexSuite.scala index 2c7a2a91e..9760e8cd2 100644 --- a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndexSuite.scala +++ b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndexSuite.scala @@ -36,13 +36,6 @@ class FlintSparkSkippingIndexSuite extends FlintSuite { index.name() shouldBe "flint_spark_catalog_default_test.2023.10_skipping_index" } - test("get encoded skipping index name on table name with special characters") { - val testTableSpecial = "spark_catalog.default.test ,:\"*+/\\|?#><" - val index = - new FlintSparkSkippingIndex(testTableSpecial, Seq(mock[FlintSparkSkippingStrategy])) - index.name() shouldBe "flint_spark_catalog_default_test%20%2c%3a%22%2a%2b%2f%5c%7c%3f%23%3e%3c_skipping_index" - } - test("get index metadata") { val indexCol = mock[FlintSparkSkippingStrategy] when(indexCol.kind).thenReturn(SkippingKind.PARTITION) diff --git a/integ-test/src/test/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala index 7da67051d..85be9bbb8 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala @@ -114,6 +114,30 @@ class FlintOpenSearchClientSuite extends AnyFlatSpec with OpenSearchSuite with M flintClient.exists(indexName) shouldBe false } + it should "percent-encode invalid index name characters" in { + val indexName = "test ,:\"+/\\|?#><" + flintClient.createIndex( + indexName, + FlintMetadata("""{"properties": {"test": { "type": "integer" } } }""")) + + flintClient.exists(indexName) shouldBe true + flintClient.getIndexMetadata(indexName) should not be null + flintClient.getAllIndexMetadata("test *") should not be empty + + // Read write test + val writer = flintClient.createWriter(indexName) + writer.write("""{"create":{}}""") + writer.write("\n") + writer.write("""{"test":1}""") + writer.write("\n") + writer.flush() + writer.close() + flintClient.createReader(indexName, "").hasNext shouldBe true + + flintClient.deleteIndex(indexName) + flintClient.exists(indexName) shouldBe false + } + it should "return false if index not exist" in { flintClient.exists("non-exist-index") shouldBe false } From 4b7cd5e97c68a00bf035eb485649be4a7509bf45 Mon Sep 17 00:00:00 2001 From: Sean Kao Date: Tue, 9 Jan 2024 11:55:06 -0800 Subject: [PATCH 4/5] minor arg fix and string format fix Signed-off-by: Sean Kao --- .../flint/core/storage/FlintOpenSearchClient.java | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java index fd6530af3..9f3cf824a 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java @@ -299,14 +299,13 @@ private String toLowercase(String indexName) { /* * Percent-encode invalid OpenSearch index name characters. */ - private String percentEncode(String indexName, Set charsToEncode) { + private String percentEncode(String indexName) { Objects.requireNonNull(indexName); - Objects.requireNonNull(charsToEncode); StringBuilder builder = new StringBuilder(indexName.length()); for (char ch : indexName.toCharArray()) { - if (charsToEncode.contains(ch)) { - builder.append("%").append(String.format("%02X", (int) ch)); + if (INVALID_INDEX_NAME_CHARS.contains(ch)) { + builder.append(String.format("%%%02X", (int) ch)); } else { builder.append(ch); } @@ -320,7 +319,7 @@ private String percentEncode(String indexName, Set charsToEncode) { private String sanitizeIndexName(String indexName) { Objects.requireNonNull(indexName); - String encoded = percentEncode(indexName, INVALID_INDEX_NAME_CHARS); + String encoded = percentEncode(indexName); return toLowercase(encoded); } } From d8e174861890cc4654a811dba5ba150c0e6c0c4f Mon Sep 17 00:00:00 2001 From: Sean Kao Date: Tue, 9 Jan 2024 13:12:56 -0800 Subject: [PATCH 5/5] fix import order Signed-off-by: Sean Kao --- .../opensearch/flint/core/storage/FlintOpenSearchClient.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java index 9f3cf824a..4e549df2b 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java @@ -17,9 +17,9 @@ import java.util.List; import java.util.Locale; import java.util.Objects; +import java.util.Set; import java.util.concurrent.atomic.AtomicReference; import java.util.logging.Logger; -import java.util.Set; import java.util.stream.Collectors; import org.apache.http.HttpHost; import org.apache.http.auth.AuthScope;