Skip to content

Commit

Permalink
Merge branch 'main' into add-more-index-options-for-streaming
Browse files Browse the repository at this point in the history
  • Loading branch information
dai-chen committed Oct 19, 2023
2 parents 30e0aec + f8ec62e commit 94f084e
Show file tree
Hide file tree
Showing 6 changed files with 104 additions and 6 deletions.
8 changes: 8 additions & 0 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,10 @@ CREATE MATERIALIZED VIEW [IF NOT EXISTS] name
AS <query>
WITH ( options )

SHOW MATERIALIZED [VIEW|VIEWS] IN catalog[.database]

[DESC|DESCRIBE] MATERIALIZED VIEW name

DROP MATERIALIZED VIEW name
```

Expand All @@ -209,6 +213,10 @@ SELECT
FROM alb_logs
GROUP BY TUMBLE(time, '1 Minute')

SHOW MATERIALIZED VIEWS IN spark_catalog.default

DESC MATERIALIZED VIEW alb_logs_metrics

DROP MATERIALIZED VIEW alb_logs_metrics
```

Expand Down
10 changes: 10 additions & 0 deletions flint-spark-integration/src/main/antlr4/FlintSparkSqlExtensions.g4
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ dropCoveringIndexStatement

materializedViewStatement
: createMaterializedViewStatement
| showMaterializedViewStatement
| describeMaterializedViewStatement
| dropMaterializedViewStatement
;

Expand All @@ -88,6 +90,14 @@ createMaterializedViewStatement
(WITH LEFT_PAREN propertyList RIGHT_PAREN)?
;

showMaterializedViewStatement
: SHOW MATERIALIZED (VIEW | VIEWS) IN catalogDb=multipartIdentifier
;

describeMaterializedViewStatement
: (DESC | DESCRIBE) MATERIALIZED VIEW mvName=multipartIdentifier
;

dropMaterializedViewStatement
: DROP MATERIALIZED VIEW mvName=multipartIdentifier
;
Expand Down
2 changes: 2 additions & 0 deletions flint-spark-integration/src/main/antlr4/SparkSqlBase.g4
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ DROP: 'DROP';
EXISTS: 'EXISTS';
FALSE: 'FALSE';
IF: 'IF';
IN: 'IN';
INDEX: 'INDEX';
INDEXES: 'INDEXES';
MATERIALIZED: 'MATERIALIZED';
Expand All @@ -172,6 +173,7 @@ REFRESH: 'REFRESH';
SHOW: 'SHOW';
TRUE: 'TRUE';
VIEW: 'VIEW';
VIEWS: 'VIEWS';
WITH: 'WITH';


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,10 +167,14 @@ class FlintSpark(val spark: SparkSession) {
* Flint index list
*/
def describeIndexes(indexNamePattern: String): Seq[FlintSparkIndex] = {
flintClient
.getAllIndexMetadata(indexNamePattern)
.asScala
.map(FlintSparkIndexFactory.create)
if (flintClient.exists(indexNamePattern)) {
flintClient
.getAllIndexMetadata(indexNamePattern)
.asScala
.map(FlintSparkIndexFactory.create)
} else {
Seq.empty
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,17 @@
package org.opensearch.flint.spark.sql.mv

import org.antlr.v4.runtime.tree.RuleNode
import org.opensearch.flint.spark.FlintSpark
import org.opensearch.flint.spark.{FlintSpark, FlintSparkIndex}
import org.opensearch.flint.spark.FlintSpark.RefreshMode
import org.opensearch.flint.spark.mv.FlintSparkMaterializedView
import org.opensearch.flint.spark.sql.{FlintSparkSqlCommand, FlintSparkSqlExtensionsVisitor, SparkSqlAstBuilder}
import org.opensearch.flint.spark.sql.FlintSparkSqlAstBuilder.getFullTableName
import org.opensearch.flint.spark.sql.FlintSparkSqlExtensionsParser.{CreateMaterializedViewStatementContext, DropMaterializedViewStatementContext, MaterializedViewQueryContext}
import org.opensearch.flint.spark.sql.FlintSparkSqlExtensionsParser._

import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.expressions.AttributeReference
import org.apache.spark.sql.catalyst.trees.CurrentOrigin
import org.apache.spark.sql.types.StringType

/**
* Flint Spark AST builder that builds Spark command for Flint materialized view statement.
Expand Down Expand Up @@ -47,6 +50,41 @@ trait FlintSparkMaterializedViewAstBuilder extends FlintSparkSqlExtensionsVisito
}
}

override def visitShowMaterializedViewStatement(
ctx: ShowMaterializedViewStatementContext): AnyRef = {
val outputSchema = Seq(
AttributeReference("materialized_view_name", StringType, nullable = false)())

FlintSparkSqlCommand(outputSchema) { flint =>
val catalogDbName = ctx.catalogDb.getText
val indexNamePattern = FlintSparkIndex.flintIndexNamePrefix(catalogDbName) + "*"
flint
.describeIndexes(indexNamePattern)
.collect { case mv: FlintSparkMaterializedView =>
Row(mv.mvName)
}
}
}

override def visitDescribeMaterializedViewStatement(
ctx: DescribeMaterializedViewStatementContext): AnyRef = {
val outputSchema = Seq(
AttributeReference("output_col_name", StringType, nullable = false)(),
AttributeReference("data_type", StringType, nullable = false)())

FlintSparkSqlCommand(outputSchema) { flint =>
val flintIndexName = getFlintIndexName(flint, ctx.mvName)
flint
.describeIndex(flintIndexName)
.map { case mv: FlintSparkMaterializedView =>
mv.outputSchema.map { case (colName, colType) =>
Row(colName, colType)
}.toSeq
}
.getOrElse(Seq.empty)
}
}

override def visitDropMaterializedViewStatement(
ctx: DropMaterializedViewStatementContext): AnyRef = {
FlintSparkSqlCommand() { flint =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,42 @@ class FlintSparkMaterializedViewSqlITSuite extends FlintSparkSuite {
sql(s"CREATE MATERIALIZED VIEW IF NOT EXISTS $testMvName AS $testQuery")
}

test("show all materialized views in catalog and database") {
// Show in catalog
flint.materializedView().name("spark_catalog.default.mv1").query(testQuery).create()
checkAnswer(
sql(s"SHOW MATERIALIZED VIEW IN spark_catalog"),
Seq(Row("spark_catalog.default.mv1")))

// Show in catalog.database
flint.materializedView().name("spark_catalog.default.mv2").query(testQuery).create()
checkAnswer(
sql(s"SHOW MATERIALIZED VIEW IN spark_catalog.default"),
Seq(Row("spark_catalog.default.mv1"), Row("spark_catalog.default.mv2")))

checkAnswer(sql(s"SHOW MATERIALIZED VIEW IN spark_catalog.other"), Seq.empty)
}

test("should return emtpy when show materialized views in empty database") {
checkAnswer(sql(s"SHOW MATERIALIZED VIEW IN spark_catalog.other"), Seq.empty)
}

test("describe materialized view") {
flint
.materializedView()
.name(testMvName)
.query(testQuery)
.create()

checkAnswer(
sql(s"DESC MATERIALIZED VIEW $testMvName"),
Seq(Row("startTime", "timestamp"), Row("count", "long")))
}

test("should return empty when describe nonexistent materialized view") {
checkAnswer(sql("DESC MATERIALIZED VIEW nonexistent_mv"), Seq())
}

test("drop materialized view") {
flint
.materializedView()
Expand Down

0 comments on commit 94f084e

Please sign in to comment.