diff --git a/docs/index.md b/docs/index.md index 3c7b09fa6..cc70da574 100644 --- a/docs/index.md +++ b/docs/index.md @@ -195,6 +195,10 @@ CREATE MATERIALIZED VIEW [IF NOT EXISTS] name AS WITH ( options ) +SHOW MATERIALIZED [VIEW|VIEWS] IN catalog[.database] + +[DESC|DESCRIBE] MATERIALIZED VIEW name + DROP MATERIALIZED VIEW name ``` @@ -209,6 +213,10 @@ SELECT FROM alb_logs GROUP BY TUMBLE(time, '1 Minute') +SHOW MATERIALIZED VIEWS IN spark_catalog.default + +DESC MATERIALIZED VIEW alb_logs_metrics + DROP MATERIALIZED VIEW alb_logs_metrics ``` diff --git a/flint-spark-integration/src/main/antlr4/FlintSparkSqlExtensions.g4 b/flint-spark-integration/src/main/antlr4/FlintSparkSqlExtensions.g4 index 44fd792ba..c4af2779d 100644 --- a/flint-spark-integration/src/main/antlr4/FlintSparkSqlExtensions.g4 +++ b/flint-spark-integration/src/main/antlr4/FlintSparkSqlExtensions.g4 @@ -79,6 +79,8 @@ dropCoveringIndexStatement materializedViewStatement : createMaterializedViewStatement + | showMaterializedViewStatement + | describeMaterializedViewStatement | dropMaterializedViewStatement ; @@ -88,6 +90,14 @@ createMaterializedViewStatement (WITH LEFT_PAREN propertyList RIGHT_PAREN)? ; +showMaterializedViewStatement + : SHOW MATERIALIZED (VIEW | VIEWS) IN catalogDb=multipartIdentifier + ; + +describeMaterializedViewStatement + : (DESC | DESCRIBE) MATERIALIZED VIEW mvName=multipartIdentifier + ; + dropMaterializedViewStatement : DROP MATERIALIZED VIEW mvName=multipartIdentifier ; diff --git a/flint-spark-integration/src/main/antlr4/SparkSqlBase.g4 b/flint-spark-integration/src/main/antlr4/SparkSqlBase.g4 index 15652aa79..533d851ba 100644 --- a/flint-spark-integration/src/main/antlr4/SparkSqlBase.g4 +++ b/flint-spark-integration/src/main/antlr4/SparkSqlBase.g4 @@ -162,6 +162,7 @@ DROP: 'DROP'; EXISTS: 'EXISTS'; FALSE: 'FALSE'; IF: 'IF'; +IN: 'IN'; INDEX: 'INDEX'; INDEXES: 'INDEXES'; MATERIALIZED: 'MATERIALIZED'; @@ -172,6 +173,7 @@ REFRESH: 'REFRESH'; SHOW: 'SHOW'; TRUE: 'TRUE'; VIEW: 'VIEW'; +VIEWS: 'VIEWS'; WITH: 'WITH'; diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala index 9c78a07f8..792ef830f 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala @@ -166,10 +166,14 @@ class FlintSpark(val spark: SparkSession) { * Flint index list */ def describeIndexes(indexNamePattern: String): Seq[FlintSparkIndex] = { - flintClient - .getAllIndexMetadata(indexNamePattern) - .asScala - .map(FlintSparkIndexFactory.create) + if (flintClient.exists(indexNamePattern)) { + flintClient + .getAllIndexMetadata(indexNamePattern) + .asScala + .map(FlintSparkIndexFactory.create) + } else { + Seq.empty + } } /** diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/mv/FlintSparkMaterializedViewAstBuilder.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/mv/FlintSparkMaterializedViewAstBuilder.scala index 976c6e6bc..a27736fd1 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/mv/FlintSparkMaterializedViewAstBuilder.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/mv/FlintSparkMaterializedViewAstBuilder.scala @@ -6,14 +6,17 @@ package org.opensearch.flint.spark.sql.mv import org.antlr.v4.runtime.tree.RuleNode -import org.opensearch.flint.spark.FlintSpark +import org.opensearch.flint.spark.{FlintSpark, FlintSparkIndex} import org.opensearch.flint.spark.FlintSpark.RefreshMode import org.opensearch.flint.spark.mv.FlintSparkMaterializedView import org.opensearch.flint.spark.sql.{FlintSparkSqlCommand, FlintSparkSqlExtensionsVisitor, SparkSqlAstBuilder} import org.opensearch.flint.spark.sql.FlintSparkSqlAstBuilder.getFullTableName -import org.opensearch.flint.spark.sql.FlintSparkSqlExtensionsParser.{CreateMaterializedViewStatementContext, DropMaterializedViewStatementContext, MaterializedViewQueryContext} +import org.opensearch.flint.spark.sql.FlintSparkSqlExtensionsParser._ +import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.expressions.AttributeReference import org.apache.spark.sql.catalyst.trees.CurrentOrigin +import org.apache.spark.sql.types.StringType /** * Flint Spark AST builder that builds Spark command for Flint materialized view statement. @@ -47,6 +50,41 @@ trait FlintSparkMaterializedViewAstBuilder extends FlintSparkSqlExtensionsVisito } } + override def visitShowMaterializedViewStatement( + ctx: ShowMaterializedViewStatementContext): AnyRef = { + val outputSchema = Seq( + AttributeReference("materialized_view_name", StringType, nullable = false)()) + + FlintSparkSqlCommand(outputSchema) { flint => + val catalogDbName = ctx.catalogDb.getText + val indexNamePattern = FlintSparkIndex.flintIndexNamePrefix(catalogDbName) + "*" + flint + .describeIndexes(indexNamePattern) + .collect { case mv: FlintSparkMaterializedView => + Row(mv.mvName) + } + } + } + + override def visitDescribeMaterializedViewStatement( + ctx: DescribeMaterializedViewStatementContext): AnyRef = { + val outputSchema = Seq( + AttributeReference("output_col_name", StringType, nullable = false)(), + AttributeReference("data_type", StringType, nullable = false)()) + + FlintSparkSqlCommand(outputSchema) { flint => + val flintIndexName = getFlintIndexName(flint, ctx.mvName) + flint + .describeIndex(flintIndexName) + .map { case mv: FlintSparkMaterializedView => + mv.outputSchema.map { case (colName, colType) => + Row(colName, colType) + }.toSeq + } + .getOrElse(Seq.empty) + } + } + override def visitDropMaterializedViewStatement( ctx: DropMaterializedViewStatementContext): AnyRef = { FlintSparkSqlCommand() { flint => diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala index 92b1771f3..8811c9bf5 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala @@ -129,6 +129,42 @@ class FlintSparkMaterializedViewSqlITSuite extends FlintSparkSuite { sql(s"CREATE MATERIALIZED VIEW IF NOT EXISTS $testMvName AS $testQuery") } + test("show all materialized views in catalog and database") { + // Show in catalog + flint.materializedView().name("spark_catalog.default.mv1").query(testQuery).create() + checkAnswer( + sql(s"SHOW MATERIALIZED VIEW IN spark_catalog"), + Seq(Row("spark_catalog.default.mv1"))) + + // Show in catalog.database + flint.materializedView().name("spark_catalog.default.mv2").query(testQuery).create() + checkAnswer( + sql(s"SHOW MATERIALIZED VIEW IN spark_catalog.default"), + Seq(Row("spark_catalog.default.mv1"), Row("spark_catalog.default.mv2"))) + + checkAnswer(sql(s"SHOW MATERIALIZED VIEW IN spark_catalog.other"), Seq.empty) + } + + test("should return emtpy when show materialized views in empty database") { + checkAnswer(sql(s"SHOW MATERIALIZED VIEW IN spark_catalog.other"), Seq.empty) + } + + test("describe materialized view") { + flint + .materializedView() + .name(testMvName) + .query(testQuery) + .create() + + checkAnswer( + sql(s"DESC MATERIALIZED VIEW $testMvName"), + Seq(Row("startTime", "timestamp"), Row("count", "long"))) + } + + test("should return empty when describe nonexistent materialized view") { + checkAnswer(sql("DESC MATERIALIZED VIEW nonexistent_mv"), Seq()) + } + test("drop materialized view") { flint .materializedView()