Skip to content

Commit

Permalink
Add watermark delay option
Browse files Browse the repository at this point in the history
Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen committed Oct 18, 2023
1 parent 2778d68 commit a63e913
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

package org.opensearch.flint.spark

import org.opensearch.flint.spark.FlintSparkIndexOptions.OptionName.{AUTO_REFRESH, CHECKPOINT_LOCATION, INDEX_SETTINGS, OptionName, REFRESH_INTERVAL}
import org.opensearch.flint.spark.FlintSparkIndexOptions.OptionName.{AUTO_REFRESH, CHECKPOINT_LOCATION, INDEX_SETTINGS, OptionName, REFRESH_INTERVAL, WATERMARK_DELAY}
import org.opensearch.flint.spark.FlintSparkIndexOptions.validateOptionNames

/**
Expand Down Expand Up @@ -42,6 +42,14 @@ case class FlintSparkIndexOptions(options: Map[String, String]) {
*/
def checkpointLocation(): Option[String] = getOptionValue(CHECKPOINT_LOCATION)

/**
* How late the data can come and still be processed.
*
* @return
* watermark delay time expression
*/
def watermarkDelay(): Option[String] = getOptionValue(WATERMARK_DELAY)

/**
* The index settings for OpenSearch index created.
*
Expand Down Expand Up @@ -84,6 +92,7 @@ object FlintSparkIndexOptions {
val AUTO_REFRESH: OptionName.Value = Value("auto_refresh")
val REFRESH_INTERVAL: OptionName.Value = Value("refresh_interval")
val CHECKPOINT_LOCATION: OptionName.Value = Value("checkpoint_location")
val WATERMARK_DELAY: OptionName.Value = Value("watermark_delay")
val INDEX_SETTINGS: OptionName.Value = Value("index_settings")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

package org.opensearch.flint.spark

import org.opensearch.flint.spark.FlintSparkIndexOptions.OptionName.{AUTO_REFRESH, CHECKPOINT_LOCATION, INDEX_SETTINGS, REFRESH_INTERVAL}
import org.opensearch.flint.spark.FlintSparkIndexOptions.OptionName._
import org.scalatest.matchers.should.Matchers

import org.apache.spark.FlintSuite
Expand All @@ -16,6 +16,7 @@ class FlintSparkIndexOptionsSuite extends FlintSuite with Matchers {
AUTO_REFRESH.toString shouldBe "auto_refresh"
REFRESH_INTERVAL.toString shouldBe "refresh_interval"
CHECKPOINT_LOCATION.toString shouldBe "checkpoint_location"
WATERMARK_DELAY.toString shouldBe "watermark_delay"
INDEX_SETTINGS.toString shouldBe "index_settings"
}

Expand All @@ -25,11 +26,13 @@ class FlintSparkIndexOptionsSuite extends FlintSuite with Matchers {
"auto_refresh" -> "true",
"refresh_interval" -> "1 Minute",
"checkpoint_location" -> "s3://test/",
"watermark_delay" -> "30 Seconds",
"index_settings" -> """{"number_of_shards": 3}"""))

options.autoRefresh() shouldBe true
options.refreshInterval() shouldBe Some("1 Minute")
options.checkpointLocation() shouldBe Some("s3://test/")
options.watermarkDelay() shouldBe Some("30 Seconds")
options.indexSettings() shouldBe Some("""{"number_of_shards": 3}""")
}

Expand All @@ -39,11 +42,12 @@ class FlintSparkIndexOptionsSuite extends FlintSuite with Matchers {
options.autoRefresh() shouldBe false
options.refreshInterval() shouldBe empty
options.checkpointLocation() shouldBe empty
options.watermarkDelay() shouldBe empty
options.indexSettings() shouldBe empty
options.optionsWithDefault should contain("auto_refresh" -> "false")
}

test("should return default option value if unspecified with specified value") {
test("should return include unspecified option if it has default value") {
val options = FlintSparkIndexOptions(Map("refresh_interval" -> "1 Minute"))

options.optionsWithDefault shouldBe Map(
Expand Down

0 comments on commit a63e913

Please sign in to comment.