diff --git a/docs/index.md b/docs/index.md index 3c1671c98..00ee596f9 100644 --- a/docs/index.md +++ b/docs/index.md @@ -195,6 +195,8 @@ CREATE MATERIALIZED VIEW [IF NOT EXISTS] name AS WITH ( options ) +REFRESH MATERIALIZED VIEW name + SHOW MATERIALIZED [VIEW|VIEWS] IN catalog[.database] [DESC|DESCRIBE] MATERIALIZED VIEW name @@ -213,6 +215,8 @@ SELECT FROM alb_logs GROUP BY TUMBLE(time, '1 Minute') +REFRESH MATERIALIZED VIEW alb_logs_metrics + SHOW MATERIALIZED VIEWS IN spark_catalog.default DESC MATERIALIZED VIEW alb_logs_metrics diff --git a/flint-spark-integration/src/main/antlr4/FlintSparkSqlExtensions.g4 b/flint-spark-integration/src/main/antlr4/FlintSparkSqlExtensions.g4 index c4af2779d..f48c276e4 100644 --- a/flint-spark-integration/src/main/antlr4/FlintSparkSqlExtensions.g4 +++ b/flint-spark-integration/src/main/antlr4/FlintSparkSqlExtensions.g4 @@ -79,6 +79,7 @@ dropCoveringIndexStatement materializedViewStatement : createMaterializedViewStatement + | refreshMaterializedViewStatement | showMaterializedViewStatement | describeMaterializedViewStatement | dropMaterializedViewStatement @@ -90,6 +91,10 @@ createMaterializedViewStatement (WITH LEFT_PAREN propertyList RIGHT_PAREN)? ; +refreshMaterializedViewStatement + : REFRESH MATERIALIZED VIEW mvName=multipartIdentifier + ; + showMaterializedViewStatement : SHOW MATERIALIZED (VIEW | VIEWS) IN catalogDb=multipartIdentifier ; diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/mv/FlintSparkMaterializedViewAstBuilder.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/mv/FlintSparkMaterializedViewAstBuilder.scala index a27736fd1..16af7984c 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/mv/FlintSparkMaterializedViewAstBuilder.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/mv/FlintSparkMaterializedViewAstBuilder.scala @@ -15,6 +15,7 @@ import org.opensearch.flint.spark.sql.FlintSparkSqlExtensionsParser._ import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.expressions.AttributeReference +import org.apache.spark.sql.catalyst.plans.logical.Command import org.apache.spark.sql.catalyst.trees.CurrentOrigin import org.apache.spark.sql.types.StringType @@ -25,7 +26,7 @@ trait FlintSparkMaterializedViewAstBuilder extends FlintSparkSqlExtensionsVisito self: SparkSqlAstBuilder => override def visitCreateMaterializedViewStatement( - ctx: CreateMaterializedViewStatementContext): AnyRef = { + ctx: CreateMaterializedViewStatementContext): Command = { FlintSparkSqlCommand() { flint => val mvName = getFullTableName(flint, ctx.mvName) val query = getMvQuery(ctx.query) @@ -50,8 +51,17 @@ trait FlintSparkMaterializedViewAstBuilder extends FlintSparkSqlExtensionsVisito } } + override def visitRefreshMaterializedViewStatement( + ctx: RefreshMaterializedViewStatementContext): Command = { + FlintSparkSqlCommand() { flint => + val flintIndexName = getFlintIndexName(flint, ctx.mvName) + flint.refreshIndex(flintIndexName, RefreshMode.FULL) + Seq.empty + } + } + override def visitShowMaterializedViewStatement( - ctx: ShowMaterializedViewStatementContext): AnyRef = { + ctx: ShowMaterializedViewStatementContext): Command = { val outputSchema = Seq( AttributeReference("materialized_view_name", StringType, nullable = false)()) @@ -67,7 +77,7 @@ trait FlintSparkMaterializedViewAstBuilder extends FlintSparkSqlExtensionsVisito } override def visitDescribeMaterializedViewStatement( - ctx: DescribeMaterializedViewStatementContext): AnyRef = { + ctx: DescribeMaterializedViewStatementContext): Command = { val outputSchema = Seq( AttributeReference("output_col_name", StringType, nullable = false)(), AttributeReference("data_type", StringType, nullable = false)()) @@ -86,7 +96,7 @@ trait FlintSparkMaterializedViewAstBuilder extends FlintSparkSqlExtensionsVisito } override def visitDropMaterializedViewStatement( - ctx: DropMaterializedViewStatementContext): AnyRef = { + ctx: DropMaterializedViewStatementContext): Command = { FlintSparkSqlCommand() { flint => flint.deleteIndex(getFlintIndexName(flint, ctx.mvName)) Seq.empty diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala index 29ce4e248..1b16a9e16 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala @@ -92,8 +92,7 @@ class FlintSparkMaterializedViewITSuite extends FlintSparkSuite { |""".stripMargin) } - // TODO: fix this windowing function unable to be used in GROUP BY - ignore("full refresh materialized view") { + test("full refresh materialized view") { flint .materializedView() .name(testMvName) @@ -104,12 +103,12 @@ class FlintSparkMaterializedViewITSuite extends FlintSparkSuite { val indexData = flint.queryIndex(testFlintIndex) checkAnswer( - indexData, + indexData.select("startTime", "count"), Seq( Row(timestamp("2023-10-01 00:00:00"), 1), Row(timestamp("2023-10-01 00:10:00"), 2), Row(timestamp("2023-10-01 01:00:00"), 1), - Row(timestamp("2023-10-01 02:00:00"), 1))) + Row(timestamp("2023-10-01 03:00:00"), 1))) } test("incremental refresh materialized view") { diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala index e57a95fdb..872939e5f 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala @@ -20,6 +20,7 @@ import org.scalatest.matchers.must.Matchers.defined import org.scalatest.matchers.should.Matchers.{convertToAnyShouldWrapper, the} import org.apache.spark.sql.Row +import org.apache.spark.sql.flint.FlintDataSourceV2.FLINT_DATASOURCE class FlintSparkMaterializedViewSqlITSuite extends FlintSparkSuite { @@ -128,6 +129,23 @@ class FlintSparkMaterializedViewSqlITSuite extends FlintSparkSuite { (settings \ "index.number_of_replicas").extract[String] shouldBe "2" } + test("create materialized view with manual refresh") { + sql(s""" + | CREATE MATERIALIZED VIEW $testMvName + | AS $testQuery + | WITH ( + | auto_refresh = false + | ) + |""".stripMargin) + + val indexData = spark.read.format(FLINT_DATASOURCE).load(testFlintIndex) + flint.describeIndex(testFlintIndex) shouldBe defined + indexData.count() shouldBe 0 + + sql(s"REFRESH MATERIALIZED VIEW $testMvName") + indexData.count() shouldBe 4 + } + test("create materialized view if not exists") { sql(s"CREATE MATERIALIZED VIEW IF NOT EXISTS $testMvName AS $testQuery") flint.describeIndex(testFlintIndex) shouldBe defined