Skip to content

Commit

Permalink
Add error output column to show Flint index statement (opensearch-pro…
Browse files Browse the repository at this point in the history
…ject#436)

* Add extended in show flint index and refactor AST builder

Signed-off-by: Chen Dai <[email protected]>

* Update user manual

Signed-off-by: Chen Dai <[email protected]>

* Add IT

Signed-off-by: Chen Dai <[email protected]>

* Fix scalafmt issue for global execution context

Signed-off-by: Chen Dai <[email protected]>

* Split ANTLR grammar rule

Signed-off-by: Chen Dai <[email protected]>

* Refactor comments and test code

Signed-off-by: Chen Dai <[email protected]>

* Remove thread pool in IT

Signed-off-by: Chen Dai <[email protected]>

---------

Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen authored Jul 26, 2024
1 parent 98bd79a commit 3e4df0a
Show file tree
Hide file tree
Showing 5 changed files with 181 additions and 50 deletions.
13 changes: 12 additions & 1 deletion docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -328,9 +328,11 @@ VACUUM MATERIALIZED VIEW alb_logs_metrics
- index_name: user defined name for covering index and materialized view
- auto_refresh: auto refresh option of the index (true / false)
- status: status of the index
- **Extended Usage**: Display additional information, including the following output columns:
- error: error message if the index is in failed status

```sql
SHOW FLINT [INDEX|INDEXES] IN catalog[.database]
SHOW FLINT [INDEX|INDEXES] [EXTENDED] IN catalog[.database]
```

Example:
Expand All @@ -344,6 +346,15 @@ fetched rows / total rows = 3/3
| flint_spark_catalog_default_http_logs_skipping_index | skipping | default | http_logs | NULL | true | refreshing |
| flint_spark_catalog_default_http_logs_status_clientip_index | covering | default | http_logs | status_clientip | false | active |
+-------------------------------------------------------------+----------+----------+-----------+-----------------+--------------+------------+
sql> SHOW FLINT INDEXES EXTENDED IN spark_catalog.default;
fetched rows / total rows = 2/2
+-------------------------------------------------------------+----------+----------+-----------+-----------------+--------------+------------+-------------------------------+
| flint_index_name | kind | database | table | index_name | auto_refresh | status | error |
|-------------------------------------------------------------+----------+----------+-----------+-----------------+--------------+------------+-------------------------------|
| flint_spark_catalog_default_http_count_view | mv | default | NULL | http_count_view | false | active | NULL |
| flint_spark_catalog_default_http_logs_skipping_index | skipping | default | http_logs | NULL | true | failed | failure in bulk execution:... |
+-------------------------------------------------------------+----------+----------+-----------+-----------------+--------------+------------+-------------------------------+
```

- **Analyze Skipping Index**: Provides recommendation for creating skipping index. It outputs the following columns:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,10 @@ indexManagementStatement
;

showFlintIndexStatement
: SHOW FLINT (INDEX | INDEXES) IN catalogDb=multipartIdentifier
: SHOW FLINT (INDEX | INDEXES)
IN catalogDb=multipartIdentifier #showFlintIndex
| SHOW FLINT (INDEX | INDEXES) EXTENDED
IN catalogDb=multipartIdentifier #showFlintIndexExtended
;

indexJobManagementStatement
Expand Down
1 change: 1 addition & 0 deletions flint-spark-integration/src/main/antlr4/SparkSqlBase.g4
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ DESC: 'DESC';
DESCRIBE: 'DESCRIBE';
DROP: 'DROP';
EXISTS: 'EXISTS';
EXTENDED: 'EXTENDED';
FALSE: 'FALSE';
FLINT: 'FLINT';
IF: 'IF';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,13 @@ package org.opensearch.flint.spark.sql.index

import scala.collection.convert.ImplicitConversions.`collection AsScalaIterable`

import org.opensearch.flint.spark.FlintSparkIndex
import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex
import org.opensearch.flint.spark.mv.FlintSparkMaterializedView
import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex
import org.opensearch.flint.spark.sql.{FlintSparkSqlCommand, FlintSparkSqlExtensionsVisitor, SparkSqlAstBuilder}
import org.opensearch.flint.spark.sql.FlintSparkSqlAstBuilder.IndexBelongsTo
import org.opensearch.flint.spark.sql.FlintSparkSqlExtensionsParser.ShowFlintIndexStatementContext
import org.opensearch.flint.spark.sql.FlintSparkSqlExtensionsParser.{MultipartIdentifierContext, ShowFlintIndexContext, ShowFlintIndexExtendedContext}

import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.expressions.AttributeReference
Expand All @@ -25,52 +26,123 @@ import org.apache.spark.sql.types.{BooleanType, StringType}
trait FlintSparkIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[AnyRef] {
self: SparkSqlAstBuilder =>

override def visitShowFlintIndexStatement(ctx: ShowFlintIndexStatementContext): Command = {
val outputSchema = Seq(
AttributeReference("flint_index_name", StringType, nullable = false)(),
AttributeReference("kind", StringType, nullable = false)(),
AttributeReference("database", StringType, nullable = false)(),
AttributeReference("table", StringType, nullable = true)(),
AttributeReference("index_name", StringType, nullable = true)(),
AttributeReference("auto_refresh", BooleanType, nullable = false)(),
AttributeReference("status", StringType, nullable = false)())

FlintSparkSqlCommand(outputSchema) { flint =>
val catalogDbName =
ctx.catalogDb.parts
.map(part => part.getText)
.mkString("_")
val indexNamePattern = s"flint_${catalogDbName}_*"
flint
.describeIndexes(indexNamePattern)
.filter(index => index belongsTo ctx.catalogDb)
.map { index =>
val (databaseName, tableName, indexName) = index match {
case skipping: FlintSparkSkippingIndex =>
val parts = skipping.tableName.split('.')
(parts(1), parts.drop(2).mkString("."), null)
case covering: FlintSparkCoveringIndex =>
val parts = covering.tableName.split('.')
(parts(1), parts.drop(2).mkString("."), covering.indexName)
case mv: FlintSparkMaterializedView =>
val parts = mv.mvName.split('.')
(parts(1), null, parts.drop(2).mkString("."))
}

val status = index.latestLogEntry match {
case Some(entry) => entry.state.toString
case None => "unavailable"
}

Row(
index.name,
index.kind,
databaseName,
tableName,
indexName,
index.options.autoRefresh(),
status)
}
/**
* Represents the basic output schema for the FlintSparkSqlCommand. This schema includes
* essential information about each index.
*/
private val baseOutputSchema = Seq(
AttributeReference("flint_index_name", StringType, nullable = false)(),
AttributeReference("kind", StringType, nullable = false)(),
AttributeReference("database", StringType, nullable = false)(),
AttributeReference("table", StringType, nullable = true)(),
AttributeReference("index_name", StringType, nullable = true)(),
AttributeReference("auto_refresh", BooleanType, nullable = false)(),
AttributeReference("status", StringType, nullable = false)())

/**
* Extends the base output schema with additional information. This schema is used when the
* EXTENDED keyword is present.
*/
private val extendedOutputSchema = Seq(
AttributeReference("error", StringType, nullable = true)())

override def visitShowFlintIndex(ctx: ShowFlintIndexContext): Command = {
new ShowFlintIndexCommandBuilder()
.withSchema(baseOutputSchema)
.forCatalog(ctx.catalogDb)
.constructRows(baseRowData)
.build()
}

override def visitShowFlintIndexExtended(ctx: ShowFlintIndexExtendedContext): Command = {
new ShowFlintIndexCommandBuilder()
.withSchema(baseOutputSchema ++ extendedOutputSchema)
.forCatalog(ctx.catalogDb)
.constructRows(index => baseRowData(index) ++ extendedRowData(index))
.build()
}

/**
* Builder class for constructing FlintSparkSqlCommand objects.
*/
private class ShowFlintIndexCommandBuilder {
private var schema: Seq[AttributeReference] = _
private var catalogDb: MultipartIdentifierContext = _
private var rowDataBuilder: FlintSparkIndex => Seq[Any] = _

/** Specify the output schema for the command. */
def withSchema(schema: Seq[AttributeReference]): ShowFlintIndexCommandBuilder = {
this.schema = schema
this
}

/** Specify the catalog database context for the command. */
def forCatalog(catalogDb: MultipartIdentifierContext): ShowFlintIndexCommandBuilder = {
this.catalogDb = catalogDb
this
}

/** Configures a function to construct row data for each index. */
def constructRows(
rowDataBuilder: FlintSparkIndex => Seq[Any]): ShowFlintIndexCommandBuilder = {
this.rowDataBuilder = rowDataBuilder
this
}

/** Builds the command using the configured parameters. */
def build(): FlintSparkSqlCommand = {
require(schema != null, "Schema must be set before building the command")
require(catalogDb != null, "Catalog database must be set before building the command")
require(rowDataBuilder != null, "Row data builder must be set before building the command")

FlintSparkSqlCommand(schema) { flint =>
val catalogDbName =
catalogDb.parts
.map(part => part.getText)
.mkString("_")
val indexNamePattern = s"flint_${catalogDbName}_*"

flint
.describeIndexes(indexNamePattern)
.filter(index => index belongsTo catalogDb)
.map { index => Row.fromSeq(rowDataBuilder(index)) }
}
}
}

private def baseRowData(index: FlintSparkIndex): Seq[Any] = {
val (databaseName, tableName, indexName) = index match {
case skipping: FlintSparkSkippingIndex =>
val parts = skipping.tableName.split('.')
(parts(1), parts.drop(2).mkString("."), null)
case covering: FlintSparkCoveringIndex =>
val parts = covering.tableName.split('.')
(parts(1), parts.drop(2).mkString("."), covering.indexName)
case mv: FlintSparkMaterializedView =>
val parts = mv.mvName.split('.')
(parts(1), null, parts.drop(2).mkString("."))
}

val status = index.latestLogEntry match {
case Some(entry) => entry.state.toString
case None => "unavailable"
}

Seq(
index.name,
index.kind,
databaseName,
tableName,
indexName,
index.options.autoRefresh(),
status)
}

private def extendedRowData(index: FlintSparkIndex): Seq[Any] = {
val error = index.latestLogEntry match {
case Some(entry) => entry.error
case None => null
}
Seq(error)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,21 @@

package org.opensearch.flint.spark

import scala.collection.JavaConverters._

import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequest
import org.opensearch.client.RequestOptions
import org.opensearch.client.indices.CreateIndexRequest
import org.opensearch.common.xcontent.XContentType
import org.opensearch.flint.spark.FlintSparkIndexOptions.OptionName.AUTO_REFRESH
import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex
import org.opensearch.flint.spark.mv.FlintSparkMaterializedView
import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex
import org.scalatest.matchers.should.Matchers

import org.apache.spark.sql.Row

class FlintSparkIndexSqlITSuite extends FlintSparkSuite {
class FlintSparkIndexSqlITSuite extends FlintSparkSuite with Matchers {

private val testTableName = "index_test"
private val testTableQualifiedName = s"spark_catalog.default.$testTableName"
Expand Down Expand Up @@ -99,6 +103,46 @@ class FlintSparkIndexSqlITSuite extends FlintSparkSuite {
FlintSparkMaterializedView.getFlintIndexName("spark_catalog.other.mv2"))
}

test("show flint indexes with extended information") {
// Create and refresh with all existing data
flint
.skippingIndex()
.onTable(testTableQualifiedName)
.addValueSet("name")
.options(FlintSparkIndexOptions(Map(AUTO_REFRESH.toString -> "true")))
.create()
flint.refreshIndex(testSkippingFlintIndex)
val activeJob = spark.streams.active.find(_.name == testSkippingFlintIndex)
awaitStreamingComplete(activeJob.get.id.toString)

// Assert output contains empty error message
def outputError: String = {
val df = sql("SHOW FLINT INDEX EXTENDED IN spark_catalog")
df.columns should contain("error")
df.collect().head.getAs[String]("error")
}
outputError shouldBe empty

// Trigger next micro batch after 5 seconds with index readonly
new Thread(() => {
Thread.sleep(5000)
openSearchClient
.indices()
.putSettings(
new UpdateSettingsRequest(testSkippingFlintIndex).settings(
Map("index.blocks.write" -> true).asJava),
RequestOptions.DEFAULT)
sql(
s"INSERT INTO $testTableQualifiedName VALUES (TIMESTAMP '2023-10-01 04:00:00', 'F', 25, 'Vancouver')")
}).start()

// Await to store exception and verify if it's as expected
flint.flintIndexMonitor.awaitMonitor(Some(testSkippingFlintIndex))
outputError should include("OpenSearchException")

deleteTestIndex(testSkippingFlintIndex)
}

test("should return empty when show flint index in empty database") {
checkAnswer(sql(s"SHOW FLINT INDEX IN spark_catalog.default"), Seq.empty)
}
Expand Down

0 comments on commit 3e4df0a

Please sign in to comment.