Skip to content

Commit

Permalink
Fix incorrect result in show index statement (opensearch-project#332)
Browse files Browse the repository at this point in the history
* Fix show index bug

Signed-off-by: Chen Dai <[email protected]>

* Fix show materialized view bug

Signed-off-by: Chen Dai <[email protected]>

* Fix show flint index

Signed-off-by: Chen Dai <[email protected]>

---------

Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen authored May 14, 2024
1 parent 422dae7 commit 7c44292
Show file tree
Hide file tree
Showing 7 changed files with 93 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,15 @@

package org.opensearch.flint.spark.sql

import scala.collection.JavaConverters._

import org.antlr.v4.runtime.ParserRuleContext
import org.antlr.v4.runtime.tree.{ParseTree, RuleNode}
import org.opensearch.flint.spark.FlintSpark
import org.opensearch.flint.spark.{FlintSpark, FlintSparkIndex}
import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex
import org.opensearch.flint.spark.mv.FlintSparkMaterializedView
import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex
import org.opensearch.flint.spark.sql.FlintSparkSqlExtensionsParser.MultipartIdentifierContext
import org.opensearch.flint.spark.sql.covering.FlintSparkCoveringIndexAstBuilder
import org.opensearch.flint.spark.sql.index.FlintSparkIndexAstBuilder
import org.opensearch.flint.spark.sql.job.FlintSparkIndexJobAstBuilder
Expand Down Expand Up @@ -71,4 +77,26 @@ object FlintSparkSqlAstBuilder {
val stopIndex = ctx.getStop.getStopIndex
sqlText.substring(startIndex, stopIndex + 1)
}

/**
* Check if a Flint index belong to catalog and database namespace.
*
* @param index
* Flint index
*/
implicit class IndexBelongsTo(private val index: FlintSparkIndex) {

def belongsTo(catalogDbCtx: MultipartIdentifierContext): Boolean = {
// Use prefix "catalog.database." to filter out index belong to this namespace
val catalogDbName = catalogDbCtx.parts.asScala.map(_.getText).mkString("", ".", ".")
index match {
case skipping: FlintSparkSkippingIndex =>
skipping.tableName.startsWith(catalogDbName)
case covering: FlintSparkCoveringIndex =>
covering.tableName.startsWith(catalogDbName)
case mv: FlintSparkMaterializedView => mv.mvName.startsWith(catalogDbName)
case _ => false
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,9 @@ trait FlintSparkCoveringIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[A
val indexNamePattern = FlintSparkCoveringIndex.getFlintIndexName("*", fullTableName)
flint
.describeIndexes(indexNamePattern)
.collect { case index: FlintSparkCoveringIndex =>
Row(index.indexName)
.collect {
case index: FlintSparkCoveringIndex if index.tableName == fullTableName =>
Row(index.indexName)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex
import org.opensearch.flint.spark.mv.FlintSparkMaterializedView
import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex
import org.opensearch.flint.spark.sql.{FlintSparkSqlCommand, FlintSparkSqlExtensionsVisitor, SparkSqlAstBuilder}
import org.opensearch.flint.spark.sql.FlintSparkSqlAstBuilder.IndexBelongsTo
import org.opensearch.flint.spark.sql.FlintSparkSqlExtensionsParser.ShowFlintIndexStatementContext

import org.apache.spark.sql.Row
Expand Down Expand Up @@ -42,6 +43,7 @@ trait FlintSparkIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[AnyRef] {
val indexNamePattern = s"flint_${catalogDbName}_*"
flint
.describeIndexes(indexNamePattern)
.filter(index => index belongsTo ctx.catalogDb)
.map { index =>
val (databaseName, tableName, indexName) = index match {
case skipping: FlintSparkSkippingIndex =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import org.antlr.v4.runtime.tree.RuleNode
import org.opensearch.flint.spark.FlintSpark
import org.opensearch.flint.spark.mv.FlintSparkMaterializedView
import org.opensearch.flint.spark.sql.{FlintSparkSqlCommand, FlintSparkSqlExtensionsVisitor, SparkSqlAstBuilder}
import org.opensearch.flint.spark.sql.FlintSparkSqlAstBuilder.{getFullTableName, getSqlText}
import org.opensearch.flint.spark.sql.FlintSparkSqlAstBuilder.{getFullTableName, getSqlText, IndexBelongsTo}
import org.opensearch.flint.spark.sql.FlintSparkSqlExtensionsParser._

import org.apache.spark.sql.Row
Expand Down Expand Up @@ -73,9 +73,11 @@ trait FlintSparkMaterializedViewAstBuilder extends FlintSparkSqlExtensionsVisito
val indexNamePattern = s"flint_${catalogDbName}_*"
flint
.describeIndexes(indexNamePattern)
.collect { case mv: FlintSparkMaterializedView =>
// MV name must be qualified when metadata created
Row(mv.mvName.split('.').drop(2).mkString("."))
.collect {
// Ensure index is a MV within the given catalog and database
case mv: FlintSparkMaterializedView if mv belongsTo ctx.catalogDb =>
// MV name must be qualified when metadata created
Row(mv.mvName.split('.').drop(2).mkString("."))
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,33 @@ class FlintSparkCoveringIndexSqlITSuite extends FlintSparkSuite {
deleteTestIndex(getFlintIndexName("idx_address", testTable), getSkippingIndexName(testTable))
}

test("show covering index on source table with the same prefix") {
flint
.coveringIndex()
.name(testIndex)
.onTable(testTable)
.addIndexColumns("name", "age")
.create()

val testTable2 = s"${testTable}_2"
withTable(testTable2) {
// Create another table with same prefix
createPartitionedAddressTable(testTable2)
flint
.coveringIndex()
.name(testIndex)
.onTable(testTable2)
.addIndexColumns("address")
.create()

// Expect no testTable2 present
val result = sql(s"SHOW INDEX ON $testTable")
checkAnswer(result, Seq(Row(testIndex)))

deleteTestIndex(getFlintIndexName(testIndex, testTable2))
}
}

test("describe covering index") {
flint
.coveringIndex()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,18 @@ class FlintSparkIndexSqlITSuite extends FlintSparkSuite {
deleteTestIndex(testCoveringFlintIndex)
}

test("show flint index in database with the same prefix") {
flint.materializedView().name("spark_catalog.default.mv1").query(testMvQuery).create()
flint.materializedView().name("spark_catalog.default_test.mv2").query(testMvQuery).create()
checkAnswer(
sql(s"SHOW FLINT INDEX IN spark_catalog.default").select("index_name"),
Seq(Row("mv1")))

deleteTestIndex(
FlintSparkMaterializedView.getFlintIndexName("spark_catalog.default.mv1"),
FlintSparkMaterializedView.getFlintIndexName("spark_catalog.default_test.mv2"))
}

test("should ignore non-Flint index") {
try {
sql(s"CREATE SKIPPING INDEX ON $testTableQualifiedName (name VALUE_SET)")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,20 @@ class FlintSparkMaterializedViewSqlITSuite extends FlintSparkSuite {
Seq(Row("mv1"), Row("mv2")))

checkAnswer(sql(s"SHOW MATERIALIZED VIEW IN spark_catalog.other"), Seq.empty)

deleteTestIndex(
getFlintIndexName("spark_catalog.default.mv1"),
getFlintIndexName("spark_catalog.default.mv2"))
}

test("show materialized view in database with the same prefix") {
flint.materializedView().name("spark_catalog.default.mv1").query(testQuery).create()
flint.materializedView().name("spark_catalog.default_test.mv2").query(testQuery).create()
checkAnswer(sql(s"SHOW MATERIALIZED VIEW IN spark_catalog.default"), Seq(Row("mv1")))

deleteTestIndex(
getFlintIndexName("spark_catalog.default.mv1"),
getFlintIndexName("spark_catalog.default_test.mv2"))
}

test("should return emtpy when show materialized views in empty database") {
Expand Down

0 comments on commit 7c44292

Please sign in to comment.