From a63e913c41bb7f754f3bbc864d1bc3cda0d75fe4 Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Wed, 18 Oct 2023 08:27:51 -0700 Subject: [PATCH] Add watermark delay option Signed-off-by: Chen Dai --- .../flint/spark/FlintSparkIndexOptions.scala | 11 ++++++++++- .../flint/spark/FlintSparkIndexOptionsSuite.scala | 8 ++++++-- 2 files changed, 16 insertions(+), 3 deletions(-) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexOptions.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexOptions.scala index b3e7535c3..dbd6eae2a 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexOptions.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexOptions.scala @@ -5,7 +5,7 @@ package org.opensearch.flint.spark -import org.opensearch.flint.spark.FlintSparkIndexOptions.OptionName.{AUTO_REFRESH, CHECKPOINT_LOCATION, INDEX_SETTINGS, OptionName, REFRESH_INTERVAL} +import org.opensearch.flint.spark.FlintSparkIndexOptions.OptionName.{AUTO_REFRESH, CHECKPOINT_LOCATION, INDEX_SETTINGS, OptionName, REFRESH_INTERVAL, WATERMARK_DELAY} import org.opensearch.flint.spark.FlintSparkIndexOptions.validateOptionNames /** @@ -42,6 +42,14 @@ case class FlintSparkIndexOptions(options: Map[String, String]) { */ def checkpointLocation(): Option[String] = getOptionValue(CHECKPOINT_LOCATION) + /** + * How late the data can come and still be processed. + * + * @return + * watermark delay time expression + */ + def watermarkDelay(): Option[String] = getOptionValue(WATERMARK_DELAY) + /** * The index settings for OpenSearch index created. * @@ -84,6 +92,7 @@ object FlintSparkIndexOptions { val AUTO_REFRESH: OptionName.Value = Value("auto_refresh") val REFRESH_INTERVAL: OptionName.Value = Value("refresh_interval") val CHECKPOINT_LOCATION: OptionName.Value = Value("checkpoint_location") + val WATERMARK_DELAY: OptionName.Value = Value("watermark_delay") val INDEX_SETTINGS: OptionName.Value = Value("index_settings") } diff --git a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexOptionsSuite.scala b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexOptionsSuite.scala index 160a4c9d3..8d2a5d506 100644 --- a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexOptionsSuite.scala +++ b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexOptionsSuite.scala @@ -5,7 +5,7 @@ package org.opensearch.flint.spark -import org.opensearch.flint.spark.FlintSparkIndexOptions.OptionName.{AUTO_REFRESH, CHECKPOINT_LOCATION, INDEX_SETTINGS, REFRESH_INTERVAL} +import org.opensearch.flint.spark.FlintSparkIndexOptions.OptionName._ import org.scalatest.matchers.should.Matchers import org.apache.spark.FlintSuite @@ -16,6 +16,7 @@ class FlintSparkIndexOptionsSuite extends FlintSuite with Matchers { AUTO_REFRESH.toString shouldBe "auto_refresh" REFRESH_INTERVAL.toString shouldBe "refresh_interval" CHECKPOINT_LOCATION.toString shouldBe "checkpoint_location" + WATERMARK_DELAY.toString shouldBe "watermark_delay" INDEX_SETTINGS.toString shouldBe "index_settings" } @@ -25,11 +26,13 @@ class FlintSparkIndexOptionsSuite extends FlintSuite with Matchers { "auto_refresh" -> "true", "refresh_interval" -> "1 Minute", "checkpoint_location" -> "s3://test/", + "watermark_delay" -> "30 Seconds", "index_settings" -> """{"number_of_shards": 3}""")) options.autoRefresh() shouldBe true options.refreshInterval() shouldBe Some("1 Minute") options.checkpointLocation() shouldBe Some("s3://test/") + options.watermarkDelay() shouldBe Some("30 Seconds") options.indexSettings() shouldBe Some("""{"number_of_shards": 3}""") } @@ -39,11 +42,12 @@ class FlintSparkIndexOptionsSuite extends FlintSuite with Matchers { options.autoRefresh() shouldBe false options.refreshInterval() shouldBe empty options.checkpointLocation() shouldBe empty + options.watermarkDelay() shouldBe empty options.indexSettings() shouldBe empty options.optionsWithDefault should contain("auto_refresh" -> "false") } - test("should return default option value if unspecified with specified value") { + test("should return include unspecified option if it has default value") { val options = FlintSparkIndexOptions(Map("refresh_interval" -> "1 Minute")) options.optionsWithDefault shouldBe Map(