Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add index settings option in create statement #44

Merged
merged 8 commits into from
Oct 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ User can provide the following options in `WITH` clause of create statement:
+ `auto_refresh`: triggers Incremental Refresh immediately after index create complete if true. Otherwise, user has to trigger Full Refresh by `REFRESH` statement manually.
+ `refresh_interval`: a string as the time interval for incremental refresh, e.g. 1 minute, 10 seconds. This is only applicable when auto refresh enabled. Please check `org.apache.spark.unsafe.types.CalendarInterval` for valid duration identifiers. By default, next micro batch will be generated as soon as the previous one complete processing.
+ `checkpoint_location`: a string as the location path for incremental refresh job checkpoint. The location has to be a path in an HDFS compatible file system and only applicable when auto refresh enabled. If unspecified, temporary checkpoint directory will be used and may result in checkpoint data lost upon restart.
+ `index_settings`: a JSON string as index settings for OpenSearch index that will be created. Please follow the format in OpenSearch documentation. If unspecified, default OpenSearch index settings will be applied.

```sql
WITH (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,27 @@ public class FlintMetadata {
// TODO: define metadata format and create strong-typed class
private final String content;

// TODO: piggyback optional index settings and will refactor as above
private String indexSettings;

public FlintMetadata(String content) {
this.content = content;
}

public FlintMetadata(String content, String indexSettings) {
this.content = content;
this.indexSettings = indexSettings;
}

public String getContent() {
return content;
}

public String getIndexSettings() {
return indexSettings;
}

public void setIndexSettings(String indexSettings) {
this.indexSettings = indexSettings;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
Expand All @@ -28,6 +29,7 @@
import org.opensearch.client.RestHighLevelClient;
import org.opensearch.client.indices.CreateIndexRequest;
import org.opensearch.client.indices.GetIndexRequest;
import org.opensearch.client.indices.GetIndexResponse;
import org.opensearch.client.indices.GetMappingsRequest;
import org.opensearch.client.indices.GetMappingsResponse;
import org.opensearch.cluster.metadata.MappingMetadata;
Expand Down Expand Up @@ -70,6 +72,9 @@ public FlintOpenSearchClient(FlintOptions options) {
CreateIndexRequest request = new CreateIndexRequest(indexName);
request.mapping(metadata.getContent(), XContentType.JSON);

if (metadata.getIndexSettings() != null) {
penghuo marked this conversation as resolved.
Show resolved Hide resolved
request.settings(metadata.getIndexSettings(), XContentType.JSON);
}
client.indices().create(request, RequestOptions.DEFAULT);
} catch (Exception e) {
throw new IllegalStateException("Failed to create Flint index " + indexName, e);
Expand All @@ -86,11 +91,13 @@ public FlintOpenSearchClient(FlintOptions options) {

@Override public List<FlintMetadata> getAllIndexMetadata(String indexNamePattern) {
try (RestHighLevelClient client = createClient()) {
GetMappingsRequest request = new GetMappingsRequest().indices(indexNamePattern);
GetMappingsResponse response = client.indices().getMapping(request, RequestOptions.DEFAULT);
GetIndexRequest request = new GetIndexRequest(indexNamePattern);
GetIndexResponse response = client.indices().get(request, RequestOptions.DEFAULT);

return response.mappings().values().stream()
.map(mapping -> new FlintMetadata(mapping.source().string()))
return Arrays.stream(response.getIndices())
.map(index -> new FlintMetadata(
response.getMappings().get(index).source().toString(),
response.getSettings().get(index).toString()))
.collect(Collectors.toList());
} catch (Exception e) {
throw new IllegalStateException("Failed to get Flint index metadata for " + indexNamePattern, e);
Expand All @@ -99,11 +106,12 @@ public FlintOpenSearchClient(FlintOptions options) {

@Override public FlintMetadata getIndexMetadata(String indexName) {
try (RestHighLevelClient client = createClient()) {
GetMappingsRequest request = new GetMappingsRequest().indices(indexName);
GetMappingsResponse response = client.indices().getMapping(request, RequestOptions.DEFAULT);
GetIndexRequest request = new GetIndexRequest(indexName);
GetIndexResponse response = client.indices().get(request, RequestOptions.DEFAULT);

MappingMetadata mapping = response.mappings().get(indexName);
return new FlintMetadata(mapping.source().string());
MappingMetadata mapping = response.getMappings().get(indexName);
Settings settings = response.getSettings().get(indexName);
return new FlintMetadata(mapping.source().string(), settings.toString());
} catch (Exception e) {
throw new IllegalStateException("Failed to get Flint index metadata for " + indexName, e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,9 @@ class FlintSpark(val spark: SparkSession) {
throw new IllegalStateException(s"Flint index $indexName already exists")
}
} else {
flintClient.createIndex(indexName, index.metadata())
val metadata = index.metadata()
index.options.indexSettings().foreach(metadata.setIndexSettings)
flintClient.createIndex(indexName, metadata)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,14 @@ case class FlintSparkIndexOptions(options: Map[String, String]) {
* checkpoint location path
*/
def checkpointLocation(): Option[String] = options.get("checkpoint_location")

/**
* The index settings for OpenSearch index created.
*
* @return
* index setting JSON
*/
def indexSettings(): Option[String] = options.get("index_settings")
}

object FlintSparkIndexOptions {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ package org.opensearch.flint.core
import scala.collection.JavaConverters._

import com.stephenn.scalatest.jsonassert.JsonMatchers.matchJson
import org.json4s.{Formats, NoTypeHints}
import org.json4s.native.JsonMethods.parse
import org.json4s.native.Serialization
import org.opensearch.client.json.jackson.JacksonJsonpMapper
import org.opensearch.client.opensearch.OpenSearchClient
import org.opensearch.client.transport.rest_client.RestClientTransport
Expand Down Expand Up @@ -46,11 +49,28 @@ class FlintOpenSearchClientSuite extends AnyFlatSpec with OpenSearchSuite with M
flintClient.getIndexMetadata(indexName).getContent should matchJson(content)
}

it should "create index with settings" in {
val indexName = "flint_test_with_settings"
val indexSettings = "{\"number_of_shards\": 3,\"number_of_replicas\": 2}"
flintClient.createIndex(indexName, new FlintMetadata("{}", indexSettings))

flintClient.exists(indexName) shouldBe true

// OS uses full setting name ("index" prefix) and store as string
implicit val formats: Formats = Serialization.formats(NoTypeHints)
val settings = parse(flintClient.getIndexMetadata(indexName).getIndexSettings)
(settings \ "index.number_of_shards").extract[String] shouldBe "3"
(settings \ "index.number_of_replicas").extract[String] shouldBe "2"
}

it should "get all index metadata with the given index name pattern" in {
flintClient.createIndex("flint_test_1_index", new FlintMetadata("{}"))
flintClient.createIndex("flint_test_2_index", new FlintMetadata("{}"))

flintClient.getAllIndexMetadata("flint_*_index") should have size 2
val allMetadata = flintClient.getAllIndexMetadata("flint_*_index")
allMetadata should have size 2
allMetadata.forEach(metadata => metadata.getContent shouldBe "{}")
allMetadata.forEach(metadata => metadata.getIndexSettings should not be empty)
}

it should "return false if index not exist" in {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,13 @@
package org.opensearch.flint.spark

import scala.Option.empty
import scala.collection.JavaConverters.mapAsJavaMapConverter

import org.json4s.{Formats, NoTypeHints}
import org.json4s.native.JsonMethods.parse
import org.json4s.native.Serialization
import org.opensearch.flint.core.FlintOptions
import org.opensearch.flint.core.storage.FlintOpenSearchClient
import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex.getFlintIndexName
import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.getSkippingIndexName
import org.scalatest.matchers.must.Matchers.defined
Expand Down Expand Up @@ -52,6 +58,42 @@ class FlintSparkCoveringIndexSqlITSuite extends FlintSparkSuite {
indexData.count() shouldBe 2
}

test("create skipping index with streaming job options") {
withTempDir { checkpointDir =>
sql(s"""
| CREATE INDEX $testIndex ON $testTable ( name )
| WITH (
| auto_refresh = true,
| refresh_interval = '5 Seconds',
| checkpoint_location = '${checkpointDir.getAbsolutePath}'
| )
| """.stripMargin)

val index = flint.describeIndex(testFlintIndex)
index shouldBe defined
index.get.options.autoRefresh() shouldBe true
index.get.options.refreshInterval() shouldBe Some("5 Seconds")
index.get.options.checkpointLocation() shouldBe Some(checkpointDir.getAbsolutePath)
}
}

test("create skipping index with index settings") {
sql(s"""
| CREATE INDEX $testIndex ON $testTable ( name )
| WITH (
| index_settings = '{"number_of_shards": 2, "number_of_replicas": 3}'
| )
|""".stripMargin)

// Check if the index setting option is set to OS index setting
val flintClient = new FlintOpenSearchClient(new FlintOptions(openSearchOptions.asJava))

implicit val formats: Formats = Serialization.formats(NoTypeHints)
val settings = parse(flintClient.getIndexMetadata(testFlintIndex).getIndexSettings)
(settings \ "index.number_of_shards").extract[String] shouldBe "2"
(settings \ "index.number_of_replicas").extract[String] shouldBe "3"
}

test("create covering index with manual refresh") {
sql(s"""
| CREATE INDEX $testIndex ON $testTable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.flint.spark

import com.stephenn.scalatest.jsonassert.JsonMatchers.matchJson
import org.json4s.native.JsonMethods._
import org.opensearch.flint.core.FlintVersion.current
import org.opensearch.flint.spark.FlintSpark.RefreshMode.{FULL, INCREMENTAL}
import org.opensearch.flint.spark.FlintSparkIndex.ID_COLUMN
Expand Down Expand Up @@ -114,15 +115,29 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite {
.options(FlintSparkIndexOptions(Map(
"auto_refresh" -> "true",
"refresh_interval" -> "1 Minute",
"checkpoint_location" -> "s3a://test/"
)))
"checkpoint_location" -> "s3a://test/",
"index_settings" -> "{\"number_of_shards\": 3,\"number_of_replicas\": 2}")))
.create()

val index = flint.describeIndex(testIndex)
index shouldBe defined
val optionJson = compact(render(
parse(index.get.metadata().getContent) \ "_meta" \ "options"))
optionJson should matchJson("""
| {
| "auto_refresh": "true",
| "refresh_interval": "1 Minute",
| "checkpoint_location": "s3a://test/",
| "index_settings": "{\"number_of_shards\": 3,\"number_of_replicas\": 2}"
| }
|""".stripMargin)

// Load index options from index mapping (verify OS index setting in SQL IT)
index.get.options.autoRefresh() shouldBe true
index.get.options.refreshInterval() shouldBe Some("1 Minute")
index.get.options.checkpointLocation() shouldBe Some("s3a://test/")
index.get.options.indexSettings() shouldBe
Some("{\"number_of_shards\": 3,\"number_of_replicas\": 2}")
}

test("should not have ID column in index data") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,13 @@
package org.opensearch.flint.spark

import scala.Option.empty
import scala.collection.JavaConverters.mapAsJavaMapConverter

import org.json4s.{Formats, NoTypeHints}
import org.json4s.native.JsonMethods.parse
import org.json4s.native.Serialization
import org.opensearch.flint.core.FlintOptions
import org.opensearch.flint.core.storage.FlintOpenSearchClient
import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.getSkippingIndexName
import org.scalatest.matchers.must.Matchers.defined
import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper
Expand Down Expand Up @@ -75,6 +81,24 @@ class FlintSparkSkippingIndexSqlITSuite extends FlintSparkSuite {
}
}

test("create skipping index with index settings") {
sql(s"""
| CREATE SKIPPING INDEX ON $testTable
| ( year PARTITION )
| WITH (
| index_settings = '{"number_of_shards": 3, "number_of_replicas": 2}'
| )
|""".stripMargin)

// Check if the index setting option is set to OS index setting
val flintClient = new FlintOpenSearchClient(new FlintOptions(openSearchOptions.asJava))

implicit val formats: Formats = Serialization.formats(NoTypeHints)
val settings = parse(flintClient.getIndexMetadata(testIndex).getIndexSettings)
(settings \ "index.number_of_shards").extract[String] shouldBe "3"
(settings \ "index.number_of_replicas").extract[String] shouldBe "2"
}

test("create skipping index with manual refresh") {
sql(s"""
| CREATE SKIPPING INDEX ON $testTable
Expand Down
Loading