From 49d1482a2ff7a79d34428e43b762b9e910bfec5e Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Fri, 6 Oct 2023 13:44:27 -0700 Subject: [PATCH 1/7] Add validation for index options Signed-off-by: Chen Dai --- .../flint/spark/FlintSparkIndexOptions.scala | 45 +++++++++++++++-- .../spark/FlintSparkIndexOptionsSuite.scala | 48 +++++++++++++++++++ 2 files changed, 89 insertions(+), 4 deletions(-) create mode 100644 flint-spark-integration/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexOptionsSuite.scala diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexOptions.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexOptions.scala index c6f546605..9b759a66f 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexOptions.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexOptions.scala @@ -5,6 +5,13 @@ package org.opensearch.flint.spark +import java.util.Locale + +import org.opensearch.flint.spark.FlintSparkIndexOptions.OptionName.{ + AUTO_REFRESH, CHECKPOINT_LOCATION, INDEX_SETTINGS, OptionName, REFRESH_INTERVAL +} +import org.opensearch.flint.spark.FlintSparkIndexOptions.validateOptionNames + /** * Flint Spark index configurable options. * @@ -13,13 +20,15 @@ package org.opensearch.flint.spark */ case class FlintSparkIndexOptions(options: Map[String, String]) { + validateOptionNames(options) + /** * Is Flint index auto refreshed or manual refreshed. * * @return * auto refresh option value */ - def autoRefresh(): Boolean = options.getOrElse("auto_refresh", "false").toBoolean + def autoRefresh(): Boolean = getOptionValue(AUTO_REFRESH).getOrElse("false").toBoolean /** * The refresh interval (only valid if auto refresh enabled). @@ -27,7 +36,7 @@ case class FlintSparkIndexOptions(options: Map[String, String]) { * @return * refresh interval expression */ - def refreshInterval(): Option[String] = options.get("refresh_interval") + def refreshInterval(): Option[String] = getOptionValue(REFRESH_INTERVAL) /** * The checkpoint location which maybe required by Flint index's refresh. @@ -35,7 +44,7 @@ case class FlintSparkIndexOptions(options: Map[String, String]) { * @return * checkpoint location path */ - def checkpointLocation(): Option[String] = options.get("checkpoint_location") + def checkpointLocation(): Option[String] = getOptionValue(CHECKPOINT_LOCATION) /** * The index settings for OpenSearch index created. @@ -43,7 +52,11 @@ case class FlintSparkIndexOptions(options: Map[String, String]) { * @return * index setting JSON */ - def indexSettings(): Option[String] = options.get("index_settings") + def indexSettings(): Option[String] = getOptionValue(INDEX_SETTINGS) + + private def getOptionValue(name: OptionName): Option[String] = { + options.get(name.toString) + } } object FlintSparkIndexOptions { @@ -52,4 +65,28 @@ object FlintSparkIndexOptions { * Empty options */ val empty: FlintSparkIndexOptions = FlintSparkIndexOptions(Map.empty) + + /** + * Option name Enum. + */ + object OptionName extends Enumeration { + type OptionName = Value + val AUTO_REFRESH: OptionName.Value = Value("auto_refresh") + val REFRESH_INTERVAL: OptionName.Value = Value("refresh_interval") + val CHECKPOINT_LOCATION: OptionName.Value = Value("checkpoint_location") + val INDEX_SETTINGS: OptionName.Value = Value("index_settings") + + override def toString(): String = { + super.toString().toLowerCase(Locale.ROOT) + } + } + + // This method has to be here otherwise Scala compilation failure + def validateOptionNames(options: Map[String, String]): Unit = { + val allowedNames = OptionName.values.map(_.toString) + val unknownNames = options.keys.filterNot(allowedNames.contains) + + require(unknownNames.isEmpty, + s"option name ${unknownNames.mkString(",")} is invalid") + } } diff --git a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexOptionsSuite.scala b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexOptionsSuite.scala new file mode 100644 index 000000000..c29419411 --- /dev/null +++ b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexOptionsSuite.scala @@ -0,0 +1,48 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.spark + +import org.scalatest.matchers.should.Matchers + +import org.apache.spark.FlintSuite + +class FlintSparkIndexOptionsSuite extends FlintSuite with Matchers { + + test("should return specified option value") { + val options = FlintSparkIndexOptions( + Map( + "auto_refresh" -> "true", + "refresh_interval" -> "1 Minute", + "checkpoint_location" -> "s3://test/", + "index_settings" -> """{"number_of_shards": 3}""")) + + options.autoRefresh() shouldBe true + options.refreshInterval() shouldBe Some("1 Minute") + options.checkpointLocation() shouldBe Some("s3://test/") + options.indexSettings() shouldBe Some("""{"number_of_shards": 3}""") + } + + test("should return default option value if unspecified") { + val options = FlintSparkIndexOptions(Map.empty) + + options.autoRefresh() shouldBe false + options.refreshInterval() shouldBe empty + options.checkpointLocation() shouldBe empty + options.indexSettings() shouldBe empty + } + + test("should report error if any unknown option name") { + the [IllegalArgumentException] thrownBy + FlintSparkIndexOptions(Map("autoRefresh" -> "true")) + + the [IllegalArgumentException] thrownBy { + FlintSparkIndexOptions(Map( + "auto_refresh" -> "true", + "indexSetting" -> "test" + )) + } should have message "requirement failed: option name indexSetting is invalid" + } +} From 274b5a50dc3dfc9bbf1ba09644725481b4596df0 Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Fri, 6 Oct 2023 14:40:41 -0700 Subject: [PATCH 2/7] Add options with default method Signed-off-by: Chen Dai --- .../flint/spark/FlintSparkIndexOptions.scala | 18 ++++++++++++++++++ .../spark/FlintSparkIndexOptionsSuite.scala | 18 ++++++++++++------ 2 files changed, 30 insertions(+), 6 deletions(-) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexOptions.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexOptions.scala index 9b759a66f..973b84458 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexOptions.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexOptions.scala @@ -54,6 +54,20 @@ case class FlintSparkIndexOptions(options: Map[String, String]) { */ def indexSettings(): Option[String] = getOptionValue(INDEX_SETTINGS) + /** + * @return + * all option values and fill default value if unspecified + */ + def optionsWithDefault: Map[String, String] = { + val map = Map.newBuilder[String, String] + map ++= options + + if (!options.contains(AUTO_REFRESH.toString)) { + map += (AUTO_REFRESH.toString -> autoRefresh().toString) + } + map.result() + } + private def getOptionValue(name: OptionName): Option[String] = { options.get(name.toString) } @@ -76,6 +90,10 @@ object FlintSparkIndexOptions { val CHECKPOINT_LOCATION: OptionName.Value = Value("checkpoint_location") val INDEX_SETTINGS: OptionName.Value = Value("index_settings") + /** + * @return + * convert enum name to lowercase as public option name + */ override def toString(): String = { super.toString().toLowerCase(Locale.ROOT) } diff --git a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexOptionsSuite.scala b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexOptionsSuite.scala index c29419411..2ee2eacb9 100644 --- a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexOptionsSuite.scala +++ b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexOptionsSuite.scala @@ -32,17 +32,23 @@ class FlintSparkIndexOptionsSuite extends FlintSuite with Matchers { options.refreshInterval() shouldBe empty options.checkpointLocation() shouldBe empty options.indexSettings() shouldBe empty + options.optionsWithDefault should contain("auto_refresh" -> "false") + } + + test("should return default option value if unspecified with specified value") { + val options = FlintSparkIndexOptions(Map("refresh_interval" -> "1 Minute")) + + options.optionsWithDefault shouldBe Map( + "auto_refresh" -> "false", + "refresh_interval" -> "1 Minute") } test("should report error if any unknown option name") { - the [IllegalArgumentException] thrownBy + the[IllegalArgumentException] thrownBy FlintSparkIndexOptions(Map("autoRefresh" -> "true")) - the [IllegalArgumentException] thrownBy { - FlintSparkIndexOptions(Map( - "auto_refresh" -> "true", - "indexSetting" -> "test" - )) + the[IllegalArgumentException] thrownBy { + FlintSparkIndexOptions(Map("auto_refresh" -> "true", "indexSetting" -> "test")) } should have message "requirement failed: option name indexSetting is invalid" } } From 1df95c9c5a91148d9b381859609661e970a5cf76 Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Fri, 6 Oct 2023 14:47:30 -0700 Subject: [PATCH 3/7] Change IT for options with default Signed-off-by: Chen Dai --- .../flint/spark/covering/FlintSparkCoveringIndex.scala | 2 +- .../flint/spark/skipping/FlintSparkSkippingIndex.scala | 2 +- .../flint/spark/FlintSparkCoveringIndexITSuite.scala | 2 +- .../flint/spark/FlintSparkSkippingIndexITSuite.scala | 6 +++--- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala index 3db325c3e..f7e20b339 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala @@ -74,7 +74,7 @@ case class FlintSparkCoveringIndex( } private def getIndexOptions: String = { - Serialization.write(options.options) + Serialization.write(options.optionsWithDefault) } private def getIndexProperties: String = { diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala index dd9cb6bdf..0749d13f5 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala @@ -87,7 +87,7 @@ class FlintSparkSkippingIndex( } private def getIndexOptions: String = { - Serialization.write(options.options) + Serialization.write(options.optionsWithDefault) } private def getIndexProperties: String = { diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexITSuite.scala index ac0b33746..b0938966f 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexITSuite.scala @@ -57,7 +57,7 @@ class FlintSparkCoveringIndexITSuite extends FlintSparkSuite { | "columnType": "int" | }], | "source": "spark_catalog.default.ci_test", - | "options": {}, + | "options": { "auto_refresh": "false" }, | "properties": {} | }, | "properties": { diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala index 02dc681d7..e3fb467e6 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala @@ -79,7 +79,7 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { | "columnType": "int" | }], | "source": "spark_catalog.default.test", - | "options": {}, + | "options": { "auto_refresh": "false" }, | "properties": {} | }, | "properties": { @@ -105,7 +105,7 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { | } |""".stripMargin) - index.get.options shouldBe FlintSparkIndexOptions.empty + index.get.options shouldBe FlintSparkIndexOptions(Map("auto_refresh" -> "false")) } test("create skipping index with index options successfully") { @@ -522,7 +522,7 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { | "columnType": "struct" | }], | "source": "$testTable", - | "options": {}, + | "options": { "auto_refresh": "false" }, | "properties": {} | }, | "properties": { From 7afcdebc1d962ba323276ca61e0374b0929288c1 Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Fri, 6 Oct 2023 15:13:58 -0700 Subject: [PATCH 4/7] Add IT for invalid option test Signed-off-by: Chen Dai --- .../spark/FlintSparkCoveringIndexSqlITSuite.scala | 15 ++++++++++++--- .../spark/FlintSparkSkippingIndexSqlITSuite.scala | 11 ++++++++++- 2 files changed, 22 insertions(+), 4 deletions(-) diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala index 892a8faa4..0e4c8bd67 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala @@ -16,7 +16,7 @@ import org.opensearch.flint.core.storage.FlintOpenSearchClient import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex.getFlintIndexName import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.getSkippingIndexName import org.scalatest.matchers.must.Matchers.defined -import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper +import org.scalatest.matchers.should.Matchers.{convertToAnyShouldWrapper, the} import org.apache.spark.sql.Row @@ -58,7 +58,7 @@ class FlintSparkCoveringIndexSqlITSuite extends FlintSparkSuite { indexData.count() shouldBe 2 } - test("create skipping index with streaming job options") { + test("create covering index with streaming job options") { withTempDir { checkpointDir => sql(s""" | CREATE INDEX $testIndex ON $testTable ( name ) @@ -77,7 +77,7 @@ class FlintSparkCoveringIndexSqlITSuite extends FlintSparkSuite { } } - test("create skipping index with index settings") { + test("create covering index with index settings") { sql(s""" | CREATE INDEX $testIndex ON $testTable ( name ) | WITH ( @@ -94,6 +94,15 @@ class FlintSparkCoveringIndexSqlITSuite extends FlintSparkSuite { (settings \ "index.number_of_replicas").extract[String] shouldBe "3" } + test("create covering index with invalid option") { + the[IllegalArgumentException] thrownBy + sql(s""" + | CREATE INDEX $testIndex ON $testTable + | (name, age) + | WITH (autoRefresh = true) + | """.stripMargin) + } + test("create covering index with manual refresh") { sql(s""" | CREATE INDEX $testIndex ON $testTable diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala index a688b1370..55893732e 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala @@ -15,7 +15,7 @@ import org.opensearch.flint.core.FlintOptions import org.opensearch.flint.core.storage.FlintOpenSearchClient import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.getSkippingIndexName import org.scalatest.matchers.must.Matchers.defined -import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper +import org.scalatest.matchers.should.Matchers.{convertToAnyShouldWrapper, the} import org.apache.spark.sql.Row import org.apache.spark.sql.flint.FlintDataSourceV2.FLINT_DATASOURCE @@ -99,6 +99,15 @@ class FlintSparkSkippingIndexSqlITSuite extends FlintSparkSuite { (settings \ "index.number_of_replicas").extract[String] shouldBe "2" } + test("create skipping index with invalid option") { + the[IllegalArgumentException] thrownBy + sql(s""" + | CREATE SKIPPING INDEX ON $testTable + | ( year PARTITION ) + | WITH (autoRefresh = true) + | """.stripMargin) + } + test("create skipping index with manual refresh") { sql(s""" | CREATE SKIPPING INDEX ON $testTable From e10deccebb85d31b9a6bf2ba98244cf779f69d75 Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Fri, 6 Oct 2023 15:23:06 -0700 Subject: [PATCH 5/7] Remove toString and add more UT Signed-off-by: Chen Dai --- .../flint/spark/FlintSparkIndexOptions.scala | 10 ---------- .../flint/spark/FlintSparkIndexOptionsSuite.scala | 11 +++++++++++ 2 files changed, 11 insertions(+), 10 deletions(-) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexOptions.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexOptions.scala index 973b84458..4db83cfff 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexOptions.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexOptions.scala @@ -5,8 +5,6 @@ package org.opensearch.flint.spark -import java.util.Locale - import org.opensearch.flint.spark.FlintSparkIndexOptions.OptionName.{ AUTO_REFRESH, CHECKPOINT_LOCATION, INDEX_SETTINGS, OptionName, REFRESH_INTERVAL } @@ -89,14 +87,6 @@ object FlintSparkIndexOptions { val REFRESH_INTERVAL: OptionName.Value = Value("refresh_interval") val CHECKPOINT_LOCATION: OptionName.Value = Value("checkpoint_location") val INDEX_SETTINGS: OptionName.Value = Value("index_settings") - - /** - * @return - * convert enum name to lowercase as public option name - */ - override def toString(): String = { - super.toString().toLowerCase(Locale.ROOT) - } } // This method has to be here otherwise Scala compilation failure diff --git a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexOptionsSuite.scala b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexOptionsSuite.scala index 2ee2eacb9..160a4c9d3 100644 --- a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexOptionsSuite.scala +++ b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexOptionsSuite.scala @@ -5,12 +5,20 @@ package org.opensearch.flint.spark +import org.opensearch.flint.spark.FlintSparkIndexOptions.OptionName.{AUTO_REFRESH, CHECKPOINT_LOCATION, INDEX_SETTINGS, REFRESH_INTERVAL} import org.scalatest.matchers.should.Matchers import org.apache.spark.FlintSuite class FlintSparkIndexOptionsSuite extends FlintSuite with Matchers { + test("should return lowercase name as option name") { + AUTO_REFRESH.toString shouldBe "auto_refresh" + REFRESH_INTERVAL.toString shouldBe "refresh_interval" + CHECKPOINT_LOCATION.toString shouldBe "checkpoint_location" + INDEX_SETTINGS.toString shouldBe "index_settings" + } + test("should return specified option value") { val options = FlintSparkIndexOptions( Map( @@ -47,6 +55,9 @@ class FlintSparkIndexOptionsSuite extends FlintSuite with Matchers { the[IllegalArgumentException] thrownBy FlintSparkIndexOptions(Map("autoRefresh" -> "true")) + the[IllegalArgumentException] thrownBy + FlintSparkIndexOptions(Map("AUTO_REFRESH" -> "true")) + the[IllegalArgumentException] thrownBy { FlintSparkIndexOptions(Map("auto_refresh" -> "true", "indexSetting" -> "test")) } should have message "requirement failed: option name indexSetting is invalid" From 05779e8878bfa70b20e19764e77ce2d0a78a43ef Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Fri, 6 Oct 2023 15:27:20 -0700 Subject: [PATCH 6/7] Update doc Signed-off-by: Chen Dai --- docs/index.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/docs/index.md b/docs/index.md index 44b0052b0..1f51c2d82 100644 --- a/docs/index.md +++ b/docs/index.md @@ -199,6 +199,8 @@ User can provide the following options in `WITH` clause of create statement: + `checkpoint_location`: a string as the location path for incremental refresh job checkpoint. The location has to be a path in an HDFS compatible file system and only applicable when auto refresh enabled. If unspecified, temporary checkpoint directory will be used and may result in checkpoint data lost upon restart. + `index_settings`: a JSON string as index settings for OpenSearch index that will be created. Please follow the format in OpenSearch documentation. If unspecified, default OpenSearch index settings will be applied. +Note that the index option name is case-sensitive. + ```sql WITH ( auto_refresh = [true|false], From c6b274a1ed6903581097c240e69971dfb0c28a19 Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Fri, 6 Oct 2023 15:43:53 -0700 Subject: [PATCH 7/7] Refactor validate method Signed-off-by: Chen Dai --- .../flint/spark/FlintSparkIndexOptions.scala | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexOptions.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexOptions.scala index 4db83cfff..b3e7535c3 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexOptions.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexOptions.scala @@ -5,9 +5,7 @@ package org.opensearch.flint.spark -import org.opensearch.flint.spark.FlintSparkIndexOptions.OptionName.{ - AUTO_REFRESH, CHECKPOINT_LOCATION, INDEX_SETTINGS, OptionName, REFRESH_INTERVAL -} +import org.opensearch.flint.spark.FlintSparkIndexOptions.OptionName.{AUTO_REFRESH, CHECKPOINT_LOCATION, INDEX_SETTINGS, OptionName, REFRESH_INTERVAL} import org.opensearch.flint.spark.FlintSparkIndexOptions.validateOptionNames /** @@ -89,12 +87,16 @@ object FlintSparkIndexOptions { val INDEX_SETTINGS: OptionName.Value = Value("index_settings") } - // This method has to be here otherwise Scala compilation failure + /** + * Validate option names and throw exception if any unknown found. + * + * @param options + * options given + */ def validateOptionNames(options: Map[String, String]): Unit = { - val allowedNames = OptionName.values.map(_.toString) - val unknownNames = options.keys.filterNot(allowedNames.contains) + val allOptions = OptionName.values.map(_.toString) + val invalidOptions = options.keys.filterNot(allOptions.contains) - require(unknownNames.isEmpty, - s"option name ${unknownNames.mkString(",")} is invalid") + require(invalidOptions.isEmpty, s"option name ${invalidOptions.mkString(",")} is invalid") } }