From dfc972034e24828320eb52b81e74cef8d4fd4206 Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Tue, 19 Sep 2023 09:48:41 -0700 Subject: [PATCH] Add show index statement Signed-off-by: Chen Dai --- .../main/antlr4/FlintSparkSqlExtensions.g4 | 5 +++++ .../src/main/antlr4/SparkSqlBase.g4 | 2 ++ .../opensearch/flint/spark/FlintSpark.scala | 12 +++++++++++ .../covering/FlintSparkCoveringIndex.scala | 4 ++-- .../FlintSparkCoveringIndexAstBuilder.scala | 15 +++++++++++++- .../FlintSparkCoveringIndexSqlITSuite.scala | 20 +++++++++++++++++++ 6 files changed, 55 insertions(+), 3 deletions(-) diff --git a/flint-spark-integration/src/main/antlr4/FlintSparkSqlExtensions.g4 b/flint-spark-integration/src/main/antlr4/FlintSparkSqlExtensions.g4 index ce0a25fb8..12f69680e 100644 --- a/flint-spark-integration/src/main/antlr4/FlintSparkSqlExtensions.g4 +++ b/flint-spark-integration/src/main/antlr4/FlintSparkSqlExtensions.g4 @@ -47,6 +47,7 @@ dropSkippingIndexStatement coveringIndexStatement : createCoveringIndexStatement | refreshCoveringIndexStatement + | showCoveringIndexStatement | describeCoveringIndexStatement | dropCoveringIndexStatement ; @@ -61,6 +62,10 @@ refreshCoveringIndexStatement : REFRESH INDEX indexName=identifier ON tableName=multipartIdentifier ; +showCoveringIndexStatement + : SHOW (INDEX | INDEXES) ON tableName=multipartIdentifier + ; + describeCoveringIndexStatement : (DESC | DESCRIBE) INDEX indexName=identifier ON tableName=multipartIdentifier ; diff --git a/flint-spark-integration/src/main/antlr4/SparkSqlBase.g4 b/flint-spark-integration/src/main/antlr4/SparkSqlBase.g4 index 17627c190..928f63812 100644 --- a/flint-spark-integration/src/main/antlr4/SparkSqlBase.g4 +++ b/flint-spark-integration/src/main/antlr4/SparkSqlBase.g4 @@ -160,9 +160,11 @@ DESCRIBE: 'DESCRIBE'; DROP: 'DROP'; FALSE: 'FALSE'; INDEX: 'INDEX'; +INDEXES: 'INDEXES'; ON: 'ON'; PARTITION: 'PARTITION'; REFRESH: 'REFRESH'; +SHOW: 'SHOW'; STRING: 'STRING'; TRUE: 'TRUE'; WITH: 'WITH'; diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala index 8058f9bff..0d35f69af 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala @@ -136,6 +136,18 @@ class FlintSpark(val spark: SparkSession) { } } + /** + * Describe all Flint indexes whose name matches the given pattern. + * + * @param indexNamePattern + * index name pattern which may contains wildcard + * @return + * Flint index list + */ + def describeIndexes(indexNamePattern: String): Seq[FlintSparkIndex] = { + flintClient.getAllIndexMetadata(indexNamePattern).asScala.map(deserialize) + } + /** * Describe a Flint index. * diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala index 2d183f4bb..46a2a52b5 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala @@ -28,10 +28,10 @@ import org.apache.spark.sql.types.StructType * @param indexedColumns * indexed column list */ -class FlintSparkCoveringIndex( +case class FlintSparkCoveringIndex( indexName: String, tableName: String, - val indexedColumns: Map[String, String]) + indexedColumns: Map[String, String]) extends FlintSparkIndex { require(indexedColumns.nonEmpty, "indexed columns must not be empty") diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/covering/FlintSparkCoveringIndexAstBuilder.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/covering/FlintSparkCoveringIndexAstBuilder.scala index 34e54cb59..c2be3f806 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/covering/FlintSparkCoveringIndexAstBuilder.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/covering/FlintSparkCoveringIndexAstBuilder.scala @@ -11,7 +11,7 @@ import org.opensearch.flint.spark.FlintSpark.RefreshMode import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex import org.opensearch.flint.spark.sql.{FlintSparkSqlCommand, FlintSparkSqlExtensionsVisitor} import org.opensearch.flint.spark.sql.FlintSparkSqlAstBuilder.{getFullTableName, isAutoRefreshEnabled} -import org.opensearch.flint.spark.sql.FlintSparkSqlExtensionsParser.{CreateCoveringIndexStatementContext, DescribeCoveringIndexStatementContext, DropCoveringIndexStatementContext, RefreshCoveringIndexStatementContext} +import org.opensearch.flint.spark.sql.FlintSparkSqlExtensionsParser._ import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.expressions.AttributeReference @@ -58,6 +58,19 @@ trait FlintSparkCoveringIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[C } } + override def visitShowCoveringIndexStatement( + ctx: ShowCoveringIndexStatementContext): Command = { + val outputSchema = Seq(AttributeReference("index_name", StringType, nullable = false)()) + + FlintSparkSqlCommand(outputSchema) { flint => + val fullTableName = getFullTableName(flint, ctx.tableName) + val indexNamePattern = FlintSparkCoveringIndex.getFlintIndexName("*", fullTableName) + flint + .describeIndexes(indexNamePattern) + .map { case index: FlintSparkCoveringIndex => Row(index.indexName) } + } + } + override def visitDescribeCoveringIndexStatement( ctx: DescribeCoveringIndexStatementContext): Command = { val outputSchema = Seq( diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala index 0c19040ab..f0c624dfe 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala @@ -66,6 +66,26 @@ class FlintSparkCoveringIndexSqlITSuite extends FlintSparkSuite { indexData.count() shouldBe 2 } + test("show all covering index on the source table") { + flint + .coveringIndex() + .name(testIndex) + .onTable(testTable) + .addIndexColumns("name", "age") + .create() + flint + .coveringIndex() + .name("idx_address") + .onTable(testTable) + .addIndexColumns("address") + .create() + + val result = sql(s"SHOW INDEX ON $testTable") + checkAnswer(result, Seq(Row(testIndex), Row("idx_address"))) + + flint.deleteIndex(getFlintIndexName("idx_address", testTable)) + } + test("describe covering index") { flint .coveringIndex()