Skip to content

Commit

Permalink
Add Flint client and index management API (#1634)
Browse files Browse the repository at this point in the history
* Add flint client and create index API

Signed-off-by: Chen Dai <[email protected]>

* Handle exception in Flint client

Signed-off-by: Chen Dai <[email protected]>

* Fix scala style

Signed-off-by: Chen Dai <[email protected]>

* Change metadata content to string for flexibility

Signed-off-by: Chen Dai <[email protected]>

---------

Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen authored May 17, 2023
1 parent b19db71 commit 84ccf88
Show file tree
Hide file tree
Showing 7 changed files with 254 additions and 12 deletions.
13 changes: 9 additions & 4 deletions flint/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,13 @@ lazy val root = (project in file("."))

lazy val flintCore = (project in file("flint-core"))
.disablePlugins(AssemblyPlugin)
.settings(name := "flint-core", scalaVersion := scala212)
.settings(
name := "flint-core",
scalaVersion := scala212,
libraryDependencies ++= Seq(
"org.opensearch.client" % "opensearch-rest-client" % opensearchVersion,
"org.opensearch.client" % "opensearch-rest-high-level-client" % opensearchVersion)
)

lazy val flintSparkIntegration = (project in file("flint-spark-integration"))
.dependsOn(flintCore)
Expand All @@ -64,8 +70,6 @@ lazy val flintSparkIntegration = (project in file("flint-spark-integration"))
name := "flint-spark-integration",
scalaVersion := scala212,
libraryDependencies ++= Seq(
"org.opensearch.client" % "opensearch-rest-client" % opensearchVersion,
"org.opensearch.client" % "opensearch-rest-high-level-client" % opensearchVersion,
"org.testcontainers" % "testcontainers" % "1.18.0" % "test",
"org.scalactic" %% "scalactic" % "3.2.15",
"org.scalatest" %% "scalatest" % "3.2.15" % "test",
Expand Down Expand Up @@ -98,6 +102,7 @@ lazy val integtest = (project in file("integ-test"))
scalaVersion := scala212,
libraryDependencies ++= Seq(
"org.scalactic" %% "scalactic" % "3.2.15",
"org.scalatest" %% "scalatest" % "3.2.15" % "test"),
"org.scalatest" %% "scalatest" % "3.2.15" % "test",
"com.stephenn" %% "scalatest-json-jsonassert" % "0.2.5" % "test"),
libraryDependencies ++= deps(sparkVersion),
Test / fullClasspath += (flintSparkIntegration / assembly).value)
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.core;

import org.opensearch.flint.core.metadata.FlintMetadata;

/**
* Flint index client that provides API for metadata and data operations
* on a Flint index regardless of concrete storage.
*/
public interface FlintClient {

/**
* Create a Flint index with the metadata given.
*
* @param indexName index name
* @param metadata index metadata
*/
void createIndex(String indexName, FlintMetadata metadata);

/**
* Does Flint index with the given name exist
*
* @param indexName index name
* @return true if the index exists, otherwise false
*/
boolean exists(String indexName);

/**
* Retrieve metadata in a Flint index.
*
* @param indexName index name
* @return index metadata
*/
FlintMetadata getIndexMetadata(String indexName);
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.core.metadata;

/**
* Flint metadata follows Flint index specification and defines metadata
* for a Flint index regardless of query engine integration and storage.
*/
public class FlintMetadata {

// TODO: define metadata format and create strong-typed class
private final String content;

public FlintMetadata(String content) {
this.content = content;
}

public String getContent() {
return content;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.core.storage;

import java.io.IOException;
import org.apache.http.HttpHost;
import org.opensearch.client.RequestOptions;
import org.opensearch.client.RestClient;
import org.opensearch.client.RestHighLevelClient;
import org.opensearch.client.indices.CreateIndexRequest;
import org.opensearch.client.indices.GetIndexRequest;
import org.opensearch.client.indices.GetMappingsRequest;
import org.opensearch.client.indices.GetMappingsResponse;
import org.opensearch.cluster.metadata.MappingMetadata;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.flint.core.FlintClient;
import org.opensearch.flint.core.metadata.FlintMetadata;

/**
* Flint client implementation for OpenSearch storage.
*/
public class FlintOpenSearchClient implements FlintClient {

/** OpenSearch host name. */
private final String host;

/** OpenSearch port number. */
private final int port;

public FlintOpenSearchClient(String host, int port) {
this.host = host;
this.port = port;
}

@Override
public void createIndex(String indexName, FlintMetadata metadata) {
try (RestHighLevelClient client = createClient()) {
CreateIndexRequest request = new CreateIndexRequest(indexName);
request.mapping(metadata.getContent(), XContentType.JSON);

client.indices().create(request, RequestOptions.DEFAULT);
} catch (Exception e) {
throw new IllegalStateException("Failed to create Flint index", e);
}
}

@Override
public boolean exists(String indexName) {
try (RestHighLevelClient client = createClient()) {
return client.indices()
.exists(new GetIndexRequest(indexName), RequestOptions.DEFAULT);
} catch (IOException e) {
throw new IllegalStateException("Failed to check if Flint index exists", e);
}
}

@Override
public FlintMetadata getIndexMetadata(String indexName) {
try (RestHighLevelClient client = createClient()) {
GetMappingsRequest request = new GetMappingsRequest().indices(indexName);
GetMappingsResponse response =
client.indices().getMapping(request, RequestOptions.DEFAULT);

MappingMetadata mapping = response.mappings().get(indexName);
return new FlintMetadata(mapping.source().string());
} catch (Exception e) {
throw new IllegalStateException("Failed to get Flint index metadata", e);
}
}

private RestHighLevelClient createClient() {
return new RestHighLevelClient(
RestClient.builder(new HttpHost(host, port, "http")));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint

import org.apache.http.HttpHost
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest
import org.opensearch.client.{RequestOptions, RestClient, RestHighLevelClient}
import org.opensearch.testcontainers.OpenSearchContainer
import org.scalatest.{BeforeAndAfterAll, Suite}

/**
* Test required OpenSearch domain should extend OpenSearchSuite.
*/
trait OpenSearchSuite extends BeforeAndAfterAll {
self: Suite =>

protected lazy val container = new OpenSearchContainer()

protected lazy val openSearchPort: Int = container.port()

protected lazy val openSearchHost: String = container.getHost

protected lazy val openSearchClient = new RestHighLevelClient(
RestClient.builder(new HttpHost(openSearchHost, openSearchPort, "http")))

protected lazy val openSearchOptions =
Map("host" -> openSearchHost, "port" -> s"$openSearchPort")

override def beforeAll(): Unit = {
container.start()
super.beforeAll()
}

override def afterAll(): Unit = {
container.close()
super.afterAll()
}

/**
* Delete index `indexNames` after calling `f`.
*/
protected def withIndexName(indexNames: String*)(f: => Unit): Unit = {
try {
f
} finally {
indexNames.foreach { indexName =>
openSearchClient
.indices()
.delete(new DeleteIndexRequest(indexName), RequestOptions.DEFAULT)
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.core

import com.stephenn.scalatest.jsonassert.JsonMatchers.matchJson
import org.opensearch.flint.OpenSearchSuite
import org.opensearch.flint.core.metadata.FlintMetadata
import org.opensearch.flint.core.storage.FlintOpenSearchClient
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers

class FlintOpenSearchClientSuite
extends AnyFlatSpec
with OpenSearchSuite
with Matchers {

/** Lazy initialize after container started. */
lazy val flintClient = new FlintOpenSearchClient(openSearchHost, openSearchPort)

behavior of "Flint OpenSearch client"

it should "create index successfully" in {
val indexName = "test"
val content =
""" {
| "_meta": {
| "kind": "SkippingIndex"
| },
| "properties": {
| "age": {
| "type": "integer"
| }
| }
| }
|""".stripMargin
flintClient.createIndex(indexName, new FlintMetadata(content))

flintClient.exists(indexName) shouldBe true
flintClient.getIndexMetadata(indexName).getContent should matchJson (content)
}

it should "return false if index not exist" in {
flintClient.exists("non-exist-index") shouldBe false
}
}

0 comments on commit 84ccf88

Please sign in to comment.