Skip to content

Commit

Permalink
Add validation for index options (opensearch-project#66)
Browse files Browse the repository at this point in the history
* Add validation for index options

Signed-off-by: Chen Dai <[email protected]>

* Add options with default method

Signed-off-by: Chen Dai <[email protected]>

* Change IT for options with default

Signed-off-by: Chen Dai <[email protected]>

* Add IT for invalid option test

Signed-off-by: Chen Dai <[email protected]>

* Remove toString and add more UT

Signed-off-by: Chen Dai <[email protected]>

* Update doc

Signed-off-by: Chen Dai <[email protected]>

* Refactor validate method

Signed-off-by: Chen Dai <[email protected]>

---------

Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen authored Oct 16, 2023
1 parent 15d7911 commit 2c184ca
Show file tree
Hide file tree
Showing 9 changed files with 146 additions and 14 deletions.
2 changes: 2 additions & 0 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,8 @@ User can provide the following options in `WITH` clause of create statement:
+ `checkpoint_location`: a string as the location path for incremental refresh job checkpoint. The location has to be a path in an HDFS compatible file system and only applicable when auto refresh enabled. If unspecified, temporary checkpoint directory will be used and may result in checkpoint data lost upon restart.
+ `index_settings`: a JSON string as index settings for OpenSearch index that will be created. Please follow the format in OpenSearch documentation. If unspecified, default OpenSearch index settings will be applied.

Note that the index option name is case-sensitive.

```sql
WITH (
auto_refresh = [true|false],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@

package org.opensearch.flint.spark

import org.opensearch.flint.spark.FlintSparkIndexOptions.OptionName.{AUTO_REFRESH, CHECKPOINT_LOCATION, INDEX_SETTINGS, OptionName, REFRESH_INTERVAL}
import org.opensearch.flint.spark.FlintSparkIndexOptions.validateOptionNames

/**
* Flint Spark index configurable options.
*
Expand All @@ -13,37 +16,57 @@ package org.opensearch.flint.spark
*/
case class FlintSparkIndexOptions(options: Map[String, String]) {

validateOptionNames(options)

/**
* Is Flint index auto refreshed or manual refreshed.
*
* @return
* auto refresh option value
*/
def autoRefresh(): Boolean = options.getOrElse("auto_refresh", "false").toBoolean
def autoRefresh(): Boolean = getOptionValue(AUTO_REFRESH).getOrElse("false").toBoolean

/**
* The refresh interval (only valid if auto refresh enabled).
*
* @return
* refresh interval expression
*/
def refreshInterval(): Option[String] = options.get("refresh_interval")
def refreshInterval(): Option[String] = getOptionValue(REFRESH_INTERVAL)

/**
* The checkpoint location which maybe required by Flint index's refresh.
*
* @return
* checkpoint location path
*/
def checkpointLocation(): Option[String] = options.get("checkpoint_location")
def checkpointLocation(): Option[String] = getOptionValue(CHECKPOINT_LOCATION)

/**
* The index settings for OpenSearch index created.
*
* @return
* index setting JSON
*/
def indexSettings(): Option[String] = options.get("index_settings")
def indexSettings(): Option[String] = getOptionValue(INDEX_SETTINGS)

/**
* @return
* all option values and fill default value if unspecified
*/
def optionsWithDefault: Map[String, String] = {
val map = Map.newBuilder[String, String]
map ++= options

if (!options.contains(AUTO_REFRESH.toString)) {
map += (AUTO_REFRESH.toString -> autoRefresh().toString)
}
map.result()
}

private def getOptionValue(name: OptionName): Option[String] = {
options.get(name.toString)
}
}

object FlintSparkIndexOptions {
Expand All @@ -52,4 +75,28 @@ object FlintSparkIndexOptions {
* Empty options
*/
val empty: FlintSparkIndexOptions = FlintSparkIndexOptions(Map.empty)

/**
* Option name Enum.
*/
object OptionName extends Enumeration {
type OptionName = Value
val AUTO_REFRESH: OptionName.Value = Value("auto_refresh")
val REFRESH_INTERVAL: OptionName.Value = Value("refresh_interval")
val CHECKPOINT_LOCATION: OptionName.Value = Value("checkpoint_location")
val INDEX_SETTINGS: OptionName.Value = Value("index_settings")
}

/**
* Validate option names and throw exception if any unknown found.
*
* @param options
* options given
*/
def validateOptionNames(options: Map[String, String]): Unit = {
val allOptions = OptionName.values.map(_.toString)
val invalidOptions = options.keys.filterNot(allOptions.contains)

require(invalidOptions.isEmpty, s"option name ${invalidOptions.mkString(",")} is invalid")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ case class FlintSparkCoveringIndex(
}

private def getIndexOptions: String = {
Serialization.write(options.options)
Serialization.write(options.optionsWithDefault)
}

private def getIndexProperties: String = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ class FlintSparkSkippingIndex(
}

private def getIndexOptions: String = {
Serialization.write(options.options)
Serialization.write(options.optionsWithDefault)
}

private def getIndexProperties: String = {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.spark

import org.opensearch.flint.spark.FlintSparkIndexOptions.OptionName.{AUTO_REFRESH, CHECKPOINT_LOCATION, INDEX_SETTINGS, REFRESH_INTERVAL}
import org.scalatest.matchers.should.Matchers

import org.apache.spark.FlintSuite

class FlintSparkIndexOptionsSuite extends FlintSuite with Matchers {

test("should return lowercase name as option name") {
AUTO_REFRESH.toString shouldBe "auto_refresh"
REFRESH_INTERVAL.toString shouldBe "refresh_interval"
CHECKPOINT_LOCATION.toString shouldBe "checkpoint_location"
INDEX_SETTINGS.toString shouldBe "index_settings"
}

test("should return specified option value") {
val options = FlintSparkIndexOptions(
Map(
"auto_refresh" -> "true",
"refresh_interval" -> "1 Minute",
"checkpoint_location" -> "s3://test/",
"index_settings" -> """{"number_of_shards": 3}"""))

options.autoRefresh() shouldBe true
options.refreshInterval() shouldBe Some("1 Minute")
options.checkpointLocation() shouldBe Some("s3://test/")
options.indexSettings() shouldBe Some("""{"number_of_shards": 3}""")
}

test("should return default option value if unspecified") {
val options = FlintSparkIndexOptions(Map.empty)

options.autoRefresh() shouldBe false
options.refreshInterval() shouldBe empty
options.checkpointLocation() shouldBe empty
options.indexSettings() shouldBe empty
options.optionsWithDefault should contain("auto_refresh" -> "false")
}

test("should return default option value if unspecified with specified value") {
val options = FlintSparkIndexOptions(Map("refresh_interval" -> "1 Minute"))

options.optionsWithDefault shouldBe Map(
"auto_refresh" -> "false",
"refresh_interval" -> "1 Minute")
}

test("should report error if any unknown option name") {
the[IllegalArgumentException] thrownBy
FlintSparkIndexOptions(Map("autoRefresh" -> "true"))

the[IllegalArgumentException] thrownBy
FlintSparkIndexOptions(Map("AUTO_REFRESH" -> "true"))

the[IllegalArgumentException] thrownBy {
FlintSparkIndexOptions(Map("auto_refresh" -> "true", "indexSetting" -> "test"))
} should have message "requirement failed: option name indexSetting is invalid"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ class FlintSparkCoveringIndexITSuite extends FlintSparkSuite {
| "columnType": "int"
| }],
| "source": "spark_catalog.default.ci_test",
| "options": {},
| "options": { "auto_refresh": "false" },
| "properties": {}
| },
| "properties": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import org.opensearch.flint.core.storage.FlintOpenSearchClient
import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex.getFlintIndexName
import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.getSkippingIndexName
import org.scalatest.matchers.must.Matchers.defined
import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper
import org.scalatest.matchers.should.Matchers.{convertToAnyShouldWrapper, the}

import org.apache.spark.sql.Row

Expand Down Expand Up @@ -58,7 +58,7 @@ class FlintSparkCoveringIndexSqlITSuite extends FlintSparkSuite {
indexData.count() shouldBe 2
}

test("create skipping index with streaming job options") {
test("create covering index with streaming job options") {
withTempDir { checkpointDir =>
sql(s"""
| CREATE INDEX $testIndex ON $testTable ( name )
Expand All @@ -77,7 +77,7 @@ class FlintSparkCoveringIndexSqlITSuite extends FlintSparkSuite {
}
}

test("create skipping index with index settings") {
test("create covering index with index settings") {
sql(s"""
| CREATE INDEX $testIndex ON $testTable ( name )
| WITH (
Expand All @@ -94,6 +94,15 @@ class FlintSparkCoveringIndexSqlITSuite extends FlintSparkSuite {
(settings \ "index.number_of_replicas").extract[String] shouldBe "3"
}

test("create covering index with invalid option") {
the[IllegalArgumentException] thrownBy
sql(s"""
| CREATE INDEX $testIndex ON $testTable
| (name, age)
| WITH (autoRefresh = true)
| """.stripMargin)
}

test("create covering index with manual refresh") {
sql(s"""
| CREATE INDEX $testIndex ON $testTable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite {
| "columnType": "int"
| }],
| "source": "spark_catalog.default.test",
| "options": {},
| "options": { "auto_refresh": "false" },
| "properties": {}
| },
| "properties": {
Expand All @@ -105,7 +105,7 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite {
| }
|""".stripMargin)

index.get.options shouldBe FlintSparkIndexOptions.empty
index.get.options shouldBe FlintSparkIndexOptions(Map("auto_refresh" -> "false"))
}

test("create skipping index with index options successfully") {
Expand Down Expand Up @@ -522,7 +522,7 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite {
| "columnType": "struct<subfield1:string,subfield2:int>"
| }],
| "source": "$testTable",
| "options": {},
| "options": { "auto_refresh": "false" },
| "properties": {}
| },
| "properties": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import org.opensearch.flint.core.FlintOptions
import org.opensearch.flint.core.storage.FlintOpenSearchClient
import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.getSkippingIndexName
import org.scalatest.matchers.must.Matchers.defined
import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper
import org.scalatest.matchers.should.Matchers.{convertToAnyShouldWrapper, the}

import org.apache.spark.sql.Row
import org.apache.spark.sql.flint.FlintDataSourceV2.FLINT_DATASOURCE
Expand Down Expand Up @@ -99,6 +99,15 @@ class FlintSparkSkippingIndexSqlITSuite extends FlintSparkSuite {
(settings \ "index.number_of_replicas").extract[String] shouldBe "2"
}

test("create skipping index with invalid option") {
the[IllegalArgumentException] thrownBy
sql(s"""
| CREATE SKIPPING INDEX ON $testTable
| ( year PARTITION )
| WITH (autoRefresh = true)
| """.stripMargin)
}

test("create skipping index with manual refresh") {
sql(s"""
| CREATE SKIPPING INDEX ON $testTable
Expand Down

0 comments on commit 2c184ca

Please sign in to comment.