From d9b9ddaf6f2a664518333c6a55c0f21c3e51af70 Mon Sep 17 00:00:00 2001 From: Peng Huo Date: Fri, 5 Jan 2024 13:22:06 -0800 Subject: [PATCH 1/4] bug fix, support array datatype in MV (#211) * bug fix, support array datatype in MV Signed-off-by: Peng Huo * fix format Signed-off-by: Peng Huo * Fix IT, change columnType long to bigint Signed-off-by: Peng Huo --------- Signed-off-by: Peng Huo --- .../sql/flint/datatype/FlintDataType.scala | 30 ++++++--- .../spark/mv/FlintSparkMaterializedView.scala | 2 +- .../flint/datatype/FlintDataTypeSuite.scala | 38 +++++++++++ .../FlintSparkMaterializedViewITSuite.scala | 6 +- ...FlintSparkMaterializedViewSqlITSuite.scala | 48 ++++++++++++-- .../flint/spark/FlintSparkSuite.scala | 63 +++++++++++++++++++ 6 files changed, 170 insertions(+), 17 deletions(-) diff --git a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/datatype/FlintDataType.scala b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/datatype/FlintDataType.scala index 414d8b61d..95a2666bd 100644 --- a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/datatype/FlintDataType.scala +++ b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/datatype/FlintDataType.scala @@ -5,8 +5,6 @@ package org.apache.spark.sql.flint.datatype -import java.time.format.DateTimeFormatterBuilder - import org.json4s.{Formats, JField, JValue, NoTypeHints} import org.json4s.JsonAST.{JNothing, JObject, JString} import org.json4s.jackson.JsonMethods @@ -78,8 +76,11 @@ object FlintDataType { // object types case JString("object") | JNothing => deserializeJValue(fieldProperties) + // binary types + case JString("binary") => BinaryType + // not supported - case _ => throw new IllegalStateException(s"unsupported data type") + case unknown => throw new IllegalStateException(s"unsupported data type: $unknown") } DataTypes.createStructField(fieldName, dataType, true, metadataBuilder.build()) } @@ -112,13 +113,16 @@ object FlintDataType { JsonMethods.compact(JsonMethods.render(jValue)) } - def serializeJValue(structType: StructType): JValue = { - JObject("properties" -> JObject(structType.fields.map(field => serializeField(field)).toList)) + private def serializeJValue(structType: StructType): JValue = { + JObject( + "properties" -> JObject( + structType.fields + .map(field => JField(field.name, serializeField(field.dataType, field.metadata))) + .toList)) } - def serializeField(structField: StructField): JField = { - val metadata = structField.metadata - val dataType = structField.dataType match { + def serializeField(dataType: DataType, metadata: Metadata): JValue = { + dataType match { // boolean case BooleanType => JObject("type" -> JString("boolean")) @@ -147,8 +151,14 @@ object FlintDataType { // objects case st: StructType => serializeJValue(st) - case _ => throw new IllegalStateException(s"unsupported data type") + + // array + case ArrayType(elementType, _) => serializeField(elementType, Metadata.empty) + + // binary + case BinaryType => JObject("type" -> JString("binary")) + + case unknown => throw new IllegalStateException(s"unsupported data type: ${unknown.sql}") } - JField(structField.name, dataType) } } diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedView.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedView.scala index 44a20e487..656cc387d 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedView.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedView.scala @@ -198,7 +198,7 @@ object FlintSparkMaterializedView { .sql(query) .schema .map { field => - field.name -> field.dataType.typeName + field.name -> field.dataType.simpleString } .toMap FlintSparkMaterializedView(mvName, query, outputSchema, indexOptions) diff --git a/flint-spark-integration/src/test/scala/org/apache/spark/sql/flint/datatype/FlintDataTypeSuite.scala b/flint-spark-integration/src/test/scala/org/apache/spark/sql/flint/datatype/FlintDataTypeSuite.scala index eb3c2a371..e2bde6b98 100644 --- a/flint-spark-integration/src/test/scala/org/apache/spark/sql/flint/datatype/FlintDataTypeSuite.scala +++ b/flint-spark-integration/src/test/scala/org/apache/spark/sql/flint/datatype/FlintDataTypeSuite.scala @@ -42,6 +42,9 @@ class FlintDataTypeSuite extends FlintSuite with Matchers { | }, | "textField": { | "type": "text" + | }, + | "binaryField": { + | "type": "binary" | } | } |}""".stripMargin @@ -59,6 +62,7 @@ class FlintDataTypeSuite extends FlintSuite with Matchers { StringType, true, new MetadataBuilder().putString("osType", "text").build()) :: + StructField("binaryField", BinaryType, true) :: Nil) FlintDataType.serialize(sparkStructType) shouldBe compactJson(flintDataType) @@ -192,6 +196,40 @@ class FlintDataTypeSuite extends FlintSuite with Matchers { FlintDataType.serialize(sparkStructType) shouldBe compactJson(flintDataType) } + test("spark array type map to should map to array element type in OpenSearch") { + val flintDataType = """{ + | "properties": { + | "varcharField": { + | "type": "keyword" + | }, + | "charField": { + | "type": "keyword" + | } + | } + |}""".stripMargin + val sparkStructType = + StructType( + StructField("arrayIntField", ArrayType(IntegerType), true) :: + StructField( + "arrayObjectField", + StructType(StructField("booleanField", BooleanType, true) :: Nil), + true) :: Nil) + FlintDataType.serialize(sparkStructType) shouldBe compactJson(s"""{ + | "properties": { + | "arrayIntField": { + | "type": "integer" + | }, + | "arrayObjectField": { + | "properties": { + | "booleanField":{ + | "type": "boolean" + | } + | } + | } + | } + |}""".stripMargin) + } + def compactJson(json: String): String = { val data: JValue = JsonMethods.parse(json) JsonMethods.compact(JsonMethods.render(data)) diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala index 1b16a9e16..6bc85a241 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala @@ -70,7 +70,7 @@ class FlintSparkMaterializedViewITSuite extends FlintSparkSuite { | "columnType": "timestamp" | },{ | "columnName": "count", - | "columnType": "long" + | "columnType": "bigint" | }], | "options": { | "auto_refresh": "true", @@ -197,7 +197,9 @@ class FlintSparkMaterializedViewITSuite extends FlintSparkSuite { Map( "auto_refresh" -> "true", "checkpoint_location" -> checkpointDir.getAbsolutePath, - "watermark_delay" -> "1 Minute")) // This must be small to ensure window closed soon + "watermark_delay" -> "1 Minute" + ) + ) // This must be small to ensure window closed soon flint .materializedView() diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala index f9bd3967a..79e49c2fd 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala @@ -157,6 +157,48 @@ class FlintSparkMaterializedViewSqlITSuite extends FlintSparkSuite { sql(s"CREATE MATERIALIZED VIEW IF NOT EXISTS $testMvName AS $testQuery") } + test("issue 112, https://github.com/opensearch-project/opensearch-spark/issues/112") { + val tableName = "spark_catalog.default.issue112" + createTableIssue112(tableName) + sql(s""" + |CREATE MATERIALIZED VIEW $testMvName AS + | SELECT + | rs.resource.attributes.key as resource_key, + | rs.resource.attributes.value.stringValue as resource_value, + | ss.scope.name as scope_name, + | ss.scope.version as scope_version, + | span.attributes.key as span_key, + | span.attributes.value.intValue as span_int_value, + | span.attributes.value.stringValue as span_string_value, + | span.endTimeUnixNano, + | span.kind, + | span.name as span_name, + | span.parentSpanId, + | span.spanId, + | span.startTimeUnixNano, + | span.traceId + | FROM $tableName + | LATERAL VIEW + | EXPLODE(resourceSpans) as rs + | LATERAL VIEW + | EXPLODE(rs.scopeSpans) as ss + | LATERAL VIEW + | EXPLODE(ss.spans) as span + | LATERAL VIEW + | EXPLODE(span.attributes) as span_attr + |WITH ( + | auto_refresh = false + |) + """.stripMargin) + + val indexData = spark.read.format(FLINT_DATASOURCE).load(testFlintIndex) + flint.describeIndex(testFlintIndex) shouldBe defined + indexData.count() shouldBe 0 + + sql(s"REFRESH MATERIALIZED VIEW $testMvName") + indexData.count() shouldBe 2 + } + test("create materialized view with quoted name and column name") { val testQuotedQuery = """ SELECT @@ -182,9 +224,7 @@ class FlintSparkMaterializedViewSqlITSuite extends FlintSparkSuite { test("show all materialized views in catalog and database") { // Show in catalog flint.materializedView().name("spark_catalog.default.mv1").query(testQuery).create() - checkAnswer( - sql(s"SHOW MATERIALIZED VIEW IN spark_catalog"), - Seq(Row("mv1"))) + checkAnswer(sql(s"SHOW MATERIALIZED VIEW IN spark_catalog"), Seq(Row("mv1"))) // Show in catalog.database flint.materializedView().name("spark_catalog.default.mv2").query(testQuery).create() @@ -208,7 +248,7 @@ class FlintSparkMaterializedViewSqlITSuite extends FlintSparkSuite { checkAnswer( sql(s"DESC MATERIALIZED VIEW $testMvName"), - Seq(Row("startTime", "timestamp"), Row("count", "long"))) + Seq(Row("startTime", "timestamp"), Row("count", "bigint"))) } test("should return empty when describe nonexistent materialized view") { diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSuite.scala index 211ddb57b..a5596bfe9 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSuite.scala @@ -147,4 +147,67 @@ trait FlintSparkSuite extends QueryTest with FlintSuite with OpenSearchSuite wit sql(s"INSERT INTO $testTable VALUES (TIMESTAMP '2023-10-01 01:00:00', 'D', 40, 'Portland')") sql(s"INSERT INTO $testTable VALUES (TIMESTAMP '2023-10-01 03:00:00', 'E', 15, 'Vancouver')") } + + protected def createTableIssue112(testTable: String): Unit = { + sql(s""" + | CREATE TABLE $testTable ( + | resourceSpans ARRAY>>>, + | scopeSpans: ARRAY, + | spans: ARRAY>>, + | endTimeUnixNano: STRING, + | kind: BIGINT, + | name: STRING, + | parentSpanId: STRING, + | spanId: STRING, + | startTimeUnixNano: STRING, + | traceId: STRING>>>>>>) + | USING json + |""".stripMargin) + sql(s"""INSERT INTO $testTable + |VALUES ( + | array( + | named_struct( + | 'resource', + | named_struct( + | 'attributes', + | array( + | named_struct('key','telemetry.sdk.version', 'value', named_struct('stringValue', '1.3.0')), + | named_struct('key','telemetry.sdk.name', 'value', named_struct('stringValue', 'opentelemetry')) + | ) + | ), + | 'scopeSpans', + | array( + | named_struct( + | 'scope', + | named_struct('name', 'opentelemetry_ecto', 'version', '1.1.1'), + | 'spans', + | array( + | named_struct( + | 'attributes', + | array( + | named_struct('key', 'total_time_microseconds', + | 'value', named_struct('stringValue', null, 'intValue', + | '31286')), + | named_struct('key', 'source', 'value', named_struct + | ('stringValue', 'featureflags', 'intValue', null)) + | ), + | 'endTimeUnixNano', '1698098982202276205', + | 'kind', 3, + | 'name', 'featureflagservice.repo.query:featureflags', + | 'parentSpanId', '9b355ca40dd98f5e', + | 'spanId', '87acd6659b425f80', + | 'startTimeUnixNano', '1698098982170068232', + | 'traceId', 'bc342fb3fbfa54c2188595b89b0b1cd8' + | ) + | ) + | ) + | ) + | ) + | ) + |)""".stripMargin) + } } From 3528b6660f7f1ddb2fbb9d0af8a0748e545424e3 Mon Sep 17 00:00:00 2001 From: Sean Kao Date: Tue, 9 Jan 2024 14:45:16 -0800 Subject: [PATCH 2/4] Percent-encode invalid flint index characters (#215) * percent-encode invalid flint index characters Signed-off-by: Sean Kao * formatting Signed-off-by: Sean Kao * move encoding logic to FlintOpenSearchClient Signed-off-by: Sean Kao * minor arg fix and string format fix Signed-off-by: Sean Kao * fix import order Signed-off-by: Sean Kao --------- Signed-off-by: Sean Kao --- .../core/storage/FlintOpenSearchClient.java | 49 ++++++++++++++++--- .../core/FlintOpenSearchClientSuite.scala | 24 +++++++++ 2 files changed, 66 insertions(+), 7 deletions(-) diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java index 22badfbf9..4e549df2b 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java @@ -17,6 +17,7 @@ import java.util.List; import java.util.Locale; import java.util.Objects; +import java.util.Set; import java.util.concurrent.atomic.AtomicReference; import java.util.logging.Logger; import java.util.stream.Collectors; @@ -71,6 +72,13 @@ public class FlintOpenSearchClient implements FlintClient { new NamedXContentRegistry(new SearchModule(Settings.builder().build(), new ArrayList<>()).getNamedXContents()); + /** + * Invalid index name characters to percent-encode, + * excluding '*' because it's reserved for pattern matching. + */ + private final static Set INVALID_INDEX_NAME_CHARS = + Set.of(' ', ',', ':', '"', '+', '/', '\\', '|', '?', '#', '>', '<'); + /** * Metadata log index name prefix */ @@ -121,7 +129,7 @@ public void createIndex(String indexName, FlintMetadata metadata) { protected void createIndex(String indexName, String mapping, Option settings) { LOG.info("Creating Flint index " + indexName); - String osIndexName = toLowercase(indexName); + String osIndexName = sanitizeIndexName(indexName); try (RestHighLevelClient client = createClient()) { CreateIndexRequest request = new CreateIndexRequest(osIndexName); request.mapping(mapping, XContentType.JSON); @@ -137,7 +145,7 @@ protected void createIndex(String indexName, String mapping, Option sett @Override public boolean exists(String indexName) { LOG.info("Checking if Flint index exists " + indexName); - String osIndexName = toLowercase(indexName); + String osIndexName = sanitizeIndexName(indexName); try (RestHighLevelClient client = createClient()) { return client.indices().exists(new GetIndexRequest(osIndexName), RequestOptions.DEFAULT); } catch (IOException e) { @@ -148,7 +156,7 @@ public boolean exists(String indexName) { @Override public List getAllIndexMetadata(String indexNamePattern) { LOG.info("Fetching all Flint index metadata for pattern " + indexNamePattern); - String osIndexNamePattern = toLowercase(indexNamePattern); + String osIndexNamePattern = sanitizeIndexName(indexNamePattern); try (RestHighLevelClient client = createClient()) { GetIndexRequest request = new GetIndexRequest(osIndexNamePattern); GetIndexResponse response = client.indices().get(request, RequestOptions.DEFAULT); @@ -166,7 +174,7 @@ public List getAllIndexMetadata(String indexNamePattern) { @Override public FlintMetadata getIndexMetadata(String indexName) { LOG.info("Fetching Flint index metadata for " + indexName); - String osIndexName = toLowercase(indexName); + String osIndexName = sanitizeIndexName(indexName); try (RestHighLevelClient client = createClient()) { GetIndexRequest request = new GetIndexRequest(osIndexName); GetIndexResponse response = client.indices().get(request, RequestOptions.DEFAULT); @@ -182,7 +190,7 @@ public FlintMetadata getIndexMetadata(String indexName) { @Override public void deleteIndex(String indexName) { LOG.info("Deleting Flint index " + indexName); - String osIndexName = toLowercase(indexName); + String osIndexName = sanitizeIndexName(indexName); try (RestHighLevelClient client = createClient()) { DeleteIndexRequest request = new DeleteIndexRequest(osIndexName); @@ -211,7 +219,7 @@ public FlintReader createReader(String indexName, String query) { queryBuilder = AbstractQueryBuilder.parseInnerQueryBuilder(parser); } return new OpenSearchScrollReader(createClient(), - toLowercase(indexName), + sanitizeIndexName(indexName), new SearchSourceBuilder().query(queryBuilder), options); } catch (IOException e) { @@ -221,7 +229,7 @@ public FlintReader createReader(String indexName, String query) { public FlintWriter createWriter(String indexName) { LOG.info("Creating Flint index writer for " + indexName); - return new OpenSearchWriter(createClient(), toLowercase(indexName), options.getRefreshPolicy()); + return new OpenSearchWriter(createClient(), sanitizeIndexName(indexName), options.getRefreshPolicy()); } @Override @@ -287,4 +295,31 @@ private String toLowercase(String indexName) { return indexName.toLowerCase(Locale.ROOT); } + + /* + * Percent-encode invalid OpenSearch index name characters. + */ + private String percentEncode(String indexName) { + Objects.requireNonNull(indexName); + + StringBuilder builder = new StringBuilder(indexName.length()); + for (char ch : indexName.toCharArray()) { + if (INVALID_INDEX_NAME_CHARS.contains(ch)) { + builder.append(String.format("%%%02X", (int) ch)); + } else { + builder.append(ch); + } + } + return builder.toString(); + } + + /* + * Sanitize index name to comply with OpenSearch index name restrictions. + */ + private String sanitizeIndexName(String indexName) { + Objects.requireNonNull(indexName); + + String encoded = percentEncode(indexName); + return toLowercase(encoded); + } } diff --git a/integ-test/src/test/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala index 7da67051d..85be9bbb8 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala @@ -114,6 +114,30 @@ class FlintOpenSearchClientSuite extends AnyFlatSpec with OpenSearchSuite with M flintClient.exists(indexName) shouldBe false } + it should "percent-encode invalid index name characters" in { + val indexName = "test ,:\"+/\\|?#><" + flintClient.createIndex( + indexName, + FlintMetadata("""{"properties": {"test": { "type": "integer" } } }""")) + + flintClient.exists(indexName) shouldBe true + flintClient.getIndexMetadata(indexName) should not be null + flintClient.getAllIndexMetadata("test *") should not be empty + + // Read write test + val writer = flintClient.createWriter(indexName) + writer.write("""{"create":{}}""") + writer.write("\n") + writer.write("""{"test":1}""") + writer.write("\n") + writer.flush() + writer.close() + flintClient.createReader(indexName, "").hasNext shouldBe true + + flintClient.deleteIndex(indexName) + flintClient.exists(indexName) shouldBe false + } + it should "return false if index not exist" in { flintClient.exists("non-exist-index") shouldBe false } From 9e2475f526a7c3d723ef65608a9efb51d5a1068a Mon Sep 17 00:00:00 2001 From: Sean Kao Date: Fri, 12 Jan 2024 13:05:39 -0800 Subject: [PATCH 3/4] GHA fix for backport and snapshot-publish (#222) * add 0.* target branch for gha snapshot-publish Signed-off-by: Sean Kao * fix gha backport Signed-off-by: Sean Kao --------- Signed-off-by: Sean Kao --- .github/workflows/backport.yml | 1 - .github/workflows/snapshot-publish.yml | 1 + 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/backport.yml b/.github/workflows/backport.yml index afda23054..fdea67753 100644 --- a/.github/workflows/backport.yml +++ b/.github/workflows/backport.yml @@ -26,4 +26,3 @@ jobs: with: github_token: ${{ steps.github_app_token.outputs.token }} head_template: backport/backport-<%= number %>-to-<%= base %> - files_to_skip: 'CHANGELOG.md' diff --git a/.github/workflows/snapshot-publish.yml b/.github/workflows/snapshot-publish.yml index 6025a3b02..0ac112b62 100644 --- a/.github/workflows/snapshot-publish.yml +++ b/.github/workflows/snapshot-publish.yml @@ -5,6 +5,7 @@ on: push: branches: - main + - 0.* jobs: build-and-publish-snapshots: From 6e163e77a2d8598ad105835f3ce20677d84fea84 Mon Sep 17 00:00:00 2001 From: Sean Kao Date: Fri, 12 Jan 2024 14:49:38 -0800 Subject: [PATCH 4/4] Quote table name with backticks to escape special characters in spark.read.table() (#224) * quote table name for special character in df read Signed-off-by: Sean Kao * add UT Signed-off-by: Sean Kao * update comments; scalafmtAll Signed-off-by: Sean Kao * add a missed table name to quote Signed-off-by: Sean Kao --------- Signed-off-by: Sean Kao --- .../opensearch/flint/spark/FlintSpark.scala | 4 ++-- .../flint/spark/FlintSparkIndex.scala | 15 +++++++++++++++ .../covering/FlintSparkCoveringIndex.scala | 4 ++-- .../skipping/FlintSparkSkippingIndex.scala | 2 +- .../FlintSparkCoveringIndexSuite.scala | 19 +++++++++++++++++++ .../FlintSparkSkippingIndexSuite.scala | 13 +++++++++++++ 6 files changed, 52 insertions(+), 5 deletions(-) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala index 122fea601..25b554581 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala @@ -13,7 +13,7 @@ import org.opensearch.flint.core.{FlintClient, FlintClientBuilder} import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState._ import org.opensearch.flint.core.metadata.log.OptimisticTransaction.NO_LOG_ENTRY import org.opensearch.flint.spark.FlintSpark.RefreshMode.{FULL, INCREMENTAL, RefreshMode} -import org.opensearch.flint.spark.FlintSparkIndex.{ID_COLUMN, StreamingRefresh} +import org.opensearch.flint.spark.FlintSparkIndex.{quotedTableName, ID_COLUMN, StreamingRefresh} import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex import org.opensearch.flint.spark.mv.FlintSparkMaterializedView import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex @@ -372,7 +372,7 @@ class FlintSpark(val spark: SparkSession) extends Logging { logInfo("Start refreshing index in foreach streaming style") val job = spark.readStream .options(options.extraSourceOptions(tableName)) - .table(tableName) + .table(quotedTableName(tableName)) .writeStream .queryName(indexName) .addSinkOptions(options) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala index af1e9fa74..248d105a2 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala @@ -95,6 +95,21 @@ object FlintSparkIndex { s"flint_${parts(0)}_${parts(1)}_${parts.drop(2).mkString(".")}" } + /** + * Add backticks to table name to escape special character + * + * @param fullTableName + * source full table name + * @return + * quoted table name + */ + def quotedTableName(fullTableName: String): String = { + require(fullTableName.split('.').length >= 3, s"Table name $fullTableName is not qualified") + + val parts = fullTableName.split('.') + s"${parts(0)}.${parts(1)}.`${parts.drop(2).mkString(".")}`" + } + /** * Populate environment variables to persist in Flint metadata. * diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala index cdb3a3462..e23126c68 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala @@ -9,7 +9,7 @@ import scala.collection.JavaConverters.mapAsJavaMapConverter import org.opensearch.flint.core.metadata.FlintMetadata import org.opensearch.flint.spark._ -import org.opensearch.flint.spark.FlintSparkIndex.{flintIndexNamePrefix, generateSchemaJSON, metadataBuilder} +import org.opensearch.flint.spark.FlintSparkIndex.{flintIndexNamePrefix, generateSchemaJSON, metadataBuilder, quotedTableName} import org.opensearch.flint.spark.FlintSparkIndexOptions.empty import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex.{getFlintIndexName, COVERING_INDEX_TYPE} @@ -60,7 +60,7 @@ case class FlintSparkCoveringIndex( override def build(spark: SparkSession, df: Option[DataFrame]): DataFrame = { val colNames = indexedColumns.keys.toSeq - val job = df.getOrElse(spark.read.table(tableName)) + val job = df.getOrElse(spark.read.table(quotedTableName(tableName))) // Add optional filtering condition filterCondition diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala index 120ca8219..2e8a3c82d 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala @@ -77,7 +77,7 @@ case class FlintSparkSkippingIndex( new Column(aggFunc.as(name)) } - df.getOrElse(spark.read.table(tableName)) + df.getOrElse(spark.read.table(quotedTableName(tableName))) .groupBy(input_file_name().as(FILE_PATH_COLUMN)) .agg(namedAggFuncs.head, namedAggFuncs.tail: _*) .withColumn(ID_COLUMN, sha1(col(FILE_PATH_COLUMN))) diff --git a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndexSuite.scala b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndexSuite.scala index f52e6ef85..1cce47d1a 100644 --- a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndexSuite.scala +++ b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndexSuite.scala @@ -5,6 +5,7 @@ package org.opensearch.flint.spark.covering +import org.scalatest.matchers.must.Matchers.contain import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper import org.apache.spark.FlintSuite @@ -30,6 +31,24 @@ class FlintSparkCoveringIndexSuite extends FlintSuite { } } + test("can build index building job with unique ID column") { + val index = + new FlintSparkCoveringIndex("ci", "spark_catalog.default.test", Map("name" -> "string")) + + val df = spark.createDataFrame(Seq(("hello", 20))).toDF("name", "age") + val indexDf = index.build(spark, Some(df)) + indexDf.schema.fieldNames should contain only ("name") + } + + test("can build index on table name with special characters") { + val testTableSpecial = "spark_catalog.default.test/2023/10" + val index = new FlintSparkCoveringIndex("ci", testTableSpecial, Map("name" -> "string")) + + val df = spark.createDataFrame(Seq(("hello", 20))).toDF("name", "age") + val indexDf = index.build(spark, Some(df)) + indexDf.schema.fieldNames should contain only ("name") + } + test("should fail if no indexed column given") { assertThrows[IllegalArgumentException] { new FlintSparkCoveringIndex("ci", "default.test", Map.empty) diff --git a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndexSuite.scala b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndexSuite.scala index 9760e8cd2..247a055bf 100644 --- a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndexSuite.scala +++ b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndexSuite.scala @@ -67,6 +67,19 @@ class FlintSparkSkippingIndexSuite extends FlintSuite { indexDf.schema.fieldNames should contain only ("name", FILE_PATH_COLUMN, ID_COLUMN) } + test("can build index on table name with special characters") { + val testTableSpecial = "spark_catalog.default.test/2023/10" + val indexCol = mock[FlintSparkSkippingStrategy] + when(indexCol.outputSchema()).thenReturn(Map("name" -> "string")) + when(indexCol.getAggregators).thenReturn( + Seq(CollectSet(col("name").expr).toAggregateExpression())) + val index = new FlintSparkSkippingIndex(testTableSpecial, Seq(indexCol)) + + val df = spark.createDataFrame(Seq(("hello", 20))).toDF("name", "age") + val indexDf = index.build(spark, Some(df)) + indexDf.schema.fieldNames should contain only ("name", FILE_PATH_COLUMN, ID_COLUMN) + } + // Test index build for different column type Seq( (