diff --git a/docs/index.md b/docs/index.md index 44b0052b0..1f51c2d82 100644 --- a/docs/index.md +++ b/docs/index.md @@ -199,6 +199,8 @@ User can provide the following options in `WITH` clause of create statement: + `checkpoint_location`: a string as the location path for incremental refresh job checkpoint. The location has to be a path in an HDFS compatible file system and only applicable when auto refresh enabled. If unspecified, temporary checkpoint directory will be used and may result in checkpoint data lost upon restart. + `index_settings`: a JSON string as index settings for OpenSearch index that will be created. Please follow the format in OpenSearch documentation. If unspecified, default OpenSearch index settings will be applied. +Note that the index option name is case-sensitive. + ```sql WITH ( auto_refresh = [true|false], diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexOptions.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexOptions.scala index c6f546605..b3e7535c3 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexOptions.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexOptions.scala @@ -5,6 +5,9 @@ package org.opensearch.flint.spark +import org.opensearch.flint.spark.FlintSparkIndexOptions.OptionName.{AUTO_REFRESH, CHECKPOINT_LOCATION, INDEX_SETTINGS, OptionName, REFRESH_INTERVAL} +import org.opensearch.flint.spark.FlintSparkIndexOptions.validateOptionNames + /** * Flint Spark index configurable options. * @@ -13,13 +16,15 @@ package org.opensearch.flint.spark */ case class FlintSparkIndexOptions(options: Map[String, String]) { + validateOptionNames(options) + /** * Is Flint index auto refreshed or manual refreshed. * * @return * auto refresh option value */ - def autoRefresh(): Boolean = options.getOrElse("auto_refresh", "false").toBoolean + def autoRefresh(): Boolean = getOptionValue(AUTO_REFRESH).getOrElse("false").toBoolean /** * The refresh interval (only valid if auto refresh enabled). @@ -27,7 +32,7 @@ case class FlintSparkIndexOptions(options: Map[String, String]) { * @return * refresh interval expression */ - def refreshInterval(): Option[String] = options.get("refresh_interval") + def refreshInterval(): Option[String] = getOptionValue(REFRESH_INTERVAL) /** * The checkpoint location which maybe required by Flint index's refresh. @@ -35,7 +40,7 @@ case class FlintSparkIndexOptions(options: Map[String, String]) { * @return * checkpoint location path */ - def checkpointLocation(): Option[String] = options.get("checkpoint_location") + def checkpointLocation(): Option[String] = getOptionValue(CHECKPOINT_LOCATION) /** * The index settings for OpenSearch index created. @@ -43,7 +48,25 @@ case class FlintSparkIndexOptions(options: Map[String, String]) { * @return * index setting JSON */ - def indexSettings(): Option[String] = options.get("index_settings") + def indexSettings(): Option[String] = getOptionValue(INDEX_SETTINGS) + + /** + * @return + * all option values and fill default value if unspecified + */ + def optionsWithDefault: Map[String, String] = { + val map = Map.newBuilder[String, String] + map ++= options + + if (!options.contains(AUTO_REFRESH.toString)) { + map += (AUTO_REFRESH.toString -> autoRefresh().toString) + } + map.result() + } + + private def getOptionValue(name: OptionName): Option[String] = { + options.get(name.toString) + } } object FlintSparkIndexOptions { @@ -52,4 +75,28 @@ object FlintSparkIndexOptions { * Empty options */ val empty: FlintSparkIndexOptions = FlintSparkIndexOptions(Map.empty) + + /** + * Option name Enum. + */ + object OptionName extends Enumeration { + type OptionName = Value + val AUTO_REFRESH: OptionName.Value = Value("auto_refresh") + val REFRESH_INTERVAL: OptionName.Value = Value("refresh_interval") + val CHECKPOINT_LOCATION: OptionName.Value = Value("checkpoint_location") + val INDEX_SETTINGS: OptionName.Value = Value("index_settings") + } + + /** + * Validate option names and throw exception if any unknown found. + * + * @param options + * options given + */ + def validateOptionNames(options: Map[String, String]): Unit = { + val allOptions = OptionName.values.map(_.toString) + val invalidOptions = options.keys.filterNot(allOptions.contains) + + require(invalidOptions.isEmpty, s"option name ${invalidOptions.mkString(",")} is invalid") + } } diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala index 3db325c3e..f7e20b339 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala @@ -74,7 +74,7 @@ case class FlintSparkCoveringIndex( } private def getIndexOptions: String = { - Serialization.write(options.options) + Serialization.write(options.optionsWithDefault) } private def getIndexProperties: String = { diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala index dd9cb6bdf..0749d13f5 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala @@ -87,7 +87,7 @@ class FlintSparkSkippingIndex( } private def getIndexOptions: String = { - Serialization.write(options.options) + Serialization.write(options.optionsWithDefault) } private def getIndexProperties: String = { diff --git a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexOptionsSuite.scala b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexOptionsSuite.scala new file mode 100644 index 000000000..160a4c9d3 --- /dev/null +++ b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexOptionsSuite.scala @@ -0,0 +1,65 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.spark + +import org.opensearch.flint.spark.FlintSparkIndexOptions.OptionName.{AUTO_REFRESH, CHECKPOINT_LOCATION, INDEX_SETTINGS, REFRESH_INTERVAL} +import org.scalatest.matchers.should.Matchers + +import org.apache.spark.FlintSuite + +class FlintSparkIndexOptionsSuite extends FlintSuite with Matchers { + + test("should return lowercase name as option name") { + AUTO_REFRESH.toString shouldBe "auto_refresh" + REFRESH_INTERVAL.toString shouldBe "refresh_interval" + CHECKPOINT_LOCATION.toString shouldBe "checkpoint_location" + INDEX_SETTINGS.toString shouldBe "index_settings" + } + + test("should return specified option value") { + val options = FlintSparkIndexOptions( + Map( + "auto_refresh" -> "true", + "refresh_interval" -> "1 Minute", + "checkpoint_location" -> "s3://test/", + "index_settings" -> """{"number_of_shards": 3}""")) + + options.autoRefresh() shouldBe true + options.refreshInterval() shouldBe Some("1 Minute") + options.checkpointLocation() shouldBe Some("s3://test/") + options.indexSettings() shouldBe Some("""{"number_of_shards": 3}""") + } + + test("should return default option value if unspecified") { + val options = FlintSparkIndexOptions(Map.empty) + + options.autoRefresh() shouldBe false + options.refreshInterval() shouldBe empty + options.checkpointLocation() shouldBe empty + options.indexSettings() shouldBe empty + options.optionsWithDefault should contain("auto_refresh" -> "false") + } + + test("should return default option value if unspecified with specified value") { + val options = FlintSparkIndexOptions(Map("refresh_interval" -> "1 Minute")) + + options.optionsWithDefault shouldBe Map( + "auto_refresh" -> "false", + "refresh_interval" -> "1 Minute") + } + + test("should report error if any unknown option name") { + the[IllegalArgumentException] thrownBy + FlintSparkIndexOptions(Map("autoRefresh" -> "true")) + + the[IllegalArgumentException] thrownBy + FlintSparkIndexOptions(Map("AUTO_REFRESH" -> "true")) + + the[IllegalArgumentException] thrownBy { + FlintSparkIndexOptions(Map("auto_refresh" -> "true", "indexSetting" -> "test")) + } should have message "requirement failed: option name indexSetting is invalid" + } +} diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexITSuite.scala index ac0b33746..b0938966f 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexITSuite.scala @@ -57,7 +57,7 @@ class FlintSparkCoveringIndexITSuite extends FlintSparkSuite { | "columnType": "int" | }], | "source": "spark_catalog.default.ci_test", - | "options": {}, + | "options": { "auto_refresh": "false" }, | "properties": {} | }, | "properties": { diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala index 892a8faa4..0e4c8bd67 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala @@ -16,7 +16,7 @@ import org.opensearch.flint.core.storage.FlintOpenSearchClient import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex.getFlintIndexName import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.getSkippingIndexName import org.scalatest.matchers.must.Matchers.defined -import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper +import org.scalatest.matchers.should.Matchers.{convertToAnyShouldWrapper, the} import org.apache.spark.sql.Row @@ -58,7 +58,7 @@ class FlintSparkCoveringIndexSqlITSuite extends FlintSparkSuite { indexData.count() shouldBe 2 } - test("create skipping index with streaming job options") { + test("create covering index with streaming job options") { withTempDir { checkpointDir => sql(s""" | CREATE INDEX $testIndex ON $testTable ( name ) @@ -77,7 +77,7 @@ class FlintSparkCoveringIndexSqlITSuite extends FlintSparkSuite { } } - test("create skipping index with index settings") { + test("create covering index with index settings") { sql(s""" | CREATE INDEX $testIndex ON $testTable ( name ) | WITH ( @@ -94,6 +94,15 @@ class FlintSparkCoveringIndexSqlITSuite extends FlintSparkSuite { (settings \ "index.number_of_replicas").extract[String] shouldBe "3" } + test("create covering index with invalid option") { + the[IllegalArgumentException] thrownBy + sql(s""" + | CREATE INDEX $testIndex ON $testTable + | (name, age) + | WITH (autoRefresh = true) + | """.stripMargin) + } + test("create covering index with manual refresh") { sql(s""" | CREATE INDEX $testIndex ON $testTable diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala index 02dc681d7..e3fb467e6 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala @@ -79,7 +79,7 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { | "columnType": "int" | }], | "source": "spark_catalog.default.test", - | "options": {}, + | "options": { "auto_refresh": "false" }, | "properties": {} | }, | "properties": { @@ -105,7 +105,7 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { | } |""".stripMargin) - index.get.options shouldBe FlintSparkIndexOptions.empty + index.get.options shouldBe FlintSparkIndexOptions(Map("auto_refresh" -> "false")) } test("create skipping index with index options successfully") { @@ -522,7 +522,7 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { | "columnType": "struct" | }], | "source": "$testTable", - | "options": {}, + | "options": { "auto_refresh": "false" }, | "properties": {} | }, | "properties": { diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala index a688b1370..55893732e 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala @@ -15,7 +15,7 @@ import org.opensearch.flint.core.FlintOptions import org.opensearch.flint.core.storage.FlintOpenSearchClient import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.getSkippingIndexName import org.scalatest.matchers.must.Matchers.defined -import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper +import org.scalatest.matchers.should.Matchers.{convertToAnyShouldWrapper, the} import org.apache.spark.sql.Row import org.apache.spark.sql.flint.FlintDataSourceV2.FLINT_DATASOURCE @@ -99,6 +99,15 @@ class FlintSparkSkippingIndexSqlITSuite extends FlintSparkSuite { (settings \ "index.number_of_replicas").extract[String] shouldBe "2" } + test("create skipping index with invalid option") { + the[IllegalArgumentException] thrownBy + sql(s""" + | CREATE SKIPPING INDEX ON $testTable + | ( year PARTITION ) + | WITH (autoRefresh = true) + | """.stripMargin) + } + test("create skipping index with manual refresh") { sql(s""" | CREATE SKIPPING INDEX ON $testTable