Skip to content

Commit

Permalink
Fix Flint index name and MV name bug (#127)
Browse files Browse the repository at this point in the history
* Preserve dots in Flint index name

Signed-off-by: Chen Dai <[email protected]>

* Fix MV name in show mv statement

Signed-off-by: Chen Dai <[email protected]>

---------

Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen authored Nov 7, 2023
1 parent d1280cf commit 19ad190
Show file tree
Hide file tree
Showing 9 changed files with 43 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,13 @@ object FlintSparkIndex {
* @return
* Flint index name
*/
def flintIndexNamePrefix(fullTableName: String): String =
s"flint_${fullTableName.replace(".", "_")}_"
def flintIndexNamePrefix(fullTableName: String): String = {
require(fullTableName.split('.').length >= 3, s"Table name $fullTableName is not qualified")

// Keep all parts since the third as it is
val parts = fullTableName.split('.')
s"flint_${parts(0)}_${parts(1)}_${parts.drop(2).mkString(".")}"
}

/**
* Create Flint metadata builder with common fields.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ object FlintSparkCoveringIndex {
tableName.split("\\.").length >= 3,
"Qualified table name catalog.database.table is required")

flintIndexNamePrefix(tableName) + indexName + COVERING_INDEX_SUFFIX
flintIndexNamePrefix(tableName) + "_" + indexName + COVERING_INDEX_SUFFIX
}

/** Builder class for covering index build */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import scala.collection.convert.ImplicitConversions.`map AsScala`

import org.opensearch.flint.core.metadata.FlintMetadata
import org.opensearch.flint.spark.{FlintSpark, FlintSparkIndex, FlintSparkIndexBuilder, FlintSparkIndexOptions}
import org.opensearch.flint.spark.FlintSparkIndex.{generateSchemaJSON, metadataBuilder, StreamingRefresh}
import org.opensearch.flint.spark.FlintSparkIndex.{flintIndexNamePrefix, generateSchemaJSON, metadataBuilder, StreamingRefresh}
import org.opensearch.flint.spark.FlintSparkIndexOptions.empty
import org.opensearch.flint.spark.function.TumbleFunction
import org.opensearch.flint.spark.mv.FlintSparkMaterializedView.{getFlintIndexName, MV_INDEX_TYPE}
Expand Down Expand Up @@ -158,7 +158,7 @@ object FlintSparkMaterializedView {
mvName.split("\\.").length >= 3,
"Qualified materialized view name catalog.database.mv is required")

s"flint_${mvName.replace(".", "_")}"
flintIndexNamePrefix(mvName)
}

/** Builder class for MV build */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ object FlintSparkSkippingIndex {
tableName.split("\\.").length >= 3,
"Qualified table name catalog.database.table is required")

flintIndexNamePrefix(tableName) + SKIPPING_INDEX_SUFFIX
flintIndexNamePrefix(tableName) + "_" + SKIPPING_INDEX_SUFFIX
}

/** Builder class for skipping index build */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@

package org.opensearch.flint.spark.sql.mv

import scala.collection.convert.ImplicitConversions.`collection AsScalaIterable`

import org.antlr.v4.runtime.tree.RuleNode
import org.opensearch.flint.spark.{FlintSpark, FlintSparkIndex}
import org.opensearch.flint.spark.FlintSpark
import org.opensearch.flint.spark.FlintSpark.RefreshMode
import org.opensearch.flint.spark.mv.FlintSparkMaterializedView
import org.opensearch.flint.spark.sql.{FlintSparkSqlCommand, FlintSparkSqlExtensionsVisitor, SparkSqlAstBuilder}
Expand Down Expand Up @@ -65,12 +67,16 @@ trait FlintSparkMaterializedViewAstBuilder extends FlintSparkSqlExtensionsVisito
AttributeReference("materialized_view_name", StringType, nullable = false)())

FlintSparkSqlCommand(outputSchema) { flint =>
val catalogDbName = ctx.catalogDb.getText
val indexNamePattern = FlintSparkIndex.flintIndexNamePrefix(catalogDbName) + "*"
val catalogDbName =
ctx.catalogDb.parts
.map(part => part.getText)
.mkString("_")
val indexNamePattern = s"flint_${catalogDbName}_*"
flint
.describeIndexes(indexNamePattern)
.collect { case mv: FlintSparkMaterializedView =>
Row(mv.mvName)
// MV name must be qualified when metadata created
Row(mv.mvName.split('.').drop(2).mkString("."))
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@ class FlintSparkCoveringIndexSuite extends FlintSuite {
index.name() shouldBe "flint_spark_catalog_default_test_ci_index"
}

test("get covering index name on table and index name with dots") {
val testTableDots = "spark_catalog.default.test.2023.10"
val index = new FlintSparkCoveringIndex("ci.01", testTableDots, Map("name" -> "string"))
index.name() shouldBe "flint_spark_catalog_default_test.2023.10_ci.01_index"
}

test("should fail if get index name without full table name") {
val index = new FlintSparkCoveringIndex("ci", "test", Map("name" -> "string"))
assertThrows[IllegalArgumentException] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import org.scalatestplus.mockito.MockitoSugar.mock
import org.apache.spark.FlintSuite
import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.dsl.expressions.{count, intToLiteral, stringToLiteral, DslAttr, DslExpression, StringToAttributeConversionHelper}
import org.apache.spark.sql.catalyst.dsl.expressions.{intToLiteral, stringToLiteral, DslAttr, DslExpression, StringToAttributeConversionHelper}
import org.apache.spark.sql.catalyst.dsl.plans.DslLogicalPlan
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.{EventTimeWatermark, LogicalPlan}
Expand All @@ -36,11 +36,17 @@ class FlintSparkMaterializedViewSuite extends FlintSuite {
val testMvName = "spark_catalog.default.mv"
val testQuery = "SELECT 1"

test("get name") {
test("get mv name") {
val mv = FlintSparkMaterializedView(testMvName, testQuery, Map.empty)
mv.name() shouldBe "flint_spark_catalog_default_mv"
}

test("get mv name with dots") {
val testMvNameDots = "spark_catalog.default.mv.2023.10"
val mv = FlintSparkMaterializedView(testMvNameDots, testQuery, Map.empty)
mv.name() shouldBe "flint_spark_catalog_default_mv.2023.10"
}

test("should fail if get name with unqualified MV name") {
the[IllegalArgumentException] thrownBy
FlintSparkMaterializedView("mv", testQuery, Map.empty).name()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ class FlintSparkSkippingIndexSuite extends FlintSuite {
index.name() shouldBe "flint_spark_catalog_default_test_skipping_index"
}

test("get skipping index name on table name with dots") {
val testTableDots = "spark_catalog.default.test.2023.10"
val index = new FlintSparkSkippingIndex(testTableDots, Seq(mock[FlintSparkSkippingStrategy]))
index.name() shouldBe "flint_spark_catalog_default_test.2023.10_skipping_index"
}

test("get index metadata") {
val indexCol = mock[FlintSparkSkippingStrategy]
when(indexCol.kind).thenReturn(SkippingKind.PARTITION)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,13 +184,13 @@ class FlintSparkMaterializedViewSqlITSuite extends FlintSparkSuite {
flint.materializedView().name("spark_catalog.default.mv1").query(testQuery).create()
checkAnswer(
sql(s"SHOW MATERIALIZED VIEW IN spark_catalog"),
Seq(Row("spark_catalog.default.mv1")))
Seq(Row("mv1")))

// Show in catalog.database
flint.materializedView().name("spark_catalog.default.mv2").query(testQuery).create()
checkAnswer(
sql(s"SHOW MATERIALIZED VIEW IN spark_catalog.default"),
Seq(Row("spark_catalog.default.mv1"), Row("spark_catalog.default.mv2")))
Seq(Row("mv1"), Row("mv2")))

checkAnswer(sql(s"SHOW MATERIALIZED VIEW IN spark_catalog.other"), Seq.empty)
}
Expand Down

0 comments on commit 19ad190

Please sign in to comment.