diff --git a/flint/build.sbt b/flint/build.sbt index 000557a9c1..ca4546e2e3 100644 --- a/flint/build.sbt +++ b/flint/build.sbt @@ -54,7 +54,13 @@ lazy val root = (project in file(".")) lazy val flintCore = (project in file("flint-core")) .disablePlugins(AssemblyPlugin) - .settings(name := "flint-core", scalaVersion := scala212) + .settings( + name := "flint-core", + scalaVersion := scala212, + libraryDependencies ++= Seq( + "org.opensearch.client" % "opensearch-rest-client" % opensearchVersion, + "org.opensearch.client" % "opensearch-rest-high-level-client" % opensearchVersion) + ) lazy val flintSparkIntegration = (project in file("flint-spark-integration")) .dependsOn(flintCore) @@ -64,8 +70,6 @@ lazy val flintSparkIntegration = (project in file("flint-spark-integration")) name := "flint-spark-integration", scalaVersion := scala212, libraryDependencies ++= Seq( - "org.opensearch.client" % "opensearch-rest-client" % opensearchVersion, - "org.opensearch.client" % "opensearch-rest-high-level-client" % opensearchVersion, "org.testcontainers" % "testcontainers" % "1.18.0" % "test", "org.scalactic" %% "scalactic" % "3.2.15", "org.scalatest" %% "scalatest" % "3.2.15" % "test", @@ -98,6 +102,7 @@ lazy val integtest = (project in file("integ-test")) scalaVersion := scala212, libraryDependencies ++= Seq( "org.scalactic" %% "scalactic" % "3.2.15", - "org.scalatest" %% "scalatest" % "3.2.15" % "test"), + "org.scalatest" %% "scalatest" % "3.2.15" % "test", + "com.stephenn" %% "scalatest-json-jsonassert" % "0.2.5" % "test"), libraryDependencies ++= deps(sparkVersion), Test / fullClasspath += (flintSparkIntegration / assembly).value) diff --git a/flint/flint-core/src/main/scala/org/opensearch/flint/core/FlintClient.java b/flint/flint-core/src/main/scala/org/opensearch/flint/core/FlintClient.java new file mode 100644 index 0000000000..0b780d0de9 --- /dev/null +++ b/flint/flint-core/src/main/scala/org/opensearch/flint/core/FlintClient.java @@ -0,0 +1,39 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.core; + +import org.opensearch.flint.core.metadata.FlintMetadata; + +/** + * Flint index client that provides API for metadata and data operations + * on a Flint index regardless of concrete storage. + */ +public interface FlintClient { + + /** + * Create a Flint index with the metadata given. + * + * @param indexName index name + * @param metadata index metadata + */ + void createIndex(String indexName, FlintMetadata metadata); + + /** + * Does Flint index with the given name exist + * + * @param indexName index name + * @return true if the index exists, otherwise false + */ + boolean exists(String indexName); + + /** + * Retrieve metadata in a Flint index. + * + * @param indexName index name + * @return index metadata + */ + FlintMetadata getIndexMetadata(String indexName); +} diff --git a/flint/flint-core/src/main/scala/org/opensearch/flint/core/FlintIndex.scala b/flint/flint-core/src/main/scala/org/opensearch/flint/core/FlintIndex.scala deleted file mode 100644 index 778705b606..0000000000 --- a/flint/flint-core/src/main/scala/org/opensearch/flint/core/FlintIndex.scala +++ /dev/null @@ -1,8 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.flint.core - -class FlintIndex {} diff --git a/flint/flint-core/src/main/scala/org/opensearch/flint/core/metadata/FlintMetadata.java b/flint/flint-core/src/main/scala/org/opensearch/flint/core/metadata/FlintMetadata.java new file mode 100644 index 0000000000..d93c178376 --- /dev/null +++ b/flint/flint-core/src/main/scala/org/opensearch/flint/core/metadata/FlintMetadata.java @@ -0,0 +1,24 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.core.metadata; + +/** + * Flint metadata follows Flint index specification and defines metadata + * for a Flint index regardless of query engine integration and storage. + */ +public class FlintMetadata { + + // TODO: define metadata format and create strong-typed class + private final String content; + + public FlintMetadata(String content) { + this.content = content; + } + + public String getContent() { + return content; + } +} diff --git a/flint/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java b/flint/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java new file mode 100644 index 0000000000..ab366d36d5 --- /dev/null +++ b/flint/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java @@ -0,0 +1,78 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.core.storage; + +import java.io.IOException; +import org.apache.http.HttpHost; +import org.opensearch.client.RequestOptions; +import org.opensearch.client.RestClient; +import org.opensearch.client.RestHighLevelClient; +import org.opensearch.client.indices.CreateIndexRequest; +import org.opensearch.client.indices.GetIndexRequest; +import org.opensearch.client.indices.GetMappingsRequest; +import org.opensearch.client.indices.GetMappingsResponse; +import org.opensearch.cluster.metadata.MappingMetadata; +import org.opensearch.common.xcontent.XContentType; +import org.opensearch.flint.core.FlintClient; +import org.opensearch.flint.core.metadata.FlintMetadata; + +/** + * Flint client implementation for OpenSearch storage. + */ +public class FlintOpenSearchClient implements FlintClient { + + /** OpenSearch host name. */ + private final String host; + + /** OpenSearch port number. */ + private final int port; + + public FlintOpenSearchClient(String host, int port) { + this.host = host; + this.port = port; + } + + @Override + public void createIndex(String indexName, FlintMetadata metadata) { + try (RestHighLevelClient client = createClient()) { + CreateIndexRequest request = new CreateIndexRequest(indexName); + request.mapping(metadata.getContent(), XContentType.JSON); + + client.indices().create(request, RequestOptions.DEFAULT); + } catch (Exception e) { + throw new IllegalStateException("Failed to create Flint index", e); + } + } + + @Override + public boolean exists(String indexName) { + try (RestHighLevelClient client = createClient()) { + return client.indices() + .exists(new GetIndexRequest(indexName), RequestOptions.DEFAULT); + } catch (IOException e) { + throw new IllegalStateException("Failed to check if Flint index exists", e); + } + } + + @Override + public FlintMetadata getIndexMetadata(String indexName) { + try (RestHighLevelClient client = createClient()) { + GetMappingsRequest request = new GetMappingsRequest().indices(indexName); + GetMappingsResponse response = + client.indices().getMapping(request, RequestOptions.DEFAULT); + + MappingMetadata mapping = response.mappings().get(indexName); + return new FlintMetadata(mapping.source().string()); + } catch (Exception e) { + throw new IllegalStateException("Failed to get Flint index metadata", e); + } + } + + private RestHighLevelClient createClient() { + return new RestHighLevelClient( + RestClient.builder(new HttpHost(host, port, "http"))); + } +} diff --git a/flint/integ-test/src/test/scala/org/opensearch/flint/OpenSearchSuite.scala b/flint/integ-test/src/test/scala/org/opensearch/flint/OpenSearchSuite.scala new file mode 100644 index 0000000000..f0e16ef562 --- /dev/null +++ b/flint/integ-test/src/test/scala/org/opensearch/flint/OpenSearchSuite.scala @@ -0,0 +1,56 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint + +import org.apache.http.HttpHost +import org.opensearch.action.admin.indices.delete.DeleteIndexRequest +import org.opensearch.client.{RequestOptions, RestClient, RestHighLevelClient} +import org.opensearch.testcontainers.OpenSearchContainer +import org.scalatest.{BeforeAndAfterAll, Suite} + +/** + * Test required OpenSearch domain should extend OpenSearchSuite. + */ +trait OpenSearchSuite extends BeforeAndAfterAll { + self: Suite => + + protected lazy val container = new OpenSearchContainer() + + protected lazy val openSearchPort: Int = container.port() + + protected lazy val openSearchHost: String = container.getHost + + protected lazy val openSearchClient = new RestHighLevelClient( + RestClient.builder(new HttpHost(openSearchHost, openSearchPort, "http"))) + + protected lazy val openSearchOptions = + Map("host" -> openSearchHost, "port" -> s"$openSearchPort") + + override def beforeAll(): Unit = { + container.start() + super.beforeAll() + } + + override def afterAll(): Unit = { + container.close() + super.afterAll() + } + + /** + * Delete index `indexNames` after calling `f`. + */ + protected def withIndexName(indexNames: String*)(f: => Unit): Unit = { + try { + f + } finally { + indexNames.foreach { indexName => + openSearchClient + .indices() + .delete(new DeleteIndexRequest(indexName), RequestOptions.DEFAULT) + } + } + } +} diff --git a/flint/integ-test/src/test/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala b/flint/integ-test/src/test/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala new file mode 100644 index 0000000000..b890b7d629 --- /dev/null +++ b/flint/integ-test/src/test/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala @@ -0,0 +1,48 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.core + +import com.stephenn.scalatest.jsonassert.JsonMatchers.matchJson +import org.opensearch.flint.OpenSearchSuite +import org.opensearch.flint.core.metadata.FlintMetadata +import org.opensearch.flint.core.storage.FlintOpenSearchClient +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers + +class FlintOpenSearchClientSuite + extends AnyFlatSpec + with OpenSearchSuite + with Matchers { + + /** Lazy initialize after container started. */ + lazy val flintClient = new FlintOpenSearchClient(openSearchHost, openSearchPort) + + behavior of "Flint OpenSearch client" + + it should "create index successfully" in { + val indexName = "test" + val content = + """ { + | "_meta": { + | "kind": "SkippingIndex" + | }, + | "properties": { + | "age": { + | "type": "integer" + | } + | } + | } + |""".stripMargin + flintClient.createIndex(indexName, new FlintMetadata(content)) + + flintClient.exists(indexName) shouldBe true + flintClient.getIndexMetadata(indexName).getContent should matchJson (content) + } + + it should "return false if index not exist" in { + flintClient.exists("non-exist-index") shouldBe false + } +}