From 4a2d0633f12a2e3cedad55e8804a6eb3b8f16bf6 Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Wed, 18 Oct 2023 08:27:51 -0700 Subject: [PATCH 01/11] Add watermark delay option Signed-off-by: Chen Dai --- .../flint/spark/FlintSparkIndexOptions.scala | 11 ++++++++++- .../flint/spark/FlintSparkIndexOptionsSuite.scala | 8 ++++++-- 2 files changed, 16 insertions(+), 3 deletions(-) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexOptions.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexOptions.scala index b3e7535c3..dbd6eae2a 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexOptions.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexOptions.scala @@ -5,7 +5,7 @@ package org.opensearch.flint.spark -import org.opensearch.flint.spark.FlintSparkIndexOptions.OptionName.{AUTO_REFRESH, CHECKPOINT_LOCATION, INDEX_SETTINGS, OptionName, REFRESH_INTERVAL} +import org.opensearch.flint.spark.FlintSparkIndexOptions.OptionName.{AUTO_REFRESH, CHECKPOINT_LOCATION, INDEX_SETTINGS, OptionName, REFRESH_INTERVAL, WATERMARK_DELAY} import org.opensearch.flint.spark.FlintSparkIndexOptions.validateOptionNames /** @@ -42,6 +42,14 @@ case class FlintSparkIndexOptions(options: Map[String, String]) { */ def checkpointLocation(): Option[String] = getOptionValue(CHECKPOINT_LOCATION) + /** + * How late the data can come and still be processed. + * + * @return + * watermark delay time expression + */ + def watermarkDelay(): Option[String] = getOptionValue(WATERMARK_DELAY) + /** * The index settings for OpenSearch index created. * @@ -84,6 +92,7 @@ object FlintSparkIndexOptions { val AUTO_REFRESH: OptionName.Value = Value("auto_refresh") val REFRESH_INTERVAL: OptionName.Value = Value("refresh_interval") val CHECKPOINT_LOCATION: OptionName.Value = Value("checkpoint_location") + val WATERMARK_DELAY: OptionName.Value = Value("watermark_delay") val INDEX_SETTINGS: OptionName.Value = Value("index_settings") } diff --git a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexOptionsSuite.scala b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexOptionsSuite.scala index 160a4c9d3..8d2a5d506 100644 --- a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexOptionsSuite.scala +++ b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexOptionsSuite.scala @@ -5,7 +5,7 @@ package org.opensearch.flint.spark -import org.opensearch.flint.spark.FlintSparkIndexOptions.OptionName.{AUTO_REFRESH, CHECKPOINT_LOCATION, INDEX_SETTINGS, REFRESH_INTERVAL} +import org.opensearch.flint.spark.FlintSparkIndexOptions.OptionName._ import org.scalatest.matchers.should.Matchers import org.apache.spark.FlintSuite @@ -16,6 +16,7 @@ class FlintSparkIndexOptionsSuite extends FlintSuite with Matchers { AUTO_REFRESH.toString shouldBe "auto_refresh" REFRESH_INTERVAL.toString shouldBe "refresh_interval" CHECKPOINT_LOCATION.toString shouldBe "checkpoint_location" + WATERMARK_DELAY.toString shouldBe "watermark_delay" INDEX_SETTINGS.toString shouldBe "index_settings" } @@ -25,11 +26,13 @@ class FlintSparkIndexOptionsSuite extends FlintSuite with Matchers { "auto_refresh" -> "true", "refresh_interval" -> "1 Minute", "checkpoint_location" -> "s3://test/", + "watermark_delay" -> "30 Seconds", "index_settings" -> """{"number_of_shards": 3}""")) options.autoRefresh() shouldBe true options.refreshInterval() shouldBe Some("1 Minute") options.checkpointLocation() shouldBe Some("s3://test/") + options.watermarkDelay() shouldBe Some("30 Seconds") options.indexSettings() shouldBe Some("""{"number_of_shards": 3}""") } @@ -39,11 +42,12 @@ class FlintSparkIndexOptionsSuite extends FlintSuite with Matchers { options.autoRefresh() shouldBe false options.refreshInterval() shouldBe empty options.checkpointLocation() shouldBe empty + options.watermarkDelay() shouldBe empty options.indexSettings() shouldBe empty options.optionsWithDefault should contain("auto_refresh" -> "false") } - test("should return default option value if unspecified with specified value") { + test("should return include unspecified option if it has default value") { val options = FlintSparkIndexOptions(Map("refresh_interval" -> "1 Minute")) options.optionsWithDefault shouldBe Map( From 1cca77bd010bfabb03ec106b28458e40ab556053 Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Wed, 18 Oct 2023 09:01:08 -0700 Subject: [PATCH 02/11] Change UT and IT with watermark delay option Signed-off-by: Chen Dai --- .../spark/mv/FlintSparkMaterializedView.scala | 16 ++++++++------- .../mv/FlintSparkMaterializedViewSuite.scala | 20 ++++++++++++++----- .../FlintSparkMaterializedViewITSuite.scala | 18 ++++++++++++----- 3 files changed, 37 insertions(+), 17 deletions(-) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedView.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedView.scala index ee58ec7f5..112de680f 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedView.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedView.scala @@ -44,9 +44,6 @@ case class FlintSparkMaterializedView( extends FlintSparkIndex with StreamingRefresh { - /** TODO: add it to index option */ - private val watermarkDelay = "0 Minute" - override val kind: String = MV_INDEX_TYPE override def name(): String = getFlintIndexName(mvName) @@ -81,8 +78,8 @@ case class FlintSparkMaterializedView( * 2.Set isStreaming flag to true in Relation operator */ val streamingPlan = batchPlan transform { - case WindowingAggregate(agg, timeCol) => - agg.copy(child = watermark(timeCol, watermarkDelay, agg.child)) + case WindowingAggregate(aggregate, timeCol) => + aggregate.copy(child = watermark(timeCol, aggregate.child)) case relation: UnresolvedRelation if !relation.isStreaming => relation.copy(isStreaming = true) @@ -90,7 +87,12 @@ case class FlintSparkMaterializedView( logicalPlanToDataFrame(spark, streamingPlan) } - private def watermark(timeCol: Attribute, delay: String, child: LogicalPlan) = { + private def watermark(timeCol: Attribute, child: LogicalPlan) = { + require( + options.watermarkDelay().isDefined, + "watermark delay is required for incremental refresh with aggregation") + + val delay = options.watermarkDelay().get EventTimeWatermark(timeCol, IntervalUtils.fromIntervalString(delay), child) } @@ -107,7 +109,7 @@ case class FlintSparkMaterializedView( if (winFuncs.size != 1) { throw new IllegalStateException( - "A windowing function is required for streaming aggregation") + "A windowing function is required for incremental refresh with aggregation") } // Assume first aggregate item must be time column diff --git a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedViewSuite.scala b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedViewSuite.scala index c28495c69..1f9b52963 100644 --- a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedViewSuite.scala +++ b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedViewSuite.scala @@ -100,19 +100,24 @@ class FlintSparkMaterializedViewSuite extends FlintSuite { | GROUP BY TUMBLE(time, '1 Minute') |""".stripMargin - val mv = FlintSparkMaterializedView(testMvName, testQuery, Map.empty) + val mv = FlintSparkMaterializedView( + testMvName, + testQuery, + Map.empty, + FlintSparkIndexOptions(Map("watermark_delay" -> "30 Seconds"))) + val actualPlan = mv.buildStream(spark).queryExecution.logical assert( actualPlan.sameSemantics( streamingRelation(testTable) - .watermark($"time", "0 Minute") + .watermark($"time", "30 Seconds") .groupBy($"TUMBLE".function($"time", "1 Minute"))( $"window.start" as "startTime", count(1) as "count"))) } } - test("build stream with filtering query") { + test("build stream with filtering aggregate query") { val testTable = "mv_build_test" withTable(testTable) { sql(s"CREATE TABLE $testTable (time TIMESTAMP, name STRING, age INT) USING CSV") @@ -127,13 +132,18 @@ class FlintSparkMaterializedViewSuite extends FlintSuite { | GROUP BY TUMBLE(time, '1 Minute') |""".stripMargin - val mv = FlintSparkMaterializedView(testMvName, testQuery, Map.empty) + val mv = FlintSparkMaterializedView( + testMvName, + testQuery, + Map.empty, + FlintSparkIndexOptions(Map("watermark_delay" -> "30 Seconds"))) + val actualPlan = mv.buildStream(spark).queryExecution.logical assert( actualPlan.sameSemantics( streamingRelation(testTable) .where($"age" > 30) - .watermark($"time", "0 Minute") + .watermark($"time", "30 Seconds") .groupBy($"TUMBLE".function($"time", "1 Minute"))( $"window.start" as "startTime", count(1) as "count"))) diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala index 29ab433c6..29ce4e248 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala @@ -43,7 +43,11 @@ class FlintSparkMaterializedViewITSuite extends FlintSparkSuite { test("create materialized view with metadata successfully") { val indexOptions = - FlintSparkIndexOptions(Map("auto_refresh" -> "true", "checkpoint_location" -> "s3://test/")) + FlintSparkIndexOptions( + Map( + "auto_refresh" -> "true", + "checkpoint_location" -> "s3://test/", + "watermark_delay" -> "30 Seconds")) flint .materializedView() .name(testMvName) @@ -70,7 +74,8 @@ class FlintSparkMaterializedViewITSuite extends FlintSparkSuite { | }], | "options": { | "auto_refresh": "true", - | "checkpoint_location": "s3://test/" + | "checkpoint_location": "s3://test/", + | "watermark_delay": "30 Seconds" | }, | "properties": {} | }, @@ -147,7 +152,7 @@ class FlintSparkMaterializedViewITSuite extends FlintSparkSuite { } } - test("incremental refresh materialized view with filtering query") { + test("incremental refresh materialized view with filtering aggregate query") { val filterQuery = s""" | SELECT @@ -155,7 +160,7 @@ class FlintSparkMaterializedViewITSuite extends FlintSparkSuite { | COUNT(*) AS count | FROM $testTable | WHERE address = 'Seattle' - | GROUP BY TUMBLE(time, '10 Minutes') + | GROUP BY TUMBLE(time, '5 Minutes') |""".stripMargin withIncrementalMaterializedView(filterQuery) { indexData => @@ -190,7 +195,10 @@ class FlintSparkMaterializedViewITSuite extends FlintSparkSuite { codeBlock: DataFrame => Unit): Unit = { withTempDir { checkpointDir => val indexOptions = FlintSparkIndexOptions( - Map("auto_refresh" -> "true", "checkpoint_location" -> checkpointDir.getAbsolutePath)) + Map( + "auto_refresh" -> "true", + "checkpoint_location" -> checkpointDir.getAbsolutePath, + "watermark_delay" -> "1 Minute")) // This must be small to ensure window closed soon flint .materializedView() From f5d5ac1cd445a68c2c672a041b9bd270524b4894 Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Wed, 18 Oct 2023 12:15:07 -0700 Subject: [PATCH 03/11] Update MV SQL IT Signed-off-by: Chen Dai --- .../FlintSparkMaterializedViewSqlITSuite.scala | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala index 92b1771f3..c0f08c5a6 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala @@ -53,7 +53,8 @@ class FlintSparkMaterializedViewSqlITSuite extends FlintSparkSuite { | AS $testQuery | WITH ( | auto_refresh = true, - | checkpoint_location = '${checkpointDir.getAbsolutePath}' + | checkpoint_location = '${checkpointDir.getAbsolutePath}', + | watermark_delay = '1 Second' | ) |""".stripMargin) @@ -88,15 +89,19 @@ class FlintSparkMaterializedViewSqlITSuite extends FlintSparkSuite { | auto_refresh = true, | refresh_interval = '5 Seconds', | checkpoint_location = '${checkpointDir.getAbsolutePath}', + | watermark_delay = '1 Second', | index_settings = '{"number_of_shards": 3, "number_of_replicas": 2}' | ) |""".stripMargin) val index = flint.describeIndex(testFlintIndex) index shouldBe defined - index.get.options.autoRefresh() shouldBe true - index.get.options.refreshInterval() shouldBe Some("5 Seconds") - index.get.options.checkpointLocation() shouldBe Some(checkpointDir.getAbsolutePath) + + val options = index.get.options + options.autoRefresh() shouldBe true + options.refreshInterval() shouldBe Some("5 Seconds") + options.checkpointLocation() shouldBe Some(checkpointDir.getAbsolutePath) + options.watermarkDelay() shouldBe Some("1 Second") } } From 06151c2b3481044181390725511c43dcb95fe056 Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Wed, 18 Oct 2023 12:56:42 -0700 Subject: [PATCH 04/11] Add output mode option Signed-off-by: Chen Dai --- .../opensearch/flint/spark/FlintSpark.scala | 19 +++++++++---------- .../flint/spark/FlintSparkIndexOptions.scala | 10 +++++++++- .../spark/FlintSparkIndexOptionsSuite.scala | 4 ++++ ...FlintSparkMaterializedViewSqlITSuite.scala | 2 ++ 4 files changed, 24 insertions(+), 11 deletions(-) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala index 9c78a07f8..8447128f3 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala @@ -237,22 +237,21 @@ class FlintSpark(val spark: SparkSession) { dataStream .addCheckpointLocation(options.checkpointLocation()) .addRefreshInterval(options.refreshInterval()) + .addOutputMode(options.outputMode()) } def addCheckpointLocation(checkpointLocation: Option[String]): DataStreamWriter[Row] = { - if (checkpointLocation.isDefined) { - dataStream.option("checkpointLocation", checkpointLocation.get) - } else { - dataStream - } + checkpointLocation.map(dataStream.option("checkpointLocation", _)).getOrElse(dataStream) } def addRefreshInterval(refreshInterval: Option[String]): DataStreamWriter[Row] = { - if (refreshInterval.isDefined) { - dataStream.trigger(Trigger.ProcessingTime(refreshInterval.get)) - } else { - dataStream - } + refreshInterval + .map(interval => dataStream.trigger(Trigger.ProcessingTime(interval))) + .getOrElse(dataStream) + } + + def addOutputMode(outputMode: Option[String]): DataStreamWriter[Row] = { + outputMode.map(dataStream.outputMode).getOrElse(dataStream) } } } diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexOptions.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexOptions.scala index dbd6eae2a..5e7f42f51 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexOptions.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexOptions.scala @@ -5,7 +5,7 @@ package org.opensearch.flint.spark -import org.opensearch.flint.spark.FlintSparkIndexOptions.OptionName.{AUTO_REFRESH, CHECKPOINT_LOCATION, INDEX_SETTINGS, OptionName, REFRESH_INTERVAL, WATERMARK_DELAY} +import org.opensearch.flint.spark.FlintSparkIndexOptions.OptionName.{AUTO_REFRESH, CHECKPOINT_LOCATION, INDEX_SETTINGS, OptionName, OUTPUT_MODE, REFRESH_INTERVAL, WATERMARK_DELAY} import org.opensearch.flint.spark.FlintSparkIndexOptions.validateOptionNames /** @@ -50,6 +50,13 @@ case class FlintSparkIndexOptions(options: Map[String, String]) { */ def watermarkDelay(): Option[String] = getOptionValue(WATERMARK_DELAY) + /** + * The output mode that describes how data will be written to streaming sink. + * @return + * output mode + */ + def outputMode(): Option[String] = getOptionValue(OUTPUT_MODE) + /** * The index settings for OpenSearch index created. * @@ -93,6 +100,7 @@ object FlintSparkIndexOptions { val REFRESH_INTERVAL: OptionName.Value = Value("refresh_interval") val CHECKPOINT_LOCATION: OptionName.Value = Value("checkpoint_location") val WATERMARK_DELAY: OptionName.Value = Value("watermark_delay") + val OUTPUT_MODE: OptionName.Value = Value("output_mode") val INDEX_SETTINGS: OptionName.Value = Value("index_settings") } diff --git a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexOptionsSuite.scala b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexOptionsSuite.scala index 8d2a5d506..2ec299823 100644 --- a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexOptionsSuite.scala +++ b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexOptionsSuite.scala @@ -17,6 +17,7 @@ class FlintSparkIndexOptionsSuite extends FlintSuite with Matchers { REFRESH_INTERVAL.toString shouldBe "refresh_interval" CHECKPOINT_LOCATION.toString shouldBe "checkpoint_location" WATERMARK_DELAY.toString shouldBe "watermark_delay" + OUTPUT_MODE.toString shouldBe "output_mode" INDEX_SETTINGS.toString shouldBe "index_settings" } @@ -27,12 +28,14 @@ class FlintSparkIndexOptionsSuite extends FlintSuite with Matchers { "refresh_interval" -> "1 Minute", "checkpoint_location" -> "s3://test/", "watermark_delay" -> "30 Seconds", + "output_mode" -> "complete", "index_settings" -> """{"number_of_shards": 3}""")) options.autoRefresh() shouldBe true options.refreshInterval() shouldBe Some("1 Minute") options.checkpointLocation() shouldBe Some("s3://test/") options.watermarkDelay() shouldBe Some("30 Seconds") + options.outputMode() shouldBe Some("complete") options.indexSettings() shouldBe Some("""{"number_of_shards": 3}""") } @@ -44,6 +47,7 @@ class FlintSparkIndexOptionsSuite extends FlintSuite with Matchers { options.checkpointLocation() shouldBe empty options.watermarkDelay() shouldBe empty options.indexSettings() shouldBe empty + options.outputMode() shouldBe empty options.optionsWithDefault should contain("auto_refresh" -> "false") } diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala index c0f08c5a6..0073688e0 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala @@ -90,6 +90,7 @@ class FlintSparkMaterializedViewSqlITSuite extends FlintSparkSuite { | refresh_interval = '5 Seconds', | checkpoint_location = '${checkpointDir.getAbsolutePath}', | watermark_delay = '1 Second', + | output_mode = 'complete', | index_settings = '{"number_of_shards": 3, "number_of_replicas": 2}' | ) |""".stripMargin) @@ -102,6 +103,7 @@ class FlintSparkMaterializedViewSqlITSuite extends FlintSparkSuite { options.refreshInterval() shouldBe Some("5 Seconds") options.checkpointLocation() shouldBe Some(checkpointDir.getAbsolutePath) options.watermarkDelay() shouldBe Some("1 Second") + options.outputMode() shouldBe Some("complete") } } From 0a4bb6096a59bf37312994c944e1b7d290ae1761 Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Wed, 18 Oct 2023 13:35:31 -0700 Subject: [PATCH 05/11] Add extra sink options Signed-off-by: Chen Dai --- .../opensearch/flint/spark/FlintSpark.scala | 1 + .../flint/spark/FlintSparkIndexOptions.scala | 20 ++++++++++++++++++- .../spark/FlintSparkIndexOptionsSuite.scala | 8 ++++++-- 3 files changed, 26 insertions(+), 3 deletions(-) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala index 8447128f3..d7c8ae8cc 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala @@ -238,6 +238,7 @@ class FlintSpark(val spark: SparkSession) { .addCheckpointLocation(options.checkpointLocation()) .addRefreshInterval(options.refreshInterval()) .addOutputMode(options.outputMode()) + .options(options.extraSinkOptions()) } def addCheckpointLocation(checkpointLocation: Option[String]): DataStreamWriter[Row] = { diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexOptions.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexOptions.scala index 5e7f42f51..26a288629 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexOptions.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexOptions.scala @@ -5,7 +5,10 @@ package org.opensearch.flint.spark -import org.opensearch.flint.spark.FlintSparkIndexOptions.OptionName.{AUTO_REFRESH, CHECKPOINT_LOCATION, INDEX_SETTINGS, OptionName, OUTPUT_MODE, REFRESH_INTERVAL, WATERMARK_DELAY} +import org.json4s.{Formats, NoTypeHints} +import org.json4s.native.JsonMethods._ +import org.json4s.native.Serialization +import org.opensearch.flint.spark.FlintSparkIndexOptions.OptionName.{AUTO_REFRESH, CHECKPOINT_LOCATION, EXTRA_OPTIONS, INDEX_SETTINGS, OptionName, OUTPUT_MODE, REFRESH_INTERVAL, WATERMARK_DELAY} import org.opensearch.flint.spark.FlintSparkIndexOptions.validateOptionNames /** @@ -16,6 +19,8 @@ import org.opensearch.flint.spark.FlintSparkIndexOptions.validateOptionNames */ case class FlintSparkIndexOptions(options: Map[String, String]) { + implicit val formats: Formats = Serialization.formats(NoTypeHints) + validateOptionNames(options) /** @@ -65,6 +70,18 @@ case class FlintSparkIndexOptions(options: Map[String, String]) { */ def indexSettings(): Option[String] = getOptionValue(INDEX_SETTINGS) + /** + * Extra streaming sink options that simply pass to DataStreamWriter.options() + * + * @return + * extra sink option map + */ + def extraSinkOptions(): Map[String, String] = { + getOptionValue(EXTRA_OPTIONS) + .map(opt => (parse(opt) \ "sink").extract[Map[String, String]]) + .getOrElse(Map.empty) + } + /** * @return * all option values and fill default value if unspecified @@ -102,6 +119,7 @@ object FlintSparkIndexOptions { val WATERMARK_DELAY: OptionName.Value = Value("watermark_delay") val OUTPUT_MODE: OptionName.Value = Value("output_mode") val INDEX_SETTINGS: OptionName.Value = Value("index_settings") + val EXTRA_OPTIONS: OptionName.Value = Value("extra_options") } /** diff --git a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexOptionsSuite.scala b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexOptionsSuite.scala index 2ec299823..24a518ca3 100644 --- a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexOptionsSuite.scala +++ b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexOptionsSuite.scala @@ -19,6 +19,7 @@ class FlintSparkIndexOptionsSuite extends FlintSuite with Matchers { WATERMARK_DELAY.toString shouldBe "watermark_delay" OUTPUT_MODE.toString shouldBe "output_mode" INDEX_SETTINGS.toString shouldBe "index_settings" + EXTRA_OPTIONS.toString shouldBe "extra_options" } test("should return specified option value") { @@ -29,7 +30,8 @@ class FlintSparkIndexOptionsSuite extends FlintSuite with Matchers { "checkpoint_location" -> "s3://test/", "watermark_delay" -> "30 Seconds", "output_mode" -> "complete", - "index_settings" -> """{"number_of_shards": 3}""")) + "index_settings" -> """{"number_of_shards": 3}""", + "extra_options" -> """{"sink": {"opt1": "val1", "opt2": "val2"}}""")) options.autoRefresh() shouldBe true options.refreshInterval() shouldBe Some("1 Minute") @@ -37,6 +39,7 @@ class FlintSparkIndexOptionsSuite extends FlintSuite with Matchers { options.watermarkDelay() shouldBe Some("30 Seconds") options.outputMode() shouldBe Some("complete") options.indexSettings() shouldBe Some("""{"number_of_shards": 3}""") + options.extraSinkOptions() shouldBe Map("opt1" -> "val1", "opt2" -> "val2") } test("should return default option value if unspecified") { @@ -46,8 +49,9 @@ class FlintSparkIndexOptionsSuite extends FlintSuite with Matchers { options.refreshInterval() shouldBe empty options.checkpointLocation() shouldBe empty options.watermarkDelay() shouldBe empty - options.indexSettings() shouldBe empty options.outputMode() shouldBe empty + options.indexSettings() shouldBe empty + options.extraSinkOptions() shouldBe empty options.optionsWithDefault should contain("auto_refresh" -> "false") } From 62569972dd54b2928a1f382cb9841e78cb3bb2bf Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Wed, 18 Oct 2023 14:20:09 -0700 Subject: [PATCH 06/11] Add extra source options Signed-off-by: Chen Dai --- .../opensearch/flint/spark/FlintSpark.scala | 7 +++-- .../flint/spark/FlintSparkIndexOptions.scala | 26 +++++++++++++---- .../spark/FlintSparkIndexOptionsSuite.scala | 29 +++++++++++++++++-- .../FlintSparkCoveringIndexSqlITSuite.scala | 3 +- ...FlintSparkMaterializedViewSqlITSuite.scala | 5 +++- .../FlintSparkSkippingIndexSqlITSuite.scala | 3 +- 6 files changed, 60 insertions(+), 13 deletions(-) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala index d7c8ae8cc..6cb0b41ec 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala @@ -138,17 +138,18 @@ class FlintSpark(val spark: SparkSession) { .queryName(indexName) .format(FLINT_DATASOURCE) .options(flintSparkConf.properties) - .addIndexOptions(options) + .addSinkOptions(options) .start(indexName) Some(job.id.toString) // Otherwise, fall back to foreachBatch + batch refresh case INCREMENTAL => val job = spark.readStream + .options(options.extraSourceOptions(tableName)) .table(tableName) .writeStream .queryName(indexName) - .addIndexOptions(options) + .addSinkOptions(options) .foreachBatch { (batchDF: DataFrame, _: Long) => batchRefresh(Some(batchDF)) } @@ -233,7 +234,7 @@ class FlintSpark(val spark: SparkSession) { // Using Scala implicit class to avoid breaking method chaining of Spark data frame fluent API private implicit class FlintDataStreamWriter(val dataStream: DataStreamWriter[Row]) { - def addIndexOptions(options: FlintSparkIndexOptions): DataStreamWriter[Row] = { + def addSinkOptions(options: FlintSparkIndexOptions): DataStreamWriter[Row] = { dataStream .addCheckpointLocation(options.checkpointLocation()) .addRefreshInterval(options.refreshInterval()) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexOptions.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexOptions.scala index 26a288629..ffb479b54 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexOptions.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexOptions.scala @@ -71,15 +71,25 @@ case class FlintSparkIndexOptions(options: Map[String, String]) { def indexSettings(): Option[String] = getOptionValue(INDEX_SETTINGS) /** - * Extra streaming sink options that simply pass to DataStreamWriter.options() + * Extra streaming source options that can be simply passed to DataStreamReader or + * Relation.options + * @param source + * source name (full table name) + * @return + * extra source option map or empty map if not exist + */ + def extraSourceOptions(source: String): Map[String, String] = { + parseExtraOptions(source) + } + + /** + * Extra streaming sink options that can be simply passed to DataStreamWriter.options() * * @return - * extra sink option map + * extra sink option map or empty map if not exist */ def extraSinkOptions(): Map[String, String] = { - getOptionValue(EXTRA_OPTIONS) - .map(opt => (parse(opt) \ "sink").extract[Map[String, String]]) - .getOrElse(Map.empty) + parseExtraOptions("sink") } /** @@ -99,6 +109,12 @@ case class FlintSparkIndexOptions(options: Map[String, String]) { private def getOptionValue(name: OptionName): Option[String] = { options.get(name.toString) } + + private def parseExtraOptions(key: String): Map[String, String] = { + getOptionValue(EXTRA_OPTIONS) + .map(opt => (parse(opt) \ key).extract[Map[String, String]]) + .getOrElse(Map.empty) + } } object FlintSparkIndexOptions { diff --git a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexOptionsSuite.scala b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexOptionsSuite.scala index 24a518ca3..b678096ca 100644 --- a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexOptionsSuite.scala +++ b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexOptionsSuite.scala @@ -31,7 +31,16 @@ class FlintSparkIndexOptionsSuite extends FlintSuite with Matchers { "watermark_delay" -> "30 Seconds", "output_mode" -> "complete", "index_settings" -> """{"number_of_shards": 3}""", - "extra_options" -> """{"sink": {"opt1": "val1", "opt2": "val2"}}""")) + "extra_options" -> + """ { + | "alb_logs": { + | "opt1": "val1" + | }, + | "sink": { + | "opt2": "val2", + | "opt3": "val3" + | } + | }""".stripMargin)) options.autoRefresh() shouldBe true options.refreshInterval() shouldBe Some("1 Minute") @@ -39,7 +48,22 @@ class FlintSparkIndexOptionsSuite extends FlintSuite with Matchers { options.watermarkDelay() shouldBe Some("30 Seconds") options.outputMode() shouldBe Some("complete") options.indexSettings() shouldBe Some("""{"number_of_shards": 3}""") - options.extraSinkOptions() shouldBe Map("opt1" -> "val1", "opt2" -> "val2") + options.extraSourceOptions("alb_logs") shouldBe Map("opt1" -> "val1") + options.extraSinkOptions() shouldBe Map("opt2" -> "val2", "opt3" -> "val3") + } + + test("should return extra source option value and empty sink option values") { + val options = FlintSparkIndexOptions( + Map("extra_options" -> + """ { + | "alb_logs": { + | "opt1": "val1" + | } + | }""".stripMargin)) + + options.extraSourceOptions("alb_logs") shouldBe Map("opt1" -> "val1") + options.extraSourceOptions("alb_logs_metrics") shouldBe empty + options.extraSinkOptions() shouldBe empty } test("should return default option value if unspecified") { @@ -51,6 +75,7 @@ class FlintSparkIndexOptionsSuite extends FlintSuite with Matchers { options.watermarkDelay() shouldBe empty options.outputMode() shouldBe empty options.indexSettings() shouldBe empty + options.extraSourceOptions("alb_logs") shouldBe empty options.extraSinkOptions() shouldBe empty options.optionsWithDefault should contain("auto_refresh" -> "false") } diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala index 627e11f52..0d3f7a887 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala @@ -65,7 +65,8 @@ class FlintSparkCoveringIndexSqlITSuite extends FlintSparkSuite { | WITH ( | auto_refresh = true, | refresh_interval = '5 Seconds', - | checkpoint_location = '${checkpointDir.getAbsolutePath}' + | checkpoint_location = '${checkpointDir.getAbsolutePath}', + | extra_options = '{"$testTable": {"maxFilesPerTrigger": "1"}}' | ) | """.stripMargin) diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala index 0073688e0..b144f7177 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala @@ -91,7 +91,8 @@ class FlintSparkMaterializedViewSqlITSuite extends FlintSparkSuite { | checkpoint_location = '${checkpointDir.getAbsolutePath}', | watermark_delay = '1 Second', | output_mode = 'complete', - | index_settings = '{"number_of_shards": 3, "number_of_replicas": 2}' + | index_settings = '{"number_of_shards": 3, "number_of_replicas": 2}', + | extra_options = '{"$testTable": {"maxFilesPerTrigger": "1"}}' | ) |""".stripMargin) @@ -104,6 +105,8 @@ class FlintSparkMaterializedViewSqlITSuite extends FlintSparkSuite { options.checkpointLocation() shouldBe Some(checkpointDir.getAbsolutePath) options.watermarkDelay() shouldBe Some("1 Second") options.outputMode() shouldBe Some("complete") + options.extraSourceOptions(testTable) shouldBe Map("maxFilesPerTrigger" -> "1") + options.extraSinkOptions() shouldBe Map.empty } } diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala index bfbeba9c3..dbd349b63 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala @@ -69,7 +69,8 @@ class FlintSparkSkippingIndexSqlITSuite extends FlintSparkSuite { | WITH ( | auto_refresh = true, | refresh_interval = '5 Seconds', - | checkpoint_location = '${checkpointDir.getAbsolutePath}' + | checkpoint_location = '${checkpointDir.getAbsolutePath}', + | extra_options = '{"$testTable": {"maxFilesPerTrigger": "1"}}' | ) | """.stripMargin) From 7852cee24120e164dcdc5227c12ff3cd5084f26d Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Wed, 18 Oct 2023 14:46:52 -0700 Subject: [PATCH 07/11] Refactor MV UT Signed-off-by: Chen Dai --- .../mv/FlintSparkMaterializedViewSuite.scala | 64 +++++++++---------- 1 file changed, 29 insertions(+), 35 deletions(-) diff --git a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedViewSuite.scala b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedViewSuite.scala index 1f9b52963..0f340e592 100644 --- a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedViewSuite.scala +++ b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedViewSuite.scala @@ -32,6 +32,8 @@ import org.apache.spark.unsafe.types.UTF8String */ class FlintSparkMaterializedViewSuite extends FlintSuite { + /** Test table, MV name and query */ + val testTable = "mv_build_test" val testMvName = "spark_catalog.default.mv" val testQuery = "SELECT 1" @@ -87,26 +89,17 @@ class FlintSparkMaterializedViewSuite extends FlintSuite { } test("build stream should insert watermark operator and replace batch relation") { - val testTable = "mv_build_test" - withTable(testTable) { - sql(s"CREATE TABLE $testTable (time TIMESTAMP, name STRING, age INT) USING CSV") - - val testQuery = - s""" + val testQuery = + s""" | SELECT | window.start AS startTime, | COUNT(*) AS count | FROM $testTable | GROUP BY TUMBLE(time, '1 Minute') |""".stripMargin + val options = Map("watermark_delay" -> "30 Seconds") - val mv = FlintSparkMaterializedView( - testMvName, - testQuery, - Map.empty, - FlintSparkIndexOptions(Map("watermark_delay" -> "30 Seconds"))) - - val actualPlan = mv.buildStream(spark).queryExecution.logical + withAggregateMaterializedView(testQuery, options) { actualPlan => assert( actualPlan.sameSemantics( streamingRelation(testTable) @@ -118,12 +111,8 @@ class FlintSparkMaterializedViewSuite extends FlintSuite { } test("build stream with filtering aggregate query") { - val testTable = "mv_build_test" - withTable(testTable) { - sql(s"CREATE TABLE $testTable (time TIMESTAMP, name STRING, age INT) USING CSV") - - val testQuery = - s""" + val testQuery = + s""" | SELECT | window.start AS startTime, | COUNT(*) AS count @@ -131,14 +120,9 @@ class FlintSparkMaterializedViewSuite extends FlintSuite { | WHERE age > 30 | GROUP BY TUMBLE(time, '1 Minute') |""".stripMargin + val options = Map("watermark_delay" -> "30 Seconds") - val mv = FlintSparkMaterializedView( - testMvName, - testQuery, - Map.empty, - FlintSparkIndexOptions(Map("watermark_delay" -> "30 Seconds"))) - - val actualPlan = mv.buildStream(spark).queryExecution.logical + withAggregateMaterializedView(testQuery, options) { actualPlan => assert( actualPlan.sameSemantics( streamingRelation(testTable) @@ -151,16 +135,9 @@ class FlintSparkMaterializedViewSuite extends FlintSuite { } test("build stream with non-aggregate query") { - val testTable = "mv_build_test" - withTable(testTable) { - sql(s"CREATE TABLE $testTable (time TIMESTAMP, name STRING, age INT) USING CSV") - - val mv = FlintSparkMaterializedView( - testMvName, - s"SELECT name, age FROM $testTable WHERE age > 30", - Map.empty) - val actualPlan = mv.buildStream(spark).queryExecution.logical + val testQuery = s"SELECT name, age FROM $testTable WHERE age > 30" + withAggregateMaterializedView(testQuery, Map.empty) { actualPlan => assert( actualPlan.sameSemantics( streamingRelation(testTable) @@ -183,6 +160,23 @@ class FlintSparkMaterializedViewSuite extends FlintSuite { mv.buildStream(spark) } } + + private def withAggregateMaterializedView(query: String, options: Map[String, String])( + codeBlock: LogicalPlan => Unit): Unit = { + + withTable(testTable) { + sql(s"CREATE TABLE $testTable (time TIMESTAMP, name STRING, age INT) USING CSV") + + val mv = FlintSparkMaterializedView( + testMvName, + query, + Map.empty, + FlintSparkIndexOptions(Map("watermark_delay" -> "30 Seconds"))) + + val actualPlan = mv.buildStream(spark).queryExecution.logical + codeBlock(actualPlan) + } + } } /** From d1de3bb068767012a3272bd5f809552f67f56e56 Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Wed, 18 Oct 2023 15:25:08 -0700 Subject: [PATCH 08/11] Copy extra options to streaming relation Signed-off-by: Chen Dai --- .../spark/mv/FlintSparkMaterializedView.scala | 10 +++++++- .../mv/FlintSparkMaterializedViewSuite.scala | 24 +++++++++++++------ 2 files changed, 26 insertions(+), 8 deletions(-) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedView.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedView.scala index 112de680f..abe78b13b 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedView.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedView.scala @@ -8,6 +8,7 @@ package org.opensearch.flint.spark.mv import java.util.Locale import scala.collection.JavaConverters.mapAsJavaMapConverter +import scala.collection.convert.ImplicitConversions.`map AsScala` import org.opensearch.flint.core.metadata.FlintMetadata import org.opensearch.flint.spark.{FlintSpark, FlintSparkIndex, FlintSparkIndexBuilder, FlintSparkIndexOptions} @@ -23,6 +24,7 @@ import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, EventTimeWatermark, LogicalPlan} import org.apache.spark.sql.catalyst.util.IntervalUtils import org.apache.spark.sql.flint.{logicalPlanToDataFrame, qualifyTableName} +import org.apache.spark.sql.util.CaseInsensitiveStringMap /** * Flint materialized view in Spark. @@ -82,7 +84,7 @@ case class FlintSparkMaterializedView( aggregate.copy(child = watermark(timeCol, aggregate.child)) case relation: UnresolvedRelation if !relation.isStreaming => - relation.copy(isStreaming = true) + relation.copy(isStreaming = true, options = optionsWithExtra(relation)) } logicalPlanToDataFrame(spark, streamingPlan) } @@ -96,6 +98,12 @@ case class FlintSparkMaterializedView( EventTimeWatermark(timeCol, IntervalUtils.fromIntervalString(delay), child) } + private def optionsWithExtra(relation: UnresolvedRelation): CaseInsensitiveStringMap = { + val originalOptions = relation.options.asCaseSensitiveMap + val extraOptions = options.extraSourceOptions(relation.tableName).asJava + new CaseInsensitiveStringMap((originalOptions ++ extraOptions).asJava) + } + /** * Extractor that extract event time column out of Aggregate operator. */ diff --git a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedViewSuite.scala b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedViewSuite.scala index 0f340e592..8bfc86120 100644 --- a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedViewSuite.scala +++ b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedViewSuite.scala @@ -146,6 +146,17 @@ class FlintSparkMaterializedViewSuite extends FlintSuite { } } + test("build stream with extra source options") { + val testQuery = s"SELECT name, age FROM $testTable" + val options = Map("extra_options" -> s"""{"$testTable": {"maxFilesPerTrigger": "1"}}""") + + withAggregateMaterializedView(testQuery, options) { actualPlan => + val expectPlan = + streamingRelation(testTable, Map("maxFilesPerTrigger" -> "1")).select("name", "age") + assert(actualPlan.sameSemantics(expectPlan)) + } + } + test("build stream should fail if there is aggregation but no windowing function") { val testTable = "mv_build_test" withTable(testTable) { @@ -167,11 +178,8 @@ class FlintSparkMaterializedViewSuite extends FlintSuite { withTable(testTable) { sql(s"CREATE TABLE $testTable (time TIMESTAMP, name STRING, age INT) USING CSV") - val mv = FlintSparkMaterializedView( - testMvName, - query, - Map.empty, - FlintSparkIndexOptions(Map("watermark_delay" -> "30 Seconds"))) + val mv = + FlintSparkMaterializedView(testMvName, query, Map.empty, FlintSparkIndexOptions(options)) val actualPlan = mv.buildStream(spark).queryExecution.logical codeBlock(actualPlan) @@ -184,10 +192,12 @@ class FlintSparkMaterializedViewSuite extends FlintSuite { */ object FlintSparkMaterializedViewSuite { - def streamingRelation(tableName: String): UnresolvedRelation = { + def streamingRelation( + tableName: String, + extraOptions: Map[String, String] = Map.empty): UnresolvedRelation = { UnresolvedRelation( TableIdentifier(tableName), - CaseInsensitiveStringMap.empty(), + new CaseInsensitiveStringMap(extraOptions.asJava), isStreaming = true) } From 0cc2f25356683003eb73404b5c059cfd842b4a4c Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Wed, 18 Oct 2023 16:11:43 -0700 Subject: [PATCH 09/11] Fix logical plan assert Signed-off-by: Chen Dai --- .../mv/FlintSparkMaterializedViewSuite.scala | 59 ++++++++++--------- 1 file changed, 32 insertions(+), 27 deletions(-) diff --git a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedViewSuite.scala b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedViewSuite.scala index 8bfc86120..cb32e74d3 100644 --- a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedViewSuite.scala +++ b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedViewSuite.scala @@ -15,7 +15,6 @@ import org.scalatestplus.mockito.MockitoSugar.mock import org.apache.spark.FlintSuite import org.apache.spark.sql.{DataFrame, Row} -import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation import org.apache.spark.sql.catalyst.dsl.expressions.{count, intToLiteral, stringToLiteral, DslAttr, DslExpression, StringToAttributeConversionHelper} import org.apache.spark.sql.catalyst.dsl.plans.DslLogicalPlan @@ -33,7 +32,7 @@ import org.apache.spark.unsafe.types.UTF8String class FlintSparkMaterializedViewSuite extends FlintSuite { /** Test table, MV name and query */ - val testTable = "mv_build_test" + val testTable = "spark_catalog.default.mv_build_test" val testMvName = "spark_catalog.default.mv" val testQuery = "SELECT 1" @@ -100,13 +99,15 @@ class FlintSparkMaterializedViewSuite extends FlintSuite { val options = Map("watermark_delay" -> "30 Seconds") withAggregateMaterializedView(testQuery, options) { actualPlan => - assert( - actualPlan.sameSemantics( - streamingRelation(testTable) - .watermark($"time", "30 Seconds") - .groupBy($"TUMBLE".function($"time", "1 Minute"))( - $"window.start" as "startTime", - count(1) as "count"))) + comparePlans( + actualPlan, + streamingRelation(testTable) + .watermark($"time", "30 Seconds") + .groupBy($"TUMBLE".function($"time", "1 Minute"))( + $"window.start" as "startTime", + $"COUNT".function(1) as "count"), + checkAnalysis = false + ) // don't analyze due to full test table name } } @@ -123,14 +124,15 @@ class FlintSparkMaterializedViewSuite extends FlintSuite { val options = Map("watermark_delay" -> "30 Seconds") withAggregateMaterializedView(testQuery, options) { actualPlan => - assert( - actualPlan.sameSemantics( - streamingRelation(testTable) - .where($"age" > 30) - .watermark($"time", "30 Seconds") - .groupBy($"TUMBLE".function($"time", "1 Minute"))( - $"window.start" as "startTime", - count(1) as "count"))) + comparePlans( + actualPlan, + streamingRelation(testTable) + .where($"age" > 30) + .watermark($"time", "30 Seconds") + .groupBy($"TUMBLE".function($"time", "1 Minute"))( + $"window.start" as "startTime", + $"COUNT".function(1) as "count"), + checkAnalysis = false) } } @@ -138,11 +140,12 @@ class FlintSparkMaterializedViewSuite extends FlintSuite { val testQuery = s"SELECT name, age FROM $testTable WHERE age > 30" withAggregateMaterializedView(testQuery, Map.empty) { actualPlan => - assert( - actualPlan.sameSemantics( - streamingRelation(testTable) - .where($"age" > 30) - .select($"name", $"age"))) + comparePlans( + actualPlan, + streamingRelation(testTable) + .where($"age" > 30) + .select($"name", $"age"), + checkAnalysis = false) } } @@ -151,9 +154,11 @@ class FlintSparkMaterializedViewSuite extends FlintSuite { val options = Map("extra_options" -> s"""{"$testTable": {"maxFilesPerTrigger": "1"}}""") withAggregateMaterializedView(testQuery, options) { actualPlan => - val expectPlan = - streamingRelation(testTable, Map("maxFilesPerTrigger" -> "1")).select("name", "age") - assert(actualPlan.sameSemantics(expectPlan)) + comparePlans( + actualPlan, + streamingRelation(testTable, Map("maxFilesPerTrigger" -> "1")) + .select($"name", $"age"), + checkAnalysis = false) } } @@ -195,8 +200,8 @@ object FlintSparkMaterializedViewSuite { def streamingRelation( tableName: String, extraOptions: Map[String, String] = Map.empty): UnresolvedRelation = { - UnresolvedRelation( - TableIdentifier(tableName), + new UnresolvedRelation( + tableName.split('.'), new CaseInsensitiveStringMap(extraOptions.asJava), isStreaming = true) } From 101ef1b132b301102e2e2a095efcdab414762aef Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Wed, 18 Oct 2023 16:36:05 -0700 Subject: [PATCH 10/11] Qualify table name in relation when fetching extra option Signed-off-by: Chen Dai --- .../flint/spark/mv/FlintSparkMaterializedView.scala | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedView.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedView.scala index abe78b13b..019cc7aa5 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedView.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedView.scala @@ -84,7 +84,7 @@ case class FlintSparkMaterializedView( aggregate.copy(child = watermark(timeCol, aggregate.child)) case relation: UnresolvedRelation if !relation.isStreaming => - relation.copy(isStreaming = true, options = optionsWithExtra(relation)) + relation.copy(isStreaming = true, options = optionsWithExtra(spark, relation)) } logicalPlanToDataFrame(spark, streamingPlan) } @@ -98,9 +98,12 @@ case class FlintSparkMaterializedView( EventTimeWatermark(timeCol, IntervalUtils.fromIntervalString(delay), child) } - private def optionsWithExtra(relation: UnresolvedRelation): CaseInsensitiveStringMap = { + private def optionsWithExtra( + spark: SparkSession, + relation: UnresolvedRelation): CaseInsensitiveStringMap = { val originalOptions = relation.options.asCaseSensitiveMap - val extraOptions = options.extraSourceOptions(relation.tableName).asJava + val tableName = qualifyTableName(spark, relation.tableName) + val extraOptions = options.extraSourceOptions(tableName).asJava new CaseInsensitiveStringMap((originalOptions ++ extraOptions).asJava) } From 30e0aec6ecc5b1c8fd7a11149098f33f21fa71ba Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Thu, 19 Oct 2023 14:48:21 -0700 Subject: [PATCH 11/11] Update user manual with new options Signed-off-by: Chen Dai --- docs/index.md | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/docs/index.md b/docs/index.md index 3c7b09fa6..56d2bcf3b 100644 --- a/docs/index.md +++ b/docs/index.md @@ -219,15 +219,22 @@ User can provide the following options in `WITH` clause of create statement: + `auto_refresh`: triggers Incremental Refresh immediately after index create complete if true. Otherwise, user has to trigger Full Refresh by `REFRESH` statement manually. + `refresh_interval`: a string as the time interval for incremental refresh, e.g. 1 minute, 10 seconds. This is only applicable when auto refresh enabled. Please check `org.apache.spark.unsafe.types.CalendarInterval` for valid duration identifiers. By default, next micro batch will be generated as soon as the previous one complete processing. + `checkpoint_location`: a string as the location path for incremental refresh job checkpoint. The location has to be a path in an HDFS compatible file system and only applicable when auto refresh enabled. If unspecified, temporary checkpoint directory will be used and may result in checkpoint data lost upon restart. ++ `watermark_delay`: a string as time expression for how late data can come and still be processed, e.g. 1 minute, 10 seconds. This is required by incremental refresh on materialized view if it has aggregation in the query. ++ `output_mode`: a mode string that describes how data will be written to streaming sink. If unspecified, default append mode will be applied. + `index_settings`: a JSON string as index settings for OpenSearch index that will be created. Please follow the format in OpenSearch documentation. If unspecified, default OpenSearch index settings will be applied. ++ `extra_options`: a JSON string as extra options that can be passed to Spark streaming source and sink API directly. Use qualified source table name (because there could be multiple) and "sink", e.g. '{"sink": "{key: val}", "table1": {key: val}}' -Note that the index option name is case-sensitive. +Note that the index option name is case-sensitive. Here is an example: ```sql WITH ( - auto_refresh = [true|false], - refresh_interval = 'time interval expression', - checkpoint_location = 'checkpoint directory path' + auto_refresh = true, + refresh_interval = '10 Seconds', + checkpoint_location = 's3://test/', + watermark_delay = '1 Second', + output_mode = 'complete', + index_settings = '{"number_of_shards": 2, "number_of_replicas": 3}', + extra_options = '{"spark_catalog.default.alb_logs": {"maxFilesPerTrigger": "1"}}' ) ```