Skip to content

Commit

Permalink
Fix show flint index
Browse files Browse the repository at this point in the history
Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen committed May 10, 2024
1 parent 78bbfb0 commit cb17ec2
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,15 @@

package org.opensearch.flint.spark.sql

import scala.collection.JavaConverters._

import org.antlr.v4.runtime.ParserRuleContext
import org.antlr.v4.runtime.tree.{ParseTree, RuleNode}
import org.opensearch.flint.spark.FlintSpark
import org.opensearch.flint.spark.{FlintSpark, FlintSparkIndex}
import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex
import org.opensearch.flint.spark.mv.FlintSparkMaterializedView
import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex
import org.opensearch.flint.spark.sql.FlintSparkSqlExtensionsParser.MultipartIdentifierContext
import org.opensearch.flint.spark.sql.covering.FlintSparkCoveringIndexAstBuilder
import org.opensearch.flint.spark.sql.index.FlintSparkIndexAstBuilder
import org.opensearch.flint.spark.sql.job.FlintSparkIndexJobAstBuilder
Expand Down Expand Up @@ -71,4 +77,26 @@ object FlintSparkSqlAstBuilder {
val stopIndex = ctx.getStop.getStopIndex
sqlText.substring(startIndex, stopIndex + 1)
}

/**
* Check if a Flint index belong to catalog and database namespace.
*
* @param index
* Flint index
*/
implicit class IndexBelongsTo(private val index: FlintSparkIndex) {

def belongsTo(catalogDbCtx: MultipartIdentifierContext): Boolean = {
// Use prefix "catalog.database." to filter out index belong to this namespace
val catalogDbName = catalogDbCtx.parts.asScala.map(_.getText).mkString("", ".", ".")
index match {
case skipping: FlintSparkSkippingIndex =>
skipping.tableName.startsWith(catalogDbName)
case covering: FlintSparkCoveringIndex =>
covering.tableName.startsWith(catalogDbName)
case mv: FlintSparkMaterializedView => mv.mvName.startsWith(catalogDbName)
case _ => false
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex
import org.opensearch.flint.spark.mv.FlintSparkMaterializedView
import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex
import org.opensearch.flint.spark.sql.{FlintSparkSqlCommand, FlintSparkSqlExtensionsVisitor, SparkSqlAstBuilder}
import org.opensearch.flint.spark.sql.FlintSparkSqlAstBuilder.IndexBelongsTo
import org.opensearch.flint.spark.sql.FlintSparkSqlExtensionsParser.ShowFlintIndexStatementContext

import org.apache.spark.sql.Row
Expand Down Expand Up @@ -42,6 +43,7 @@ trait FlintSparkIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[AnyRef] {
val indexNamePattern = s"flint_${catalogDbName}_*"
flint
.describeIndexes(indexNamePattern)
.filter(index => index belongsTo ctx.catalogDb)
.map { index =>
val (databaseName, tableName, indexName) = index match {
case skipping: FlintSparkSkippingIndex =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import org.antlr.v4.runtime.tree.RuleNode
import org.opensearch.flint.spark.FlintSpark
import org.opensearch.flint.spark.mv.FlintSparkMaterializedView
import org.opensearch.flint.spark.sql.{FlintSparkSqlCommand, FlintSparkSqlExtensionsVisitor, SparkSqlAstBuilder}
import org.opensearch.flint.spark.sql.FlintSparkSqlAstBuilder.{getFullTableName, getSqlText}
import org.opensearch.flint.spark.sql.FlintSparkSqlAstBuilder.{getFullTableName, getSqlText, IndexBelongsTo}
import org.opensearch.flint.spark.sql.FlintSparkSqlExtensionsParser._

import org.apache.spark.sql.Row
Expand Down Expand Up @@ -69,13 +69,13 @@ trait FlintSparkMaterializedViewAstBuilder extends FlintSparkSqlExtensionsVisito
val catalogDbName =
ctx.catalogDb.parts
.map(part => part.getText)
.mkString(".")
val indexNamePattern = s"flint_${catalogDbName.replace('.', '_')}_*"
.mkString("_")
val indexNamePattern = s"flint_${catalogDbName}_*"
flint
.describeIndexes(indexNamePattern)
.collect {
// Ensure index is a MV within the given catalog and database
case mv: FlintSparkMaterializedView if mv.mvName.startsWith(s"$catalogDbName.") =>
case mv: FlintSparkMaterializedView if mv belongsTo ctx.catalogDb =>
// MV name must be qualified when metadata created
Row(mv.mvName.split('.').drop(2).mkString("."))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,18 @@ class FlintSparkIndexSqlITSuite extends FlintSparkSuite {
deleteTestIndex(testCoveringFlintIndex)
}

test("show flint index in database with the same prefix") {
flint.materializedView().name("spark_catalog.default.mv1").query(testMvQuery).create()
flint.materializedView().name("spark_catalog.default_test.mv2").query(testMvQuery).create()
checkAnswer(
sql(s"SHOW FLINT INDEX IN spark_catalog.default").select("index_name"),
Seq(Row("mv1")))

deleteTestIndex(
FlintSparkMaterializedView.getFlintIndexName("spark_catalog.default.mv1"),
FlintSparkMaterializedView.getFlintIndexName("spark_catalog.default_test.mv2"))
}

test("should ignore non-Flint index") {
try {
sql(s"CREATE SKIPPING INDEX ON $testTableQualifiedName (name VALUE_SET)")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,12 +263,20 @@ class FlintSparkMaterializedViewSqlITSuite extends FlintSparkSuite {
Seq(Row("mv1"), Row("mv2")))

checkAnswer(sql(s"SHOW MATERIALIZED VIEW IN spark_catalog.other"), Seq.empty)

deleteTestIndex(
getFlintIndexName("spark_catalog.default.mv1"),
getFlintIndexName("spark_catalog.default.mv2"))
}

test("show materialized view in database with the same prefix") {
flint.materializedView().name("spark_catalog.default.mv1").query(testQuery).create()
flint.materializedView().name("spark_catalog.default_test.mv2").query(testQuery).create()
checkAnswer(sql(s"SHOW MATERIALIZED VIEW IN spark_catalog.default"), Seq(Row("mv1")))

deleteTestIndex(
getFlintIndexName("spark_catalog.default.mv1"),
getFlintIndexName("spark_catalog.default_test.mv2"))
}

test("should return emtpy when show materialized views in empty database") {
Expand Down

0 comments on commit cb17ec2

Please sign in to comment.