diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/FlintSparkSqlAstBuilder.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/FlintSparkSqlAstBuilder.scala index f3f7b42b6..07ef6663e 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/FlintSparkSqlAstBuilder.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/FlintSparkSqlAstBuilder.scala @@ -5,9 +5,15 @@ package org.opensearch.flint.spark.sql +import scala.collection.JavaConverters._ + import org.antlr.v4.runtime.ParserRuleContext import org.antlr.v4.runtime.tree.{ParseTree, RuleNode} -import org.opensearch.flint.spark.FlintSpark +import org.opensearch.flint.spark.{FlintSpark, FlintSparkIndex} +import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex +import org.opensearch.flint.spark.mv.FlintSparkMaterializedView +import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex +import org.opensearch.flint.spark.sql.FlintSparkSqlExtensionsParser.MultipartIdentifierContext import org.opensearch.flint.spark.sql.covering.FlintSparkCoveringIndexAstBuilder import org.opensearch.flint.spark.sql.index.FlintSparkIndexAstBuilder import org.opensearch.flint.spark.sql.job.FlintSparkIndexJobAstBuilder @@ -71,4 +77,26 @@ object FlintSparkSqlAstBuilder { val stopIndex = ctx.getStop.getStopIndex sqlText.substring(startIndex, stopIndex + 1) } + + /** + * Check if a Flint index belong to catalog and database namespace. + * + * @param index + * Flint index + */ + implicit class IndexBelongsTo(private val index: FlintSparkIndex) { + + def belongsTo(catalogDbCtx: MultipartIdentifierContext): Boolean = { + // Use prefix "catalog.database." to filter out index belong to this namespace + val catalogDbName = catalogDbCtx.parts.asScala.map(_.getText).mkString("", ".", ".") + index match { + case skipping: FlintSparkSkippingIndex => + skipping.tableName.startsWith(catalogDbName) + case covering: FlintSparkCoveringIndex => + covering.tableName.startsWith(catalogDbName) + case mv: FlintSparkMaterializedView => mv.mvName.startsWith(catalogDbName) + case _ => false + } + } + } } diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/covering/FlintSparkCoveringIndexAstBuilder.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/covering/FlintSparkCoveringIndexAstBuilder.scala index 72237406b..fc200aebf 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/covering/FlintSparkCoveringIndexAstBuilder.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/covering/FlintSparkCoveringIndexAstBuilder.scala @@ -76,8 +76,9 @@ trait FlintSparkCoveringIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[A val indexNamePattern = FlintSparkCoveringIndex.getFlintIndexName("*", fullTableName) flint .describeIndexes(indexNamePattern) - .collect { case index: FlintSparkCoveringIndex => - Row(index.indexName) + .collect { + case index: FlintSparkCoveringIndex if index.tableName == fullTableName => + Row(index.indexName) } } } diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/index/FlintSparkIndexAstBuilder.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/index/FlintSparkIndexAstBuilder.scala index 925490547..62c98b023 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/index/FlintSparkIndexAstBuilder.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/index/FlintSparkIndexAstBuilder.scala @@ -11,6 +11,7 @@ import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex import org.opensearch.flint.spark.mv.FlintSparkMaterializedView import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex import org.opensearch.flint.spark.sql.{FlintSparkSqlCommand, FlintSparkSqlExtensionsVisitor, SparkSqlAstBuilder} +import org.opensearch.flint.spark.sql.FlintSparkSqlAstBuilder.IndexBelongsTo import org.opensearch.flint.spark.sql.FlintSparkSqlExtensionsParser.ShowFlintIndexStatementContext import org.apache.spark.sql.Row @@ -42,6 +43,7 @@ trait FlintSparkIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[AnyRef] { val indexNamePattern = s"flint_${catalogDbName}_*" flint .describeIndexes(indexNamePattern) + .filter(index => index belongsTo ctx.catalogDb) .map { index => val (databaseName, tableName, indexName) = index match { case skipping: FlintSparkSkippingIndex => diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/mv/FlintSparkMaterializedViewAstBuilder.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/mv/FlintSparkMaterializedViewAstBuilder.scala index 65ee45577..cd4f84028 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/mv/FlintSparkMaterializedViewAstBuilder.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/mv/FlintSparkMaterializedViewAstBuilder.scala @@ -11,7 +11,7 @@ import org.antlr.v4.runtime.tree.RuleNode import org.opensearch.flint.spark.FlintSpark import org.opensearch.flint.spark.mv.FlintSparkMaterializedView import org.opensearch.flint.spark.sql.{FlintSparkSqlCommand, FlintSparkSqlExtensionsVisitor, SparkSqlAstBuilder} -import org.opensearch.flint.spark.sql.FlintSparkSqlAstBuilder.{getFullTableName, getSqlText} +import org.opensearch.flint.spark.sql.FlintSparkSqlAstBuilder.{getFullTableName, getSqlText, IndexBelongsTo} import org.opensearch.flint.spark.sql.FlintSparkSqlExtensionsParser._ import org.apache.spark.sql.Row @@ -73,9 +73,11 @@ trait FlintSparkMaterializedViewAstBuilder extends FlintSparkSqlExtensionsVisito val indexNamePattern = s"flint_${catalogDbName}_*" flint .describeIndexes(indexNamePattern) - .collect { case mv: FlintSparkMaterializedView => - // MV name must be qualified when metadata created - Row(mv.mvName.split('.').drop(2).mkString(".")) + .collect { + // Ensure index is a MV within the given catalog and database + case mv: FlintSparkMaterializedView if mv belongsTo ctx.catalogDb => + // MV name must be qualified when metadata created + Row(mv.mvName.split('.').drop(2).mkString(".")) } } } diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala index 403f53b36..371c6ca2f 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala @@ -339,6 +339,33 @@ class FlintSparkCoveringIndexSqlITSuite extends FlintSparkSuite { deleteTestIndex(getFlintIndexName("idx_address", testTable), getSkippingIndexName(testTable)) } + test("show covering index on source table with the same prefix") { + flint + .coveringIndex() + .name(testIndex) + .onTable(testTable) + .addIndexColumns("name", "age") + .create() + + val testTable2 = s"${testTable}_2" + withTable(testTable2) { + // Create another table with same prefix + createPartitionedAddressTable(testTable2) + flint + .coveringIndex() + .name(testIndex) + .onTable(testTable2) + .addIndexColumns("address") + .create() + + // Expect no testTable2 present + val result = sql(s"SHOW INDEX ON $testTable") + checkAnswer(result, Seq(Row(testIndex))) + + deleteTestIndex(getFlintIndexName(testIndex, testTable2)) + } + } + test("describe covering index") { flint .coveringIndex() diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexSqlITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexSqlITSuite.scala index 61a16779a..e312ba6de 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexSqlITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexSqlITSuite.scala @@ -127,6 +127,18 @@ class FlintSparkIndexSqlITSuite extends FlintSparkSuite { deleteTestIndex(testCoveringFlintIndex) } + test("show flint index in database with the same prefix") { + flint.materializedView().name("spark_catalog.default.mv1").query(testMvQuery).create() + flint.materializedView().name("spark_catalog.default_test.mv2").query(testMvQuery).create() + checkAnswer( + sql(s"SHOW FLINT INDEX IN spark_catalog.default").select("index_name"), + Seq(Row("mv1"))) + + deleteTestIndex( + FlintSparkMaterializedView.getFlintIndexName("spark_catalog.default.mv1"), + FlintSparkMaterializedView.getFlintIndexName("spark_catalog.default_test.mv2")) + } + test("should ignore non-Flint index") { try { sql(s"CREATE SKIPPING INDEX ON $testTableQualifiedName (name VALUE_SET)") diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala index 906523696..8dfde3439 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala @@ -263,6 +263,20 @@ class FlintSparkMaterializedViewSqlITSuite extends FlintSparkSuite { Seq(Row("mv1"), Row("mv2"))) checkAnswer(sql(s"SHOW MATERIALIZED VIEW IN spark_catalog.other"), Seq.empty) + + deleteTestIndex( + getFlintIndexName("spark_catalog.default.mv1"), + getFlintIndexName("spark_catalog.default.mv2")) + } + + test("show materialized view in database with the same prefix") { + flint.materializedView().name("spark_catalog.default.mv1").query(testQuery).create() + flint.materializedView().name("spark_catalog.default_test.mv2").query(testQuery).create() + checkAnswer(sql(s"SHOW MATERIALIZED VIEW IN spark_catalog.default"), Seq(Row("mv1"))) + + deleteTestIndex( + getFlintIndexName("spark_catalog.default.mv1"), + getFlintIndexName("spark_catalog.default_test.mv2")) } test("should return emtpy when show materialized views in empty database") {