Skip to content

Commit

Permalink
Add show index statement
Browse files Browse the repository at this point in the history
Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen committed Sep 19, 2023
1 parent bba5932 commit dfc9720
Show file tree
Hide file tree
Showing 6 changed files with 55 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ dropSkippingIndexStatement
coveringIndexStatement
: createCoveringIndexStatement
| refreshCoveringIndexStatement
| showCoveringIndexStatement
| describeCoveringIndexStatement
| dropCoveringIndexStatement
;
Expand All @@ -61,6 +62,10 @@ refreshCoveringIndexStatement
: REFRESH INDEX indexName=identifier ON tableName=multipartIdentifier
;

showCoveringIndexStatement
: SHOW (INDEX | INDEXES) ON tableName=multipartIdentifier
;

describeCoveringIndexStatement
: (DESC | DESCRIBE) INDEX indexName=identifier ON tableName=multipartIdentifier
;
Expand Down
2 changes: 2 additions & 0 deletions flint-spark-integration/src/main/antlr4/SparkSqlBase.g4
Original file line number Diff line number Diff line change
Expand Up @@ -160,9 +160,11 @@ DESCRIBE: 'DESCRIBE';
DROP: 'DROP';
FALSE: 'FALSE';
INDEX: 'INDEX';
INDEXES: 'INDEXES';
ON: 'ON';
PARTITION: 'PARTITION';
REFRESH: 'REFRESH';
SHOW: 'SHOW';
STRING: 'STRING';
TRUE: 'TRUE';
WITH: 'WITH';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,18 @@ class FlintSpark(val spark: SparkSession) {
}
}

/**
* Describe all Flint indexes whose name matches the given pattern.
*
* @param indexNamePattern
* index name pattern which may contains wildcard
* @return
* Flint index list
*/
def describeIndexes(indexNamePattern: String): Seq[FlintSparkIndex] = {
flintClient.getAllIndexMetadata(indexNamePattern).asScala.map(deserialize)
}

/**
* Describe a Flint index.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@ import org.apache.spark.sql.types.StructType
* @param indexedColumns
* indexed column list
*/
class FlintSparkCoveringIndex(
case class FlintSparkCoveringIndex(
indexName: String,
tableName: String,
val indexedColumns: Map[String, String])
indexedColumns: Map[String, String])
extends FlintSparkIndex {

require(indexedColumns.nonEmpty, "indexed columns must not be empty")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import org.opensearch.flint.spark.FlintSpark.RefreshMode
import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex
import org.opensearch.flint.spark.sql.{FlintSparkSqlCommand, FlintSparkSqlExtensionsVisitor}
import org.opensearch.flint.spark.sql.FlintSparkSqlAstBuilder.{getFullTableName, isAutoRefreshEnabled}
import org.opensearch.flint.spark.sql.FlintSparkSqlExtensionsParser.{CreateCoveringIndexStatementContext, DescribeCoveringIndexStatementContext, DropCoveringIndexStatementContext, RefreshCoveringIndexStatementContext}
import org.opensearch.flint.spark.sql.FlintSparkSqlExtensionsParser._

import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.expressions.AttributeReference
Expand Down Expand Up @@ -58,6 +58,19 @@ trait FlintSparkCoveringIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[C
}
}

override def visitShowCoveringIndexStatement(
ctx: ShowCoveringIndexStatementContext): Command = {
val outputSchema = Seq(AttributeReference("index_name", StringType, nullable = false)())

FlintSparkSqlCommand(outputSchema) { flint =>
val fullTableName = getFullTableName(flint, ctx.tableName)
val indexNamePattern = FlintSparkCoveringIndex.getFlintIndexName("*", fullTableName)
flint
.describeIndexes(indexNamePattern)
.map { case index: FlintSparkCoveringIndex => Row(index.indexName) }
}
}

override def visitDescribeCoveringIndexStatement(
ctx: DescribeCoveringIndexStatementContext): Command = {
val outputSchema = Seq(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,26 @@ class FlintSparkCoveringIndexSqlITSuite extends FlintSparkSuite {
indexData.count() shouldBe 2
}

test("show all covering index on the source table") {
flint
.coveringIndex()
.name(testIndex)
.onTable(testTable)
.addIndexColumns("name", "age")
.create()
flint
.coveringIndex()
.name("idx_address")
.onTable(testTable)
.addIndexColumns("address")
.create()

val result = sql(s"SHOW INDEX ON $testTable")
checkAnswer(result, Seq(Row(testIndex), Row("idx_address")))

flint.deleteIndex(getFlintIndexName("idx_address", testTable))
}

test("describe covering index") {
flint
.coveringIndex()
Expand Down

0 comments on commit dfc9720

Please sign in to comment.