Skip to content

Commit

Permalink
Add show statement with IT
Browse files Browse the repository at this point in the history
Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen committed Oct 19, 2023
1 parent 572a8fa commit 62f743a
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ dropCoveringIndexStatement

materializedViewStatement
: createMaterializedViewStatement
| showMaterializedViewStatement
| dropMaterializedViewStatement
;

Expand All @@ -88,6 +89,10 @@ createMaterializedViewStatement
(WITH LEFT_PAREN propertyList RIGHT_PAREN)?
;

showMaterializedViewStatement
: SHOW MATERIALIZED (VIEW | VIEWS) IN catalogDb=multipartIdentifier
;

dropMaterializedViewStatement
: DROP MATERIALIZED VIEW mvName=multipartIdentifier
;
Expand Down
2 changes: 2 additions & 0 deletions flint-spark-integration/src/main/antlr4/SparkSqlBase.g4
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ DROP: 'DROP';
EXISTS: 'EXISTS';
FALSE: 'FALSE';
IF: 'IF';
IN: 'IN';
INDEX: 'INDEX';
INDEXES: 'INDEXES';
MATERIALIZED: 'MATERIALIZED';
Expand All @@ -172,6 +173,7 @@ REFRESH: 'REFRESH';
SHOW: 'SHOW';
TRUE: 'TRUE';
VIEW: 'VIEW';
VIEWS: 'VIEWS';
WITH: 'WITH';


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,10 +166,14 @@ class FlintSpark(val spark: SparkSession) {
* Flint index list
*/
def describeIndexes(indexNamePattern: String): Seq[FlintSparkIndex] = {
flintClient
.getAllIndexMetadata(indexNamePattern)
.asScala
.map(FlintSparkIndexFactory.create)
if (flintClient.exists(indexNamePattern)) {
flintClient
.getAllIndexMetadata(indexNamePattern)
.asScala
.map(FlintSparkIndexFactory.create)
} else {
Seq.empty
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,17 @@
package org.opensearch.flint.spark.sql.mv

import org.antlr.v4.runtime.tree.RuleNode
import org.opensearch.flint.spark.FlintSpark
import org.opensearch.flint.spark.{FlintSpark, FlintSparkIndex}
import org.opensearch.flint.spark.FlintSpark.RefreshMode
import org.opensearch.flint.spark.mv.FlintSparkMaterializedView
import org.opensearch.flint.spark.sql.{FlintSparkSqlCommand, FlintSparkSqlExtensionsVisitor, SparkSqlAstBuilder}
import org.opensearch.flint.spark.sql.FlintSparkSqlAstBuilder.getFullTableName
import org.opensearch.flint.spark.sql.FlintSparkSqlExtensionsParser.{CreateMaterializedViewStatementContext, DropMaterializedViewStatementContext, MaterializedViewQueryContext}
import org.opensearch.flint.spark.sql.FlintSparkSqlExtensionsParser.{CreateMaterializedViewStatementContext, DropMaterializedViewStatementContext, MaterializedViewQueryContext, ShowMaterializedViewStatementContext}

import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.expressions.AttributeReference
import org.apache.spark.sql.catalyst.trees.CurrentOrigin
import org.apache.spark.sql.types.StringType

/**
* Flint Spark AST builder that builds Spark command for Flint materialized view statement.
Expand Down Expand Up @@ -47,6 +50,22 @@ trait FlintSparkMaterializedViewAstBuilder extends FlintSparkSqlExtensionsVisito
}
}

override def visitShowMaterializedViewStatement(
ctx: ShowMaterializedViewStatementContext): AnyRef = {
val outputSchema = Seq(
AttributeReference("materialized_view_name", StringType, nullable = false)())

FlintSparkSqlCommand(outputSchema) { flint =>
val catalogDbName = ctx.catalogDb.getText
val indexNamePattern = FlintSparkIndex.flintIndexNamePrefix(catalogDbName) + "*"
flint
.describeIndexes(indexNamePattern)
.collect { case mv: FlintSparkMaterializedView =>
Row(mv.mvName)
}
}
}

override def visitDropMaterializedViewStatement(
ctx: DropMaterializedViewStatementContext): AnyRef = {
FlintSparkSqlCommand() { flint =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,20 @@ class FlintSparkMaterializedViewSqlITSuite extends FlintSparkSuite {
sql(s"CREATE MATERIALIZED VIEW IF NOT EXISTS $testMvName AS $testQuery")
}

test("show all materialized views in catalog") {
flint.materializedView().name("spark_catalog.default.mv1").query(testQuery).create()
checkAnswer(
sql(s"SHOW MATERIALIZED VIEW IN spark_catalog"),
Seq(Row("spark_catalog.default.mv1")))

flint.materializedView().name("spark_catalog.default.mv2").query(testQuery).create()
checkAnswer(
sql(s"SHOW MATERIALIZED VIEW IN spark_catalog"),
Seq(Row("spark_catalog.default.mv1"), Row("spark_catalog.default.mv2")))

checkAnswer(sql(s"SHOW MATERIALIZED VIEW IN spark_catalog.other"), Seq.empty)
}

test("drop materialized view") {
flint
.materializedView()
Expand Down

0 comments on commit 62f743a

Please sign in to comment.