From 3528b6660f7f1ddb2fbb9d0af8a0748e545424e3 Mon Sep 17 00:00:00 2001 From: Sean Kao Date: Tue, 9 Jan 2024 14:45:16 -0800 Subject: [PATCH 1/3] Percent-encode invalid flint index characters (#215) * percent-encode invalid flint index characters Signed-off-by: Sean Kao * formatting Signed-off-by: Sean Kao * move encoding logic to FlintOpenSearchClient Signed-off-by: Sean Kao * minor arg fix and string format fix Signed-off-by: Sean Kao * fix import order Signed-off-by: Sean Kao --------- Signed-off-by: Sean Kao --- .../core/storage/FlintOpenSearchClient.java | 49 ++++++++++++++++--- .../core/FlintOpenSearchClientSuite.scala | 24 +++++++++ 2 files changed, 66 insertions(+), 7 deletions(-) diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java index 22badfbf9..4e549df2b 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java @@ -17,6 +17,7 @@ import java.util.List; import java.util.Locale; import java.util.Objects; +import java.util.Set; import java.util.concurrent.atomic.AtomicReference; import java.util.logging.Logger; import java.util.stream.Collectors; @@ -71,6 +72,13 @@ public class FlintOpenSearchClient implements FlintClient { new NamedXContentRegistry(new SearchModule(Settings.builder().build(), new ArrayList<>()).getNamedXContents()); + /** + * Invalid index name characters to percent-encode, + * excluding '*' because it's reserved for pattern matching. + */ + private final static Set INVALID_INDEX_NAME_CHARS = + Set.of(' ', ',', ':', '"', '+', '/', '\\', '|', '?', '#', '>', '<'); + /** * Metadata log index name prefix */ @@ -121,7 +129,7 @@ public void createIndex(String indexName, FlintMetadata metadata) { protected void createIndex(String indexName, String mapping, Option settings) { LOG.info("Creating Flint index " + indexName); - String osIndexName = toLowercase(indexName); + String osIndexName = sanitizeIndexName(indexName); try (RestHighLevelClient client = createClient()) { CreateIndexRequest request = new CreateIndexRequest(osIndexName); request.mapping(mapping, XContentType.JSON); @@ -137,7 +145,7 @@ protected void createIndex(String indexName, String mapping, Option sett @Override public boolean exists(String indexName) { LOG.info("Checking if Flint index exists " + indexName); - String osIndexName = toLowercase(indexName); + String osIndexName = sanitizeIndexName(indexName); try (RestHighLevelClient client = createClient()) { return client.indices().exists(new GetIndexRequest(osIndexName), RequestOptions.DEFAULT); } catch (IOException e) { @@ -148,7 +156,7 @@ public boolean exists(String indexName) { @Override public List getAllIndexMetadata(String indexNamePattern) { LOG.info("Fetching all Flint index metadata for pattern " + indexNamePattern); - String osIndexNamePattern = toLowercase(indexNamePattern); + String osIndexNamePattern = sanitizeIndexName(indexNamePattern); try (RestHighLevelClient client = createClient()) { GetIndexRequest request = new GetIndexRequest(osIndexNamePattern); GetIndexResponse response = client.indices().get(request, RequestOptions.DEFAULT); @@ -166,7 +174,7 @@ public List getAllIndexMetadata(String indexNamePattern) { @Override public FlintMetadata getIndexMetadata(String indexName) { LOG.info("Fetching Flint index metadata for " + indexName); - String osIndexName = toLowercase(indexName); + String osIndexName = sanitizeIndexName(indexName); try (RestHighLevelClient client = createClient()) { GetIndexRequest request = new GetIndexRequest(osIndexName); GetIndexResponse response = client.indices().get(request, RequestOptions.DEFAULT); @@ -182,7 +190,7 @@ public FlintMetadata getIndexMetadata(String indexName) { @Override public void deleteIndex(String indexName) { LOG.info("Deleting Flint index " + indexName); - String osIndexName = toLowercase(indexName); + String osIndexName = sanitizeIndexName(indexName); try (RestHighLevelClient client = createClient()) { DeleteIndexRequest request = new DeleteIndexRequest(osIndexName); @@ -211,7 +219,7 @@ public FlintReader createReader(String indexName, String query) { queryBuilder = AbstractQueryBuilder.parseInnerQueryBuilder(parser); } return new OpenSearchScrollReader(createClient(), - toLowercase(indexName), + sanitizeIndexName(indexName), new SearchSourceBuilder().query(queryBuilder), options); } catch (IOException e) { @@ -221,7 +229,7 @@ public FlintReader createReader(String indexName, String query) { public FlintWriter createWriter(String indexName) { LOG.info("Creating Flint index writer for " + indexName); - return new OpenSearchWriter(createClient(), toLowercase(indexName), options.getRefreshPolicy()); + return new OpenSearchWriter(createClient(), sanitizeIndexName(indexName), options.getRefreshPolicy()); } @Override @@ -287,4 +295,31 @@ private String toLowercase(String indexName) { return indexName.toLowerCase(Locale.ROOT); } + + /* + * Percent-encode invalid OpenSearch index name characters. + */ + private String percentEncode(String indexName) { + Objects.requireNonNull(indexName); + + StringBuilder builder = new StringBuilder(indexName.length()); + for (char ch : indexName.toCharArray()) { + if (INVALID_INDEX_NAME_CHARS.contains(ch)) { + builder.append(String.format("%%%02X", (int) ch)); + } else { + builder.append(ch); + } + } + return builder.toString(); + } + + /* + * Sanitize index name to comply with OpenSearch index name restrictions. + */ + private String sanitizeIndexName(String indexName) { + Objects.requireNonNull(indexName); + + String encoded = percentEncode(indexName); + return toLowercase(encoded); + } } diff --git a/integ-test/src/test/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala index 7da67051d..85be9bbb8 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala @@ -114,6 +114,30 @@ class FlintOpenSearchClientSuite extends AnyFlatSpec with OpenSearchSuite with M flintClient.exists(indexName) shouldBe false } + it should "percent-encode invalid index name characters" in { + val indexName = "test ,:\"+/\\|?#><" + flintClient.createIndex( + indexName, + FlintMetadata("""{"properties": {"test": { "type": "integer" } } }""")) + + flintClient.exists(indexName) shouldBe true + flintClient.getIndexMetadata(indexName) should not be null + flintClient.getAllIndexMetadata("test *") should not be empty + + // Read write test + val writer = flintClient.createWriter(indexName) + writer.write("""{"create":{}}""") + writer.write("\n") + writer.write("""{"test":1}""") + writer.write("\n") + writer.flush() + writer.close() + flintClient.createReader(indexName, "").hasNext shouldBe true + + flintClient.deleteIndex(indexName) + flintClient.exists(indexName) shouldBe false + } + it should "return false if index not exist" in { flintClient.exists("non-exist-index") shouldBe false } From 9e2475f526a7c3d723ef65608a9efb51d5a1068a Mon Sep 17 00:00:00 2001 From: Sean Kao Date: Fri, 12 Jan 2024 13:05:39 -0800 Subject: [PATCH 2/3] GHA fix for backport and snapshot-publish (#222) * add 0.* target branch for gha snapshot-publish Signed-off-by: Sean Kao * fix gha backport Signed-off-by: Sean Kao --------- Signed-off-by: Sean Kao --- .github/workflows/backport.yml | 1 - .github/workflows/snapshot-publish.yml | 1 + 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/backport.yml b/.github/workflows/backport.yml index afda23054..fdea67753 100644 --- a/.github/workflows/backport.yml +++ b/.github/workflows/backport.yml @@ -26,4 +26,3 @@ jobs: with: github_token: ${{ steps.github_app_token.outputs.token }} head_template: backport/backport-<%= number %>-to-<%= base %> - files_to_skip: 'CHANGELOG.md' diff --git a/.github/workflows/snapshot-publish.yml b/.github/workflows/snapshot-publish.yml index 6025a3b02..0ac112b62 100644 --- a/.github/workflows/snapshot-publish.yml +++ b/.github/workflows/snapshot-publish.yml @@ -5,6 +5,7 @@ on: push: branches: - main + - 0.* jobs: build-and-publish-snapshots: From 6e163e77a2d8598ad105835f3ce20677d84fea84 Mon Sep 17 00:00:00 2001 From: Sean Kao Date: Fri, 12 Jan 2024 14:49:38 -0800 Subject: [PATCH 3/3] Quote table name with backticks to escape special characters in spark.read.table() (#224) * quote table name for special character in df read Signed-off-by: Sean Kao * add UT Signed-off-by: Sean Kao * update comments; scalafmtAll Signed-off-by: Sean Kao * add a missed table name to quote Signed-off-by: Sean Kao --------- Signed-off-by: Sean Kao --- .../opensearch/flint/spark/FlintSpark.scala | 4 ++-- .../flint/spark/FlintSparkIndex.scala | 15 +++++++++++++++ .../covering/FlintSparkCoveringIndex.scala | 4 ++-- .../skipping/FlintSparkSkippingIndex.scala | 2 +- .../FlintSparkCoveringIndexSuite.scala | 19 +++++++++++++++++++ .../FlintSparkSkippingIndexSuite.scala | 13 +++++++++++++ 6 files changed, 52 insertions(+), 5 deletions(-) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala index 122fea601..25b554581 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala @@ -13,7 +13,7 @@ import org.opensearch.flint.core.{FlintClient, FlintClientBuilder} import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState._ import org.opensearch.flint.core.metadata.log.OptimisticTransaction.NO_LOG_ENTRY import org.opensearch.flint.spark.FlintSpark.RefreshMode.{FULL, INCREMENTAL, RefreshMode} -import org.opensearch.flint.spark.FlintSparkIndex.{ID_COLUMN, StreamingRefresh} +import org.opensearch.flint.spark.FlintSparkIndex.{quotedTableName, ID_COLUMN, StreamingRefresh} import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex import org.opensearch.flint.spark.mv.FlintSparkMaterializedView import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex @@ -372,7 +372,7 @@ class FlintSpark(val spark: SparkSession) extends Logging { logInfo("Start refreshing index in foreach streaming style") val job = spark.readStream .options(options.extraSourceOptions(tableName)) - .table(tableName) + .table(quotedTableName(tableName)) .writeStream .queryName(indexName) .addSinkOptions(options) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala index af1e9fa74..248d105a2 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala @@ -95,6 +95,21 @@ object FlintSparkIndex { s"flint_${parts(0)}_${parts(1)}_${parts.drop(2).mkString(".")}" } + /** + * Add backticks to table name to escape special character + * + * @param fullTableName + * source full table name + * @return + * quoted table name + */ + def quotedTableName(fullTableName: String): String = { + require(fullTableName.split('.').length >= 3, s"Table name $fullTableName is not qualified") + + val parts = fullTableName.split('.') + s"${parts(0)}.${parts(1)}.`${parts.drop(2).mkString(".")}`" + } + /** * Populate environment variables to persist in Flint metadata. * diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala index cdb3a3462..e23126c68 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala @@ -9,7 +9,7 @@ import scala.collection.JavaConverters.mapAsJavaMapConverter import org.opensearch.flint.core.metadata.FlintMetadata import org.opensearch.flint.spark._ -import org.opensearch.flint.spark.FlintSparkIndex.{flintIndexNamePrefix, generateSchemaJSON, metadataBuilder} +import org.opensearch.flint.spark.FlintSparkIndex.{flintIndexNamePrefix, generateSchemaJSON, metadataBuilder, quotedTableName} import org.opensearch.flint.spark.FlintSparkIndexOptions.empty import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex.{getFlintIndexName, COVERING_INDEX_TYPE} @@ -60,7 +60,7 @@ case class FlintSparkCoveringIndex( override def build(spark: SparkSession, df: Option[DataFrame]): DataFrame = { val colNames = indexedColumns.keys.toSeq - val job = df.getOrElse(spark.read.table(tableName)) + val job = df.getOrElse(spark.read.table(quotedTableName(tableName))) // Add optional filtering condition filterCondition diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala index 120ca8219..2e8a3c82d 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala @@ -77,7 +77,7 @@ case class FlintSparkSkippingIndex( new Column(aggFunc.as(name)) } - df.getOrElse(spark.read.table(tableName)) + df.getOrElse(spark.read.table(quotedTableName(tableName))) .groupBy(input_file_name().as(FILE_PATH_COLUMN)) .agg(namedAggFuncs.head, namedAggFuncs.tail: _*) .withColumn(ID_COLUMN, sha1(col(FILE_PATH_COLUMN))) diff --git a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndexSuite.scala b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndexSuite.scala index f52e6ef85..1cce47d1a 100644 --- a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndexSuite.scala +++ b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndexSuite.scala @@ -5,6 +5,7 @@ package org.opensearch.flint.spark.covering +import org.scalatest.matchers.must.Matchers.contain import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper import org.apache.spark.FlintSuite @@ -30,6 +31,24 @@ class FlintSparkCoveringIndexSuite extends FlintSuite { } } + test("can build index building job with unique ID column") { + val index = + new FlintSparkCoveringIndex("ci", "spark_catalog.default.test", Map("name" -> "string")) + + val df = spark.createDataFrame(Seq(("hello", 20))).toDF("name", "age") + val indexDf = index.build(spark, Some(df)) + indexDf.schema.fieldNames should contain only ("name") + } + + test("can build index on table name with special characters") { + val testTableSpecial = "spark_catalog.default.test/2023/10" + val index = new FlintSparkCoveringIndex("ci", testTableSpecial, Map("name" -> "string")) + + val df = spark.createDataFrame(Seq(("hello", 20))).toDF("name", "age") + val indexDf = index.build(spark, Some(df)) + indexDf.schema.fieldNames should contain only ("name") + } + test("should fail if no indexed column given") { assertThrows[IllegalArgumentException] { new FlintSparkCoveringIndex("ci", "default.test", Map.empty) diff --git a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndexSuite.scala b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndexSuite.scala index 9760e8cd2..247a055bf 100644 --- a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndexSuite.scala +++ b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndexSuite.scala @@ -67,6 +67,19 @@ class FlintSparkSkippingIndexSuite extends FlintSuite { indexDf.schema.fieldNames should contain only ("name", FILE_PATH_COLUMN, ID_COLUMN) } + test("can build index on table name with special characters") { + val testTableSpecial = "spark_catalog.default.test/2023/10" + val indexCol = mock[FlintSparkSkippingStrategy] + when(indexCol.outputSchema()).thenReturn(Map("name" -> "string")) + when(indexCol.getAggregators).thenReturn( + Seq(CollectSet(col("name").expr).toAggregateExpression())) + val index = new FlintSparkSkippingIndex(testTableSpecial, Seq(indexCol)) + + val df = spark.createDataFrame(Seq(("hello", 20))).toDF("name", "age") + val indexDf = index.build(spark, Some(df)) + indexDf.schema.fieldNames should contain only ("name", FILE_PATH_COLUMN, ID_COLUMN) + } + // Test index build for different column type Seq( (