From 00b06fe3ce3cb6b449e8b4e2a982e09298e731e5 Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Wed, 18 Oct 2023 17:26:18 -0700 Subject: [PATCH 1/4] Add show statement with IT Signed-off-by: Chen Dai --- .../main/antlr4/FlintSparkSqlExtensions.g4 | 5 ++++ .../src/main/antlr4/SparkSqlBase.g4 | 2 ++ .../opensearch/flint/spark/FlintSpark.scala | 12 ++++++---- ...FlintSparkMaterializedViewAstBuilder.scala | 23 +++++++++++++++++-- ...FlintSparkMaterializedViewSqlITSuite.scala | 14 +++++++++++ 5 files changed, 50 insertions(+), 6 deletions(-) diff --git a/flint-spark-integration/src/main/antlr4/FlintSparkSqlExtensions.g4 b/flint-spark-integration/src/main/antlr4/FlintSparkSqlExtensions.g4 index 44fd792ba..ccc9c5e64 100644 --- a/flint-spark-integration/src/main/antlr4/FlintSparkSqlExtensions.g4 +++ b/flint-spark-integration/src/main/antlr4/FlintSparkSqlExtensions.g4 @@ -79,6 +79,7 @@ dropCoveringIndexStatement materializedViewStatement : createMaterializedViewStatement + | showMaterializedViewStatement | dropMaterializedViewStatement ; @@ -88,6 +89,10 @@ createMaterializedViewStatement (WITH LEFT_PAREN propertyList RIGHT_PAREN)? ; +showMaterializedViewStatement + : SHOW MATERIALIZED (VIEW | VIEWS) IN catalogDb=multipartIdentifier + ; + dropMaterializedViewStatement : DROP MATERIALIZED VIEW mvName=multipartIdentifier ; diff --git a/flint-spark-integration/src/main/antlr4/SparkSqlBase.g4 b/flint-spark-integration/src/main/antlr4/SparkSqlBase.g4 index 15652aa79..533d851ba 100644 --- a/flint-spark-integration/src/main/antlr4/SparkSqlBase.g4 +++ b/flint-spark-integration/src/main/antlr4/SparkSqlBase.g4 @@ -162,6 +162,7 @@ DROP: 'DROP'; EXISTS: 'EXISTS'; FALSE: 'FALSE'; IF: 'IF'; +IN: 'IN'; INDEX: 'INDEX'; INDEXES: 'INDEXES'; MATERIALIZED: 'MATERIALIZED'; @@ -172,6 +173,7 @@ REFRESH: 'REFRESH'; SHOW: 'SHOW'; TRUE: 'TRUE'; VIEW: 'VIEW'; +VIEWS: 'VIEWS'; WITH: 'WITH'; diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala index 9c78a07f8..792ef830f 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala @@ -166,10 +166,14 @@ class FlintSpark(val spark: SparkSession) { * Flint index list */ def describeIndexes(indexNamePattern: String): Seq[FlintSparkIndex] = { - flintClient - .getAllIndexMetadata(indexNamePattern) - .asScala - .map(FlintSparkIndexFactory.create) + if (flintClient.exists(indexNamePattern)) { + flintClient + .getAllIndexMetadata(indexNamePattern) + .asScala + .map(FlintSparkIndexFactory.create) + } else { + Seq.empty + } } /** diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/mv/FlintSparkMaterializedViewAstBuilder.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/mv/FlintSparkMaterializedViewAstBuilder.scala index 976c6e6bc..4b399cc69 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/mv/FlintSparkMaterializedViewAstBuilder.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/mv/FlintSparkMaterializedViewAstBuilder.scala @@ -6,14 +6,17 @@ package org.opensearch.flint.spark.sql.mv import org.antlr.v4.runtime.tree.RuleNode -import org.opensearch.flint.spark.FlintSpark +import org.opensearch.flint.spark.{FlintSpark, FlintSparkIndex} import org.opensearch.flint.spark.FlintSpark.RefreshMode import org.opensearch.flint.spark.mv.FlintSparkMaterializedView import org.opensearch.flint.spark.sql.{FlintSparkSqlCommand, FlintSparkSqlExtensionsVisitor, SparkSqlAstBuilder} import org.opensearch.flint.spark.sql.FlintSparkSqlAstBuilder.getFullTableName -import org.opensearch.flint.spark.sql.FlintSparkSqlExtensionsParser.{CreateMaterializedViewStatementContext, DropMaterializedViewStatementContext, MaterializedViewQueryContext} +import org.opensearch.flint.spark.sql.FlintSparkSqlExtensionsParser.{CreateMaterializedViewStatementContext, DropMaterializedViewStatementContext, MaterializedViewQueryContext, ShowMaterializedViewStatementContext} +import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.expressions.AttributeReference import org.apache.spark.sql.catalyst.trees.CurrentOrigin +import org.apache.spark.sql.types.StringType /** * Flint Spark AST builder that builds Spark command for Flint materialized view statement. @@ -47,6 +50,22 @@ trait FlintSparkMaterializedViewAstBuilder extends FlintSparkSqlExtensionsVisito } } + override def visitShowMaterializedViewStatement( + ctx: ShowMaterializedViewStatementContext): AnyRef = { + val outputSchema = Seq( + AttributeReference("materialized_view_name", StringType, nullable = false)()) + + FlintSparkSqlCommand(outputSchema) { flint => + val catalogDbName = ctx.catalogDb.getText + val indexNamePattern = FlintSparkIndex.flintIndexNamePrefix(catalogDbName) + "*" + flint + .describeIndexes(indexNamePattern) + .collect { case mv: FlintSparkMaterializedView => + Row(mv.mvName) + } + } + } + override def visitDropMaterializedViewStatement( ctx: DropMaterializedViewStatementContext): AnyRef = { FlintSparkSqlCommand() { flint => diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala index 92b1771f3..eca496c2a 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala @@ -129,6 +129,20 @@ class FlintSparkMaterializedViewSqlITSuite extends FlintSparkSuite { sql(s"CREATE MATERIALIZED VIEW IF NOT EXISTS $testMvName AS $testQuery") } + test("show all materialized views in catalog") { + flint.materializedView().name("spark_catalog.default.mv1").query(testQuery).create() + checkAnswer( + sql(s"SHOW MATERIALIZED VIEW IN spark_catalog"), + Seq(Row("spark_catalog.default.mv1"))) + + flint.materializedView().name("spark_catalog.default.mv2").query(testQuery).create() + checkAnswer( + sql(s"SHOW MATERIALIZED VIEW IN spark_catalog"), + Seq(Row("spark_catalog.default.mv1"), Row("spark_catalog.default.mv2"))) + + checkAnswer(sql(s"SHOW MATERIALIZED VIEW IN spark_catalog.other"), Seq.empty) + } + test("drop materialized view") { flint .materializedView() From 28fdb9cb5870b3d42899727ed46f9143f67129ee Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Wed, 18 Oct 2023 17:52:12 -0700 Subject: [PATCH 2/4] Add desc MV statement with IT Signed-off-by: Chen Dai --- .../main/antlr4/FlintSparkSqlExtensions.g4 | 5 +++++ ...FlintSparkMaterializedViewAstBuilder.scala | 21 ++++++++++++++++++- ...FlintSparkMaterializedViewSqlITSuite.scala | 12 +++++++++++ 3 files changed, 37 insertions(+), 1 deletion(-) diff --git a/flint-spark-integration/src/main/antlr4/FlintSparkSqlExtensions.g4 b/flint-spark-integration/src/main/antlr4/FlintSparkSqlExtensions.g4 index ccc9c5e64..c4af2779d 100644 --- a/flint-spark-integration/src/main/antlr4/FlintSparkSqlExtensions.g4 +++ b/flint-spark-integration/src/main/antlr4/FlintSparkSqlExtensions.g4 @@ -80,6 +80,7 @@ dropCoveringIndexStatement materializedViewStatement : createMaterializedViewStatement | showMaterializedViewStatement + | describeMaterializedViewStatement | dropMaterializedViewStatement ; @@ -93,6 +94,10 @@ showMaterializedViewStatement : SHOW MATERIALIZED (VIEW | VIEWS) IN catalogDb=multipartIdentifier ; +describeMaterializedViewStatement + : (DESC | DESCRIBE) MATERIALIZED VIEW mvName=multipartIdentifier + ; + dropMaterializedViewStatement : DROP MATERIALIZED VIEW mvName=multipartIdentifier ; diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/mv/FlintSparkMaterializedViewAstBuilder.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/mv/FlintSparkMaterializedViewAstBuilder.scala index 4b399cc69..a27736fd1 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/mv/FlintSparkMaterializedViewAstBuilder.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/mv/FlintSparkMaterializedViewAstBuilder.scala @@ -11,7 +11,7 @@ import org.opensearch.flint.spark.FlintSpark.RefreshMode import org.opensearch.flint.spark.mv.FlintSparkMaterializedView import org.opensearch.flint.spark.sql.{FlintSparkSqlCommand, FlintSparkSqlExtensionsVisitor, SparkSqlAstBuilder} import org.opensearch.flint.spark.sql.FlintSparkSqlAstBuilder.getFullTableName -import org.opensearch.flint.spark.sql.FlintSparkSqlExtensionsParser.{CreateMaterializedViewStatementContext, DropMaterializedViewStatementContext, MaterializedViewQueryContext, ShowMaterializedViewStatementContext} +import org.opensearch.flint.spark.sql.FlintSparkSqlExtensionsParser._ import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.expressions.AttributeReference @@ -66,6 +66,25 @@ trait FlintSparkMaterializedViewAstBuilder extends FlintSparkSqlExtensionsVisito } } + override def visitDescribeMaterializedViewStatement( + ctx: DescribeMaterializedViewStatementContext): AnyRef = { + val outputSchema = Seq( + AttributeReference("output_col_name", StringType, nullable = false)(), + AttributeReference("data_type", StringType, nullable = false)()) + + FlintSparkSqlCommand(outputSchema) { flint => + val flintIndexName = getFlintIndexName(flint, ctx.mvName) + flint + .describeIndex(flintIndexName) + .map { case mv: FlintSparkMaterializedView => + mv.outputSchema.map { case (colName, colType) => + Row(colName, colType) + }.toSeq + } + .getOrElse(Seq.empty) + } + } + override def visitDropMaterializedViewStatement( ctx: DropMaterializedViewStatementContext): AnyRef = { FlintSparkSqlCommand() { flint => diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala index eca496c2a..429b355f5 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala @@ -143,6 +143,18 @@ class FlintSparkMaterializedViewSqlITSuite extends FlintSparkSuite { checkAnswer(sql(s"SHOW MATERIALIZED VIEW IN spark_catalog.other"), Seq.empty) } + test("describe materialized view") { + flint + .materializedView() + .name(testMvName) + .query(testQuery) + .create() + + checkAnswer( + sql(s"DESC MATERIALIZED VIEW $testMvName"), + Seq(Row("startTime", "timestamp"), Row("count", "long"))) + } + test("drop materialized view") { flint .materializedView() From aa0bf0b4cd8cfd5932743fe48ab5f4f9a4628b93 Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Wed, 18 Oct 2023 18:00:20 -0700 Subject: [PATCH 3/4] Add more IT for desc statement Signed-off-by: Chen Dai --- .../FlintSparkMaterializedViewSqlITSuite.scala | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala index 429b355f5..8811c9bf5 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala @@ -129,20 +129,26 @@ class FlintSparkMaterializedViewSqlITSuite extends FlintSparkSuite { sql(s"CREATE MATERIALIZED VIEW IF NOT EXISTS $testMvName AS $testQuery") } - test("show all materialized views in catalog") { + test("show all materialized views in catalog and database") { + // Show in catalog flint.materializedView().name("spark_catalog.default.mv1").query(testQuery).create() checkAnswer( sql(s"SHOW MATERIALIZED VIEW IN spark_catalog"), Seq(Row("spark_catalog.default.mv1"))) + // Show in catalog.database flint.materializedView().name("spark_catalog.default.mv2").query(testQuery).create() checkAnswer( - sql(s"SHOW MATERIALIZED VIEW IN spark_catalog"), + sql(s"SHOW MATERIALIZED VIEW IN spark_catalog.default"), Seq(Row("spark_catalog.default.mv1"), Row("spark_catalog.default.mv2"))) checkAnswer(sql(s"SHOW MATERIALIZED VIEW IN spark_catalog.other"), Seq.empty) } + test("should return emtpy when show materialized views in empty database") { + checkAnswer(sql(s"SHOW MATERIALIZED VIEW IN spark_catalog.other"), Seq.empty) + } + test("describe materialized view") { flint .materializedView() @@ -155,6 +161,10 @@ class FlintSparkMaterializedViewSqlITSuite extends FlintSparkSuite { Seq(Row("startTime", "timestamp"), Row("count", "long"))) } + test("should return empty when describe nonexistent materialized view") { + checkAnswer(sql("DESC MATERIALIZED VIEW nonexistent_mv"), Seq()) + } + test("drop materialized view") { flint .materializedView() From 6be65bd874f94fa11ea12cbe71ad3cb350c6951a Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Thu, 19 Oct 2023 08:58:10 -0700 Subject: [PATCH 4/4] Update user manual Signed-off-by: Chen Dai --- docs/index.md | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/docs/index.md b/docs/index.md index 3c7b09fa6..cc70da574 100644 --- a/docs/index.md +++ b/docs/index.md @@ -195,6 +195,10 @@ CREATE MATERIALIZED VIEW [IF NOT EXISTS] name AS WITH ( options ) +SHOW MATERIALIZED [VIEW|VIEWS] IN catalog[.database] + +[DESC|DESCRIBE] MATERIALIZED VIEW name + DROP MATERIALIZED VIEW name ``` @@ -209,6 +213,10 @@ SELECT FROM alb_logs GROUP BY TUMBLE(time, '1 Minute') +SHOW MATERIALIZED VIEWS IN spark_catalog.default + +DESC MATERIALIZED VIEW alb_logs_metrics + DROP MATERIALIZED VIEW alb_logs_metrics ```