diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala index d1f490113..dd64214ba 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala @@ -17,6 +17,8 @@ import org.opensearch.flint.spark.FlintSpark.RefreshMode.{FULL, INCREMENTAL, Ref import org.opensearch.flint.spark.FlintSparkIndex.ID_COLUMN import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex.COVERING_INDEX_TYPE +import org.opensearch.flint.spark.mv.FlintSparkMaterializedView +import org.opensearch.flint.spark.mv.FlintSparkMaterializedView.MV_INDEX_TYPE import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.SKIPPING_INDEX_TYPE import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.{SkippingKind, SkippingKindSerializer} @@ -71,6 +73,16 @@ class FlintSpark(val spark: SparkSession) { new FlintSparkCoveringIndex.Builder(this) } + /** + * Create materialized view builder for creating mv with fluent API. + * + * @return + * mv builder + */ + def materializedView(): FlintSparkMaterializedView.Builder = { + new FlintSparkMaterializedView.Builder(this) + } + /** * Create the given index with metadata. * @@ -221,7 +233,7 @@ class FlintSpark(val spark: SparkSession) { private def deserialize(metadata: FlintMetadata): FlintSparkIndex = { val meta = parse(metadata.getContent) \ "_meta" val indexName = (meta \ "name").extract[String] - val tableName = (meta \ "source").extract[String] + val source = (meta \ "source").extract[String] val indexType = (meta \ "kind").extract[String] val indexedColumns = (meta \ "indexedColumns").asInstanceOf[JArray] val indexOptions = FlintSparkIndexOptions( @@ -251,11 +263,19 @@ class FlintSpark(val spark: SparkSession) { throw new IllegalStateException(s"Unknown skipping strategy: $other") } } - new FlintSparkSkippingIndex(tableName, strategies, indexOptions) + new FlintSparkSkippingIndex(source, strategies, indexOptions) case COVERING_INDEX_TYPE => new FlintSparkCoveringIndex( indexName, - tableName, + source, + indexedColumns.arr.map { obj => + ((obj \ "columnName").extract[String], (obj \ "columnType").extract[String]) + }.toMap, + indexOptions) + case MV_INDEX_TYPE => + FlintSparkMaterializedView( + indexName, + source, indexedColumns.arr.map { obj => ((obj \ "columnName").extract[String], (obj \ "columnType").extract[String]) }.toMap, diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedView.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedView.scala new file mode 100644 index 000000000..a19dc2309 --- /dev/null +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedView.scala @@ -0,0 +1,152 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.spark.mv + +import org.json4s.{Formats, NoTypeHints} +import org.json4s.JsonAST.{JArray, JObject, JString} +import org.json4s.native.JsonMethods.{compact, parse, render} +import org.json4s.native.Serialization +import org.opensearch.flint.core.metadata.FlintMetadata +import org.opensearch.flint.spark.{FlintSpark, FlintSparkIndex, FlintSparkIndexBuilder, FlintSparkIndexOptions} +import org.opensearch.flint.spark.FlintSparkIndexOptions.empty +import org.opensearch.flint.spark.mv.FlintSparkMaterializedView.{getFlintIndexName, MV_INDEX_TYPE} + +import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.flint.datatype.FlintDataType +import org.apache.spark.sql.types.StructType +import org.apache.spark.unsafe.types.UTF8String + +/** + * Flint materialized view in Spark. + * + * @param mvName + * materialized view name + * @param query + * unresolved plan + */ +case class FlintSparkMaterializedView( + mvName: String, + query: String, + outputSchema: Map[String, String], + override val options: FlintSparkIndexOptions = empty) + extends FlintSparkIndex { + + /** Required by json4s write function */ + implicit val formats: Formats = Serialization.formats(NoTypeHints) + + /** TODO: add it to index option */ + private val watermarkDelay = UTF8String.fromString("0 Minute") + + override val kind: String = MV_INDEX_TYPE + + override def name(): String = getFlintIndexName(mvName) + + override def metadata(): FlintMetadata = + new FlintMetadata(s"""{ + | "_meta": { + | "name": "$mvName", + | "kind": "$kind", + | "indexedColumns": $getMetaInfo, + | "source": $getEscapedQuery, + | "options": $getIndexOptions + | }, + | "properties": $getSchema + | } + |""".stripMargin) + + private def getMetaInfo: String = { + val objects = outputSchema.map { case (colName, colType) => + JObject("columnName" -> JString(colName), "columnType" -> JString(colType)) + }.toList + Serialization.write(JArray(objects)) + } + + private def getEscapedQuery: String = { + compact(render(JString(query))) + } + + private def getIndexOptions: String = { + Serialization.write(options.options) + } + + private def getSchema: String = { + val catalogDDL = + outputSchema + .map { case (colName, colType) => s"$colName $colType not null" } + .mkString(",") + val properties = FlintDataType.serialize(StructType.fromDDL(catalogDDL)) + compact(render(parse(properties) \ "properties")) + } + + override def build(df: DataFrame): DataFrame = { + null + } +} + +object FlintSparkMaterializedView { + + /** MV index type name */ + val MV_INDEX_TYPE = "mv" + + /** + * Get index name following the convention "flint_" + qualified MV name (replace dot with + * underscore). + * + * @param mvName + * MV name + * @return + * Flint index name + */ + def getFlintIndexName(mvName: String): String = { + require(mvName.contains("."), "Full table name database.mv is required") + + s"flint_${mvName.replace(".", "_")}" + } + + /** Builder class for MV build */ + class Builder(flint: FlintSpark) extends FlintSparkIndexBuilder(flint) { + private var mvName: String = "" + private var query: String = "" + + /** + * Set MV name. + * + * @param mvName + * MV name + * @return + * builder + */ + def name(mvName: String): Builder = { + this.mvName = mvName + this + } + + /** + * Set MV query. + * + * @param query + * MV query + * @return + * builder + */ + def query(query: String): Builder = { + this.query = query + this + } + + override protected def buildIndex(): FlintSparkIndex = { + // TODO: need to change this and Flint DS to support complext field type + val outputSchema = flint.spark + .sql(query) + .schema + .map { field => + field.name -> field.dataType.typeName + } + .toMap + FlintSparkMaterializedView(mvName, query, outputSchema, indexOptions) + } + } +} diff --git a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedViewSuite.scala b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedViewSuite.scala new file mode 100644 index 000000000..b34355932 --- /dev/null +++ b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedViewSuite.scala @@ -0,0 +1,25 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.spark.mv + +import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper + +import org.apache.spark.FlintSuite + +class FlintSparkMaterializedViewSuite extends FlintSuite { + + test("get mv name") { + val mv = FlintSparkMaterializedView("default.mv", "SELECT 1", Map.empty) + mv.name() shouldBe "flint_default_mv" + } + + test("should fail if not full mv name") { + val mv = FlintSparkMaterializedView("mv", "SELECT 1", Map.empty) + assertThrows[IllegalArgumentException] { + mv.name() + } + } +} diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala new file mode 100644 index 000000000..fafd842f8 --- /dev/null +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala @@ -0,0 +1,93 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.spark + +import com.stephenn.scalatest.jsonassert.JsonMatchers.matchJson +import org.opensearch.flint.spark.mv.FlintSparkMaterializedView.getFlintIndexName +import org.scalatest.matchers.must.Matchers.defined +import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper + +class FlintSparkMaterializedViewITSuite extends FlintSparkSuite { + + private val testTable = "default.mv_test" + private val testMv = "default.mv" + private val testFlintIndex = getFlintIndexName(testMv) + + override def beforeAll(): Unit = { + super.beforeAll() + + createPartitionedTable(testTable) + } + + override def afterEach(): Unit = { + super.afterEach() + + // Delete all test indices + flint.deleteIndex(testFlintIndex) + } + + test("create mv with metadata successfully") { + flint + .materializedView() + .name(testMv) + .query(s""" + | SELECT + | window.start AS startTime, + | COUNT(*) AS count + | FROM $testTable + | GROUP BY TUMBLE(time, '1 Hour') + |""".stripMargin) + .create() + + val index = flint.describeIndex(testFlintIndex) + index shouldBe defined + index.get.metadata().getContent should matchJson(s""" + | { + | "_meta": { + | "name": "default.mv", + | "kind": "mv", + | "indexedColumns": [ + | { + | "columnName": "startTime", + | "columnType": "timestamp" + | },{ + | "columnName": "count", + | "columnType": "long" + | }], + | "source": "\n SELECT\n window.start AS startTime,\n COUNT(*) AS count\n FROM default.mv_test\n GROUP BY TUMBLE(time, '1 Hour')\n", + | "options": {} + | }, + | "properties": { + | "startTime": { + | "type": "date", + | "format": "strict_date_optional_time_nanos" + | }, + | "count": { + | "type": "long" + | } + | } + | } + |""".stripMargin) + } + + /* + test("create mv with metadata successfully") { + withTempDir { checkpointDir => + flint + .materializedView() + .name(testMv) + .query(s""" + | SELECT time, COUNT(*) AS count + | FROM $testTable + | GROUP BY TUMBLE(time, '1 Hour') + |""".stripMargin) + .options( + FlintSparkIndexOptions(Map("checkpoint_location" -> checkpointDir.getAbsolutePath))) + .create() + } + } + */ +} diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSuite.scala index edbf5935a..cde812197 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSuite.scala @@ -37,6 +37,7 @@ trait FlintSparkSuite s""" | CREATE TABLE $testTable | ( + | time TIMESTAMP, | name STRING, | age INT, | address STRING @@ -56,14 +57,14 @@ trait FlintSparkSuite s""" | INSERT INTO $testTable | PARTITION (year=2023, month=4) - | VALUES ('Hello', 30, 'Seattle') + | VALUES (TIMESTAMP '2023-09-21 16:30:00', 'Hello', 30, 'Seattle') | """.stripMargin) sql( s""" | INSERT INTO $testTable | PARTITION (year=2023, month=5) - | VALUES ('World', 25, 'Portland') + | VALUES (TIMESTAMP '2023-09-21 17:15:00', 'World', 25, 'Portland') | """.stripMargin) } }