diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/index/FlintSparkIndexAstBuilder.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/index/FlintSparkIndexAstBuilder.scala index ca17b87dd..e6cccbc4a 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/index/FlintSparkIndexAstBuilder.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/index/FlintSparkIndexAstBuilder.scala @@ -28,14 +28,7 @@ trait FlintSparkIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[AnyRef] { /** * Represents the basic output schema for the FlintSparkSqlCommand. This schema includes - * essential information about each index, such as: - * - Index Name (`flint_index_name`) - * - Index Kind (`kind`) - * - Associated Database (`database`) - * - Table the index is associated with (`table`), which can be nullable - * - Specific Index Name (`index_name`), which can be nullable - * - Auto-refresh status (`auto_refresh`) - * - Current status of the index (`status`) + * essential information about each index. */ private val baseOutputSchema = Seq( AttributeReference("flint_index_name", StringType, nullable = false)(), @@ -48,9 +41,7 @@ trait FlintSparkIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[AnyRef] { /** * Extends the base output schema with additional information. This schema is used when the - * EXTENDED keyword is present which includes: - * - Error information (`error`), detailing any errors associated with the index, which can be - * nullable. + * EXTENDED keyword is present. */ private val extendedOutputSchema = Seq( AttributeReference("error", StringType, nullable = true)()) diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexSqlITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexSqlITSuite.scala index 307651593..feb821a2a 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexSqlITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexSqlITSuite.scala @@ -126,6 +126,14 @@ class FlintSparkIndexSqlITSuite extends FlintSparkSuite with Matchers { val activeJob = spark.streams.active.find(_.name == testSkippingFlintIndex) awaitStreamingComplete(activeJob.get.id.toString) + // Assert output contains empty error message + def outputError: String = { + val df = sql("SHOW FLINT INDEX EXTENDED IN spark_catalog") + df.columns should contain("error") + df.collect().head.getAs[String]("error") + } + outputError shouldBe empty + // Trigger next micro batch after 5 seconds with index readonly Future { Thread.sleep(5000) @@ -139,13 +147,9 @@ class FlintSparkIndexSqlITSuite extends FlintSparkSuite with Matchers { s"INSERT INTO $testTableQualifiedName VALUES (TIMESTAMP '2023-10-01 04:00:00', 'F', 25, 'Vancouver')") } - // Await and store exception as expected + // Await to store exception and verify if it's as expected flint.flintIndexMonitor.awaitMonitor(Some(testSkippingFlintIndex)) - - // Assert output contains error message - val df = sql("SHOW FLINT INDEX EXTENDED IN spark_catalog") - df.columns should contain("error") - df.select(col("error")).collect().head.getString(0) should include("OpenSearchException") + outputError should include("OpenSearchException") deleteTestIndex(testSkippingFlintIndex) }