diff --git a/docs/index.md b/docs/index.md index af6e54a3e..249e7a770 100644 --- a/docs/index.md +++ b/docs/index.md @@ -328,9 +328,11 @@ VACUUM MATERIALIZED VIEW alb_logs_metrics - index_name: user defined name for covering index and materialized view - auto_refresh: auto refresh option of the index (true / false) - status: status of the index +- **Extended Usage**: Display additional information, including the following output columns: + - error: error message if the index is in failed status ```sql -SHOW FLINT [INDEX|INDEXES] IN catalog[.database] +SHOW FLINT [INDEX|INDEXES] [EXTENDED] IN catalog[.database] ``` Example: @@ -344,6 +346,15 @@ fetched rows / total rows = 3/3 | flint_spark_catalog_default_http_logs_skipping_index | skipping | default | http_logs | NULL | true | refreshing | | flint_spark_catalog_default_http_logs_status_clientip_index | covering | default | http_logs | status_clientip | false | active | +-------------------------------------------------------------+----------+----------+-----------+-----------------+--------------+------------+ + +sql> SHOW FLINT INDEXES EXTENDED IN spark_catalog.default; +fetched rows / total rows = 2/2 ++-------------------------------------------------------------+----------+----------+-----------+-----------------+--------------+------------+-------------------------------+ +| flint_index_name | kind | database | table | index_name | auto_refresh | status | error | +|-------------------------------------------------------------+----------+----------+-----------+-----------------+--------------+------------+-------------------------------| +| flint_spark_catalog_default_http_count_view | mv | default | NULL | http_count_view | false | active | NULL | +| flint_spark_catalog_default_http_logs_skipping_index | skipping | default | http_logs | NULL | true | failed | failure in bulk execution:... | ++-------------------------------------------------------------+----------+----------+-----------+-----------------+--------------+------------+-------------------------------+ ``` - **Analyze Skipping Index**: Provides recommendation for creating skipping index. It outputs the following columns: diff --git a/flint-spark-integration/src/main/antlr4/FlintSparkSqlExtensions.g4 b/flint-spark-integration/src/main/antlr4/FlintSparkSqlExtensions.g4 index 2e8d634da..46e814e9f 100644 --- a/flint-spark-integration/src/main/antlr4/FlintSparkSqlExtensions.g4 +++ b/flint-spark-integration/src/main/antlr4/FlintSparkSqlExtensions.g4 @@ -156,7 +156,10 @@ indexManagementStatement ; showFlintIndexStatement - : SHOW FLINT (INDEX | INDEXES) IN catalogDb=multipartIdentifier + : SHOW FLINT (INDEX | INDEXES) + IN catalogDb=multipartIdentifier #showFlintIndex + | SHOW FLINT (INDEX | INDEXES) EXTENDED + IN catalogDb=multipartIdentifier #showFlintIndexExtended ; indexJobManagementStatement diff --git a/flint-spark-integration/src/main/antlr4/SparkSqlBase.g4 b/flint-spark-integration/src/main/antlr4/SparkSqlBase.g4 index 283981e47..c53c61adf 100644 --- a/flint-spark-integration/src/main/antlr4/SparkSqlBase.g4 +++ b/flint-spark-integration/src/main/antlr4/SparkSqlBase.g4 @@ -163,6 +163,7 @@ DESC: 'DESC'; DESCRIBE: 'DESCRIBE'; DROP: 'DROP'; EXISTS: 'EXISTS'; +EXTENDED: 'EXTENDED'; FALSE: 'FALSE'; FLINT: 'FLINT'; IF: 'IF'; diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/index/FlintSparkIndexAstBuilder.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/index/FlintSparkIndexAstBuilder.scala index 62c98b023..e6cccbc4a 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/index/FlintSparkIndexAstBuilder.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/index/FlintSparkIndexAstBuilder.scala @@ -7,12 +7,13 @@ package org.opensearch.flint.spark.sql.index import scala.collection.convert.ImplicitConversions.`collection AsScalaIterable` +import org.opensearch.flint.spark.FlintSparkIndex import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex import org.opensearch.flint.spark.mv.FlintSparkMaterializedView import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex import org.opensearch.flint.spark.sql.{FlintSparkSqlCommand, FlintSparkSqlExtensionsVisitor, SparkSqlAstBuilder} import org.opensearch.flint.spark.sql.FlintSparkSqlAstBuilder.IndexBelongsTo -import org.opensearch.flint.spark.sql.FlintSparkSqlExtensionsParser.ShowFlintIndexStatementContext +import org.opensearch.flint.spark.sql.FlintSparkSqlExtensionsParser.{MultipartIdentifierContext, ShowFlintIndexContext, ShowFlintIndexExtendedContext} import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.expressions.AttributeReference @@ -25,52 +26,123 @@ import org.apache.spark.sql.types.{BooleanType, StringType} trait FlintSparkIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[AnyRef] { self: SparkSqlAstBuilder => - override def visitShowFlintIndexStatement(ctx: ShowFlintIndexStatementContext): Command = { - val outputSchema = Seq( - AttributeReference("flint_index_name", StringType, nullable = false)(), - AttributeReference("kind", StringType, nullable = false)(), - AttributeReference("database", StringType, nullable = false)(), - AttributeReference("table", StringType, nullable = true)(), - AttributeReference("index_name", StringType, nullable = true)(), - AttributeReference("auto_refresh", BooleanType, nullable = false)(), - AttributeReference("status", StringType, nullable = false)()) - - FlintSparkSqlCommand(outputSchema) { flint => - val catalogDbName = - ctx.catalogDb.parts - .map(part => part.getText) - .mkString("_") - val indexNamePattern = s"flint_${catalogDbName}_*" - flint - .describeIndexes(indexNamePattern) - .filter(index => index belongsTo ctx.catalogDb) - .map { index => - val (databaseName, tableName, indexName) = index match { - case skipping: FlintSparkSkippingIndex => - val parts = skipping.tableName.split('.') - (parts(1), parts.drop(2).mkString("."), null) - case covering: FlintSparkCoveringIndex => - val parts = covering.tableName.split('.') - (parts(1), parts.drop(2).mkString("."), covering.indexName) - case mv: FlintSparkMaterializedView => - val parts = mv.mvName.split('.') - (parts(1), null, parts.drop(2).mkString(".")) - } - - val status = index.latestLogEntry match { - case Some(entry) => entry.state.toString - case None => "unavailable" - } - - Row( - index.name, - index.kind, - databaseName, - tableName, - indexName, - index.options.autoRefresh(), - status) - } + /** + * Represents the basic output schema for the FlintSparkSqlCommand. This schema includes + * essential information about each index. + */ + private val baseOutputSchema = Seq( + AttributeReference("flint_index_name", StringType, nullable = false)(), + AttributeReference("kind", StringType, nullable = false)(), + AttributeReference("database", StringType, nullable = false)(), + AttributeReference("table", StringType, nullable = true)(), + AttributeReference("index_name", StringType, nullable = true)(), + AttributeReference("auto_refresh", BooleanType, nullable = false)(), + AttributeReference("status", StringType, nullable = false)()) + + /** + * Extends the base output schema with additional information. This schema is used when the + * EXTENDED keyword is present. + */ + private val extendedOutputSchema = Seq( + AttributeReference("error", StringType, nullable = true)()) + + override def visitShowFlintIndex(ctx: ShowFlintIndexContext): Command = { + new ShowFlintIndexCommandBuilder() + .withSchema(baseOutputSchema) + .forCatalog(ctx.catalogDb) + .constructRows(baseRowData) + .build() + } + + override def visitShowFlintIndexExtended(ctx: ShowFlintIndexExtendedContext): Command = { + new ShowFlintIndexCommandBuilder() + .withSchema(baseOutputSchema ++ extendedOutputSchema) + .forCatalog(ctx.catalogDb) + .constructRows(index => baseRowData(index) ++ extendedRowData(index)) + .build() + } + + /** + * Builder class for constructing FlintSparkSqlCommand objects. + */ + private class ShowFlintIndexCommandBuilder { + private var schema: Seq[AttributeReference] = _ + private var catalogDb: MultipartIdentifierContext = _ + private var rowDataBuilder: FlintSparkIndex => Seq[Any] = _ + + /** Specify the output schema for the command. */ + def withSchema(schema: Seq[AttributeReference]): ShowFlintIndexCommandBuilder = { + this.schema = schema + this + } + + /** Specify the catalog database context for the command. */ + def forCatalog(catalogDb: MultipartIdentifierContext): ShowFlintIndexCommandBuilder = { + this.catalogDb = catalogDb + this + } + + /** Configures a function to construct row data for each index. */ + def constructRows( + rowDataBuilder: FlintSparkIndex => Seq[Any]): ShowFlintIndexCommandBuilder = { + this.rowDataBuilder = rowDataBuilder + this + } + + /** Builds the command using the configured parameters. */ + def build(): FlintSparkSqlCommand = { + require(schema != null, "Schema must be set before building the command") + require(catalogDb != null, "Catalog database must be set before building the command") + require(rowDataBuilder != null, "Row data builder must be set before building the command") + + FlintSparkSqlCommand(schema) { flint => + val catalogDbName = + catalogDb.parts + .map(part => part.getText) + .mkString("_") + val indexNamePattern = s"flint_${catalogDbName}_*" + + flint + .describeIndexes(indexNamePattern) + .filter(index => index belongsTo catalogDb) + .map { index => Row.fromSeq(rowDataBuilder(index)) } + } + } + } + + private def baseRowData(index: FlintSparkIndex): Seq[Any] = { + val (databaseName, tableName, indexName) = index match { + case skipping: FlintSparkSkippingIndex => + val parts = skipping.tableName.split('.') + (parts(1), parts.drop(2).mkString("."), null) + case covering: FlintSparkCoveringIndex => + val parts = covering.tableName.split('.') + (parts(1), parts.drop(2).mkString("."), covering.indexName) + case mv: FlintSparkMaterializedView => + val parts = mv.mvName.split('.') + (parts(1), null, parts.drop(2).mkString(".")) + } + + val status = index.latestLogEntry match { + case Some(entry) => entry.state.toString + case None => "unavailable" + } + + Seq( + index.name, + index.kind, + databaseName, + tableName, + indexName, + index.options.autoRefresh(), + status) + } + + private def extendedRowData(index: FlintSparkIndex): Seq[Any] = { + val error = index.latestLogEntry match { + case Some(entry) => entry.error + case None => null } + Seq(error) } } diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexSqlITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexSqlITSuite.scala index e312ba6de..a5744271f 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexSqlITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexSqlITSuite.scala @@ -5,6 +5,9 @@ package org.opensearch.flint.spark +import scala.collection.JavaConverters._ + +import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequest import org.opensearch.client.RequestOptions import org.opensearch.client.indices.CreateIndexRequest import org.opensearch.common.xcontent.XContentType @@ -12,10 +15,11 @@ import org.opensearch.flint.spark.FlintSparkIndexOptions.OptionName.AUTO_REFRESH import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex import org.opensearch.flint.spark.mv.FlintSparkMaterializedView import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex +import org.scalatest.matchers.should.Matchers import org.apache.spark.sql.Row -class FlintSparkIndexSqlITSuite extends FlintSparkSuite { +class FlintSparkIndexSqlITSuite extends FlintSparkSuite with Matchers { private val testTableName = "index_test" private val testTableQualifiedName = s"spark_catalog.default.$testTableName" @@ -99,6 +103,46 @@ class FlintSparkIndexSqlITSuite extends FlintSparkSuite { FlintSparkMaterializedView.getFlintIndexName("spark_catalog.other.mv2")) } + test("show flint indexes with extended information") { + // Create and refresh with all existing data + flint + .skippingIndex() + .onTable(testTableQualifiedName) + .addValueSet("name") + .options(FlintSparkIndexOptions(Map(AUTO_REFRESH.toString -> "true"))) + .create() + flint.refreshIndex(testSkippingFlintIndex) + val activeJob = spark.streams.active.find(_.name == testSkippingFlintIndex) + awaitStreamingComplete(activeJob.get.id.toString) + + // Assert output contains empty error message + def outputError: String = { + val df = sql("SHOW FLINT INDEX EXTENDED IN spark_catalog") + df.columns should contain("error") + df.collect().head.getAs[String]("error") + } + outputError shouldBe empty + + // Trigger next micro batch after 5 seconds with index readonly + new Thread(() => { + Thread.sleep(5000) + openSearchClient + .indices() + .putSettings( + new UpdateSettingsRequest(testSkippingFlintIndex).settings( + Map("index.blocks.write" -> true).asJava), + RequestOptions.DEFAULT) + sql( + s"INSERT INTO $testTableQualifiedName VALUES (TIMESTAMP '2023-10-01 04:00:00', 'F', 25, 'Vancouver')") + }).start() + + // Await to store exception and verify if it's as expected + flint.flintIndexMonitor.awaitMonitor(Some(testSkippingFlintIndex)) + outputError should include("OpenSearchException") + + deleteTestIndex(testSkippingFlintIndex) + } + test("should return empty when show flint index in empty database") { checkAnswer(sql(s"SHOW FLINT INDEX IN spark_catalog.default"), Seq.empty) }