Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ignore non-Flint index in show and describe index statement #296

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ class FlintSpark(val spark: SparkSession) extends Logging {
flintClient
.getAllIndexMetadata(indexNamePattern)
.asScala
.map(FlintSparkIndexFactory.create)
.flatMap(FlintSparkIndexFactory.create)
} else {
Seq.empty
}
Expand All @@ -196,8 +196,7 @@ class FlintSpark(val spark: SparkSession) extends Logging {
logInfo(s"Describing index name $indexName")
if (flintClient.exists(indexName)) {
val metadata = flintClient.getIndexMetadata(indexName)
val index = FlintSparkIndexFactory.create(metadata)
Some(index)
FlintSparkIndexFactory.create(metadata)
} else {
Option.empty
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,33 @@ import org.opensearch.flint.spark.skipping.minmax.MinMaxSkippingStrategy
import org.opensearch.flint.spark.skipping.partition.PartitionSkippingStrategy
import org.opensearch.flint.spark.skipping.valueset.ValueSetSkippingStrategy

import org.apache.spark.internal.Logging

/**
* Flint Spark index factory that encapsulates specific Flint index instance creation. This is for
* internal code use instead of user facing API.
*/
object FlintSparkIndexFactory {
object FlintSparkIndexFactory extends Logging {

/**
* Creates Flint index from generic Flint metadata.
*
* @param metadata
* Flint metadata
* @return
* Flint index
* Flint index instance, or None if any error during creation
*/
def create(metadata: FlintMetadata): FlintSparkIndex = {
def create(metadata: FlintMetadata): Option[FlintSparkIndex] = {
try {
Some(doCreate(metadata))
} catch {
case e: Exception =>
logWarning(s"Failed to create Flint index from metadata $metadata", e)
None
}
}

private def doCreate(metadata: FlintMetadata): FlintSparkIndex = {
val indexOptions = FlintSparkIndexOptions(
metadata.options.asScala.mapValues(_.asInstanceOf[String]).toMap)
val latestLogEntry = metadata.latestLogEntry
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@

package org.opensearch.flint.spark

import org.opensearch.client.RequestOptions
import org.opensearch.client.indices.CreateIndexRequest
import org.opensearch.common.xcontent.XContentType
import org.opensearch.flint.spark.FlintSparkIndexOptions.OptionName.AUTO_REFRESH
import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex
import org.opensearch.flint.spark.mv.FlintSparkMaterializedView
Expand Down Expand Up @@ -123,4 +126,39 @@ class FlintSparkIndexSqlITSuite extends FlintSparkSuite {
"refreshing")))
deleteTestIndex(testCoveringFlintIndex)
}

test("should ignore non-Flint index") {
try {
sql(s"CREATE SKIPPING INDEX ON $testTableQualifiedName (name VALUE_SET)")

// Create a non-Flint index which has "flint_" prefix in coincidence
openSearchClient
.indices()
.create(
new CreateIndexRequest("flint_spark_catalog_invalid_index1"),
RequestOptions.DEFAULT)

// Create a non-Flint index which has "flint_" prefix and _meta mapping in coincidence
openSearchClient
.indices()
.create(
new CreateIndexRequest("flint_spark_catalog_invalid_index2")
.mapping(
"""{
| "_meta": {
| "custom": "test"
| }
|}
|""".stripMargin,
XContentType.JSON),
RequestOptions.DEFAULT)

// Show statement should ignore such index without problem
checkAnswer(
sql(s"SHOW FLINT INDEX IN spark_catalog"),
Row(testSkippingFlintIndex, "skipping", "default", testTableName, null, false, "active"))
} finally {
deleteTestIndex(testSkippingFlintIndex)
}
}
}
Loading