Skip to content

Commit

Permalink
Abstract service for accessing Flint index metadata (#495) (#576)
Browse files Browse the repository at this point in the history
* fix missed renames

* rename for log entry properties
* correct typo planTransformer



* add FlintIndexMetadataService

* interface class for FlintIndexMetadataService
  * move FlintMetadata and FlintVersion to flint-commons
* remove ser/de from FlintMetadata; move to OS impl for
  FlintIndexMetadataService
* remove schema parser in FlintMetadata builder to remove dependency to
  opensearch
* FlintSparkIndex generate not only schema json but also map
* FlintMetadataSuite divided into two: one for builder and one for
  ser/de, which is merged to FlintOpenSearchIndexMetadataServiceSuite



* move get metadata functions to new service

* Remove getIndexMetadata and getAllIndexMetadata from FlintClient
* Implement the two for OpenSearch
  * TODO: sanitize index name
* Add builder for FlintIndexMetadataService and options
* Refactor caller of FlintClient.get(All)IndexMetadata with
  FlintIndexMetadataService
* TODO: test suite for getIndexMetadata and getAllIndexMetadata (might
  overlap with FlintOpenSearchClientSuite)



* update index metadata

* remove updateIndex from FlintClient
* implement updateIndexMetadata for FlintOpenSearchIndexMetadataService
* updateIndexMetadata upon create index in FlintSpark
  * for OS client + OS index metadata service, the call for update is
    redundant
  * it's for when some other index metadata service implementation is
    provided
* TODO: Suite for updateIndexMetadata (now shared with
  FlintOpenSearchClientSuite)



* empty implementation for OS deleteIndexMetadata



* sanitize index name



* fix new FlintOption missing from FlintSparkConf



* fix FlintOpenSearchClientSuite



* delete file (missed in resolving conflict)



* Use service builder in OpenSearchCluster



* fix service builder class

* fix FlintOptions for custom class spark properties
* remove SparkConf from builder argument



* fix IT



* add test suites



* sanitize index name for opensearch metadata log



* remove spark-warehouse files



* exclude _meta field for createIndex in OpenSearch



* catch client creation exception



* Fetch metadata for OpenSearch table

* OpenSearchCluster move to java file because scala object cannot be
  mocked in mockito



* update doc with spark config



---------


(cherry picked from commit f5ad574)

Signed-off-by: Sean Kao <[email protected]>
  • Loading branch information
seankao-az authored Aug 21, 2024
1 parent ab46239 commit f470ff5
Show file tree
Hide file tree
Showing 41 changed files with 815 additions and 467 deletions.
2 changes: 2 additions & 0 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -522,6 +522,8 @@ In the index mapping, the `_meta` and `properties`field stores meta and schema i
- `spark.datasource.flint.auth.password`: basic auth password.
- `spark.datasource.flint.region`: default is us-west-2. only been used when auth=sigv4
- `spark.datasource.flint.customAWSCredentialsProvider`: default is empty.
- `spark.datasource.flint.customFlintMetadataLogServiceClass`: default is empty.
- `spark.datasource.flint.customFlintIndexMetadataServiceClass`: default is empty.
- `spark.datasource.flint.write.id_name`: no default value.
- `spark.datasource.flint.ignore.id_column` : default value is true.
- `spark.datasource.flint.write.batch_size`: "The number of documents written to Flint in a single batch request. Default value is Integer.MAX_VALUE.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.core
package org.opensearch.flint.common

/**
* Flint version.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.common.metadata;

import java.util.Map;

/**
* Flint index metadata service provides API for index metadata related operations on a Flint index
* regardless of underlying storage.
* <p>
* Custom implementations of this interface are expected to provide a public constructor with
* the signature {@code public MyCustomService(SparkConf sparkConf)} to be instantiated by
* the FlintIndexMetadataServiceBuilder.
*/
public interface FlintIndexMetadataService {

/**
* Retrieve metadata for a Flint index.
*
* @param indexName index name
* @return index metadata
*/
FlintMetadata getIndexMetadata(String indexName);

/**
* Retrieve all metadata for Flint index whose name matches the given pattern.
*
* @param indexNamePattern index name pattern
* @return map where the keys are the matched index names, and the values are
* corresponding index metadata
*/
Map<String, FlintMetadata> getAllIndexMetadata(String... indexNamePattern);

/**
* Update metadata for a Flint index.
*
* @param indexName index name
* @param metadata index metadata to update
*/
void updateIndexMetadata(String indexName, FlintMetadata metadata);

/**
* Delete metadata for a Flint index.
*
* @param indexName index name
*/
void deleteIndexMetadata(String indexName);
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,13 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.core.metadata
package org.opensearch.flint.common.metadata

import java.util

import org.opensearch.flint.common.FlintVersion
import org.opensearch.flint.common.FlintVersion.current
import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry
import org.opensearch.flint.core.FlintVersion
import org.opensearch.flint.core.FlintVersion.current
import org.opensearch.flint.core.metadata.FlintJsonHelper._

/**
* Flint metadata follows Flint index specification and defines metadata for a Flint index
Expand All @@ -35,7 +34,11 @@ case class FlintMetadata(
schema: util.Map[String, AnyRef] = new util.HashMap[String, AnyRef],
/** Optional latest metadata log entry id */
latestId: Option[String] = None,
/** Optional latest metadata log entry */
/**
* Optional latest metadata log entry. TODO: remove? This was added for SHOW command to be
* fetched during get(All)IndexMetadata. Now describeIndex uses metadata log service to fetch
* log entry after get(All)IndexMetadata so this doesn't need to be part of FlintMetadata.
*/
latestLogEntry: Option[FlintMetadataLogEntry] = None,
/** Optional Flint index settings. TODO: move elsewhere? */
indexSettings: Option[String]) {
Expand All @@ -44,124 +47,10 @@ case class FlintMetadata(
require(name != null, "name is required")
require(kind != null, "kind is required")
require(source != null, "source is required")

/**
* Generate JSON content as index metadata.
*
* @return
* JSON content
*/
def getContent: String = {
try {
buildJson(builder => {
// Add _meta field
objectField(builder, "_meta") {
builder
.field("version", version.version)
.field("name", name)
.field("kind", kind)
.field("source", source)
.field("indexedColumns", indexedColumns)

if (latestId.isDefined) {
builder.field("latestId", latestId.get)
}
optionalObjectField(builder, "options", options)
optionalObjectField(builder, "properties", properties)
}

// Add properties (schema) field
builder.field("properties", schema)
})
} catch {
case e: Exception =>
throw new IllegalStateException("Failed to jsonify Flint metadata", e)
}
}
}

object FlintMetadata {

/**
* Construct Flint metadata with JSON content, index settings, and latest log entry.
*
* @param content
* JSON content
* @param settings
* index settings
* @param latestLogEntry
* latest metadata log entry
* @return
* Flint metadata
*/
def apply(
content: String,
settings: String,
latestLogEntry: FlintMetadataLogEntry): FlintMetadata = {
val metadata = FlintMetadata(content, settings)
metadata.copy(latestLogEntry = Option(latestLogEntry))
}

/**
* Construct Flint metadata with JSON content and index settings.
*
* @param content
* JSON content
* @param settings
* index settings
* @return
* Flint metadata
*/
def apply(content: String, settings: String): FlintMetadata = {
val metadata = FlintMetadata(content)
metadata.copy(indexSettings = Option(settings))
}

/**
* Parse the given JSON content and construct Flint metadata class.
*
* @param content
* JSON content
* @return
* Flint metadata
*/
def apply(content: String): FlintMetadata = {
try {
val builder = new FlintMetadata.Builder()
parseJson(content) { (parser, fieldName) =>
{
fieldName match {
case "_meta" =>
parseObjectField(parser) { (parser, innerFieldName) =>
{
innerFieldName match {
case "version" => builder.version(FlintVersion.apply(parser.text()))
case "name" => builder.name(parser.text())
case "kind" => builder.kind(parser.text())
case "source" => builder.source(parser.text())
case "indexedColumns" =>
parseArrayField(parser) {
builder.addIndexedColumn(parser.map())
}
case "options" => builder.options(parser.map())
case "properties" => builder.properties(parser.map())
case _ => // Handle other fields as needed
}
}
}
case "properties" =>
builder.schema(parser.map())
case _ => // Ignore other fields, for instance, dynamic.
}
}
}
builder.build()
} catch {
case e: Exception =>
throw new IllegalStateException("Failed to parse metadata JSON", e)
}
}

def builder(): FlintMetadata.Builder = new Builder

/**
Expand Down Expand Up @@ -231,16 +120,6 @@ object FlintMetadata {
this
}

def schema(schema: String): this.type = {
parseJson(schema) { (parser, fieldName) =>
fieldName match {
case "properties" => this.schema = parser.map()
case _ => // do nothing
}
}
this
}

def latestLogEntry(entry: FlintMetadataLogEntry): this.type = {
this.latestId = Option(entry.id)
this.latestLogEntry = Option(entry)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ case class FlintMetadataLogEntry(
state: IndexState,
entryVersion: JMap[String, Any],
error: String,
storageContext: JMap[String, Any]) = {
this(id, createTime, state, entryVersion.asScala.toMap, error, storageContext.asScala.toMap)
properties: JMap[String, Any]) = {
this(id, createTime, state, entryVersion.asScala.toMap, error, properties.asScala.toMap)
}

def this(
Expand All @@ -53,8 +53,8 @@ case class FlintMetadataLogEntry(
state: IndexState,
entryVersion: JMap[String, Any],
error: String,
storageContext: Map[String, Any]) = {
this(id, createTime, state, entryVersion.asScala.toMap, error, storageContext)
properties: Map[String, Any]) = {
this(id, createTime, state, entryVersion.asScala.toMap, error, properties)
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.common.metadata

import scala.collection.JavaConverters.mapAsJavaMapConverter

import org.opensearch.flint.common.FlintVersion.current
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers

class FlintMetadataSuite extends AnyFlatSpec with Matchers {
"builder" should "build FlintMetadata with provided fields" in {
val builder = new FlintMetadata.Builder
builder.name("test_index")
builder.kind("test_kind")
builder.source("test_source_table")
builder.addIndexedColumn(Map[String, AnyRef]("test_field" -> "spark_type").asJava)
builder.schema(Map[String, AnyRef]("test_field" -> Map("type" -> "os_type").asJava).asJava)

val metadata = builder.build()

metadata.version shouldBe current()
metadata.name shouldBe "test_index"
metadata.kind shouldBe "test_kind"
metadata.source shouldBe "test_source_table"
metadata.indexedColumns shouldBe Array(Map("test_field" -> "spark_type").asJava)
metadata.schema shouldBe Map("test_field" -> Map("type" -> "os_type").asJava).asJava
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,7 @@

package org.opensearch.flint.core;

import java.util.Map;

import org.opensearch.flint.core.metadata.FlintMetadata;
import org.opensearch.flint.common.metadata.FlintMetadata;
import org.opensearch.flint.core.storage.FlintWriter;

/**
Expand All @@ -32,31 +30,6 @@ public interface FlintClient {
*/
boolean exists(String indexName);

/**
* Retrieve all metadata for Flint index whose name matches the given pattern.
*
* @param indexNamePattern index name pattern
* @return map where the keys are the matched index names, and the values are
* corresponding index metadata
*/
Map<String, FlintMetadata> getAllIndexMetadata(String... indexNamePattern);

/**
* Retrieve metadata in a Flint index.
*
* @param indexName index name
* @return index metadata
*/
FlintMetadata getIndexMetadata(String indexName);

/**
* Update a Flint index with the metadata given.
*
* @param indexName index name
* @param metadata index metadata
*/
void updateIndex(String indexName, FlintMetadata metadata);

/**
* Delete a Flint index.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,9 @@ public class FlintOptions implements Serializable {

public static final String DEFAULT_BATCH_BYTES = "1mb";

public static final String CUSTOM_FLINT_METADATA_LOG_SERVICE_CLASS = "spark.datasource.flint.customFlintMetadataLogServiceClass";
public static final String CUSTOM_FLINT_METADATA_LOG_SERVICE_CLASS = "customFlintMetadataLogServiceClass";

public static final String CUSTOM_FLINT_INDEX_METADATA_SERVICE_CLASS = "customFlintIndexMetadataServiceClass";

public static final String SUPPORT_SHARD = "read.support_shard";

Expand Down Expand Up @@ -189,6 +191,10 @@ public String getCustomFlintMetadataLogServiceClass() {
return options.getOrDefault(CUSTOM_FLINT_METADATA_LOG_SERVICE_CLASS, "");
}

public String getCustomFlintIndexMetadataServiceClass() {
return options.getOrDefault(CUSTOM_FLINT_INDEX_METADATA_SERVICE_CLASS, "");
}

/**
* FIXME, This is workaround for AWS OpenSearch Serverless (AOSS). AOSS does not support shard
* operation, but shard info is exposed in index settings. Remove this setting when AOSS fix
Expand Down
10 changes: 0 additions & 10 deletions flint-core/src/main/scala/org/opensearch/flint/core/MetaData.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@

package org.opensearch.flint.core

import org.opensearch.flint.core.metadata.FlintMetadata

/**
* OpenSearch Table metadata.
*
Expand All @@ -18,11 +16,3 @@ import org.opensearch.flint.core.metadata.FlintMetadata
* setting
*/
case class MetaData(name: String, properties: String, setting: String)

object MetaData {
def apply(name: String, flintMetadata: FlintMetadata): MetaData = {
val properties = flintMetadata.getContent
val setting = flintMetadata.indexSettings.getOrElse("")
MetaData(name, properties, setting)
}
}
Loading

0 comments on commit f470ff5

Please sign in to comment.