From ea012af59adcc38594e985db92171c97e2e792c9 Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Thu, 19 Oct 2023 14:09:11 -0700 Subject: [PATCH] Add create and drop materialized view SQL support (#73) * Add MV grammar with empty impl Signed-off-by: Chen Dai * Find mv query in origin Signed-off-by: Chen Dai * Implement create and drop statement in ast builder Signed-off-by: Chen Dai * Add MV SQL IT Signed-off-by: Chen Dai * Add more IT for create statement Signed-off-by: Chen Dai * Add more IT for drop statement Signed-off-by: Chen Dai * Update user manual with MV Signed-off-by: Chen Dai * Update doc with MV index naming convention Signed-off-by: Chen Dai --------- Signed-off-by: Chen Dai --- docs/index.md | 42 ++++- .../main/antlr4/FlintSparkSqlExtensions.g4 | 24 +++ .../src/main/antlr4/SparkSqlBase.g4 | 3 + .../spark/sql/FlintSparkSqlAstBuilder.scala | 2 + .../flint/spark/sql/FlintSparkSqlParser.scala | 6 +- ...FlintSparkMaterializedViewAstBuilder.scala | 70 +++++++++ ...FlintSparkMaterializedViewSqlITSuite.scala | 145 ++++++++++++++++++ 7 files changed, 287 insertions(+), 5 deletions(-) create mode 100644 flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/mv/FlintSparkMaterializedViewAstBuilder.scala create mode 100644 integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala diff --git a/docs/index.md b/docs/index.md index 8afdc1fbc..3c7b09fa6 100644 --- a/docs/index.md +++ b/docs/index.md @@ -15,6 +15,7 @@ A Flint index is ... - MinMax: skip data scan by maintaining lower and upper bound of the indexed column per file. - ValueSet: skip data scan by building a unique value set of the indexed column per file. - Covering Index: create index for selected columns within the source dataset to improve query performance +- Materialized View: enhance query performance by storing precomputed and aggregated data from the source dataset Please see the following example in which Index Building Logic and Query Rewrite Logic column shows the basic idea behind each skipping index implementation. @@ -187,6 +188,30 @@ DESCRIBE INDEX elb_and_requestUri ON alb_logs DROP INDEX elb_and_requestUri ON alb_logs ``` +#### Materialized View + +```sql +CREATE MATERIALIZED VIEW [IF NOT EXISTS] name +AS +WITH ( options ) + +DROP MATERIALIZED VIEW name +``` + +Example: + +```sql +CREATE MATERIALIZED VIEW alb_logs_metrics +AS +SELECT + window.start AS startTime, + COUNT(*) AS count +FROM alb_logs +GROUP BY TUMBLE(time, '1 Minute') + +DROP MATERIALIZED VIEW alb_logs_metrics +``` + #### Create Index Options User can provide the following options in `WITH` clause of create statement: @@ -226,6 +251,7 @@ OpenSearch index corresponding to the Flint index follows the naming convention 1. Skipping index: `flint_[catalog_database_table]_skipping_index` 2. Covering index: `flint_[catalog_database_table]_[index_name]_index` +3. Materialized view: `flint_[catalog_database_table]_[mv_name]` It's important to note that any uppercase letters in the index name and table name (catalog, database and table) in SQL statement will be automatically converted to lowercase due to restriction imposed by OpenSearch. @@ -345,23 +371,31 @@ val flint = new FlintSpark(spark) // Skipping index flint.skippingIndex() - .onTable("alb_logs") + .onTable("spark_catalog.default.alb_logs") .filterBy("time > 2023-04-01 00:00:00") .addPartitions("year", "month", "day") .addValueSet("elb_status_code") .addMinMax("request_processing_time") .create() -flint.refresh("flint_alb_logs_skipping_index", FULL) +flint.refreshIndex("flint_spark_catalog_default_alb_logs_skipping_index", FULL) // Covering index flint.coveringIndex() .name("elb_and_requestUri") - .onTable("alb_logs") + .onTable("spark_catalog.default.alb_logs") .addIndexColumns("elb", "requestUri") .create() -flint.refresh("flint_alb_logs_elb_and_requestUri_index") +flint.refreshIndex("flint_spark_catalog_default_alb_logs_elb_and_requestUri_index") + +// Materialized view +flint.materializedView() + .name("spark_catalog.default.alb_logs_metrics") + .query("SELECT ...") + .create() + +flint.refreshIndex("flint_spark_catalog_default_alb_logs_metrics") ``` #### Skipping Index Provider SPI diff --git a/flint-spark-integration/src/main/antlr4/FlintSparkSqlExtensions.g4 b/flint-spark-integration/src/main/antlr4/FlintSparkSqlExtensions.g4 index e8e0264f2..44fd792ba 100644 --- a/flint-spark-integration/src/main/antlr4/FlintSparkSqlExtensions.g4 +++ b/flint-spark-integration/src/main/antlr4/FlintSparkSqlExtensions.g4 @@ -17,6 +17,7 @@ singleStatement statement : skippingIndexStatement | coveringIndexStatement + | materializedViewStatement ; skippingIndexStatement @@ -76,6 +77,29 @@ dropCoveringIndexStatement : DROP INDEX indexName ON tableName ; +materializedViewStatement + : createMaterializedViewStatement + | dropMaterializedViewStatement + ; + +createMaterializedViewStatement + : CREATE MATERIALIZED VIEW (IF NOT EXISTS)? mvName=multipartIdentifier + AS query=materializedViewQuery + (WITH LEFT_PAREN propertyList RIGHT_PAREN)? + ; + +dropMaterializedViewStatement + : DROP MATERIALIZED VIEW mvName=multipartIdentifier + ; + +/* + * Match all remaining tokens in non-greedy way + * so WITH clause won't be captured by this rule. + */ +materializedViewQuery + : .+? + ; + indexColTypeList : indexColType (COMMA indexColType)* ; diff --git a/flint-spark-integration/src/main/antlr4/SparkSqlBase.g4 b/flint-spark-integration/src/main/antlr4/SparkSqlBase.g4 index 4ac1ced5c..15652aa79 100644 --- a/flint-spark-integration/src/main/antlr4/SparkSqlBase.g4 +++ b/flint-spark-integration/src/main/antlr4/SparkSqlBase.g4 @@ -154,6 +154,7 @@ COMMA: ','; DOT: '.'; +AS: 'AS'; CREATE: 'CREATE'; DESC: 'DESC'; DESCRIBE: 'DESCRIBE'; @@ -163,12 +164,14 @@ FALSE: 'FALSE'; IF: 'IF'; INDEX: 'INDEX'; INDEXES: 'INDEXES'; +MATERIALIZED: 'MATERIALIZED'; NOT: 'NOT'; ON: 'ON'; PARTITION: 'PARTITION'; REFRESH: 'REFRESH'; SHOW: 'SHOW'; TRUE: 'TRUE'; +VIEW: 'VIEW'; WITH: 'WITH'; diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/FlintSparkSqlAstBuilder.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/FlintSparkSqlAstBuilder.scala index 98abb3878..a56d99f14 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/FlintSparkSqlAstBuilder.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/FlintSparkSqlAstBuilder.scala @@ -8,6 +8,7 @@ package org.opensearch.flint.spark.sql import org.antlr.v4.runtime.tree.{ParseTree, RuleNode} import org.opensearch.flint.spark.FlintSpark import org.opensearch.flint.spark.sql.covering.FlintSparkCoveringIndexAstBuilder +import org.opensearch.flint.spark.sql.mv.FlintSparkMaterializedViewAstBuilder import org.opensearch.flint.spark.sql.skipping.FlintSparkSkippingIndexAstBuilder import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan @@ -21,6 +22,7 @@ class FlintSparkSqlAstBuilder extends FlintSparkSqlExtensionsBaseVisitor[AnyRef] with FlintSparkSkippingIndexAstBuilder with FlintSparkCoveringIndexAstBuilder + with FlintSparkMaterializedViewAstBuilder with SparkSqlAstBuilder { override def visit(tree: ParseTree): LogicalPlan = { diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/FlintSparkSqlParser.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/FlintSparkSqlParser.scala index 78a9c0628..bb4f1e127 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/FlintSparkSqlParser.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/FlintSparkSqlParser.scala @@ -37,6 +37,7 @@ import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.parser._ +import org.apache.spark.sql.catalyst.parser.ParserUtils.withOrigin import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.trees.Origin import org.apache.spark.sql.types.{DataType, StructType} @@ -54,7 +55,10 @@ class FlintSparkSqlParser(sparkParser: ParserInterface) extends ParserInterface override def parsePlan(sqlText: String): LogicalPlan = parse(sqlText) { flintParser => try { - flintAstBuilder.visit(flintParser.singleStatement()) + val ctx = flintParser.singleStatement() + withOrigin(ctx, Some(sqlText)) { + flintAstBuilder.visit(ctx) + } } catch { // Fall back to Spark parse plan logic if flint cannot parse case _: ParseException => sparkParser.parsePlan(sqlText) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/mv/FlintSparkMaterializedViewAstBuilder.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/mv/FlintSparkMaterializedViewAstBuilder.scala new file mode 100644 index 000000000..976c6e6bc --- /dev/null +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/mv/FlintSparkMaterializedViewAstBuilder.scala @@ -0,0 +1,70 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.spark.sql.mv + +import org.antlr.v4.runtime.tree.RuleNode +import org.opensearch.flint.spark.FlintSpark +import org.opensearch.flint.spark.FlintSpark.RefreshMode +import org.opensearch.flint.spark.mv.FlintSparkMaterializedView +import org.opensearch.flint.spark.sql.{FlintSparkSqlCommand, FlintSparkSqlExtensionsVisitor, SparkSqlAstBuilder} +import org.opensearch.flint.spark.sql.FlintSparkSqlAstBuilder.getFullTableName +import org.opensearch.flint.spark.sql.FlintSparkSqlExtensionsParser.{CreateMaterializedViewStatementContext, DropMaterializedViewStatementContext, MaterializedViewQueryContext} + +import org.apache.spark.sql.catalyst.trees.CurrentOrigin + +/** + * Flint Spark AST builder that builds Spark command for Flint materialized view statement. + */ +trait FlintSparkMaterializedViewAstBuilder extends FlintSparkSqlExtensionsVisitor[AnyRef] { + self: SparkSqlAstBuilder => + + override def visitCreateMaterializedViewStatement( + ctx: CreateMaterializedViewStatementContext): AnyRef = { + FlintSparkSqlCommand() { flint => + val mvName = getFullTableName(flint, ctx.mvName) + val query = getMvQuery(ctx.query) + + val mvBuilder = flint + .materializedView() + .name(mvName) + .query(query) + + val ignoreIfExists = ctx.EXISTS() != null + val indexOptions = visitPropertyList(ctx.propertyList()) + mvBuilder + .options(indexOptions) + .create(ignoreIfExists) + + // Trigger auto refresh if enabled + if (indexOptions.autoRefresh()) { + val flintIndexName = getFlintIndexName(flint, ctx.mvName) + flint.refreshIndex(flintIndexName, RefreshMode.INCREMENTAL) + } + Seq.empty + } + } + + override def visitDropMaterializedViewStatement( + ctx: DropMaterializedViewStatementContext): AnyRef = { + FlintSparkSqlCommand() { flint => + flint.deleteIndex(getFlintIndexName(flint, ctx.mvName)) + Seq.empty + } + } + + private def getMvQuery(ctx: MaterializedViewQueryContext): String = { + // Assume origin must be preserved at the beginning of parsing + val sqlText = CurrentOrigin.get.sqlText.get + val startIndex = ctx.getStart.getStartIndex + val stopIndex = ctx.getStop.getStopIndex + sqlText.substring(startIndex, stopIndex + 1) + } + + private def getFlintIndexName(flint: FlintSpark, mvNameCtx: RuleNode): String = { + val fullMvName = getFullTableName(flint, mvNameCtx) + FlintSparkMaterializedView.getFlintIndexName(fullMvName) + } +} diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala new file mode 100644 index 000000000..92b1771f3 --- /dev/null +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala @@ -0,0 +1,145 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.spark + +import java.sql.Timestamp + +import scala.Option.empty +import scala.collection.JavaConverters.mapAsJavaMapConverter + +import org.json4s.{Formats, NoTypeHints} +import org.json4s.native.JsonMethods.parse +import org.json4s.native.Serialization +import org.opensearch.flint.core.FlintOptions +import org.opensearch.flint.core.storage.FlintOpenSearchClient +import org.opensearch.flint.spark.mv.FlintSparkMaterializedView.getFlintIndexName +import org.scalatest.matchers.must.Matchers.defined +import org.scalatest.matchers.should.Matchers.{convertToAnyShouldWrapper, the} + +import org.apache.spark.sql.Row + +class FlintSparkMaterializedViewSqlITSuite extends FlintSparkSuite { + + /** Test table, MV, index name and query */ + private val testTable = "spark_catalog.default.mv_test" + private val testMvName = "spark_catalog.default.mv_test_metrics" + private val testFlintIndex = getFlintIndexName(testMvName) + private val testQuery = + s""" + | SELECT + | window.start AS startTime, + | COUNT(*) AS count + | FROM $testTable + | GROUP BY TUMBLE(time, '10 Minutes') + |""".stripMargin + + override def beforeAll(): Unit = { + super.beforeAll() + createTimeSeriesTable(testTable) + } + + override def afterEach(): Unit = { + super.afterEach() + flint.deleteIndex(testFlintIndex) + } + + test("create materialized view with auto refresh") { + withTempDir { checkpointDir => + sql(s""" + | CREATE MATERIALIZED VIEW $testMvName + | AS $testQuery + | WITH ( + | auto_refresh = true, + | checkpoint_location = '${checkpointDir.getAbsolutePath}' + | ) + |""".stripMargin) + + // Wait for streaming job complete current micro batch + val job = spark.streams.active.find(_.name == testFlintIndex) + job shouldBe defined + failAfter(streamingTimeout) { + job.get.processAllAvailable() + } + + flint.describeIndex(testFlintIndex) shouldBe defined + checkAnswer( + flint.queryIndex(testFlintIndex).select("startTime", "count"), + Seq( + Row(timestamp("2023-10-01 00:00:00"), 1), + Row(timestamp("2023-10-01 00:10:00"), 2), + Row(timestamp("2023-10-01 01:00:00"), 1) + /* + * The last row is pending to fire upon watermark + * Row(timestamp("2023-10-01 02:00:00"), 1) + */ + )) + } + } + + test("create materialized view with streaming job options") { + withTempDir { checkpointDir => + sql(s""" + | CREATE MATERIALIZED VIEW $testMvName + | AS $testQuery + | WITH ( + | auto_refresh = true, + | refresh_interval = '5 Seconds', + | checkpoint_location = '${checkpointDir.getAbsolutePath}', + | index_settings = '{"number_of_shards": 3, "number_of_replicas": 2}' + | ) + |""".stripMargin) + + val index = flint.describeIndex(testFlintIndex) + index shouldBe defined + index.get.options.autoRefresh() shouldBe true + index.get.options.refreshInterval() shouldBe Some("5 Seconds") + index.get.options.checkpointLocation() shouldBe Some(checkpointDir.getAbsolutePath) + } + } + + test("create materialized view with index settings") { + sql(s""" + | CREATE MATERIALIZED VIEW $testMvName + | AS $testQuery + | WITH ( + | index_settings = '{"number_of_shards": 3, "number_of_replicas": 2}' + | ) + |""".stripMargin) + + // Check if the index setting option is set to OS index setting + val flintClient = new FlintOpenSearchClient(new FlintOptions(openSearchOptions.asJava)) + + implicit val formats: Formats = Serialization.formats(NoTypeHints) + val settings = parse(flintClient.getIndexMetadata(testFlintIndex).indexSettings.get) + (settings \ "index.number_of_shards").extract[String] shouldBe "3" + (settings \ "index.number_of_replicas").extract[String] shouldBe "2" + } + + test("create materialized view if not exists") { + sql(s"CREATE MATERIALIZED VIEW IF NOT EXISTS $testMvName AS $testQuery") + flint.describeIndex(testFlintIndex) shouldBe defined + + // Expect error without IF NOT EXISTS, otherwise success + the[IllegalStateException] thrownBy + sql(s"CREATE MATERIALIZED VIEW $testMvName AS $testQuery") + + sql(s"CREATE MATERIALIZED VIEW IF NOT EXISTS $testMvName AS $testQuery") + } + + test("drop materialized view") { + flint + .materializedView() + .name(testMvName) + .query(testQuery) + .create() + + sql(s"DROP MATERIALIZED VIEW $testMvName") + + flint.describeIndex(testFlintIndex) shouldBe empty + } + + private def timestamp(ts: String): Timestamp = Timestamp.valueOf(ts) +}