Skip to content

Commit

Permalink
Change indexSettings to Option
Browse files Browse the repository at this point in the history
Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen committed Oct 12, 2023
1 parent ded6831 commit e15a23f
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,37 @@ package org.opensearch.flint.core.metadata
import java.util

import org.opensearch.flint.core.FlintVersion
import org.opensearch.flint.core.FlintVersion.current
import org.opensearch.flint.core.metadata.XContentBuilderHelper._

/**
* Flint metadata follows Flint index specification and defines metadata for a Flint index
* regardless of query engine integration and storage.
*/
case class FlintMetadata(
version: FlintVersion = FlintVersion.current(),
/** Flint spec version */
version: FlintVersion,
/** Flint index name */
name: String,
/** Flint index kind */
kind: String,
/** Flint index source that index data derived from */
source: String,
/** Flint indexed column list */
indexedColumns: Array[util.Map[String, AnyRef]] = Array(),
/** Flint indexed options. TODO: move to properties? */
options: util.Map[String, AnyRef] = new util.HashMap[String, AnyRef],
/** Flint index properties for any custom fields */
properties: util.Map[String, AnyRef] = new util.HashMap[String, AnyRef],
/** Flint index schema */
schema: util.Map[String, AnyRef] = new util.HashMap[String, AnyRef],
indexSettings: String = null) {
/** Optional Flint index settings. TODO: move elsewhere? */
indexSettings: Option[String]) {

require(version != null, "version is required")
require(name != null, "name is required")
require(kind != null, "kind is required")
require(source != null, "source is required")

/**
* Generate JSON content as index metadata.
Expand All @@ -36,7 +51,7 @@ case class FlintMetadata(
buildJson(builder => {
// Add _meta field
objectField(builder, "_meta") {
builder.field("version", versionOrDefault())
builder.field("version", version.version)
builder.field("name", name)
builder.field("kind", kind)
builder.field("source", source)
Expand All @@ -54,21 +69,13 @@ case class FlintMetadata(
throw new IllegalStateException("Failed to jsonify Flint metadata", e)
}
}

private def versionOrDefault(): String = {
if (version == null) {
FlintVersion.current().version
} else {
version.version
}
}
}

object FlintMetadata {

def apply(content: String, settings: String): FlintMetadata = {
val metadata = FlintMetadata(content)
metadata.copy(indexSettings = settings)
metadata.copy(indexSettings = Option(settings))
}

/**
Expand Down Expand Up @@ -200,15 +207,15 @@ object FlintMetadata {
// Build method to create the FlintMetadata instance
def build(): FlintMetadata = {
FlintMetadata(
version,
if (version == null) current() else version,
name,
kind,
source,
indexedColumns,
options,
properties,
schema,
indexSettings)
Option(indexSettings))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.opensearch.index.query.QueryBuilder;
import org.opensearch.search.SearchModule;
import org.opensearch.search.builder.SearchSourceBuilder;
import scala.Option;

/**
* Flint client implementation for OpenSearch storage.
Expand All @@ -73,8 +74,9 @@ public FlintOpenSearchClient(FlintOptions options) {
CreateIndexRequest request = new CreateIndexRequest(osIndexName);
request.mapping(metadata.getContent(), XContentType.JSON);

if (metadata.indexSettings() != null) {
request.settings(metadata.indexSettings(), XContentType.JSON);
Option<String> settings = metadata.indexSettings();
if (settings.isDefined()) {
request.settings(settings.get(), XContentType.JSON);
}
client.indices().create(request, RequestOptions.DEFAULT);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ package org.opensearch.flint.core

import scala.collection.JavaConverters._

import com.stephenn.scalatest.jsonassert.JsonMatchers.matchJson
import org.json4s.{Formats, NoTypeHints}
import org.json4s.native.JsonMethods.parse
import org.json4s.native.Serialization
Expand Down Expand Up @@ -48,6 +47,7 @@ class FlintOpenSearchClientSuite extends AnyFlatSpec with OpenSearchSuite with M

val metadata = mock[FlintMetadata]
when(metadata.getContent).thenReturn(content)
when(metadata.indexSettings).thenReturn(None)
flintClient.createIndex(indexName, metadata)

flintClient.exists(indexName) shouldBe true
Expand All @@ -59,21 +59,22 @@ class FlintOpenSearchClientSuite extends AnyFlatSpec with OpenSearchSuite with M
val indexSettings = "{\"number_of_shards\": 3,\"number_of_replicas\": 2}"
val metadata = mock[FlintMetadata]
when(metadata.getContent).thenReturn("{}")
when(metadata.indexSettings).thenReturn(indexSettings)
when(metadata.indexSettings).thenReturn(Some(indexSettings))

flintClient.createIndex(indexName, metadata)
flintClient.exists(indexName) shouldBe true

// OS uses full setting name ("index" prefix) and store as string
implicit val formats: Formats = Serialization.formats(NoTypeHints)
val settings = parse(flintClient.getIndexMetadata(indexName).indexSettings)
val settings = parse(flintClient.getIndexMetadata(indexName).indexSettings.get)
(settings \ "index.number_of_shards").extract[String] shouldBe "3"
(settings \ "index.number_of_replicas").extract[String] shouldBe "2"
}

it should "get all index metadata with the given index name pattern" in {
val metadata = mock[FlintMetadata]
when(metadata.getContent).thenReturn("{}")
when(metadata.indexSettings).thenReturn(None)
flintClient.createIndex("flint_test_1_index", metadata)
flintClient.createIndex("flint_test_2_index", metadata)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ class FlintSparkCoveringIndexSqlITSuite extends FlintSparkSuite {
val flintClient = new FlintOpenSearchClient(new FlintOptions(openSearchOptions.asJava))

implicit val formats: Formats = Serialization.formats(NoTypeHints)
val settings = parse(flintClient.getIndexMetadata(testFlintIndex).indexSettings)
val settings = parse(flintClient.getIndexMetadata(testFlintIndex).indexSettings.get)
(settings \ "index.number_of_shards").extract[String] shouldBe "2"
(settings \ "index.number_of_replicas").extract[String] shouldBe "3"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ class FlintSparkSkippingIndexSqlITSuite extends FlintSparkSuite {
val flintClient = new FlintOpenSearchClient(new FlintOptions(openSearchOptions.asJava))

implicit val formats: Formats = Serialization.formats(NoTypeHints)
val settings = parse(flintClient.getIndexMetadata(testIndex).indexSettings)
val settings = parse(flintClient.getIndexMetadata(testIndex).indexSettings.get)
(settings \ "index.number_of_shards").extract[String] shouldBe "3"
(settings \ "index.number_of_replicas").extract[String] shouldBe "2"
}
Expand Down

0 comments on commit e15a23f

Please sign in to comment.