diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/FlintClient.java b/flint-core/src/main/scala/org/opensearch/flint/core/FlintClient.java index 8ee4903d1..b4271360c 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/FlintClient.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/FlintClient.java @@ -5,12 +5,11 @@ package org.opensearch.flint.core; +import java.util.List; import org.opensearch.flint.core.metadata.FlintMetadata; import org.opensearch.flint.core.storage.FlintReader; import org.opensearch.flint.core.storage.FlintWriter; -import java.io.Writer; - /** * Flint index client that provides API for metadata and data operations * on a Flint index regardless of concrete storage. @@ -33,6 +32,14 @@ public interface FlintClient { */ boolean exists(String indexName); + /** + * Retrieve all metadata for Flint index whose name matches the given pattern. + * + * @param indexNamePattern index name pattern + * @return all matched index metadata + */ + List getAllIndexMetadata(String indexNamePattern); + /** * Retrieve metadata in a Flint index. * diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java index bc58a7c5e..4e3c5a76c 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java @@ -5,9 +5,17 @@ package org.opensearch.flint.core.storage; +import static org.opensearch.common.xcontent.DeprecationHandler.IGNORE_DEPRECATIONS; + import com.amazonaws.auth.AWS4Signer; import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; +import java.io.IOException; +import java.lang.reflect.Constructor; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; import org.apache.http.HttpHost; import org.opensearch.action.admin.indices.delete.DeleteIndexRequest; import org.opensearch.client.RequestOptions; @@ -34,13 +42,6 @@ import org.opensearch.search.SearchModule; import org.opensearch.search.builder.SearchSourceBuilder; -import java.io.IOException; -import java.lang.reflect.Constructor; -import java.util.ArrayList; -import java.util.concurrent.atomic.AtomicReference; - -import static org.opensearch.common.xcontent.DeprecationHandler.IGNORE_DEPRECATIONS; - /** * Flint client implementation for OpenSearch storage. */ @@ -79,6 +80,19 @@ public FlintOpenSearchClient(FlintOptions options) { } } + @Override public List getAllIndexMetadata(String indexNamePattern) { + try (RestHighLevelClient client = createClient()) { + GetMappingsRequest request = new GetMappingsRequest().indices(indexNamePattern); + GetMappingsResponse response = client.indices().getMapping(request, RequestOptions.DEFAULT); + + return response.mappings().values().stream() + .map(mapping -> new FlintMetadata(mapping.source().string())) + .collect(Collectors.toList()); + } catch (Exception e) { + throw new IllegalStateException("Failed to get Flint index metadata for " + indexNamePattern, e); + } + } + @Override public FlintMetadata getIndexMetadata(String indexName) { try (RestHighLevelClient client = createClient()) { GetMappingsRequest request = new GetMappingsRequest().indices(indexName); diff --git a/flint-spark-integration/src/main/antlr4/FlintSparkSqlExtensions.g4 b/flint-spark-integration/src/main/antlr4/FlintSparkSqlExtensions.g4 index 303084970..12f69680e 100644 --- a/flint-spark-integration/src/main/antlr4/FlintSparkSqlExtensions.g4 +++ b/flint-spark-integration/src/main/antlr4/FlintSparkSqlExtensions.g4 @@ -47,6 +47,8 @@ dropSkippingIndexStatement coveringIndexStatement : createCoveringIndexStatement | refreshCoveringIndexStatement + | showCoveringIndexStatement + | describeCoveringIndexStatement | dropCoveringIndexStatement ; @@ -60,6 +62,14 @@ refreshCoveringIndexStatement : REFRESH INDEX indexName=identifier ON tableName=multipartIdentifier ; +showCoveringIndexStatement + : SHOW (INDEX | INDEXES) ON tableName=multipartIdentifier + ; + +describeCoveringIndexStatement + : (DESC | DESCRIBE) INDEX indexName=identifier ON tableName=multipartIdentifier + ; + dropCoveringIndexStatement : DROP INDEX indexName=identifier ON tableName=multipartIdentifier ; diff --git a/flint-spark-integration/src/main/antlr4/SparkSqlBase.g4 b/flint-spark-integration/src/main/antlr4/SparkSqlBase.g4 index 17627c190..928f63812 100644 --- a/flint-spark-integration/src/main/antlr4/SparkSqlBase.g4 +++ b/flint-spark-integration/src/main/antlr4/SparkSqlBase.g4 @@ -160,9 +160,11 @@ DESCRIBE: 'DESCRIBE'; DROP: 'DROP'; FALSE: 'FALSE'; INDEX: 'INDEX'; +INDEXES: 'INDEXES'; ON: 'ON'; PARTITION: 'PARTITION'; REFRESH: 'REFRESH'; +SHOW: 'SHOW'; STRING: 'STRING'; TRUE: 'TRUE'; WITH: 'WITH'; diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala index 8058f9bff..0d35f69af 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala @@ -136,6 +136,18 @@ class FlintSpark(val spark: SparkSession) { } } + /** + * Describe all Flint indexes whose name matches the given pattern. + * + * @param indexNamePattern + * index name pattern which may contains wildcard + * @return + * Flint index list + */ + def describeIndexes(indexNamePattern: String): Seq[FlintSparkIndex] = { + flintClient.getAllIndexMetadata(indexNamePattern).asScala.map(deserialize) + } + /** * Describe a Flint index. * diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala index f2f5933d6..46a2a52b5 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.types.StructType * @param indexedColumns * indexed column list */ -class FlintSparkCoveringIndex( +case class FlintSparkCoveringIndex( indexName: String, tableName: String, indexedColumns: Map[String, String]) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/covering/FlintSparkCoveringIndexAstBuilder.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/covering/FlintSparkCoveringIndexAstBuilder.scala index c412b6eb6..203ebc22c 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/covering/FlintSparkCoveringIndexAstBuilder.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/covering/FlintSparkCoveringIndexAstBuilder.scala @@ -11,9 +11,12 @@ import org.opensearch.flint.spark.FlintSpark.RefreshMode import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex import org.opensearch.flint.spark.sql.{FlintSparkSqlCommand, FlintSparkSqlExtensionsVisitor} import org.opensearch.flint.spark.sql.FlintSparkSqlAstBuilder.{getFullTableName, isAutoRefreshEnabled} -import org.opensearch.flint.spark.sql.FlintSparkSqlExtensionsParser.{CreateCoveringIndexStatementContext, DropCoveringIndexStatementContext, RefreshCoveringIndexStatementContext} +import org.opensearch.flint.spark.sql.FlintSparkSqlExtensionsParser._ +import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.expressions.AttributeReference import org.apache.spark.sql.catalyst.plans.logical.Command +import org.apache.spark.sql.types.StringType /** * Flint Spark AST builder that builds Spark command for Flint covering index statement. @@ -55,6 +58,41 @@ trait FlintSparkCoveringIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[C } } + override def visitShowCoveringIndexStatement( + ctx: ShowCoveringIndexStatementContext): Command = { + val outputSchema = Seq(AttributeReference("index_name", StringType, nullable = false)()) + + FlintSparkSqlCommand(outputSchema) { flint => + val fullTableName = getFullTableName(flint, ctx.tableName) + val indexNamePattern = FlintSparkCoveringIndex.getFlintIndexName("*", fullTableName) + flint + .describeIndexes(indexNamePattern) + .collect { case index: FlintSparkCoveringIndex => + Row(index.indexName) + } + } + } + + override def visitDescribeCoveringIndexStatement( + ctx: DescribeCoveringIndexStatementContext): Command = { + val outputSchema = Seq( + AttributeReference("indexed_col_name", StringType, nullable = false)(), + AttributeReference("data_type", StringType, nullable = false)(), + AttributeReference("index_type", StringType, nullable = false)()) + + FlintSparkSqlCommand(outputSchema) { flint => + val indexName = getFlintIndexName(flint, ctx.indexName, ctx.tableName) + flint + .describeIndex(indexName) + .map { case index: FlintSparkCoveringIndex => + index.indexedColumns.map { case (colName, colType) => + Row(colName, colType, "indexed") + }.toSeq + } + .getOrElse(Seq.empty) + } + } + override def visitDropCoveringIndexStatement( ctx: DropCoveringIndexStatementContext): Command = { FlintSparkSqlCommand() { flint => diff --git a/integ-test/src/test/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala index 91ec25c24..72fa1e116 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala @@ -43,6 +43,13 @@ class FlintOpenSearchClientSuite extends AnyFlatSpec with OpenSearchSuite with M flintClient.getIndexMetadata(indexName).getContent should matchJson(content) } + it should "get all index metadata with the given index name pattern" in { + flintClient.createIndex("flint_test_1_index", new FlintMetadata("{}")) + flintClient.createIndex("flint_test_2_index", new FlintMetadata("{}")) + + flintClient.getAllIndexMetadata("flint_*_index") should have size 2 + } + it should "return false if index not exist" in { flintClient.exists("non-exist-index") shouldBe false } diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala index 04b3ed0c8..66d19a261 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala @@ -8,9 +8,12 @@ package org.opensearch.flint.spark import scala.Option.empty import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex.getFlintIndexName +import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.getSkippingIndexName import org.scalatest.matchers.must.Matchers.defined import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper +import org.apache.spark.sql.Row + class FlintSparkCoveringIndexSqlITSuite extends FlintSparkSuite { /** Test table and index name */ @@ -64,6 +67,48 @@ class FlintSparkCoveringIndexSqlITSuite extends FlintSparkSuite { indexData.count() shouldBe 2 } + test("show all covering index on the source table") { + flint + .coveringIndex() + .name(testIndex) + .onTable(testTable) + .addIndexColumns("name", "age") + .create() + + // Create another covering index + flint + .coveringIndex() + .name("idx_address") + .onTable(testTable) + .addIndexColumns("address") + .create() + + // Create a skipping index which is expected to be filtered + flint + .skippingIndex() + .onTable(testTable) + .addPartitions("year", "month") + .create() + + val result = sql(s"SHOW INDEX ON $testTable") + checkAnswer(result, Seq(Row(testIndex), Row("idx_address"))) + + flint.deleteIndex(getFlintIndexName("idx_address", testTable)) + flint.deleteIndex(getSkippingIndexName(testTable)) + } + + test("describe covering index") { + flint + .coveringIndex() + .name(testIndex) + .onTable(testTable) + .addIndexColumns("name", "age") + .create() + + val result = sql(s"DESC INDEX $testIndex ON $testTable") + checkAnswer(result, Seq(Row("name", "string", "indexed"), Row("age", "int", "indexed"))) + } + test("drop covering index") { flint .coveringIndex()