Skip to content

Commit

Permalink
Add id expression index option
Browse files Browse the repository at this point in the history
Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen committed Nov 1, 2023
1 parent 84e5eec commit 9feae2a
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ package org.opensearch.flint.spark
import org.json4s.{Formats, NoTypeHints}
import org.json4s.native.JsonMethods._
import org.json4s.native.Serialization
import org.opensearch.flint.spark.FlintSparkIndexOptions.OptionName.{AUTO_REFRESH, CHECKPOINT_LOCATION, EXTRA_OPTIONS, INDEX_SETTINGS, OptionName, OUTPUT_MODE, REFRESH_INTERVAL, WATERMARK_DELAY}
import org.opensearch.flint.spark.FlintSparkIndexOptions.OptionName.{AUTO_REFRESH, CHECKPOINT_LOCATION, EXTRA_OPTIONS, ID_EXPRESSION, INDEX_SETTINGS, OptionName, OUTPUT_MODE, REFRESH_INTERVAL, WATERMARK_DELAY}
import org.opensearch.flint.spark.FlintSparkIndexOptions.validateOptionNames

/**
Expand Down Expand Up @@ -70,6 +70,14 @@ case class FlintSparkIndexOptions(options: Map[String, String]) {
*/
def indexSettings(): Option[String] = getOptionValue(INDEX_SETTINGS)

/**
* An expression that generates unique value as source data row ID.
*
* @return
* ID expression
*/
def idExpression(): Option[String] = getOptionValue(ID_EXPRESSION)

/**
* Extra streaming source options that can be simply passed to DataStreamReader or
* Relation.options
Expand Down Expand Up @@ -136,6 +144,7 @@ object FlintSparkIndexOptions {
val OUTPUT_MODE: OptionName.Value = Value("output_mode")
val INDEX_SETTINGS: OptionName.Value = Value("index_settings")
val EXTRA_OPTIONS: OptionName.Value = Value("extra_options")
val ID_EXPRESSION: OptionName.Value = Value("id_expression")
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ class FlintSparkIndexOptionsSuite extends FlintSuite with Matchers {
WATERMARK_DELAY.toString shouldBe "watermark_delay"
OUTPUT_MODE.toString shouldBe "output_mode"
INDEX_SETTINGS.toString shouldBe "index_settings"
ID_EXPRESSION.toString shouldBe "id_expression"
EXTRA_OPTIONS.toString shouldBe "extra_options"
}

Expand All @@ -31,6 +32,7 @@ class FlintSparkIndexOptionsSuite extends FlintSuite with Matchers {
"watermark_delay" -> "30 Seconds",
"output_mode" -> "complete",
"index_settings" -> """{"number_of_shards": 3}""",
"id_expression" -> """sha1(col("timestamp"))""",
"extra_options" ->
""" {
| "alb_logs": {
Expand All @@ -48,6 +50,7 @@ class FlintSparkIndexOptionsSuite extends FlintSuite with Matchers {
options.watermarkDelay() shouldBe Some("30 Seconds")
options.outputMode() shouldBe Some("complete")
options.indexSettings() shouldBe Some("""{"number_of_shards": 3}""")
options.idExpression() shouldBe Some("""sha1(col("timestamp"))""")
options.extraSourceOptions("alb_logs") shouldBe Map("opt1" -> "val1")
options.extraSinkOptions() shouldBe Map("opt2" -> "val2", "opt3" -> "val3")
}
Expand Down Expand Up @@ -75,6 +78,7 @@ class FlintSparkIndexOptionsSuite extends FlintSuite with Matchers {
options.watermarkDelay() shouldBe empty
options.outputMode() shouldBe empty
options.indexSettings() shouldBe empty
options.idExpression() shouldBe empty
options.extraSourceOptions("alb_logs") shouldBe empty
options.extraSinkOptions() shouldBe empty
options.optionsWithDefault should contain("auto_refresh" -> "false")
Expand Down

0 comments on commit 9feae2a

Please sign in to comment.