Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Flint client and index management API #1634

Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions flint/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,13 @@ lazy val root = (project in file("."))

lazy val flintCore = (project in file("flint-core"))
.disablePlugins(AssemblyPlugin)
.settings(name := "flint-core", scalaVersion := scala212)
.settings(
name := "flint-core",
scalaVersion := scala212,
libraryDependencies ++= Seq(
"org.opensearch.client" % "opensearch-rest-client" % opensearchVersion,
"org.opensearch.client" % "opensearch-rest-high-level-client" % opensearchVersion)
)

lazy val flintSparkIntegration = (project in file("flint-spark-integration"))
.dependsOn(flintCore)
Expand All @@ -64,8 +70,6 @@ lazy val flintSparkIntegration = (project in file("flint-spark-integration"))
name := "flint-spark-integration",
scalaVersion := scala212,
libraryDependencies ++= Seq(
"org.opensearch.client" % "opensearch-rest-client" % opensearchVersion,
"org.opensearch.client" % "opensearch-rest-high-level-client" % opensearchVersion,
"org.testcontainers" % "testcontainers" % "1.18.0" % "test",
"org.scalactic" %% "scalactic" % "3.2.15",
"org.scalatest" %% "scalatest" % "3.2.15" % "test",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.core;

import org.opensearch.flint.core.metadata.FlintMetadata;

/**
* Flint index client that provides API for metadata and data operations
* on a Flint index regardless of concrete storage.
*/
public interface FlintClient {

/**
* Create a Flint index with the metadata given.
*
* @param indexName index name
* @param metadata index metadata
*/
void createIndex(String indexName, FlintMetadata metadata);

/**
* Does Flint index with the given name exist
*
* @param indexName index name
* @return true if the index exists, otherwise false
*/
boolean exists(String indexName);

/**
* Retrieve metadata in a Flint index.
*
* @param indexName index name
* @return index metadata
*/
FlintMetadata getIndexMetadata(String indexName);
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.core.metadata;

import java.util.Map;

/**
* Flint metadata follows Flint index specification and defines metadata
* for a Flint index regardless of query engine integration and storage.
*/
public class FlintMetadata {

/** Field name and type for each field in a Flint index. */
private final Map<String, String> schema;

/** Meta info for the Flint index. */
private final Map<String, Object> meta;

public FlintMetadata(Map<String, String> schema,
Map<String, Object> meta) {
this.schema = schema;
this.meta = meta;
}

public Map<String, String> getSchema() {
dai-chen marked this conversation as resolved.
Show resolved Hide resolved
return schema;
}

public Map<String, Object> getMeta() {
return meta;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.core.storage;

import java.io.IOException;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.http.HttpHost;
import org.opensearch.client.RequestOptions;
import org.opensearch.client.RestClient;
import org.opensearch.client.RestHighLevelClient;
import org.opensearch.client.indices.CreateIndexRequest;
import org.opensearch.client.indices.GetIndexRequest;
import org.opensearch.client.indices.GetMappingsRequest;
import org.opensearch.client.indices.GetMappingsResponse;
import org.opensearch.cluster.metadata.MappingMetadata;
import org.opensearch.flint.core.FlintClient;
import org.opensearch.flint.core.metadata.FlintMetadata;

/**
* Flint client implementation for OpenSearch storage.
*/
public class FlintOpenSearchClient implements FlintClient {

/** OpenSearch host name. */
private final String host;

/** OpenSearch port number. */
private final int port;

public FlintOpenSearchClient(String host, int port) {
this.host = host;
this.port = port;
}

@Override
public void createIndex(String indexName, FlintMetadata metadata) {
try (RestHighLevelClient client = createClient()) {
CreateIndexRequest request = new CreateIndexRequest(indexName);
request.mapping(buildIndexMapping(metadata));

client.indices().create(request, RequestOptions.DEFAULT);
} catch (Exception e) {
throw new IllegalStateException("Failed to create Flint index", e);
}
}

@Override
public boolean exists(String indexName) {
try (RestHighLevelClient client = createClient()) {
return client.indices()
.exists(new GetIndexRequest(indexName), RequestOptions.DEFAULT);
} catch (IOException e) {
throw new IllegalStateException("Failed to check if Flint index exists", e);
}
}

@Override
public FlintMetadata getIndexMetadata(String indexName) {
try (RestHighLevelClient client = createClient()) {
GetMappingsRequest request = new GetMappingsRequest().indices(indexName);
GetMappingsResponse response =
client.indices().getMapping(request, RequestOptions.DEFAULT);

return parseIndexMapping(response.mappings().get(indexName));
} catch (Exception e) {
throw new IllegalStateException("Failed to get Flint index metadata", e);
}
}

private RestHighLevelClient createClient() {
return new RestHighLevelClient(
RestClient.builder(new HttpHost(host, port, "http")));
}

private Map<String, Object> buildIndexMapping(FlintMetadata metadata) {
// Convert from {"field": "int"} to {"field": {"type": "int"}}
Map<String, Object> fieldTypes =
metadata.getSchema().entrySet().stream()
.collect(Collectors.toMap(
Map.Entry::getKey,
entry -> Map.of("type", entry.getValue())));

return Map.of(
"properties", fieldTypes,
"_meta", metadata.getMeta());
}

@SuppressWarnings("unchecked")
private FlintMetadata parseIndexMapping(MappingMetadata mapping) {
Map<String, Object> source = mapping.getSourceAsMap();

// Parse {"field": {"type": "int"}} to {"field": "int"}
Map<String, String> schema =
((Map<String, Object>) source.get("properties"))
.entrySet().stream()
.collect(Collectors.toMap(
Map.Entry::getKey,
entry -> ((Map<String, String>) entry.getValue()).get("type")));
dai-chen marked this conversation as resolved.
Show resolved Hide resolved

return new FlintMetadata(schema, (Map<String, Object>) source.get("_meta"));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint

import org.apache.http.HttpHost
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest
import org.opensearch.client.{RequestOptions, RestClient, RestHighLevelClient}
import org.opensearch.testcontainers.OpenSearchContainer
import org.scalatest.{BeforeAndAfterAll, Suite}

/**
* Test required OpenSearch domain should extend OpenSearchSuite.
*/
trait OpenSearchSuite extends BeforeAndAfterAll {
self: Suite =>

protected lazy val container = new OpenSearchContainer()

protected lazy val openSearchPort: Int = container.port()

protected lazy val openSearchHost: String = container.getHost

protected lazy val openSearchClient = new RestHighLevelClient(
RestClient.builder(new HttpHost(openSearchHost, openSearchPort, "http")))

protected lazy val openSearchOptions =
Map("host" -> openSearchHost, "port" -> s"$openSearchPort")

override def beforeAll(): Unit = {
container.start()
super.beforeAll()
}

override def afterAll(): Unit = {
container.close()
super.afterAll()
}

/**
* Delete index `indexNames` after calling `f`.
*/
protected def withIndexName(indexNames: String*)(f: => Unit): Unit = {
try {
f
} finally {
indexNames.foreach { indexName =>
openSearchClient
.indices()
.delete(new DeleteIndexRequest(indexName), RequestOptions.DEFAULT)
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.core

import org.opensearch.flint.OpenSearchSuite
import org.opensearch.flint.core.metadata.FlintMetadata
import org.opensearch.flint.core.storage.FlintOpenSearchClient
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import scala.collection.JavaConverters._

class FlintOpenSearchClientSuite
extends AnyFlatSpec
with OpenSearchSuite
with Matchers {

/** Lazy initialize after container started. */
lazy val flintClient = new FlintOpenSearchClient(openSearchHost, openSearchPort)

behavior of "Flint OpenSearch client"

it should "create index successfully" in {
val indexName = "test"
val schema = Map("age" -> "integer").asJava
val meta =
Map("index" ->
Map("kind" -> "SkippingIndex").asJava.asInstanceOf[Object]
).asJava
flintClient.createIndex(indexName, new FlintMetadata(schema, meta))

flintClient.exists(indexName) shouldBe true
flintClient.getIndexMetadata(indexName) should have (
'schema (schema),
'meta (meta))
}

it should "return false if index not exist" in {
flintClient.exists("non-exist-index") shouldBe false
}
}