Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add error output column to show Flint index statement #436

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -328,9 +328,11 @@ VACUUM MATERIALIZED VIEW alb_logs_metrics
- index_name: user defined name for covering index and materialized view
- auto_refresh: auto refresh option of the index (true / false)
- status: status of the index
- **Extended Usage**: Display additional information, including the following output columns:
- error: error message if the index is in failed status

```sql
SHOW FLINT [INDEX|INDEXES] IN catalog[.database]
SHOW FLINT [INDEX|INDEXES] [EXTENDED] IN catalog[.database]
```

Example:
Expand All @@ -344,6 +346,15 @@ fetched rows / total rows = 3/3
| flint_spark_catalog_default_http_logs_skipping_index | skipping | default | http_logs | NULL | true | refreshing |
| flint_spark_catalog_default_http_logs_status_clientip_index | covering | default | http_logs | status_clientip | false | active |
+-------------------------------------------------------------+----------+----------+-----------+-----------------+--------------+------------+

sql> SHOW FLINT INDEXES EXTENDED IN spark_catalog.default;
fetched rows / total rows = 2/2
+-------------------------------------------------------------+----------+----------+-----------+-----------------+--------------+------------+-------------------------------+
| flint_index_name | kind | database | table | index_name | auto_refresh | status | error |
|-------------------------------------------------------------+----------+----------+-----------+-----------------+--------------+------------+-------------------------------|
| flint_spark_catalog_default_http_count_view | mv | default | NULL | http_count_view | false | active | NULL |
| flint_spark_catalog_default_http_logs_skipping_index | skipping | default | http_logs | NULL | true | failed | failure in bulk execution:... |
+-------------------------------------------------------------+----------+----------+-----------+-----------------+--------------+------------+-------------------------------+
```

- **Analyze Skipping Index**: Provides recommendation for creating skipping index. It outputs the following columns:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,10 @@ indexManagementStatement
;

showFlintIndexStatement
: SHOW FLINT (INDEX | INDEXES) IN catalogDb=multipartIdentifier
: SHOW FLINT (INDEX | INDEXES)
IN catalogDb=multipartIdentifier #showFlintIndex
| SHOW FLINT (INDEX | INDEXES) EXTENDED
IN catalogDb=multipartIdentifier #showFlintIndexExtended
;

indexJobManagementStatement
Expand Down
1 change: 1 addition & 0 deletions flint-spark-integration/src/main/antlr4/SparkSqlBase.g4
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ DESC: 'DESC';
DESCRIBE: 'DESCRIBE';
DROP: 'DROP';
EXISTS: 'EXISTS';
EXTENDED: 'EXTENDED';
FALSE: 'FALSE';
FLINT: 'FLINT';
IF: 'IF';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,13 @@ package org.opensearch.flint.spark.sql.index

import scala.collection.convert.ImplicitConversions.`collection AsScalaIterable`

import org.opensearch.flint.spark.FlintSparkIndex
import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex
import org.opensearch.flint.spark.mv.FlintSparkMaterializedView
import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex
import org.opensearch.flint.spark.sql.{FlintSparkSqlCommand, FlintSparkSqlExtensionsVisitor, SparkSqlAstBuilder}
import org.opensearch.flint.spark.sql.FlintSparkSqlAstBuilder.IndexBelongsTo
import org.opensearch.flint.spark.sql.FlintSparkSqlExtensionsParser.ShowFlintIndexStatementContext
import org.opensearch.flint.spark.sql.FlintSparkSqlExtensionsParser.{MultipartIdentifierContext, ShowFlintIndexContext, ShowFlintIndexExtendedContext}

import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.expressions.AttributeReference
Expand All @@ -25,52 +26,123 @@ import org.apache.spark.sql.types.{BooleanType, StringType}
trait FlintSparkIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[AnyRef] {
self: SparkSqlAstBuilder =>

override def visitShowFlintIndexStatement(ctx: ShowFlintIndexStatementContext): Command = {
val outputSchema = Seq(
AttributeReference("flint_index_name", StringType, nullable = false)(),
AttributeReference("kind", StringType, nullable = false)(),
AttributeReference("database", StringType, nullable = false)(),
AttributeReference("table", StringType, nullable = true)(),
AttributeReference("index_name", StringType, nullable = true)(),
AttributeReference("auto_refresh", BooleanType, nullable = false)(),
AttributeReference("status", StringType, nullable = false)())

FlintSparkSqlCommand(outputSchema) { flint =>
val catalogDbName =
ctx.catalogDb.parts
.map(part => part.getText)
.mkString("_")
val indexNamePattern = s"flint_${catalogDbName}_*"
flint
.describeIndexes(indexNamePattern)
.filter(index => index belongsTo ctx.catalogDb)
.map { index =>
val (databaseName, tableName, indexName) = index match {
case skipping: FlintSparkSkippingIndex =>
val parts = skipping.tableName.split('.')
(parts(1), parts.drop(2).mkString("."), null)
case covering: FlintSparkCoveringIndex =>
val parts = covering.tableName.split('.')
(parts(1), parts.drop(2).mkString("."), covering.indexName)
case mv: FlintSparkMaterializedView =>
val parts = mv.mvName.split('.')
(parts(1), null, parts.drop(2).mkString("."))
}

val status = index.latestLogEntry match {
case Some(entry) => entry.state.toString
case None => "unavailable"
}

Row(
index.name,
index.kind,
databaseName,
tableName,
indexName,
index.options.autoRefresh(),
status)
}
/**
* Represents the basic output schema for the FlintSparkSqlCommand. This schema includes
* essential information about each index.
*/
private val baseOutputSchema = Seq(
AttributeReference("flint_index_name", StringType, nullable = false)(),
AttributeReference("kind", StringType, nullable = false)(),
AttributeReference("database", StringType, nullable = false)(),
AttributeReference("table", StringType, nullable = true)(),
AttributeReference("index_name", StringType, nullable = true)(),
AttributeReference("auto_refresh", BooleanType, nullable = false)(),
AttributeReference("status", StringType, nullable = false)())

/**
* Extends the base output schema with additional information. This schema is used when the
* EXTENDED keyword is present.
*/
private val extendedOutputSchema = Seq(
AttributeReference("error", StringType, nullable = true)())

override def visitShowFlintIndex(ctx: ShowFlintIndexContext): Command = {
new ShowFlintIndexCommandBuilder()
.withSchema(baseOutputSchema)
.forCatalog(ctx.catalogDb)
.constructRows(baseRowData)
.build()
}

override def visitShowFlintIndexExtended(ctx: ShowFlintIndexExtendedContext): Command = {
new ShowFlintIndexCommandBuilder()
.withSchema(baseOutputSchema ++ extendedOutputSchema)
.forCatalog(ctx.catalogDb)
.constructRows(index => baseRowData(index) ++ extendedRowData(index))
.build()
}

/**
* Builder class for constructing FlintSparkSqlCommand objects.
*/
private class ShowFlintIndexCommandBuilder {
private var schema: Seq[AttributeReference] = _
private var catalogDb: MultipartIdentifierContext = _
private var rowDataBuilder: FlintSparkIndex => Seq[Any] = _

/** Specify the output schema for the command. */
def withSchema(schema: Seq[AttributeReference]): ShowFlintIndexCommandBuilder = {
this.schema = schema
this
}

/** Specify the catalog database context for the command. */
def forCatalog(catalogDb: MultipartIdentifierContext): ShowFlintIndexCommandBuilder = {
this.catalogDb = catalogDb
this
}

/** Configures a function to construct row data for each index. */
def constructRows(
rowDataBuilder: FlintSparkIndex => Seq[Any]): ShowFlintIndexCommandBuilder = {
this.rowDataBuilder = rowDataBuilder
this
}

/** Builds the command using the configured parameters. */
def build(): FlintSparkSqlCommand = {
require(schema != null, "Schema must be set before building the command")
require(catalogDb != null, "Catalog database must be set before building the command")
require(rowDataBuilder != null, "Row data builder must be set before building the command")

FlintSparkSqlCommand(schema) { flint =>
val catalogDbName =
catalogDb.parts
.map(part => part.getText)
.mkString("_")
val indexNamePattern = s"flint_${catalogDbName}_*"

flint
.describeIndexes(indexNamePattern)
.filter(index => index belongsTo catalogDb)
.map { index => Row.fromSeq(rowDataBuilder(index)) }
}
}
}

private def baseRowData(index: FlintSparkIndex): Seq[Any] = {
val (databaseName, tableName, indexName) = index match {
case skipping: FlintSparkSkippingIndex =>
val parts = skipping.tableName.split('.')
(parts(1), parts.drop(2).mkString("."), null)
case covering: FlintSparkCoveringIndex =>
val parts = covering.tableName.split('.')
(parts(1), parts.drop(2).mkString("."), covering.indexName)
case mv: FlintSparkMaterializedView =>
val parts = mv.mvName.split('.')
(parts(1), null, parts.drop(2).mkString("."))
}

val status = index.latestLogEntry match {
case Some(entry) => entry.state.toString
case None => "unavailable"
}

Seq(
index.name,
index.kind,
databaseName,
tableName,
indexName,
index.options.autoRefresh(),
status)
}

private def extendedRowData(index: FlintSparkIndex): Seq[Any] = {
val error = index.latestLogEntry match {
case Some(entry) => entry.error
case None => null
}
Seq(error)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,21 @@

package org.opensearch.flint.spark

import scala.collection.JavaConverters._

import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequest
import org.opensearch.client.RequestOptions
import org.opensearch.client.indices.CreateIndexRequest
import org.opensearch.common.xcontent.XContentType
import org.opensearch.flint.spark.FlintSparkIndexOptions.OptionName.AUTO_REFRESH
import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex
import org.opensearch.flint.spark.mv.FlintSparkMaterializedView
import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex
import org.scalatest.matchers.should.Matchers

import org.apache.spark.sql.Row

class FlintSparkIndexSqlITSuite extends FlintSparkSuite {
class FlintSparkIndexSqlITSuite extends FlintSparkSuite with Matchers {

private val testTableName = "index_test"
private val testTableQualifiedName = s"spark_catalog.default.$testTableName"
Expand Down Expand Up @@ -99,6 +103,46 @@ class FlintSparkIndexSqlITSuite extends FlintSparkSuite {
FlintSparkMaterializedView.getFlintIndexName("spark_catalog.other.mv2"))
}

test("show flint indexes with extended information") {
// Create and refresh with all existing data
flint
.skippingIndex()
.onTable(testTableQualifiedName)
.addValueSet("name")
.options(FlintSparkIndexOptions(Map(AUTO_REFRESH.toString -> "true")))
.create()
flint.refreshIndex(testSkippingFlintIndex)
val activeJob = spark.streams.active.find(_.name == testSkippingFlintIndex)
awaitStreamingComplete(activeJob.get.id.toString)

// Assert output contains empty error message
def outputError: String = {
val df = sql("SHOW FLINT INDEX EXTENDED IN spark_catalog")
df.columns should contain("error")
df.collect().head.getAs[String]("error")
}
outputError shouldBe empty

// Trigger next micro batch after 5 seconds with index readonly
new Thread(() => {
Thread.sleep(5000)
openSearchClient
.indices()
.putSettings(
new UpdateSettingsRequest(testSkippingFlintIndex).settings(
Map("index.blocks.write" -> true).asJava),
RequestOptions.DEFAULT)
sql(
s"INSERT INTO $testTableQualifiedName VALUES (TIMESTAMP '2023-10-01 04:00:00', 'F', 25, 'Vancouver')")
}).start()

// Await to store exception and verify if it's as expected
flint.flintIndexMonitor.awaitMonitor(Some(testSkippingFlintIndex))
outputError should include("OpenSearchException")

deleteTestIndex(testSkippingFlintIndex)
}

test("should return empty when show flint index in empty database") {
checkAnswer(sql(s"SHOW FLINT INDEX IN spark_catalog.default"), Seq.empty)
}
Expand Down
Loading