diff --git a/docs/index.md b/docs/index.md index bd593548c..cd52051a1 100644 --- a/docs/index.md +++ b/docs/index.md @@ -197,6 +197,7 @@ User can provide the following options in `WITH` clause of create statement: + `auto_refresh`: triggers Incremental Refresh immediately after index create complete if true. Otherwise, user has to trigger Full Refresh by `REFRESH` statement manually. + `refresh_interval`: a string as the time interval for incremental refresh, e.g. 1 minute, 10 seconds. This is only applicable when auto refresh enabled. Please check `org.apache.spark.unsafe.types.CalendarInterval` for valid duration identifiers. By default, next micro batch will be generated as soon as the previous one complete processing. + `checkpoint_location`: a string as the location path for incremental refresh job checkpoint. The location has to be a path in an HDFS compatible file system and only applicable when auto refresh enabled. If unspecified, temporary checkpoint directory will be used and may result in checkpoint data lost upon restart. ++ `index_settings`: a JSON string as index settings for OpenSearch index that will be created. Please follow the format in OpenSearch documentation. If unspecified, default OpenSearch index settings will be applied. ```sql WITH ( diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/FlintMetadata.java b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/FlintMetadata.java index d93c17837..6773c3897 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/FlintMetadata.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/FlintMetadata.java @@ -14,11 +14,27 @@ public class FlintMetadata { // TODO: define metadata format and create strong-typed class private final String content; + // TODO: piggyback optional index settings and will refactor as above + private String indexSettings; + public FlintMetadata(String content) { this.content = content; } + public FlintMetadata(String content, String indexSettings) { + this.content = content; + this.indexSettings = indexSettings; + } + public String getContent() { return content; } + + public String getIndexSettings() { + return indexSettings; + } + + public void setIndexSettings(String indexSettings) { + this.indexSettings = indexSettings; + } } diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java index 8ad13c881..b35406707 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java @@ -13,6 +13,7 @@ import java.io.IOException; import java.lang.reflect.Constructor; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; @@ -28,6 +29,7 @@ import org.opensearch.client.RestHighLevelClient; import org.opensearch.client.indices.CreateIndexRequest; import org.opensearch.client.indices.GetIndexRequest; +import org.opensearch.client.indices.GetIndexResponse; import org.opensearch.client.indices.GetMappingsRequest; import org.opensearch.client.indices.GetMappingsResponse; import org.opensearch.cluster.metadata.MappingMetadata; @@ -70,6 +72,9 @@ public FlintOpenSearchClient(FlintOptions options) { CreateIndexRequest request = new CreateIndexRequest(indexName); request.mapping(metadata.getContent(), XContentType.JSON); + if (metadata.getIndexSettings() != null) { + request.settings(metadata.getIndexSettings(), XContentType.JSON); + } client.indices().create(request, RequestOptions.DEFAULT); } catch (Exception e) { throw new IllegalStateException("Failed to create Flint index " + indexName, e); @@ -86,11 +91,13 @@ public FlintOpenSearchClient(FlintOptions options) { @Override public List getAllIndexMetadata(String indexNamePattern) { try (RestHighLevelClient client = createClient()) { - GetMappingsRequest request = new GetMappingsRequest().indices(indexNamePattern); - GetMappingsResponse response = client.indices().getMapping(request, RequestOptions.DEFAULT); + GetIndexRequest request = new GetIndexRequest(indexNamePattern); + GetIndexResponse response = client.indices().get(request, RequestOptions.DEFAULT); - return response.mappings().values().stream() - .map(mapping -> new FlintMetadata(mapping.source().string())) + return Arrays.stream(response.getIndices()) + .map(index -> new FlintMetadata( + response.getMappings().get(index).source().toString(), + response.getSettings().get(index).toString())) .collect(Collectors.toList()); } catch (Exception e) { throw new IllegalStateException("Failed to get Flint index metadata for " + indexNamePattern, e); @@ -99,11 +106,12 @@ public FlintOpenSearchClient(FlintOptions options) { @Override public FlintMetadata getIndexMetadata(String indexName) { try (RestHighLevelClient client = createClient()) { - GetMappingsRequest request = new GetMappingsRequest().indices(indexName); - GetMappingsResponse response = client.indices().getMapping(request, RequestOptions.DEFAULT); + GetIndexRequest request = new GetIndexRequest(indexName); + GetIndexResponse response = client.indices().get(request, RequestOptions.DEFAULT); - MappingMetadata mapping = response.mappings().get(indexName); - return new FlintMetadata(mapping.source().string()); + MappingMetadata mapping = response.getMappings().get(indexName); + Settings settings = response.getSettings().get(indexName); + return new FlintMetadata(mapping.source().string(), settings.toString()); } catch (Exception e) { throw new IllegalStateException("Failed to get Flint index metadata for " + indexName, e); } diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala index b3de3c4b6..4a4885ecb 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala @@ -86,7 +86,9 @@ class FlintSpark(val spark: SparkSession) { throw new IllegalStateException(s"Flint index $indexName already exists") } } else { - flintClient.createIndex(indexName, index.metadata()) + val metadata = index.metadata() + index.options.indexSettings().foreach(metadata.setIndexSettings) + flintClient.createIndex(indexName, metadata) } } diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexOptions.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexOptions.scala index f348c12a0..c6f546605 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexOptions.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexOptions.scala @@ -36,6 +36,14 @@ case class FlintSparkIndexOptions(options: Map[String, String]) { * checkpoint location path */ def checkpointLocation(): Option[String] = options.get("checkpoint_location") + + /** + * The index settings for OpenSearch index created. + * + * @return + * index setting JSON + */ + def indexSettings(): Option[String] = options.get("index_settings") } object FlintSparkIndexOptions { diff --git a/integ-test/src/test/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala index e8fab62de..d2c7f3969 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala @@ -8,6 +8,9 @@ package org.opensearch.flint.core import scala.collection.JavaConverters._ import com.stephenn.scalatest.jsonassert.JsonMatchers.matchJson +import org.json4s.{Formats, NoTypeHints} +import org.json4s.native.JsonMethods.parse +import org.json4s.native.Serialization import org.opensearch.client.json.jackson.JacksonJsonpMapper import org.opensearch.client.opensearch.OpenSearchClient import org.opensearch.client.transport.rest_client.RestClientTransport @@ -46,11 +49,28 @@ class FlintOpenSearchClientSuite extends AnyFlatSpec with OpenSearchSuite with M flintClient.getIndexMetadata(indexName).getContent should matchJson(content) } + it should "create index with settings" in { + val indexName = "flint_test_with_settings" + val indexSettings = "{\"number_of_shards\": 3,\"number_of_replicas\": 2}" + flintClient.createIndex(indexName, new FlintMetadata("{}", indexSettings)) + + flintClient.exists(indexName) shouldBe true + + // OS uses full setting name ("index" prefix) and store as string + implicit val formats: Formats = Serialization.formats(NoTypeHints) + val settings = parse(flintClient.getIndexMetadata(indexName).getIndexSettings) + (settings \ "index.number_of_shards").extract[String] shouldBe "3" + (settings \ "index.number_of_replicas").extract[String] shouldBe "2" + } + it should "get all index metadata with the given index name pattern" in { flintClient.createIndex("flint_test_1_index", new FlintMetadata("{}")) flintClient.createIndex("flint_test_2_index", new FlintMetadata("{}")) - flintClient.getAllIndexMetadata("flint_*_index") should have size 2 + val allMetadata = flintClient.getAllIndexMetadata("flint_*_index") + allMetadata should have size 2 + allMetadata.forEach(metadata => metadata.getContent shouldBe "{}") + allMetadata.forEach(metadata => metadata.getIndexSettings should not be empty) } it should "return false if index not exist" in { diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala index e50b17b0a..892a8faa4 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala @@ -6,7 +6,13 @@ package org.opensearch.flint.spark import scala.Option.empty +import scala.collection.JavaConverters.mapAsJavaMapConverter +import org.json4s.{Formats, NoTypeHints} +import org.json4s.native.JsonMethods.parse +import org.json4s.native.Serialization +import org.opensearch.flint.core.FlintOptions +import org.opensearch.flint.core.storage.FlintOpenSearchClient import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex.getFlintIndexName import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.getSkippingIndexName import org.scalatest.matchers.must.Matchers.defined @@ -52,6 +58,42 @@ class FlintSparkCoveringIndexSqlITSuite extends FlintSparkSuite { indexData.count() shouldBe 2 } + test("create skipping index with streaming job options") { + withTempDir { checkpointDir => + sql(s""" + | CREATE INDEX $testIndex ON $testTable ( name ) + | WITH ( + | auto_refresh = true, + | refresh_interval = '5 Seconds', + | checkpoint_location = '${checkpointDir.getAbsolutePath}' + | ) + | """.stripMargin) + + val index = flint.describeIndex(testFlintIndex) + index shouldBe defined + index.get.options.autoRefresh() shouldBe true + index.get.options.refreshInterval() shouldBe Some("5 Seconds") + index.get.options.checkpointLocation() shouldBe Some(checkpointDir.getAbsolutePath) + } + } + + test("create skipping index with index settings") { + sql(s""" + | CREATE INDEX $testIndex ON $testTable ( name ) + | WITH ( + | index_settings = '{"number_of_shards": 2, "number_of_replicas": 3}' + | ) + |""".stripMargin) + + // Check if the index setting option is set to OS index setting + val flintClient = new FlintOpenSearchClient(new FlintOptions(openSearchOptions.asJava)) + + implicit val formats: Formats = Serialization.formats(NoTypeHints) + val settings = parse(flintClient.getIndexMetadata(testFlintIndex).getIndexSettings) + (settings \ "index.number_of_shards").extract[String] shouldBe "2" + (settings \ "index.number_of_replicas").extract[String] shouldBe "3" + } + test("create covering index with manual refresh") { sql(s""" | CREATE INDEX $testIndex ON $testTable diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala index 8f2de17f0..02dc681d7 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala @@ -6,6 +6,7 @@ package org.opensearch.flint.spark import com.stephenn.scalatest.jsonassert.JsonMatchers.matchJson +import org.json4s.native.JsonMethods._ import org.opensearch.flint.core.FlintVersion.current import org.opensearch.flint.spark.FlintSpark.RefreshMode.{FULL, INCREMENTAL} import org.opensearch.flint.spark.FlintSparkIndex.ID_COLUMN @@ -115,15 +116,29 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { .options(FlintSparkIndexOptions(Map( "auto_refresh" -> "true", "refresh_interval" -> "1 Minute", - "checkpoint_location" -> "s3a://test/" - ))) + "checkpoint_location" -> "s3a://test/", + "index_settings" -> "{\"number_of_shards\": 3,\"number_of_replicas\": 2}"))) .create() val index = flint.describeIndex(testIndex) index shouldBe defined + val optionJson = compact(render( + parse(index.get.metadata().getContent) \ "_meta" \ "options")) + optionJson should matchJson(""" + | { + | "auto_refresh": "true", + | "refresh_interval": "1 Minute", + | "checkpoint_location": "s3a://test/", + | "index_settings": "{\"number_of_shards\": 3,\"number_of_replicas\": 2}" + | } + |""".stripMargin) + + // Load index options from index mapping (verify OS index setting in SQL IT) index.get.options.autoRefresh() shouldBe true index.get.options.refreshInterval() shouldBe Some("1 Minute") index.get.options.checkpointLocation() shouldBe Some("s3a://test/") + index.get.options.indexSettings() shouldBe + Some("{\"number_of_shards\": 3,\"number_of_replicas\": 2}") } test("should not have ID column in index data") { diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala index 23a1bb542..a688b1370 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala @@ -6,7 +6,13 @@ package org.opensearch.flint.spark import scala.Option.empty +import scala.collection.JavaConverters.mapAsJavaMapConverter +import org.json4s.{Formats, NoTypeHints} +import org.json4s.native.JsonMethods.parse +import org.json4s.native.Serialization +import org.opensearch.flint.core.FlintOptions +import org.opensearch.flint.core.storage.FlintOpenSearchClient import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.getSkippingIndexName import org.scalatest.matchers.must.Matchers.defined import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper @@ -75,6 +81,24 @@ class FlintSparkSkippingIndexSqlITSuite extends FlintSparkSuite { } } + test("create skipping index with index settings") { + sql(s""" + | CREATE SKIPPING INDEX ON $testTable + | ( year PARTITION ) + | WITH ( + | index_settings = '{"number_of_shards": 3, "number_of_replicas": 2}' + | ) + |""".stripMargin) + + // Check if the index setting option is set to OS index setting + val flintClient = new FlintOpenSearchClient(new FlintOptions(openSearchOptions.asJava)) + + implicit val formats: Formats = Serialization.formats(NoTypeHints) + val settings = parse(flintClient.getIndexMetadata(testIndex).getIndexSettings) + (settings \ "index.number_of_shards").extract[String] shouldBe "3" + (settings \ "index.number_of_replicas").extract[String] shouldBe "2" + } + test("create skipping index with manual refresh") { sql(s""" | CREATE SKIPPING INDEX ON $testTable