diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexSqlITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexSqlITSuite.scala index e312ba6de..f5cc070c3 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexSqlITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexSqlITSuite.scala @@ -5,6 +5,11 @@ package org.opensearch.flint.spark +import scala.collection.JavaConverters._ +import scala.concurrent.ExecutionContext.Implicits.global +import scala.concurrent.Future + +import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequest import org.opensearch.client.RequestOptions import org.opensearch.client.indices.CreateIndexRequest import org.opensearch.common.xcontent.XContentType @@ -12,10 +17,12 @@ import org.opensearch.flint.spark.FlintSparkIndexOptions.OptionName.AUTO_REFRESH import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex import org.opensearch.flint.spark.mv.FlintSparkMaterializedView import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex +import org.scalatest.matchers.should.Matchers import org.apache.spark.sql.Row +import org.apache.spark.sql.functions.col -class FlintSparkIndexSqlITSuite extends FlintSparkSuite { +class FlintSparkIndexSqlITSuite extends FlintSparkSuite with Matchers { private val testTableName = "index_test" private val testTableQualifiedName = s"spark_catalog.default.$testTableName" @@ -99,6 +106,42 @@ class FlintSparkIndexSqlITSuite extends FlintSparkSuite { FlintSparkMaterializedView.getFlintIndexName("spark_catalog.other.mv2")) } + test("show flint indexes with extended information") { + // Create and refresh with all existing data + flint + .skippingIndex() + .onTable(testTableQualifiedName) + .addValueSet("name") + .options(FlintSparkIndexOptions(Map(AUTO_REFRESH.toString -> "true"))) + .create() + flint.refreshIndex(testSkippingFlintIndex) + val activeJob = spark.streams.active.find(_.name == testSkippingFlintIndex) + awaitStreamingComplete(activeJob.get.id.toString) + + // Trigger next micro batch after 5 seconds with index readonly + Future { + Thread.sleep(5000) + openSearchClient + .indices() + .putSettings( + new UpdateSettingsRequest(testSkippingFlintIndex).settings( + Map("index.blocks.write" -> true).asJava), + RequestOptions.DEFAULT) + sql( + s"INSERT INTO $testTableQualifiedName VALUES (TIMESTAMP '2023-10-01 04:00:00', 'F', 25, 'Vancouver')") + } + + // Await and store exception as expected + flint.flintIndexMonitor.awaitMonitor(Some(testSkippingFlintIndex)) + + // Assert output contains error message + val df = sql("SHOW FLINT INDEX EXTENDED IN spark_catalog") + df.columns should contain("error") + df.select(col("error")).collect().head.getString(0) should include("OpenSearchException") + + deleteTestIndex(testSkippingFlintIndex) + } + test("should return empty when show flint index in empty database") { checkAnswer(sql(s"SHOW FLINT INDEX IN spark_catalog.default"), Seq.empty) }