From e2ba9d030542538fbea19c376d2d41cf1da10673 Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Tue, 16 May 2023 15:51:59 -0700 Subject: [PATCH 1/4] Add flint client and create index API Signed-off-by: Chen Dai --- flint/build.sbt | 10 ++- .../opensearch/flint/core/FlintClient.java | 31 +++++++ .../opensearch/flint/core/FlintIndex.scala | 8 -- .../flint/core/metadata/FlintMetadata.java | 35 ++++++++ .../core/storage/FlintOpenSearchClient.java | 90 +++++++++++++++++++ .../opensearch/flint/OpenSearchSuite.scala | 56 ++++++++++++ .../core/FlintOpenSearchClientSuite.scala | 34 +++++++ 7 files changed, 253 insertions(+), 11 deletions(-) create mode 100644 flint/flint-core/src/main/scala/org/opensearch/flint/core/FlintClient.java delete mode 100644 flint/flint-core/src/main/scala/org/opensearch/flint/core/FlintIndex.scala create mode 100644 flint/flint-core/src/main/scala/org/opensearch/flint/core/metadata/FlintMetadata.java create mode 100644 flint/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java create mode 100644 flint/integ-test/src/test/scala/org/opensearch/flint/OpenSearchSuite.scala create mode 100644 flint/integ-test/src/test/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala diff --git a/flint/build.sbt b/flint/build.sbt index 000557a9c1..9b88c35629 100644 --- a/flint/build.sbt +++ b/flint/build.sbt @@ -54,7 +54,13 @@ lazy val root = (project in file(".")) lazy val flintCore = (project in file("flint-core")) .disablePlugins(AssemblyPlugin) - .settings(name := "flint-core", scalaVersion := scala212) + .settings( + name := "flint-core", + scalaVersion := scala212, + libraryDependencies ++= Seq( + "org.opensearch.client" % "opensearch-rest-client" % opensearchVersion, + "org.opensearch.client" % "opensearch-rest-high-level-client" % opensearchVersion) + ) lazy val flintSparkIntegration = (project in file("flint-spark-integration")) .dependsOn(flintCore) @@ -64,8 +70,6 @@ lazy val flintSparkIntegration = (project in file("flint-spark-integration")) name := "flint-spark-integration", scalaVersion := scala212, libraryDependencies ++= Seq( - "org.opensearch.client" % "opensearch-rest-client" % opensearchVersion, - "org.opensearch.client" % "opensearch-rest-high-level-client" % opensearchVersion, "org.testcontainers" % "testcontainers" % "1.18.0" % "test", "org.scalactic" %% "scalactic" % "3.2.15", "org.scalatest" %% "scalatest" % "3.2.15" % "test", diff --git a/flint/flint-core/src/main/scala/org/opensearch/flint/core/FlintClient.java b/flint/flint-core/src/main/scala/org/opensearch/flint/core/FlintClient.java new file mode 100644 index 0000000000..3917f2855f --- /dev/null +++ b/flint/flint-core/src/main/scala/org/opensearch/flint/core/FlintClient.java @@ -0,0 +1,31 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.core; + +import org.opensearch.flint.core.metadata.FlintMetadata; + +/** + * Flint index client that provides API for metadata and data operations + * on a Flint index regardless of concrete storage. + */ +public interface FlintClient { + + /** + * Create a Flint index with the metadata given. + * + * @param indexName index name + * @param metadata index metadata + */ + void createIndex(String indexName, FlintMetadata metadata); + + /** + * Retrieve metadata in a Flint index. + * + * @param indexName index name + * @return index metadata + */ + FlintMetadata getIndexMetadata(String indexName); +} diff --git a/flint/flint-core/src/main/scala/org/opensearch/flint/core/FlintIndex.scala b/flint/flint-core/src/main/scala/org/opensearch/flint/core/FlintIndex.scala deleted file mode 100644 index 778705b606..0000000000 --- a/flint/flint-core/src/main/scala/org/opensearch/flint/core/FlintIndex.scala +++ /dev/null @@ -1,8 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.flint.core - -class FlintIndex {} diff --git a/flint/flint-core/src/main/scala/org/opensearch/flint/core/metadata/FlintMetadata.java b/flint/flint-core/src/main/scala/org/opensearch/flint/core/metadata/FlintMetadata.java new file mode 100644 index 0000000000..d068a7c9b7 --- /dev/null +++ b/flint/flint-core/src/main/scala/org/opensearch/flint/core/metadata/FlintMetadata.java @@ -0,0 +1,35 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.core.metadata; + +import java.util.Map; + +/** + * Flint metadata follows Flint index specification and defines metadata + * for a Flint index regardless of query engine integration and storage. + */ +public class FlintMetadata { + + /** Field name and type for each field in a Flint index. */ + private final Map schema; + + /** Meta info for the Flint index. */ + private final Map meta; + + public FlintMetadata(Map schema, + Map meta) { + this.schema = schema; + this.meta = meta; + } + + public Map getSchema() { + return schema; + } + + public Map getMeta() { + return meta; + } +} diff --git a/flint/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java b/flint/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java new file mode 100644 index 0000000000..851c1bfb48 --- /dev/null +++ b/flint/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java @@ -0,0 +1,90 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.core.storage; + +import java.io.IOException; +import java.util.Map; +import java.util.stream.Collectors; +import org.apache.http.HttpHost; +import org.opensearch.client.RequestOptions; +import org.opensearch.client.RestClient; +import org.opensearch.client.RestHighLevelClient; +import org.opensearch.client.indices.CreateIndexRequest; +import org.opensearch.client.indices.GetMappingsRequest; +import org.opensearch.client.indices.GetMappingsResponse; +import org.opensearch.cluster.metadata.MappingMetadata; +import org.opensearch.flint.core.FlintClient; +import org.opensearch.flint.core.metadata.FlintMetadata; + +/** + * Flint client implementation for OpenSearch storage. + */ +public class FlintOpenSearchClient implements FlintClient { + + /** OpenSearch host name. */ + private final String host; + + /** OpenSearch port number. */ + private final int port; + + public FlintOpenSearchClient(String host, int port) { + this.host = host; + this.port = port; + } + + @Override + public void createIndex(String indexName, FlintMetadata metadata) { + try (RestHighLevelClient client = createClient()) { + CreateIndexRequest request = new CreateIndexRequest(indexName); + request.mapping(buildIndexMapping(metadata)); + + client.indices().create(request, RequestOptions.DEFAULT); + } catch (IOException e) { + } + } + + @Override + public FlintMetadata getIndexMetadata(String indexName) { + try (RestHighLevelClient client = createClient()) { + GetMappingsRequest request = new GetMappingsRequest().indices(indexName); + GetMappingsResponse response = client.indices().getMapping(request, RequestOptions.DEFAULT); + + return parseIndexMapping(response.mappings().get(indexName)); + } catch (IOException e) { + return null; + } + } + + private RestHighLevelClient createClient() { + return new RestHighLevelClient( + RestClient.builder(new HttpHost(host, port, "http"))); + } + + private Map buildIndexMapping(FlintMetadata metadata) { + Map fieldTypes = + metadata.getSchema().entrySet().stream() + .collect(Collectors.toMap( + Map.Entry::getKey, + entry -> Map.of("type", entry.getValue()))); + return Map.of( + "properties", fieldTypes, + "_meta", metadata.getMeta()); + } + + @SuppressWarnings("unchecked") + private FlintMetadata parseIndexMapping(MappingMetadata mapping) { + Map source = mapping.getSourceAsMap(); + Map schema = + ((Map) source.get("properties")) + .entrySet().stream() + .collect(Collectors.toMap( + Map.Entry::getKey, + entry -> ((Map) entry.getValue()).get("type"))); + + return new FlintMetadata(schema, + (Map) source.get("_meta")); + } +} diff --git a/flint/integ-test/src/test/scala/org/opensearch/flint/OpenSearchSuite.scala b/flint/integ-test/src/test/scala/org/opensearch/flint/OpenSearchSuite.scala new file mode 100644 index 0000000000..f0e16ef562 --- /dev/null +++ b/flint/integ-test/src/test/scala/org/opensearch/flint/OpenSearchSuite.scala @@ -0,0 +1,56 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint + +import org.apache.http.HttpHost +import org.opensearch.action.admin.indices.delete.DeleteIndexRequest +import org.opensearch.client.{RequestOptions, RestClient, RestHighLevelClient} +import org.opensearch.testcontainers.OpenSearchContainer +import org.scalatest.{BeforeAndAfterAll, Suite} + +/** + * Test required OpenSearch domain should extend OpenSearchSuite. + */ +trait OpenSearchSuite extends BeforeAndAfterAll { + self: Suite => + + protected lazy val container = new OpenSearchContainer() + + protected lazy val openSearchPort: Int = container.port() + + protected lazy val openSearchHost: String = container.getHost + + protected lazy val openSearchClient = new RestHighLevelClient( + RestClient.builder(new HttpHost(openSearchHost, openSearchPort, "http"))) + + protected lazy val openSearchOptions = + Map("host" -> openSearchHost, "port" -> s"$openSearchPort") + + override def beforeAll(): Unit = { + container.start() + super.beforeAll() + } + + override def afterAll(): Unit = { + container.close() + super.afterAll() + } + + /** + * Delete index `indexNames` after calling `f`. + */ + protected def withIndexName(indexNames: String*)(f: => Unit): Unit = { + try { + f + } finally { + indexNames.foreach { indexName => + openSearchClient + .indices() + .delete(new DeleteIndexRequest(indexName), RequestOptions.DEFAULT) + } + } + } +} diff --git a/flint/integ-test/src/test/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala b/flint/integ-test/src/test/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala new file mode 100644 index 0000000000..18e4863699 --- /dev/null +++ b/flint/integ-test/src/test/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala @@ -0,0 +1,34 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.core + +import org.opensearch.flint.OpenSearchSuite +import org.opensearch.flint.core.metadata.FlintMetadata +import org.opensearch.flint.core.storage.FlintOpenSearchClient +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper + +import scala.collection.JavaConverters._ + +class FlintOpenSearchClientSuite + extends AnyFlatSpec + with OpenSearchSuite { + + lazy val flintClient = new FlintOpenSearchClient(openSearchHost, openSearchPort) + + it should "create index successfully" in { + val schema = Map("age" -> "integer").asJava + val meta = + Map("index" -> + Map("kind" -> "SkippingIndex").asJava.asInstanceOf[Object] + ).asJava + flintClient.createIndex("test", new FlintMetadata(schema, meta)) + + val metadata = flintClient.getIndexMetadata("test") + metadata.getSchema shouldBe schema + metadata.getMeta shouldBe meta + } +} From c0ca5f90ee4889081a74520d816a5f7110106110 Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Tue, 16 May 2023 16:42:33 -0700 Subject: [PATCH 2/4] Handle exception in Flint client Signed-off-by: Chen Dai --- .../opensearch/flint/core/FlintClient.java | 8 ++++++ .../core/storage/FlintOpenSearchClient.java | 26 +++++++++++++++---- .../core/FlintOpenSearchClientSuite.scala | 22 +++++++++++----- 3 files changed, 45 insertions(+), 11 deletions(-) diff --git a/flint/flint-core/src/main/scala/org/opensearch/flint/core/FlintClient.java b/flint/flint-core/src/main/scala/org/opensearch/flint/core/FlintClient.java index 3917f2855f..0b780d0de9 100644 --- a/flint/flint-core/src/main/scala/org/opensearch/flint/core/FlintClient.java +++ b/flint/flint-core/src/main/scala/org/opensearch/flint/core/FlintClient.java @@ -21,6 +21,14 @@ public interface FlintClient { */ void createIndex(String indexName, FlintMetadata metadata); + /** + * Does Flint index with the given name exist + * + * @param indexName index name + * @return true if the index exists, otherwise false + */ + boolean exists(String indexName); + /** * Retrieve metadata in a Flint index. * diff --git a/flint/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java b/flint/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java index 851c1bfb48..c4c53178c9 100644 --- a/flint/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java +++ b/flint/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java @@ -13,6 +13,7 @@ import org.opensearch.client.RestClient; import org.opensearch.client.RestHighLevelClient; import org.opensearch.client.indices.CreateIndexRequest; +import org.opensearch.client.indices.GetIndexRequest; import org.opensearch.client.indices.GetMappingsRequest; import org.opensearch.client.indices.GetMappingsResponse; import org.opensearch.cluster.metadata.MappingMetadata; @@ -42,7 +43,18 @@ public void createIndex(String indexName, FlintMetadata metadata) { request.mapping(buildIndexMapping(metadata)); client.indices().create(request, RequestOptions.DEFAULT); + } catch (Exception e) { + throw new IllegalStateException("Failed to create Flint index", e); + } + } + + @Override + public boolean exists(String indexName) { + try (RestHighLevelClient client = createClient()) { + return client.indices() + .exists(new GetIndexRequest(indexName), RequestOptions.DEFAULT); } catch (IOException e) { + throw new IllegalStateException("Failed to check if Flint index exists", e); } } @@ -50,11 +62,12 @@ public void createIndex(String indexName, FlintMetadata metadata) { public FlintMetadata getIndexMetadata(String indexName) { try (RestHighLevelClient client = createClient()) { GetMappingsRequest request = new GetMappingsRequest().indices(indexName); - GetMappingsResponse response = client.indices().getMapping(request, RequestOptions.DEFAULT); + GetMappingsResponse response = + client.indices().getMapping(request, RequestOptions.DEFAULT); return parseIndexMapping(response.mappings().get(indexName)); - } catch (IOException e) { - return null; + } catch (Exception e) { + throw new IllegalStateException("Failed to get Flint index metadata", e); } } @@ -64,11 +77,13 @@ private RestHighLevelClient createClient() { } private Map buildIndexMapping(FlintMetadata metadata) { + // Convert from {"field": "int"} to {"field": {"type": "int"}} Map fieldTypes = metadata.getSchema().entrySet().stream() .collect(Collectors.toMap( Map.Entry::getKey, entry -> Map.of("type", entry.getValue()))); + return Map.of( "properties", fieldTypes, "_meta", metadata.getMeta()); @@ -77,6 +92,8 @@ private Map buildIndexMapping(FlintMetadata metadata) { @SuppressWarnings("unchecked") private FlintMetadata parseIndexMapping(MappingMetadata mapping) { Map source = mapping.getSourceAsMap(); + + // Parse {"field": {"type": "int"}} to {"field": "int"} Map schema = ((Map) source.get("properties")) .entrySet().stream() @@ -84,7 +101,6 @@ private FlintMetadata parseIndexMapping(MappingMetadata mapping) { Map.Entry::getKey, entry -> ((Map) entry.getValue()).get("type"))); - return new FlintMetadata(schema, - (Map) source.get("_meta")); + return new FlintMetadata(schema, (Map) source.get("_meta")); } } diff --git a/flint/integ-test/src/test/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala b/flint/integ-test/src/test/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala index 18e4863699..fe2ca14f5c 100644 --- a/flint/integ-test/src/test/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala +++ b/flint/integ-test/src/test/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala @@ -9,26 +9,36 @@ import org.opensearch.flint.OpenSearchSuite import org.opensearch.flint.core.metadata.FlintMetadata import org.opensearch.flint.core.storage.FlintOpenSearchClient import org.scalatest.flatspec.AnyFlatSpec -import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper +import org.scalatest.matchers.should.Matchers import scala.collection.JavaConverters._ class FlintOpenSearchClientSuite extends AnyFlatSpec - with OpenSearchSuite { + with OpenSearchSuite + with Matchers { + /** Lazy initialize after container started. */ lazy val flintClient = new FlintOpenSearchClient(openSearchHost, openSearchPort) + behavior of "Flint OpenSearch client" + it should "create index successfully" in { + val indexName = "test" val schema = Map("age" -> "integer").asJava val meta = Map("index" -> Map("kind" -> "SkippingIndex").asJava.asInstanceOf[Object] ).asJava - flintClient.createIndex("test", new FlintMetadata(schema, meta)) + flintClient.createIndex(indexName, new FlintMetadata(schema, meta)) + + flintClient.exists(indexName) shouldBe true + flintClient.getIndexMetadata(indexName) should have ( + 'schema (schema), + 'meta (meta)) + } - val metadata = flintClient.getIndexMetadata("test") - metadata.getSchema shouldBe schema - metadata.getMeta shouldBe meta + it should "return false if index not exist" in { + flintClient.exists("non-exist-index") shouldBe false } } From 9d3c2fdbcb47d9c45ff4a66ed60e47b17be7eb4e Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Tue, 16 May 2023 17:30:42 -0700 Subject: [PATCH 3/4] Fix scala style Signed-off-by: Chen Dai --- .../org/opensearch/flint/core/FlintOpenSearchClientSuite.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/flint/integ-test/src/test/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala b/flint/integ-test/src/test/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala index fe2ca14f5c..183b284579 100644 --- a/flint/integ-test/src/test/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala +++ b/flint/integ-test/src/test/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala @@ -10,7 +10,6 @@ import org.opensearch.flint.core.metadata.FlintMetadata import org.opensearch.flint.core.storage.FlintOpenSearchClient import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.matchers.should.Matchers - import scala.collection.JavaConverters._ class FlintOpenSearchClientSuite From 66f49a752ed6d676b93932c3a1ca5c86a8fac9c4 Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Wed, 17 May 2023 13:27:49 -0700 Subject: [PATCH 4/4] Change metadata content to string for flexibility Signed-off-by: Chen Dai --- flint/build.sbt | 3 +- .../flint/core/metadata/FlintMetadata.java | 23 ++++-------- .../core/storage/FlintOpenSearchClient.java | 36 +++---------------- .../core/FlintOpenSearchClientSuite.scala | 25 +++++++------ 4 files changed, 27 insertions(+), 60 deletions(-) diff --git a/flint/build.sbt b/flint/build.sbt index 9b88c35629..ca4546e2e3 100644 --- a/flint/build.sbt +++ b/flint/build.sbt @@ -102,6 +102,7 @@ lazy val integtest = (project in file("integ-test")) scalaVersion := scala212, libraryDependencies ++= Seq( "org.scalactic" %% "scalactic" % "3.2.15", - "org.scalatest" %% "scalatest" % "3.2.15" % "test"), + "org.scalatest" %% "scalatest" % "3.2.15" % "test", + "com.stephenn" %% "scalatest-json-jsonassert" % "0.2.5" % "test"), libraryDependencies ++= deps(sparkVersion), Test / fullClasspath += (flintSparkIntegration / assembly).value) diff --git a/flint/flint-core/src/main/scala/org/opensearch/flint/core/metadata/FlintMetadata.java b/flint/flint-core/src/main/scala/org/opensearch/flint/core/metadata/FlintMetadata.java index d068a7c9b7..d93c178376 100644 --- a/flint/flint-core/src/main/scala/org/opensearch/flint/core/metadata/FlintMetadata.java +++ b/flint/flint-core/src/main/scala/org/opensearch/flint/core/metadata/FlintMetadata.java @@ -5,31 +5,20 @@ package org.opensearch.flint.core.metadata; -import java.util.Map; - /** * Flint metadata follows Flint index specification and defines metadata * for a Flint index regardless of query engine integration and storage. */ public class FlintMetadata { - /** Field name and type for each field in a Flint index. */ - private final Map schema; - - /** Meta info for the Flint index. */ - private final Map meta; - - public FlintMetadata(Map schema, - Map meta) { - this.schema = schema; - this.meta = meta; - } + // TODO: define metadata format and create strong-typed class + private final String content; - public Map getSchema() { - return schema; + public FlintMetadata(String content) { + this.content = content; } - public Map getMeta() { - return meta; + public String getContent() { + return content; } } diff --git a/flint/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java b/flint/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java index c4c53178c9..ab366d36d5 100644 --- a/flint/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java +++ b/flint/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java @@ -6,8 +6,6 @@ package org.opensearch.flint.core.storage; import java.io.IOException; -import java.util.Map; -import java.util.stream.Collectors; import org.apache.http.HttpHost; import org.opensearch.client.RequestOptions; import org.opensearch.client.RestClient; @@ -17,6 +15,7 @@ import org.opensearch.client.indices.GetMappingsRequest; import org.opensearch.client.indices.GetMappingsResponse; import org.opensearch.cluster.metadata.MappingMetadata; +import org.opensearch.common.xcontent.XContentType; import org.opensearch.flint.core.FlintClient; import org.opensearch.flint.core.metadata.FlintMetadata; @@ -40,7 +39,7 @@ public FlintOpenSearchClient(String host, int port) { public void createIndex(String indexName, FlintMetadata metadata) { try (RestHighLevelClient client = createClient()) { CreateIndexRequest request = new CreateIndexRequest(indexName); - request.mapping(buildIndexMapping(metadata)); + request.mapping(metadata.getContent(), XContentType.JSON); client.indices().create(request, RequestOptions.DEFAULT); } catch (Exception e) { @@ -65,7 +64,8 @@ public FlintMetadata getIndexMetadata(String indexName) { GetMappingsResponse response = client.indices().getMapping(request, RequestOptions.DEFAULT); - return parseIndexMapping(response.mappings().get(indexName)); + MappingMetadata mapping = response.mappings().get(indexName); + return new FlintMetadata(mapping.source().string()); } catch (Exception e) { throw new IllegalStateException("Failed to get Flint index metadata", e); } @@ -75,32 +75,4 @@ private RestHighLevelClient createClient() { return new RestHighLevelClient( RestClient.builder(new HttpHost(host, port, "http"))); } - - private Map buildIndexMapping(FlintMetadata metadata) { - // Convert from {"field": "int"} to {"field": {"type": "int"}} - Map fieldTypes = - metadata.getSchema().entrySet().stream() - .collect(Collectors.toMap( - Map.Entry::getKey, - entry -> Map.of("type", entry.getValue()))); - - return Map.of( - "properties", fieldTypes, - "_meta", metadata.getMeta()); - } - - @SuppressWarnings("unchecked") - private FlintMetadata parseIndexMapping(MappingMetadata mapping) { - Map source = mapping.getSourceAsMap(); - - // Parse {"field": {"type": "int"}} to {"field": "int"} - Map schema = - ((Map) source.get("properties")) - .entrySet().stream() - .collect(Collectors.toMap( - Map.Entry::getKey, - entry -> ((Map) entry.getValue()).get("type"))); - - return new FlintMetadata(schema, (Map) source.get("_meta")); - } } diff --git a/flint/integ-test/src/test/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala b/flint/integ-test/src/test/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala index 183b284579..b890b7d629 100644 --- a/flint/integ-test/src/test/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala +++ b/flint/integ-test/src/test/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala @@ -5,12 +5,12 @@ package org.opensearch.flint.core +import com.stephenn.scalatest.jsonassert.JsonMatchers.matchJson import org.opensearch.flint.OpenSearchSuite import org.opensearch.flint.core.metadata.FlintMetadata import org.opensearch.flint.core.storage.FlintOpenSearchClient import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.matchers.should.Matchers -import scala.collection.JavaConverters._ class FlintOpenSearchClientSuite extends AnyFlatSpec @@ -24,17 +24,22 @@ class FlintOpenSearchClientSuite it should "create index successfully" in { val indexName = "test" - val schema = Map("age" -> "integer").asJava - val meta = - Map("index" -> - Map("kind" -> "SkippingIndex").asJava.asInstanceOf[Object] - ).asJava - flintClient.createIndex(indexName, new FlintMetadata(schema, meta)) + val content = + """ { + | "_meta": { + | "kind": "SkippingIndex" + | }, + | "properties": { + | "age": { + | "type": "integer" + | } + | } + | } + |""".stripMargin + flintClient.createIndex(indexName, new FlintMetadata(content)) flintClient.exists(indexName) shouldBe true - flintClient.getIndexMetadata(indexName) should have ( - 'schema (schema), - 'meta (meta)) + flintClient.getIndexMetadata(indexName).getContent should matchJson (content) } it should "return false if index not exist" in {