From 3d9ca035a19afa0fd1f1f6c185bd6802341f62df Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Fri, 3 Nov 2023 15:48:55 -0700 Subject: [PATCH 1/2] Preserve dots in Flint index name Signed-off-by: Chen Dai --- .../org/opensearch/flint/spark/FlintSparkIndex.scala | 9 +++++++-- .../flint/spark/covering/FlintSparkCoveringIndex.scala | 2 +- .../flint/spark/mv/FlintSparkMaterializedView.scala | 4 ++-- .../flint/spark/skipping/FlintSparkSkippingIndex.scala | 2 +- .../sql/mv/FlintSparkMaterializedViewAstBuilder.scala | 2 +- .../spark/covering/FlintSparkCoveringIndexSuite.scala | 6 ++++++ .../spark/mv/FlintSparkMaterializedViewSuite.scala | 10 ++++++++-- .../spark/skipping/FlintSparkSkippingIndexSuite.scala | 6 ++++++ 8 files changed, 32 insertions(+), 9 deletions(-) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala index fe5329739..89a968772 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala @@ -87,8 +87,13 @@ object FlintSparkIndex { * @return * Flint index name */ - def flintIndexNamePrefix(fullTableName: String): String = - s"flint_${fullTableName.replace(".", "_")}_" + def flintIndexNamePrefix(fullTableName: String): String = { + require(fullTableName.split('.').length >= 3, "Table name is not qualified") + + // Keep all parts since the third as it is + val parts = fullTableName.split('.') + s"flint_${parts(0)}_${parts(1)}_${parts.drop(2).mkString(".")}" + } /** * Populate environment variables to persist in Flint metadata. diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala index 91272309f..cdb3a3462 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala @@ -97,7 +97,7 @@ object FlintSparkCoveringIndex { tableName.split("\\.").length >= 3, "Qualified table name catalog.database.table is required") - flintIndexNamePrefix(tableName) + indexName + COVERING_INDEX_SUFFIX + flintIndexNamePrefix(tableName) + "_" + indexName + COVERING_INDEX_SUFFIX } /** Builder class for covering index build */ diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedView.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedView.scala index 019cc7aa5..44a20e487 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedView.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedView.scala @@ -12,7 +12,7 @@ import scala.collection.convert.ImplicitConversions.`map AsScala` import org.opensearch.flint.core.metadata.FlintMetadata import org.opensearch.flint.spark.{FlintSpark, FlintSparkIndex, FlintSparkIndexBuilder, FlintSparkIndexOptions} -import org.opensearch.flint.spark.FlintSparkIndex.{generateSchemaJSON, metadataBuilder, StreamingRefresh} +import org.opensearch.flint.spark.FlintSparkIndex.{flintIndexNamePrefix, generateSchemaJSON, metadataBuilder, StreamingRefresh} import org.opensearch.flint.spark.FlintSparkIndexOptions.empty import org.opensearch.flint.spark.function.TumbleFunction import org.opensearch.flint.spark.mv.FlintSparkMaterializedView.{getFlintIndexName, MV_INDEX_TYPE} @@ -158,7 +158,7 @@ object FlintSparkMaterializedView { mvName.split("\\.").length >= 3, "Qualified materialized view name catalog.database.mv is required") - s"flint_${mvName.replace(".", "_")}" + flintIndexNamePrefix(mvName) } /** Builder class for MV build */ diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala index eb2075b63..d83af5df5 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala @@ -112,7 +112,7 @@ object FlintSparkSkippingIndex { tableName.split("\\.").length >= 3, "Qualified table name catalog.database.table is required") - flintIndexNamePrefix(tableName) + SKIPPING_INDEX_SUFFIX + flintIndexNamePrefix(tableName) + "_" + SKIPPING_INDEX_SUFFIX } /** Builder class for skipping index build */ diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/mv/FlintSparkMaterializedViewAstBuilder.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/mv/FlintSparkMaterializedViewAstBuilder.scala index 266a10c9f..5e04eca63 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/mv/FlintSparkMaterializedViewAstBuilder.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/mv/FlintSparkMaterializedViewAstBuilder.scala @@ -66,7 +66,7 @@ trait FlintSparkMaterializedViewAstBuilder extends FlintSparkSqlExtensionsVisito FlintSparkSqlCommand(outputSchema) { flint => val catalogDbName = ctx.catalogDb.getText - val indexNamePattern = FlintSparkIndex.flintIndexNamePrefix(catalogDbName) + "*" + val indexNamePattern = FlintSparkIndex.flintIndexNamePrefix(catalogDbName) + "_*" flint .describeIndexes(indexNamePattern) .collect { case mv: FlintSparkMaterializedView => diff --git a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndexSuite.scala b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndexSuite.scala index 8c144b46b..f52e6ef85 100644 --- a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndexSuite.scala +++ b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndexSuite.scala @@ -17,6 +17,12 @@ class FlintSparkCoveringIndexSuite extends FlintSuite { index.name() shouldBe "flint_spark_catalog_default_test_ci_index" } + test("get covering index name on table and index name with dots") { + val testTableDots = "spark_catalog.default.test.2023.10" + val index = new FlintSparkCoveringIndex("ci.01", testTableDots, Map("name" -> "string")) + index.name() shouldBe "flint_spark_catalog_default_test.2023.10_ci.01_index" + } + test("should fail if get index name without full table name") { val index = new FlintSparkCoveringIndex("ci", "test", Map("name" -> "string")) assertThrows[IllegalArgumentException] { diff --git a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedViewSuite.scala b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedViewSuite.scala index cb32e74d3..b7746d44a 100644 --- a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedViewSuite.scala +++ b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedViewSuite.scala @@ -16,7 +16,7 @@ import org.scalatestplus.mockito.MockitoSugar.mock import org.apache.spark.FlintSuite import org.apache.spark.sql.{DataFrame, Row} import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation -import org.apache.spark.sql.catalyst.dsl.expressions.{count, intToLiteral, stringToLiteral, DslAttr, DslExpression, StringToAttributeConversionHelper} +import org.apache.spark.sql.catalyst.dsl.expressions.{intToLiteral, stringToLiteral, DslAttr, DslExpression, StringToAttributeConversionHelper} import org.apache.spark.sql.catalyst.dsl.plans.DslLogicalPlan import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.{EventTimeWatermark, LogicalPlan} @@ -36,11 +36,17 @@ class FlintSparkMaterializedViewSuite extends FlintSuite { val testMvName = "spark_catalog.default.mv" val testQuery = "SELECT 1" - test("get name") { + test("get mv name") { val mv = FlintSparkMaterializedView(testMvName, testQuery, Map.empty) mv.name() shouldBe "flint_spark_catalog_default_mv" } + test("get mv name with dots") { + val testMvNameDots = "spark_catalog.default.mv.2023.10" + val mv = FlintSparkMaterializedView(testMvNameDots, testQuery, Map.empty) + mv.name() shouldBe "flint_spark_catalog_default_mv.2023.10" + } + test("should fail if get name with unqualified MV name") { the[IllegalArgumentException] thrownBy FlintSparkMaterializedView("mv", testQuery, Map.empty).name() diff --git a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndexSuite.scala b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndexSuite.scala index d52c43842..491b7811a 100644 --- a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndexSuite.scala +++ b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndexSuite.scala @@ -30,6 +30,12 @@ class FlintSparkSkippingIndexSuite extends FlintSuite { index.name() shouldBe "flint_spark_catalog_default_test_skipping_index" } + test("get skipping index name on table name with dots") { + val testTableDots = "spark_catalog.default.test.2023.10" + val index = new FlintSparkSkippingIndex(testTableDots, Seq(mock[FlintSparkSkippingStrategy])) + index.name() shouldBe "flint_spark_catalog_default_test.2023.10_skipping_index" + } + test("get index metadata") { val indexCol = mock[FlintSparkSkippingStrategy] when(indexCol.kind).thenReturn(SkippingKind.PARTITION) From 0e2f9fb875b396b34d7a0a4f44c4e94b4f40e679 Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Fri, 3 Nov 2023 17:51:05 -0700 Subject: [PATCH 2/2] Fix MV name in show mv statement Signed-off-by: Chen Dai --- .../opensearch/flint/spark/FlintSparkIndex.scala | 2 +- .../mv/FlintSparkMaterializedViewAstBuilder.scala | 14 ++++++++++---- .../FlintSparkMaterializedViewSqlITSuite.scala | 4 ++-- 3 files changed, 13 insertions(+), 7 deletions(-) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala index 89a968772..af1e9fa74 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala @@ -88,7 +88,7 @@ object FlintSparkIndex { * Flint index name */ def flintIndexNamePrefix(fullTableName: String): String = { - require(fullTableName.split('.').length >= 3, "Table name is not qualified") + require(fullTableName.split('.').length >= 3, s"Table name $fullTableName is not qualified") // Keep all parts since the third as it is val parts = fullTableName.split('.') diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/mv/FlintSparkMaterializedViewAstBuilder.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/mv/FlintSparkMaterializedViewAstBuilder.scala index 5e04eca63..1a990b5b0 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/mv/FlintSparkMaterializedViewAstBuilder.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/mv/FlintSparkMaterializedViewAstBuilder.scala @@ -5,8 +5,10 @@ package org.opensearch.flint.spark.sql.mv +import scala.collection.convert.ImplicitConversions.`collection AsScalaIterable` + import org.antlr.v4.runtime.tree.RuleNode -import org.opensearch.flint.spark.{FlintSpark, FlintSparkIndex} +import org.opensearch.flint.spark.FlintSpark import org.opensearch.flint.spark.FlintSpark.RefreshMode import org.opensearch.flint.spark.mv.FlintSparkMaterializedView import org.opensearch.flint.spark.sql.{FlintSparkSqlCommand, FlintSparkSqlExtensionsVisitor, SparkSqlAstBuilder} @@ -65,12 +67,16 @@ trait FlintSparkMaterializedViewAstBuilder extends FlintSparkSqlExtensionsVisito AttributeReference("materialized_view_name", StringType, nullable = false)()) FlintSparkSqlCommand(outputSchema) { flint => - val catalogDbName = ctx.catalogDb.getText - val indexNamePattern = FlintSparkIndex.flintIndexNamePrefix(catalogDbName) + "_*" + val catalogDbName = + ctx.catalogDb.parts + .map(part => part.getText) + .mkString("_") + val indexNamePattern = s"flint_${catalogDbName}_*" flint .describeIndexes(indexNamePattern) .collect { case mv: FlintSparkMaterializedView => - Row(mv.mvName) + // MV name must be qualified when metadata created + Row(mv.mvName.split('.').drop(2).mkString(".")) } } } diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala index 872939e5f..f9bd3967a 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala @@ -184,13 +184,13 @@ class FlintSparkMaterializedViewSqlITSuite extends FlintSparkSuite { flint.materializedView().name("spark_catalog.default.mv1").query(testQuery).create() checkAnswer( sql(s"SHOW MATERIALIZED VIEW IN spark_catalog"), - Seq(Row("spark_catalog.default.mv1"))) + Seq(Row("mv1"))) // Show in catalog.database flint.materializedView().name("spark_catalog.default.mv2").query(testQuery).create() checkAnswer( sql(s"SHOW MATERIALIZED VIEW IN spark_catalog.default"), - Seq(Row("spark_catalog.default.mv1"), Row("spark_catalog.default.mv2"))) + Seq(Row("mv1"), Row("mv2"))) checkAnswer(sql(s"SHOW MATERIALIZED VIEW IN spark_catalog.other"), Seq.empty) }