Skip to content

Commit

Permalink
Merge branch 'main' into add-where-clause-for-skipping-index
Browse files Browse the repository at this point in the history
  • Loading branch information
dai-chen committed Nov 9, 2023
2 parents c65671e + d5e6738 commit 7733b61
Show file tree
Hide file tree
Showing 9 changed files with 65 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,29 @@ object FlintSparkIndex {
* @return
* Flint index name
*/
def flintIndexNamePrefix(fullTableName: String): String =
s"flint_${fullTableName.replace(".", "_")}_"
def flintIndexNamePrefix(fullTableName: String): String = {
require(fullTableName.split('.').length >= 3, s"Table name $fullTableName is not qualified")

// Keep all parts since the third as it is
val parts = fullTableName.split('.')
s"flint_${parts(0)}_${parts(1)}_${parts.drop(2).mkString(".")}"
}

/**
* Populate environment variables to persist in Flint metadata.
*
* @return
* env key value mapping to populate
*/
def populateEnvToMetadata: Map[String, String] = {
// TODO: avoid hardcoding env name below by providing another config
val envNames = Seq("SERVERLESS_EMR_VIRTUAL_CLUSTER_ID", "SERVERLESS_EMR_JOB_ID")
envNames
.flatMap(key =>
Option(System.getenv(key))
.map(value => key -> value))
.toMap
}

/**
* Create Flint metadata builder with common fields.
Expand All @@ -104,6 +125,12 @@ object FlintSparkIndex {
builder.kind(index.kind)
builder.options(index.options.optionsWithDefault.mapValues(_.asInstanceOf[AnyRef]).asJava)

// Index properties
val envs = populateEnvToMetadata
if (envs.nonEmpty) {
builder.addProperty("env", envs.asJava)
}

// Optional index settings
val settings = index.options.indexSettings()
if (settings.isDefined) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ object FlintSparkCoveringIndex {
tableName.split("\\.").length >= 3,
"Qualified table name catalog.database.table is required")

flintIndexNamePrefix(tableName) + indexName + COVERING_INDEX_SUFFIX
flintIndexNamePrefix(tableName) + "_" + indexName + COVERING_INDEX_SUFFIX
}

/** Builder class for covering index build */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import scala.collection.convert.ImplicitConversions.`map AsScala`

import org.opensearch.flint.core.metadata.FlintMetadata
import org.opensearch.flint.spark.{FlintSpark, FlintSparkIndex, FlintSparkIndexBuilder, FlintSparkIndexOptions}
import org.opensearch.flint.spark.FlintSparkIndex.{generateSchemaJSON, metadataBuilder, StreamingRefresh}
import org.opensearch.flint.spark.FlintSparkIndex.{flintIndexNamePrefix, generateSchemaJSON, metadataBuilder, StreamingRefresh}
import org.opensearch.flint.spark.FlintSparkIndexOptions.empty
import org.opensearch.flint.spark.function.TumbleFunction
import org.opensearch.flint.spark.mv.FlintSparkMaterializedView.{getFlintIndexName, MV_INDEX_TYPE}
Expand Down Expand Up @@ -158,7 +158,7 @@ object FlintSparkMaterializedView {
mvName.split("\\.").length >= 3,
"Qualified materialized view name catalog.database.mv is required")

s"flint_${mvName.replace(".", "_")}"
flintIndexNamePrefix(mvName)
}

/** Builder class for MV build */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ object FlintSparkSkippingIndex {
tableName.split("\\.").length >= 3,
"Qualified table name catalog.database.table is required")

flintIndexNamePrefix(tableName) + SKIPPING_INDEX_SUFFIX
flintIndexNamePrefix(tableName) + "_" + SKIPPING_INDEX_SUFFIX
}

/** Builder class for skipping index build */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@

package org.opensearch.flint.spark.sql.mv

import scala.collection.convert.ImplicitConversions.`collection AsScalaIterable`

import org.antlr.v4.runtime.tree.RuleNode
import org.opensearch.flint.spark.{FlintSpark, FlintSparkIndex}
import org.opensearch.flint.spark.FlintSpark
import org.opensearch.flint.spark.FlintSpark.RefreshMode
import org.opensearch.flint.spark.mv.FlintSparkMaterializedView
import org.opensearch.flint.spark.sql.{FlintSparkSqlCommand, FlintSparkSqlExtensionsVisitor, SparkSqlAstBuilder}
Expand Down Expand Up @@ -65,12 +67,16 @@ trait FlintSparkMaterializedViewAstBuilder extends FlintSparkSqlExtensionsVisito
AttributeReference("materialized_view_name", StringType, nullable = false)())

FlintSparkSqlCommand(outputSchema) { flint =>
val catalogDbName = ctx.catalogDb.getText
val indexNamePattern = FlintSparkIndex.flintIndexNamePrefix(catalogDbName) + "*"
val catalogDbName =
ctx.catalogDb.parts
.map(part => part.getText)
.mkString("_")
val indexNamePattern = s"flint_${catalogDbName}_*"
flint
.describeIndexes(indexNamePattern)
.collect { case mv: FlintSparkMaterializedView =>
Row(mv.mvName)
// MV name must be qualified when metadata created
Row(mv.mvName.split('.').drop(2).mkString("."))
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@ class FlintSparkCoveringIndexSuite extends FlintSuite {
index.name() shouldBe "flint_spark_catalog_default_test_ci_index"
}

test("get covering index name on table and index name with dots") {
val testTableDots = "spark_catalog.default.test.2023.10"
val index = new FlintSparkCoveringIndex("ci.01", testTableDots, Map("name" -> "string"))
index.name() shouldBe "flint_spark_catalog_default_test.2023.10_ci.01_index"
}

test("should fail if get index name without full table name") {
val index = new FlintSparkCoveringIndex("ci", "test", Map("name" -> "string"))
assertThrows[IllegalArgumentException] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import org.scalatestplus.mockito.MockitoSugar.mock
import org.apache.spark.FlintSuite
import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.dsl.expressions.{count, intToLiteral, stringToLiteral, DslAttr, DslExpression, StringToAttributeConversionHelper}
import org.apache.spark.sql.catalyst.dsl.expressions.{intToLiteral, stringToLiteral, DslAttr, DslExpression, StringToAttributeConversionHelper}
import org.apache.spark.sql.catalyst.dsl.plans.DslLogicalPlan
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.{EventTimeWatermark, LogicalPlan}
Expand All @@ -36,11 +36,17 @@ class FlintSparkMaterializedViewSuite extends FlintSuite {
val testMvName = "spark_catalog.default.mv"
val testQuery = "SELECT 1"

test("get name") {
test("get mv name") {
val mv = FlintSparkMaterializedView(testMvName, testQuery, Map.empty)
mv.name() shouldBe "flint_spark_catalog_default_mv"
}

test("get mv name with dots") {
val testMvNameDots = "spark_catalog.default.mv.2023.10"
val mv = FlintSparkMaterializedView(testMvNameDots, testQuery, Map.empty)
mv.name() shouldBe "flint_spark_catalog_default_mv.2023.10"
}

test("should fail if get name with unqualified MV name") {
the[IllegalArgumentException] thrownBy
FlintSparkMaterializedView("mv", testQuery, Map.empty).name()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ class FlintSparkSkippingIndexSuite extends FlintSuite {
index.name() shouldBe "flint_spark_catalog_default_test_skipping_index"
}

test("get skipping index name on table name with dots") {
val testTableDots = "spark_catalog.default.test.2023.10"
val index = new FlintSparkSkippingIndex(testTableDots, Seq(mock[FlintSparkSkippingStrategy]))
index.name() shouldBe "flint_spark_catalog_default_test.2023.10_skipping_index"
}

test("get index metadata") {
val indexCol = mock[FlintSparkSkippingStrategy]
when(indexCol.kind).thenReturn(SkippingKind.PARTITION)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,13 +184,13 @@ class FlintSparkMaterializedViewSqlITSuite extends FlintSparkSuite {
flint.materializedView().name("spark_catalog.default.mv1").query(testQuery).create()
checkAnswer(
sql(s"SHOW MATERIALIZED VIEW IN spark_catalog"),
Seq(Row("spark_catalog.default.mv1")))
Seq(Row("mv1")))

// Show in catalog.database
flint.materializedView().name("spark_catalog.default.mv2").query(testQuery).create()
checkAnswer(
sql(s"SHOW MATERIALIZED VIEW IN spark_catalog.default"),
Seq(Row("spark_catalog.default.mv1"), Row("spark_catalog.default.mv2")))
Seq(Row("mv1"), Row("mv2")))

checkAnswer(sql(s"SHOW MATERIALIZED VIEW IN spark_catalog.other"), Seq.empty)
}
Expand Down

0 comments on commit 7733b61

Please sign in to comment.