Skip to content

Commit

Permalink
Merge branch 'main' into fix-quoted-identifier-issue
Browse files Browse the repository at this point in the history
  • Loading branch information
dai-chen committed Oct 25, 2023
2 parents d5b4861 + fa8e47a commit 560ed66
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 8 deletions.
4 changes: 4 additions & 0 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,8 @@ CREATE MATERIALIZED VIEW [IF NOT EXISTS] name
AS <query>
WITH ( options )

REFRESH MATERIALIZED VIEW name

SHOW MATERIALIZED [VIEW|VIEWS] IN catalog[.database]

[DESC|DESCRIBE] MATERIALIZED VIEW name
Expand All @@ -213,6 +215,8 @@ SELECT
FROM alb_logs
GROUP BY TUMBLE(time, '1 Minute')

REFRESH MATERIALIZED VIEW alb_logs_metrics

SHOW MATERIALIZED VIEWS IN spark_catalog.default

DESC MATERIALIZED VIEW alb_logs_metrics
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ dropCoveringIndexStatement

materializedViewStatement
: createMaterializedViewStatement
| refreshMaterializedViewStatement
| showMaterializedViewStatement
| describeMaterializedViewStatement
| dropMaterializedViewStatement
Expand All @@ -90,6 +91,10 @@ createMaterializedViewStatement
(WITH LEFT_PAREN propertyList RIGHT_PAREN)?
;

refreshMaterializedViewStatement
: REFRESH MATERIALIZED VIEW mvName=multipartIdentifier
;

showMaterializedViewStatement
: SHOW MATERIALIZED (VIEW | VIEWS) IN catalogDb=multipartIdentifier
;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import org.opensearch.flint.spark.sql.FlintSparkSqlExtensionsParser._

import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.expressions.AttributeReference
import org.apache.spark.sql.catalyst.plans.logical.Command
import org.apache.spark.sql.catalyst.trees.CurrentOrigin
import org.apache.spark.sql.types.StringType

Expand All @@ -25,7 +26,7 @@ trait FlintSparkMaterializedViewAstBuilder extends FlintSparkSqlExtensionsVisito
self: SparkSqlAstBuilder =>

override def visitCreateMaterializedViewStatement(
ctx: CreateMaterializedViewStatementContext): AnyRef = {
ctx: CreateMaterializedViewStatementContext): Command = {
FlintSparkSqlCommand() { flint =>
val mvName = getFullTableName(flint, ctx.mvName)
val query = getMvQuery(ctx.query)
Expand All @@ -50,8 +51,17 @@ trait FlintSparkMaterializedViewAstBuilder extends FlintSparkSqlExtensionsVisito
}
}

override def visitRefreshMaterializedViewStatement(
ctx: RefreshMaterializedViewStatementContext): Command = {
FlintSparkSqlCommand() { flint =>
val flintIndexName = getFlintIndexName(flint, ctx.mvName)
flint.refreshIndex(flintIndexName, RefreshMode.FULL)
Seq.empty
}
}

override def visitShowMaterializedViewStatement(
ctx: ShowMaterializedViewStatementContext): AnyRef = {
ctx: ShowMaterializedViewStatementContext): Command = {
val outputSchema = Seq(
AttributeReference("materialized_view_name", StringType, nullable = false)())

Expand All @@ -67,7 +77,7 @@ trait FlintSparkMaterializedViewAstBuilder extends FlintSparkSqlExtensionsVisito
}

override def visitDescribeMaterializedViewStatement(
ctx: DescribeMaterializedViewStatementContext): AnyRef = {
ctx: DescribeMaterializedViewStatementContext): Command = {
val outputSchema = Seq(
AttributeReference("output_col_name", StringType, nullable = false)(),
AttributeReference("data_type", StringType, nullable = false)())
Expand All @@ -86,7 +96,7 @@ trait FlintSparkMaterializedViewAstBuilder extends FlintSparkSqlExtensionsVisito
}

override def visitDropMaterializedViewStatement(
ctx: DropMaterializedViewStatementContext): AnyRef = {
ctx: DropMaterializedViewStatementContext): Command = {
FlintSparkSqlCommand() { flint =>
flint.deleteIndex(getFlintIndexName(flint, ctx.mvName))
Seq.empty
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,7 @@ class FlintSparkMaterializedViewITSuite extends FlintSparkSuite {
|""".stripMargin)
}

// TODO: fix this windowing function unable to be used in GROUP BY
ignore("full refresh materialized view") {
test("full refresh materialized view") {
flint
.materializedView()
.name(testMvName)
Expand All @@ -104,12 +103,12 @@ class FlintSparkMaterializedViewITSuite extends FlintSparkSuite {

val indexData = flint.queryIndex(testFlintIndex)
checkAnswer(
indexData,
indexData.select("startTime", "count"),
Seq(
Row(timestamp("2023-10-01 00:00:00"), 1),
Row(timestamp("2023-10-01 00:10:00"), 2),
Row(timestamp("2023-10-01 01:00:00"), 1),
Row(timestamp("2023-10-01 02:00:00"), 1)))
Row(timestamp("2023-10-01 03:00:00"), 1)))
}

test("incremental refresh materialized view") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import org.scalatest.matchers.must.Matchers.defined
import org.scalatest.matchers.should.Matchers.{convertToAnyShouldWrapper, the}

import org.apache.spark.sql.Row
import org.apache.spark.sql.flint.FlintDataSourceV2.FLINT_DATASOURCE

class FlintSparkMaterializedViewSqlITSuite extends FlintSparkSuite {

Expand Down Expand Up @@ -128,6 +129,23 @@ class FlintSparkMaterializedViewSqlITSuite extends FlintSparkSuite {
(settings \ "index.number_of_replicas").extract[String] shouldBe "2"
}

test("create materialized view with manual refresh") {
sql(s"""
| CREATE MATERIALIZED VIEW $testMvName
| AS $testQuery
| WITH (
| auto_refresh = false
| )
|""".stripMargin)

val indexData = spark.read.format(FLINT_DATASOURCE).load(testFlintIndex)
flint.describeIndex(testFlintIndex) shouldBe defined
indexData.count() shouldBe 0

sql(s"REFRESH MATERIALIZED VIEW $testMvName")
indexData.count() shouldBe 4
}

test("create materialized view if not exists") {
sql(s"CREATE MATERIALIZED VIEW IF NOT EXISTS $testMvName AS $testQuery")
flint.describeIndex(testFlintIndex) shouldBe defined
Expand Down

0 comments on commit 560ed66

Please sign in to comment.