Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add create and drop materialized view SQL support #73

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 38 additions & 4 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ A Flint index is ...
- MinMax: skip data scan by maintaining lower and upper bound of the indexed column per file.
- ValueSet: skip data scan by building a unique value set of the indexed column per file.
- Covering Index: create index for selected columns within the source dataset to improve query performance
- Materialized View: enhance query performance by storing precomputed and aggregated data from the source dataset

Please see the following example in which Index Building Logic and Query Rewrite Logic column shows the basic idea behind each skipping index implementation.

Expand Down Expand Up @@ -187,6 +188,30 @@ DESCRIBE INDEX elb_and_requestUri ON alb_logs
DROP INDEX elb_and_requestUri ON alb_logs
```

#### Materialized View

```sql
CREATE MATERIALIZED VIEW [IF NOT EXISTS] name
AS <query>
WITH ( options )

DROP MATERIALIZED VIEW name
```

Example:

```sql
CREATE MATERIALIZED VIEW alb_logs_metrics
AS
SELECT
window.start AS startTime,
COUNT(*) AS count
FROM alb_logs
GROUP BY TUMBLE(time, '1 Minute')

DROP MATERIALIZED VIEW alb_logs_metrics
```

#### Create Index Options

User can provide the following options in `WITH` clause of create statement:
Expand Down Expand Up @@ -226,6 +251,7 @@ OpenSearch index corresponding to the Flint index follows the naming convention

1. Skipping index: `flint_[catalog_database_table]_skipping_index`
2. Covering index: `flint_[catalog_database_table]_[index_name]_index`
3. Materialized view: `flint_[catalog_database_table]_[mv_name]`

It's important to note that any uppercase letters in the index name and table name (catalog, database and table) in SQL statement will be automatically converted to lowercase due to restriction imposed by OpenSearch.

Expand Down Expand Up @@ -345,23 +371,31 @@ val flint = new FlintSpark(spark)

// Skipping index
flint.skippingIndex()
.onTable("alb_logs")
.onTable("spark_catalog.default.alb_logs")
.filterBy("time > 2023-04-01 00:00:00")
.addPartitions("year", "month", "day")
.addValueSet("elb_status_code")
.addMinMax("request_processing_time")
.create()

flint.refresh("flint_alb_logs_skipping_index", FULL)
flint.refreshIndex("flint_spark_catalog_default_alb_logs_skipping_index", FULL)

// Covering index
flint.coveringIndex()
.name("elb_and_requestUri")
.onTable("alb_logs")
.onTable("spark_catalog.default.alb_logs")
.addIndexColumns("elb", "requestUri")
.create()

flint.refresh("flint_alb_logs_elb_and_requestUri_index")
flint.refreshIndex("flint_spark_catalog_default_alb_logs_elb_and_requestUri_index")

// Materialized view
flint.materializedView()
.name("spark_catalog.default.alb_logs_metrics")
.query("SELECT ...")
.create()

flint.refreshIndex("flint_spark_catalog_default_alb_logs_metrics")
```

#### Skipping Index Provider SPI
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ singleStatement
statement
: skippingIndexStatement
| coveringIndexStatement
| materializedViewStatement
;

skippingIndexStatement
Expand Down Expand Up @@ -76,6 +77,29 @@ dropCoveringIndexStatement
: DROP INDEX indexName ON tableName
;

materializedViewStatement
: createMaterializedViewStatement
| dropMaterializedViewStatement
;

createMaterializedViewStatement
: CREATE MATERIALIZED VIEW (IF NOT EXISTS)? mvName=multipartIdentifier
AS query=materializedViewQuery
(WITH LEFT_PAREN propertyList RIGHT_PAREN)?
;

dropMaterializedViewStatement
: DROP MATERIALIZED VIEW mvName=multipartIdentifier
;

/*
* Match all remaining tokens in non-greedy way
* so WITH clause won't be captured by this rule.
*/
materializedViewQuery
: .+?
;

indexColTypeList
: indexColType (COMMA indexColType)*
;
Expand Down
3 changes: 3 additions & 0 deletions flint-spark-integration/src/main/antlr4/SparkSqlBase.g4
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ COMMA: ',';
DOT: '.';


AS: 'AS';
CREATE: 'CREATE';
DESC: 'DESC';
DESCRIBE: 'DESCRIBE';
Expand All @@ -163,12 +164,14 @@ FALSE: 'FALSE';
IF: 'IF';
INDEX: 'INDEX';
INDEXES: 'INDEXES';
MATERIALIZED: 'MATERIALIZED';
NOT: 'NOT';
ON: 'ON';
PARTITION: 'PARTITION';
REFRESH: 'REFRESH';
SHOW: 'SHOW';
TRUE: 'TRUE';
VIEW: 'VIEW';
WITH: 'WITH';


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package org.opensearch.flint.spark.sql
import org.antlr.v4.runtime.tree.{ParseTree, RuleNode}
import org.opensearch.flint.spark.FlintSpark
import org.opensearch.flint.spark.sql.covering.FlintSparkCoveringIndexAstBuilder
import org.opensearch.flint.spark.sql.mv.FlintSparkMaterializedViewAstBuilder
import org.opensearch.flint.spark.sql.skipping.FlintSparkSkippingIndexAstBuilder

import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
Expand All @@ -21,6 +22,7 @@ class FlintSparkSqlAstBuilder
extends FlintSparkSqlExtensionsBaseVisitor[AnyRef]
with FlintSparkSkippingIndexAstBuilder
with FlintSparkCoveringIndexAstBuilder
with FlintSparkMaterializedViewAstBuilder
with SparkSqlAstBuilder {

override def visit(tree: ParseTree): LogicalPlan = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.parser._
import org.apache.spark.sql.catalyst.parser.ParserUtils.withOrigin
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.trees.Origin
import org.apache.spark.sql.types.{DataType, StructType}
Expand All @@ -54,7 +55,10 @@ class FlintSparkSqlParser(sparkParser: ParserInterface) extends ParserInterface

override def parsePlan(sqlText: String): LogicalPlan = parse(sqlText) { flintParser =>
try {
flintAstBuilder.visit(flintParser.singleStatement())
val ctx = flintParser.singleStatement()
withOrigin(ctx, Some(sqlText)) {
flintAstBuilder.visit(ctx)
}
} catch {
// Fall back to Spark parse plan logic if flint cannot parse
case _: ParseException => sparkParser.parsePlan(sqlText)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.spark.sql.mv

import org.antlr.v4.runtime.tree.RuleNode
import org.opensearch.flint.spark.FlintSpark
import org.opensearch.flint.spark.FlintSpark.RefreshMode
import org.opensearch.flint.spark.mv.FlintSparkMaterializedView
import org.opensearch.flint.spark.sql.{FlintSparkSqlCommand, FlintSparkSqlExtensionsVisitor, SparkSqlAstBuilder}
import org.opensearch.flint.spark.sql.FlintSparkSqlAstBuilder.getFullTableName
import org.opensearch.flint.spark.sql.FlintSparkSqlExtensionsParser.{CreateMaterializedViewStatementContext, DropMaterializedViewStatementContext, MaterializedViewQueryContext}

import org.apache.spark.sql.catalyst.trees.CurrentOrigin

/**
* Flint Spark AST builder that builds Spark command for Flint materialized view statement.
*/
trait FlintSparkMaterializedViewAstBuilder extends FlintSparkSqlExtensionsVisitor[AnyRef] {
self: SparkSqlAstBuilder =>

override def visitCreateMaterializedViewStatement(
ctx: CreateMaterializedViewStatementContext): AnyRef = {
FlintSparkSqlCommand() { flint =>
val mvName = getFullTableName(flint, ctx.mvName)
val query = getMvQuery(ctx.query)

val mvBuilder = flint
.materializedView()
.name(mvName)
.query(query)

val ignoreIfExists = ctx.EXISTS() != null
val indexOptions = visitPropertyList(ctx.propertyList())
mvBuilder
.options(indexOptions)
.create(ignoreIfExists)

// Trigger auto refresh if enabled
if (indexOptions.autoRefresh()) {
val flintIndexName = getFlintIndexName(flint, ctx.mvName)
flint.refreshIndex(flintIndexName, RefreshMode.INCREMENTAL)
}
Seq.empty
}
}

override def visitDropMaterializedViewStatement(
ctx: DropMaterializedViewStatementContext): AnyRef = {
FlintSparkSqlCommand() { flint =>
flint.deleteIndex(getFlintIndexName(flint, ctx.mvName))
Seq.empty
}
}

private def getMvQuery(ctx: MaterializedViewQueryContext): String = {
// Assume origin must be preserved at the beginning of parsing
val sqlText = CurrentOrigin.get.sqlText.get
val startIndex = ctx.getStart.getStartIndex
val stopIndex = ctx.getStop.getStopIndex
sqlText.substring(startIndex, stopIndex + 1)
}

private def getFlintIndexName(flint: FlintSpark, mvNameCtx: RuleNode): String = {
val fullMvName = getFullTableName(flint, mvNameCtx)
FlintSparkMaterializedView.getFlintIndexName(fullMvName)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.spark

import java.sql.Timestamp

import scala.Option.empty
import scala.collection.JavaConverters.mapAsJavaMapConverter

import org.json4s.{Formats, NoTypeHints}
import org.json4s.native.JsonMethods.parse
import org.json4s.native.Serialization
import org.opensearch.flint.core.FlintOptions
import org.opensearch.flint.core.storage.FlintOpenSearchClient
import org.opensearch.flint.spark.mv.FlintSparkMaterializedView.getFlintIndexName
import org.scalatest.matchers.must.Matchers.defined
import org.scalatest.matchers.should.Matchers.{convertToAnyShouldWrapper, the}

import org.apache.spark.sql.Row

class FlintSparkMaterializedViewSqlITSuite extends FlintSparkSuite {

/** Test table, MV, index name and query */
private val testTable = "spark_catalog.default.mv_test"
private val testMvName = "spark_catalog.default.mv_test_metrics"
private val testFlintIndex = getFlintIndexName(testMvName)
private val testQuery =
s"""
| SELECT
| window.start AS startTime,
| COUNT(*) AS count
| FROM $testTable
| GROUP BY TUMBLE(time, '10 Minutes')
|""".stripMargin

override def beforeAll(): Unit = {
super.beforeAll()
createTimeSeriesTable(testTable)
}

override def afterEach(): Unit = {
super.afterEach()
flint.deleteIndex(testFlintIndex)
}

test("create materialized view with auto refresh") {
withTempDir { checkpointDir =>
sql(s"""
| CREATE MATERIALIZED VIEW $testMvName
| AS $testQuery
| WITH (
| auto_refresh = true,
| checkpoint_location = '${checkpointDir.getAbsolutePath}'
| )
|""".stripMargin)

// Wait for streaming job complete current micro batch
val job = spark.streams.active.find(_.name == testFlintIndex)
job shouldBe defined
failAfter(streamingTimeout) {
job.get.processAllAvailable()
}

flint.describeIndex(testFlintIndex) shouldBe defined
checkAnswer(
flint.queryIndex(testFlintIndex).select("startTime", "count"),
Seq(
Row(timestamp("2023-10-01 00:00:00"), 1),
Row(timestamp("2023-10-01 00:10:00"), 2),
Row(timestamp("2023-10-01 01:00:00"), 1)
/*
* The last row is pending to fire upon watermark
* Row(timestamp("2023-10-01 02:00:00"), 1)
*/
))
}
}

test("create materialized view with streaming job options") {
withTempDir { checkpointDir =>
sql(s"""
| CREATE MATERIALIZED VIEW $testMvName
| AS $testQuery
| WITH (
| auto_refresh = true,
| refresh_interval = '5 Seconds',
| checkpoint_location = '${checkpointDir.getAbsolutePath}',
| index_settings = '{"number_of_shards": 3, "number_of_replicas": 2}'
| )
|""".stripMargin)

val index = flint.describeIndex(testFlintIndex)
index shouldBe defined
index.get.options.autoRefresh() shouldBe true
index.get.options.refreshInterval() shouldBe Some("5 Seconds")
index.get.options.checkpointLocation() shouldBe Some(checkpointDir.getAbsolutePath)
}
}

test("create materialized view with index settings") {
sql(s"""
| CREATE MATERIALIZED VIEW $testMvName
| AS $testQuery
| WITH (
| index_settings = '{"number_of_shards": 3, "number_of_replicas": 2}'
| )
|""".stripMargin)

// Check if the index setting option is set to OS index setting
val flintClient = new FlintOpenSearchClient(new FlintOptions(openSearchOptions.asJava))

implicit val formats: Formats = Serialization.formats(NoTypeHints)
val settings = parse(flintClient.getIndexMetadata(testFlintIndex).indexSettings.get)
(settings \ "index.number_of_shards").extract[String] shouldBe "3"
(settings \ "index.number_of_replicas").extract[String] shouldBe "2"
}

test("create materialized view if not exists") {
sql(s"CREATE MATERIALIZED VIEW IF NOT EXISTS $testMvName AS $testQuery")
flint.describeIndex(testFlintIndex) shouldBe defined

// Expect error without IF NOT EXISTS, otherwise success
the[IllegalStateException] thrownBy
sql(s"CREATE MATERIALIZED VIEW $testMvName AS $testQuery")

sql(s"CREATE MATERIALIZED VIEW IF NOT EXISTS $testMvName AS $testQuery")
}

test("drop materialized view") {
flint
.materializedView()
.name(testMvName)
.query(testQuery)
.create()

sql(s"DROP MATERIALIZED VIEW $testMvName")

flint.describeIndex(testFlintIndex) shouldBe empty
}

private def timestamp(ts: String): Timestamp = Timestamp.valueOf(ts)
}
Loading