Skip to content

Commit

Permalink
Ignore non-Flint index in show and describe index statement (#296)
Browse files Browse the repository at this point in the history
* Ignore index if create failure and add IT

Signed-off-by: Chen Dai <[email protected]>

* Fix style check

Signed-off-by: Chen Dai <[email protected]>

* Add more IT

Signed-off-by: Chen Dai <[email protected]>

---------

Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen authored Mar 27, 2024
1 parent 94fc2f5 commit e87d330
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ class FlintSpark(val spark: SparkSession) extends Logging {
flintClient
.getAllIndexMetadata(indexNamePattern)
.asScala
.map(FlintSparkIndexFactory.create)
.flatMap(FlintSparkIndexFactory.create)
} else {
Seq.empty
}
Expand All @@ -196,8 +196,7 @@ class FlintSpark(val spark: SparkSession) extends Logging {
logInfo(s"Describing index name $indexName")
if (flintClient.exists(indexName)) {
val metadata = flintClient.getIndexMetadata(indexName)
val index = FlintSparkIndexFactory.create(metadata)
Some(index)
FlintSparkIndexFactory.create(metadata)
} else {
Option.empty
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,33 @@ import org.opensearch.flint.spark.skipping.minmax.MinMaxSkippingStrategy
import org.opensearch.flint.spark.skipping.partition.PartitionSkippingStrategy
import org.opensearch.flint.spark.skipping.valueset.ValueSetSkippingStrategy

import org.apache.spark.internal.Logging

/**
* Flint Spark index factory that encapsulates specific Flint index instance creation. This is for
* internal code use instead of user facing API.
*/
object FlintSparkIndexFactory {
object FlintSparkIndexFactory extends Logging {

/**
* Creates Flint index from generic Flint metadata.
*
* @param metadata
* Flint metadata
* @return
* Flint index
* Flint index instance, or None if any error during creation
*/
def create(metadata: FlintMetadata): FlintSparkIndex = {
def create(metadata: FlintMetadata): Option[FlintSparkIndex] = {
try {
Some(doCreate(metadata))
} catch {
case e: Exception =>
logWarning(s"Failed to create Flint index from metadata $metadata", e)
None
}
}

private def doCreate(metadata: FlintMetadata): FlintSparkIndex = {
val indexOptions = FlintSparkIndexOptions(
metadata.options.asScala.mapValues(_.asInstanceOf[String]).toMap)
val latestLogEntry = metadata.latestLogEntry
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@

package org.opensearch.flint.spark

import org.opensearch.client.RequestOptions
import org.opensearch.client.indices.CreateIndexRequest
import org.opensearch.common.xcontent.XContentType
import org.opensearch.flint.spark.FlintSparkIndexOptions.OptionName.AUTO_REFRESH
import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex
import org.opensearch.flint.spark.mv.FlintSparkMaterializedView
Expand Down Expand Up @@ -123,4 +126,39 @@ class FlintSparkIndexSqlITSuite extends FlintSparkSuite {
"refreshing")))
deleteTestIndex(testCoveringFlintIndex)
}

test("should ignore non-Flint index") {
try {
sql(s"CREATE SKIPPING INDEX ON $testTableQualifiedName (name VALUE_SET)")

// Create a non-Flint index which has "flint_" prefix in coincidence
openSearchClient
.indices()
.create(
new CreateIndexRequest("flint_spark_catalog_invalid_index1"),
RequestOptions.DEFAULT)

// Create a non-Flint index which has "flint_" prefix and _meta mapping in coincidence
openSearchClient
.indices()
.create(
new CreateIndexRequest("flint_spark_catalog_invalid_index2")
.mapping(
"""{
| "_meta": {
| "custom": "test"
| }
|}
|""".stripMargin,
XContentType.JSON),
RequestOptions.DEFAULT)

// Show statement should ignore such index without problem
checkAnswer(
sql(s"SHOW FLINT INDEX IN spark_catalog"),
Row(testSkippingFlintIndex, "skipping", "default", testTableName, null, false, "active"))
} finally {
deleteTestIndex(testSkippingFlintIndex)
}
}
}

0 comments on commit e87d330

Please sign in to comment.