diff --git a/docs/index.md b/docs/index.md index cc70da574..3c1671c98 100644 --- a/docs/index.md +++ b/docs/index.md @@ -227,15 +227,22 @@ User can provide the following options in `WITH` clause of create statement: + `auto_refresh`: triggers Incremental Refresh immediately after index create complete if true. Otherwise, user has to trigger Full Refresh by `REFRESH` statement manually. + `refresh_interval`: a string as the time interval for incremental refresh, e.g. 1 minute, 10 seconds. This is only applicable when auto refresh enabled. Please check `org.apache.spark.unsafe.types.CalendarInterval` for valid duration identifiers. By default, next micro batch will be generated as soon as the previous one complete processing. + `checkpoint_location`: a string as the location path for incremental refresh job checkpoint. The location has to be a path in an HDFS compatible file system and only applicable when auto refresh enabled. If unspecified, temporary checkpoint directory will be used and may result in checkpoint data lost upon restart. ++ `watermark_delay`: a string as time expression for how late data can come and still be processed, e.g. 1 minute, 10 seconds. This is required by incremental refresh on materialized view if it has aggregation in the query. ++ `output_mode`: a mode string that describes how data will be written to streaming sink. If unspecified, default append mode will be applied. + `index_settings`: a JSON string as index settings for OpenSearch index that will be created. Please follow the format in OpenSearch documentation. If unspecified, default OpenSearch index settings will be applied. ++ `extra_options`: a JSON string as extra options that can be passed to Spark streaming source and sink API directly. Use qualified source table name (because there could be multiple) and "sink", e.g. '{"sink": "{key: val}", "table1": {key: val}}' -Note that the index option name is case-sensitive. +Note that the index option name is case-sensitive. Here is an example: ```sql WITH ( - auto_refresh = [true|false], - refresh_interval = 'time interval expression', - checkpoint_location = 'checkpoint directory path' + auto_refresh = true, + refresh_interval = '10 Seconds', + checkpoint_location = 's3://test/', + watermark_delay = '1 Second', + output_mode = 'complete', + index_settings = '{"number_of_shards": 2, "number_of_replicas": 3}', + extra_options = '{"spark_catalog.default.alb_logs": {"maxFilesPerTrigger": "1"}}' ) ``` diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala index 792ef830f..5bdab6276 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala @@ -138,17 +138,18 @@ class FlintSpark(val spark: SparkSession) { .queryName(indexName) .format(FLINT_DATASOURCE) .options(flintSparkConf.properties) - .addIndexOptions(options) + .addSinkOptions(options) .start(indexName) Some(job.id.toString) // Otherwise, fall back to foreachBatch + batch refresh case INCREMENTAL => val job = spark.readStream + .options(options.extraSourceOptions(tableName)) .table(tableName) .writeStream .queryName(indexName) - .addIndexOptions(options) + .addSinkOptions(options) .foreachBatch { (batchDF: DataFrame, _: Long) => batchRefresh(Some(batchDF)) } @@ -237,26 +238,26 @@ class FlintSpark(val spark: SparkSession) { // Using Scala implicit class to avoid breaking method chaining of Spark data frame fluent API private implicit class FlintDataStreamWriter(val dataStream: DataStreamWriter[Row]) { - def addIndexOptions(options: FlintSparkIndexOptions): DataStreamWriter[Row] = { + def addSinkOptions(options: FlintSparkIndexOptions): DataStreamWriter[Row] = { dataStream .addCheckpointLocation(options.checkpointLocation()) .addRefreshInterval(options.refreshInterval()) + .addOutputMode(options.outputMode()) + .options(options.extraSinkOptions()) } def addCheckpointLocation(checkpointLocation: Option[String]): DataStreamWriter[Row] = { - if (checkpointLocation.isDefined) { - dataStream.option("checkpointLocation", checkpointLocation.get) - } else { - dataStream - } + checkpointLocation.map(dataStream.option("checkpointLocation", _)).getOrElse(dataStream) } def addRefreshInterval(refreshInterval: Option[String]): DataStreamWriter[Row] = { - if (refreshInterval.isDefined) { - dataStream.trigger(Trigger.ProcessingTime(refreshInterval.get)) - } else { - dataStream - } + refreshInterval + .map(interval => dataStream.trigger(Trigger.ProcessingTime(interval))) + .getOrElse(dataStream) + } + + def addOutputMode(outputMode: Option[String]): DataStreamWriter[Row] = { + outputMode.map(dataStream.outputMode).getOrElse(dataStream) } } } diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexOptions.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexOptions.scala index b3e7535c3..ffb479b54 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexOptions.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexOptions.scala @@ -5,7 +5,10 @@ package org.opensearch.flint.spark -import org.opensearch.flint.spark.FlintSparkIndexOptions.OptionName.{AUTO_REFRESH, CHECKPOINT_LOCATION, INDEX_SETTINGS, OptionName, REFRESH_INTERVAL} +import org.json4s.{Formats, NoTypeHints} +import org.json4s.native.JsonMethods._ +import org.json4s.native.Serialization +import org.opensearch.flint.spark.FlintSparkIndexOptions.OptionName.{AUTO_REFRESH, CHECKPOINT_LOCATION, EXTRA_OPTIONS, INDEX_SETTINGS, OptionName, OUTPUT_MODE, REFRESH_INTERVAL, WATERMARK_DELAY} import org.opensearch.flint.spark.FlintSparkIndexOptions.validateOptionNames /** @@ -16,6 +19,8 @@ import org.opensearch.flint.spark.FlintSparkIndexOptions.validateOptionNames */ case class FlintSparkIndexOptions(options: Map[String, String]) { + implicit val formats: Formats = Serialization.formats(NoTypeHints) + validateOptionNames(options) /** @@ -42,6 +47,21 @@ case class FlintSparkIndexOptions(options: Map[String, String]) { */ def checkpointLocation(): Option[String] = getOptionValue(CHECKPOINT_LOCATION) + /** + * How late the data can come and still be processed. + * + * @return + * watermark delay time expression + */ + def watermarkDelay(): Option[String] = getOptionValue(WATERMARK_DELAY) + + /** + * The output mode that describes how data will be written to streaming sink. + * @return + * output mode + */ + def outputMode(): Option[String] = getOptionValue(OUTPUT_MODE) + /** * The index settings for OpenSearch index created. * @@ -50,6 +70,28 @@ case class FlintSparkIndexOptions(options: Map[String, String]) { */ def indexSettings(): Option[String] = getOptionValue(INDEX_SETTINGS) + /** + * Extra streaming source options that can be simply passed to DataStreamReader or + * Relation.options + * @param source + * source name (full table name) + * @return + * extra source option map or empty map if not exist + */ + def extraSourceOptions(source: String): Map[String, String] = { + parseExtraOptions(source) + } + + /** + * Extra streaming sink options that can be simply passed to DataStreamWriter.options() + * + * @return + * extra sink option map or empty map if not exist + */ + def extraSinkOptions(): Map[String, String] = { + parseExtraOptions("sink") + } + /** * @return * all option values and fill default value if unspecified @@ -67,6 +109,12 @@ case class FlintSparkIndexOptions(options: Map[String, String]) { private def getOptionValue(name: OptionName): Option[String] = { options.get(name.toString) } + + private def parseExtraOptions(key: String): Map[String, String] = { + getOptionValue(EXTRA_OPTIONS) + .map(opt => (parse(opt) \ key).extract[Map[String, String]]) + .getOrElse(Map.empty) + } } object FlintSparkIndexOptions { @@ -84,7 +132,10 @@ object FlintSparkIndexOptions { val AUTO_REFRESH: OptionName.Value = Value("auto_refresh") val REFRESH_INTERVAL: OptionName.Value = Value("refresh_interval") val CHECKPOINT_LOCATION: OptionName.Value = Value("checkpoint_location") + val WATERMARK_DELAY: OptionName.Value = Value("watermark_delay") + val OUTPUT_MODE: OptionName.Value = Value("output_mode") val INDEX_SETTINGS: OptionName.Value = Value("index_settings") + val EXTRA_OPTIONS: OptionName.Value = Value("extra_options") } /** diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedView.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedView.scala index ee58ec7f5..019cc7aa5 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedView.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedView.scala @@ -8,6 +8,7 @@ package org.opensearch.flint.spark.mv import java.util.Locale import scala.collection.JavaConverters.mapAsJavaMapConverter +import scala.collection.convert.ImplicitConversions.`map AsScala` import org.opensearch.flint.core.metadata.FlintMetadata import org.opensearch.flint.spark.{FlintSpark, FlintSparkIndex, FlintSparkIndexBuilder, FlintSparkIndexOptions} @@ -23,6 +24,7 @@ import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, EventTimeWatermark, LogicalPlan} import org.apache.spark.sql.catalyst.util.IntervalUtils import org.apache.spark.sql.flint.{logicalPlanToDataFrame, qualifyTableName} +import org.apache.spark.sql.util.CaseInsensitiveStringMap /** * Flint materialized view in Spark. @@ -44,9 +46,6 @@ case class FlintSparkMaterializedView( extends FlintSparkIndex with StreamingRefresh { - /** TODO: add it to index option */ - private val watermarkDelay = "0 Minute" - override val kind: String = MV_INDEX_TYPE override def name(): String = getFlintIndexName(mvName) @@ -81,19 +80,33 @@ case class FlintSparkMaterializedView( * 2.Set isStreaming flag to true in Relation operator */ val streamingPlan = batchPlan transform { - case WindowingAggregate(agg, timeCol) => - agg.copy(child = watermark(timeCol, watermarkDelay, agg.child)) + case WindowingAggregate(aggregate, timeCol) => + aggregate.copy(child = watermark(timeCol, aggregate.child)) case relation: UnresolvedRelation if !relation.isStreaming => - relation.copy(isStreaming = true) + relation.copy(isStreaming = true, options = optionsWithExtra(spark, relation)) } logicalPlanToDataFrame(spark, streamingPlan) } - private def watermark(timeCol: Attribute, delay: String, child: LogicalPlan) = { + private def watermark(timeCol: Attribute, child: LogicalPlan) = { + require( + options.watermarkDelay().isDefined, + "watermark delay is required for incremental refresh with aggregation") + + val delay = options.watermarkDelay().get EventTimeWatermark(timeCol, IntervalUtils.fromIntervalString(delay), child) } + private def optionsWithExtra( + spark: SparkSession, + relation: UnresolvedRelation): CaseInsensitiveStringMap = { + val originalOptions = relation.options.asCaseSensitiveMap + val tableName = qualifyTableName(spark, relation.tableName) + val extraOptions = options.extraSourceOptions(tableName).asJava + new CaseInsensitiveStringMap((originalOptions ++ extraOptions).asJava) + } + /** * Extractor that extract event time column out of Aggregate operator. */ @@ -107,7 +120,7 @@ case class FlintSparkMaterializedView( if (winFuncs.size != 1) { throw new IllegalStateException( - "A windowing function is required for streaming aggregation") + "A windowing function is required for incremental refresh with aggregation") } // Assume first aggregate item must be time column diff --git a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexOptionsSuite.scala b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexOptionsSuite.scala index 160a4c9d3..b678096ca 100644 --- a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexOptionsSuite.scala +++ b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexOptionsSuite.scala @@ -5,7 +5,7 @@ package org.opensearch.flint.spark -import org.opensearch.flint.spark.FlintSparkIndexOptions.OptionName.{AUTO_REFRESH, CHECKPOINT_LOCATION, INDEX_SETTINGS, REFRESH_INTERVAL} +import org.opensearch.flint.spark.FlintSparkIndexOptions.OptionName._ import org.scalatest.matchers.should.Matchers import org.apache.spark.FlintSuite @@ -16,7 +16,10 @@ class FlintSparkIndexOptionsSuite extends FlintSuite with Matchers { AUTO_REFRESH.toString shouldBe "auto_refresh" REFRESH_INTERVAL.toString shouldBe "refresh_interval" CHECKPOINT_LOCATION.toString shouldBe "checkpoint_location" + WATERMARK_DELAY.toString shouldBe "watermark_delay" + OUTPUT_MODE.toString shouldBe "output_mode" INDEX_SETTINGS.toString shouldBe "index_settings" + EXTRA_OPTIONS.toString shouldBe "extra_options" } test("should return specified option value") { @@ -25,12 +28,42 @@ class FlintSparkIndexOptionsSuite extends FlintSuite with Matchers { "auto_refresh" -> "true", "refresh_interval" -> "1 Minute", "checkpoint_location" -> "s3://test/", - "index_settings" -> """{"number_of_shards": 3}""")) + "watermark_delay" -> "30 Seconds", + "output_mode" -> "complete", + "index_settings" -> """{"number_of_shards": 3}""", + "extra_options" -> + """ { + | "alb_logs": { + | "opt1": "val1" + | }, + | "sink": { + | "opt2": "val2", + | "opt3": "val3" + | } + | }""".stripMargin)) options.autoRefresh() shouldBe true options.refreshInterval() shouldBe Some("1 Minute") options.checkpointLocation() shouldBe Some("s3://test/") + options.watermarkDelay() shouldBe Some("30 Seconds") + options.outputMode() shouldBe Some("complete") options.indexSettings() shouldBe Some("""{"number_of_shards": 3}""") + options.extraSourceOptions("alb_logs") shouldBe Map("opt1" -> "val1") + options.extraSinkOptions() shouldBe Map("opt2" -> "val2", "opt3" -> "val3") + } + + test("should return extra source option value and empty sink option values") { + val options = FlintSparkIndexOptions( + Map("extra_options" -> + """ { + | "alb_logs": { + | "opt1": "val1" + | } + | }""".stripMargin)) + + options.extraSourceOptions("alb_logs") shouldBe Map("opt1" -> "val1") + options.extraSourceOptions("alb_logs_metrics") shouldBe empty + options.extraSinkOptions() shouldBe empty } test("should return default option value if unspecified") { @@ -39,11 +72,15 @@ class FlintSparkIndexOptionsSuite extends FlintSuite with Matchers { options.autoRefresh() shouldBe false options.refreshInterval() shouldBe empty options.checkpointLocation() shouldBe empty + options.watermarkDelay() shouldBe empty + options.outputMode() shouldBe empty options.indexSettings() shouldBe empty + options.extraSourceOptions("alb_logs") shouldBe empty + options.extraSinkOptions() shouldBe empty options.optionsWithDefault should contain("auto_refresh" -> "false") } - test("should return default option value if unspecified with specified value") { + test("should return include unspecified option if it has default value") { val options = FlintSparkIndexOptions(Map("refresh_interval" -> "1 Minute")) options.optionsWithDefault shouldBe Map( diff --git a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedViewSuite.scala b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedViewSuite.scala index c28495c69..cb32e74d3 100644 --- a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedViewSuite.scala +++ b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedViewSuite.scala @@ -15,7 +15,6 @@ import org.scalatestplus.mockito.MockitoSugar.mock import org.apache.spark.FlintSuite import org.apache.spark.sql.{DataFrame, Row} -import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation import org.apache.spark.sql.catalyst.dsl.expressions.{count, intToLiteral, stringToLiteral, DslAttr, DslExpression, StringToAttributeConversionHelper} import org.apache.spark.sql.catalyst.dsl.plans.DslLogicalPlan @@ -32,6 +31,8 @@ import org.apache.spark.unsafe.types.UTF8String */ class FlintSparkMaterializedViewSuite extends FlintSuite { + /** Test table, MV name and query */ + val testTable = "spark_catalog.default.mv_build_test" val testMvName = "spark_catalog.default.mv" val testQuery = "SELECT 1" @@ -87,38 +88,32 @@ class FlintSparkMaterializedViewSuite extends FlintSuite { } test("build stream should insert watermark operator and replace batch relation") { - val testTable = "mv_build_test" - withTable(testTable) { - sql(s"CREATE TABLE $testTable (time TIMESTAMP, name STRING, age INT) USING CSV") - - val testQuery = - s""" + val testQuery = + s""" | SELECT | window.start AS startTime, | COUNT(*) AS count | FROM $testTable | GROUP BY TUMBLE(time, '1 Minute') |""".stripMargin - - val mv = FlintSparkMaterializedView(testMvName, testQuery, Map.empty) - val actualPlan = mv.buildStream(spark).queryExecution.logical - assert( - actualPlan.sameSemantics( - streamingRelation(testTable) - .watermark($"time", "0 Minute") - .groupBy($"TUMBLE".function($"time", "1 Minute"))( - $"window.start" as "startTime", - count(1) as "count"))) + val options = Map("watermark_delay" -> "30 Seconds") + + withAggregateMaterializedView(testQuery, options) { actualPlan => + comparePlans( + actualPlan, + streamingRelation(testTable) + .watermark($"time", "30 Seconds") + .groupBy($"TUMBLE".function($"time", "1 Minute"))( + $"window.start" as "startTime", + $"COUNT".function(1) as "count"), + checkAnalysis = false + ) // don't analyze due to full test table name } } - test("build stream with filtering query") { - val testTable = "mv_build_test" - withTable(testTable) { - sql(s"CREATE TABLE $testTable (time TIMESTAMP, name STRING, age INT) USING CSV") - - val testQuery = - s""" + test("build stream with filtering aggregate query") { + val testQuery = + s""" | SELECT | window.start AS startTime, | COUNT(*) AS count @@ -126,36 +121,44 @@ class FlintSparkMaterializedViewSuite extends FlintSuite { | WHERE age > 30 | GROUP BY TUMBLE(time, '1 Minute') |""".stripMargin - - val mv = FlintSparkMaterializedView(testMvName, testQuery, Map.empty) - val actualPlan = mv.buildStream(spark).queryExecution.logical - assert( - actualPlan.sameSemantics( - streamingRelation(testTable) - .where($"age" > 30) - .watermark($"time", "0 Minute") - .groupBy($"TUMBLE".function($"time", "1 Minute"))( - $"window.start" as "startTime", - count(1) as "count"))) + val options = Map("watermark_delay" -> "30 Seconds") + + withAggregateMaterializedView(testQuery, options) { actualPlan => + comparePlans( + actualPlan, + streamingRelation(testTable) + .where($"age" > 30) + .watermark($"time", "30 Seconds") + .groupBy($"TUMBLE".function($"time", "1 Minute"))( + $"window.start" as "startTime", + $"COUNT".function(1) as "count"), + checkAnalysis = false) } } test("build stream with non-aggregate query") { - val testTable = "mv_build_test" - withTable(testTable) { - sql(s"CREATE TABLE $testTable (time TIMESTAMP, name STRING, age INT) USING CSV") + val testQuery = s"SELECT name, age FROM $testTable WHERE age > 30" + + withAggregateMaterializedView(testQuery, Map.empty) { actualPlan => + comparePlans( + actualPlan, + streamingRelation(testTable) + .where($"age" > 30) + .select($"name", $"age"), + checkAnalysis = false) + } + } - val mv = FlintSparkMaterializedView( - testMvName, - s"SELECT name, age FROM $testTable WHERE age > 30", - Map.empty) - val actualPlan = mv.buildStream(spark).queryExecution.logical + test("build stream with extra source options") { + val testQuery = s"SELECT name, age FROM $testTable" + val options = Map("extra_options" -> s"""{"$testTable": {"maxFilesPerTrigger": "1"}}""") - assert( - actualPlan.sameSemantics( - streamingRelation(testTable) - .where($"age" > 30) - .select($"name", $"age"))) + withAggregateMaterializedView(testQuery, options) { actualPlan => + comparePlans( + actualPlan, + streamingRelation(testTable, Map("maxFilesPerTrigger" -> "1")) + .select($"name", $"age"), + checkAnalysis = false) } } @@ -173,6 +176,20 @@ class FlintSparkMaterializedViewSuite extends FlintSuite { mv.buildStream(spark) } } + + private def withAggregateMaterializedView(query: String, options: Map[String, String])( + codeBlock: LogicalPlan => Unit): Unit = { + + withTable(testTable) { + sql(s"CREATE TABLE $testTable (time TIMESTAMP, name STRING, age INT) USING CSV") + + val mv = + FlintSparkMaterializedView(testMvName, query, Map.empty, FlintSparkIndexOptions(options)) + + val actualPlan = mv.buildStream(spark).queryExecution.logical + codeBlock(actualPlan) + } + } } /** @@ -180,10 +197,12 @@ class FlintSparkMaterializedViewSuite extends FlintSuite { */ object FlintSparkMaterializedViewSuite { - def streamingRelation(tableName: String): UnresolvedRelation = { - UnresolvedRelation( - TableIdentifier(tableName), - CaseInsensitiveStringMap.empty(), + def streamingRelation( + tableName: String, + extraOptions: Map[String, String] = Map.empty): UnresolvedRelation = { + new UnresolvedRelation( + tableName.split('.'), + new CaseInsensitiveStringMap(extraOptions.asJava), isStreaming = true) } diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala index 627e11f52..0d3f7a887 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala @@ -65,7 +65,8 @@ class FlintSparkCoveringIndexSqlITSuite extends FlintSparkSuite { | WITH ( | auto_refresh = true, | refresh_interval = '5 Seconds', - | checkpoint_location = '${checkpointDir.getAbsolutePath}' + | checkpoint_location = '${checkpointDir.getAbsolutePath}', + | extra_options = '{"$testTable": {"maxFilesPerTrigger": "1"}}' | ) | """.stripMargin) diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala index 29ab433c6..29ce4e248 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala @@ -43,7 +43,11 @@ class FlintSparkMaterializedViewITSuite extends FlintSparkSuite { test("create materialized view with metadata successfully") { val indexOptions = - FlintSparkIndexOptions(Map("auto_refresh" -> "true", "checkpoint_location" -> "s3://test/")) + FlintSparkIndexOptions( + Map( + "auto_refresh" -> "true", + "checkpoint_location" -> "s3://test/", + "watermark_delay" -> "30 Seconds")) flint .materializedView() .name(testMvName) @@ -70,7 +74,8 @@ class FlintSparkMaterializedViewITSuite extends FlintSparkSuite { | }], | "options": { | "auto_refresh": "true", - | "checkpoint_location": "s3://test/" + | "checkpoint_location": "s3://test/", + | "watermark_delay": "30 Seconds" | }, | "properties": {} | }, @@ -147,7 +152,7 @@ class FlintSparkMaterializedViewITSuite extends FlintSparkSuite { } } - test("incremental refresh materialized view with filtering query") { + test("incremental refresh materialized view with filtering aggregate query") { val filterQuery = s""" | SELECT @@ -155,7 +160,7 @@ class FlintSparkMaterializedViewITSuite extends FlintSparkSuite { | COUNT(*) AS count | FROM $testTable | WHERE address = 'Seattle' - | GROUP BY TUMBLE(time, '10 Minutes') + | GROUP BY TUMBLE(time, '5 Minutes') |""".stripMargin withIncrementalMaterializedView(filterQuery) { indexData => @@ -190,7 +195,10 @@ class FlintSparkMaterializedViewITSuite extends FlintSparkSuite { codeBlock: DataFrame => Unit): Unit = { withTempDir { checkpointDir => val indexOptions = FlintSparkIndexOptions( - Map("auto_refresh" -> "true", "checkpoint_location" -> checkpointDir.getAbsolutePath)) + Map( + "auto_refresh" -> "true", + "checkpoint_location" -> checkpointDir.getAbsolutePath, + "watermark_delay" -> "1 Minute")) // This must be small to ensure window closed soon flint .materializedView() diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala index 8811c9bf5..15cd6443e 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala @@ -53,7 +53,8 @@ class FlintSparkMaterializedViewSqlITSuite extends FlintSparkSuite { | AS $testQuery | WITH ( | auto_refresh = true, - | checkpoint_location = '${checkpointDir.getAbsolutePath}' + | checkpoint_location = '${checkpointDir.getAbsolutePath}', + | watermark_delay = '1 Second' | ) |""".stripMargin) @@ -88,15 +89,24 @@ class FlintSparkMaterializedViewSqlITSuite extends FlintSparkSuite { | auto_refresh = true, | refresh_interval = '5 Seconds', | checkpoint_location = '${checkpointDir.getAbsolutePath}', - | index_settings = '{"number_of_shards": 3, "number_of_replicas": 2}' + | watermark_delay = '1 Second', + | output_mode = 'complete', + | index_settings = '{"number_of_shards": 3, "number_of_replicas": 2}', + | extra_options = '{"$testTable": {"maxFilesPerTrigger": "1"}}' | ) |""".stripMargin) val index = flint.describeIndex(testFlintIndex) index shouldBe defined - index.get.options.autoRefresh() shouldBe true - index.get.options.refreshInterval() shouldBe Some("5 Seconds") - index.get.options.checkpointLocation() shouldBe Some(checkpointDir.getAbsolutePath) + + val options = index.get.options + options.autoRefresh() shouldBe true + options.refreshInterval() shouldBe Some("5 Seconds") + options.checkpointLocation() shouldBe Some(checkpointDir.getAbsolutePath) + options.watermarkDelay() shouldBe Some("1 Second") + options.outputMode() shouldBe Some("complete") + options.extraSourceOptions(testTable) shouldBe Map("maxFilesPerTrigger" -> "1") + options.extraSinkOptions() shouldBe Map.empty } } diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala index bfbeba9c3..dbd349b63 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala @@ -69,7 +69,8 @@ class FlintSparkSkippingIndexSqlITSuite extends FlintSparkSuite { | WITH ( | auto_refresh = true, | refresh_interval = '5 Seconds', - | checkpoint_location = '${checkpointDir.getAbsolutePath}' + | checkpoint_location = '${checkpointDir.getAbsolutePath}', + | extra_options = '{"$testTable": {"maxFilesPerTrigger": "1"}}' | ) | """.stripMargin)