Skip to content

Commit

Permalink
move get metadata functions to new service
Browse files Browse the repository at this point in the history
* Remove getIndexMetadata and getAllIndexMetadata from FlintClient
* Implement the two for OpenSearch
  * TODO: sanitize index name
* Add builder for FlintIndexMetadataService and options
* Refactor caller of FlintClient.get(All)IndexMetadata with
  FlintIndexMetadataService
* TODO: test suite for getIndexMetadata and getAllIndexMetadata (might
  overlap with FlintOpenSearchClientSuite)

Signed-off-by: Sean Kao <[email protected]>
  • Loading branch information
seankao-az committed Aug 5, 2024
1 parent addf507 commit 88ed8dd
Show file tree
Hide file tree
Showing 14 changed files with 163 additions and 105 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,9 @@ case class FlintMetadata(
/** Optional latest metadata log entry id */
latestId: Option[String] = None,
/**
* Optional latest metadata log entry. TODO: remove. Now describeIndex uses metadata log
* service to fetch log entry.
* Optional latest metadata log entry. TODO: remove. This was added for SHOW command to be
* fetched during get(All)IndexMetadata. Now describeIndex uses metadata log service to fetch
* log entry.
*/
latestLogEntry: Option[FlintMetadataLogEntry] = None,
/** Optional Flint index settings. TODO: move elsewhere? */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,23 +33,6 @@ public interface FlintClient {
*/
boolean exists(String indexName);

/**
* Retrieve all metadata for Flint index whose name matches the given pattern.
*
* @param indexNamePattern index name pattern
* @return map where the keys are the matched index names, and the values are
* corresponding index metadata
*/
Map<String, FlintMetadata> getAllIndexMetadata(String... indexNamePattern);

/**
* Retrieve metadata in a Flint index.
*
* @param indexName index name
* @return index metadata
*/
FlintMetadata getIndexMetadata(String indexName);

/**
* Update a Flint index with the metadata given.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ public class FlintOptions implements Serializable {

public static final String CUSTOM_FLINT_METADATA_LOG_SERVICE_CLASS = "spark.datasource.flint.customFlintMetadataLogServiceClass";

public static final String CUSTOM_FLINT_INDEX_METADATA_SERVICE_CLASS = "spark.datasource.flint.customFlintIndexMetadataServiceClass";

public FlintOptions(Map<String, String> options) {
this.options = options;
this.retryOptions = new FlintRetryOptions(options);
Expand Down Expand Up @@ -168,4 +170,8 @@ public int getBatchBytes() {
public String getCustomFlintMetadataLogServiceClass() {
return options.getOrDefault(CUSTOM_FLINT_METADATA_LOG_SERVICE_CLASS, "");
}

public String getCustomFlintIndexMetadataServiceClass() {
return options.getOrDefault(CUSTOM_FLINT_INDEX_METADATA_SERVICE_CLASS, "");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.core.metadata;

import java.lang.reflect.Constructor;
import org.apache.spark.SparkConf;
import org.opensearch.flint.common.metadata.FlintIndexMetadataService;
import org.opensearch.flint.core.FlintOptions;
import org.opensearch.flint.core.storage.FlintOpenSearchIndexMetadataService;

/**
* {@link FlintIndexMetadataService} builder.
* <p>
* Custom implementations of {@link FlintIndexMetadataService} are expected to provide a public
* constructor with the signature {@code public MyCustomService(SparkConf sparkConf)} to be
* instantiated by this builder.
*/
public class FlintIndexMetadataServiceBuilder {
public static FlintIndexMetadataService build(FlintOptions options, SparkConf sparkConf) {
String className = options.getCustomFlintIndexMetadataServiceClass();
if (className.isEmpty()) {
return new FlintOpenSearchIndexMetadataService(options);
}

// Attempts to instantiate Flint index metadata service with sparkConf using reflection
try {
Class<?> flintIndexMetadataServiceClass = Class.forName(className);
Constructor<?> constructor = flintIndexMetadataServiceClass.getConstructor(SparkConf.class);
return (FlintIndexMetadataService) constructor.newInstance(sparkConf);
} catch (Exception e) {
throw new RuntimeException("Failed to instantiate FlintIndexMetadataService: " + className, e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -105,52 +105,13 @@ public boolean exists(String indexName) {
}
}

@Override
public Map<String, FlintMetadata> getAllIndexMetadata(String... indexNamePattern) {
LOG.info("Fetching all Flint index metadata for pattern " + String.join(",", indexNamePattern));
String[] indexNames =
Arrays.stream(indexNamePattern).map(this::sanitizeIndexName).toArray(String[]::new);
try (IRestHighLevelClient client = createClient()) {
GetIndexRequest request = new GetIndexRequest(indexNames);
GetIndexResponse response = client.getIndex(request, RequestOptions.DEFAULT);

return Arrays.stream(response.getIndices())
.collect(Collectors.toMap(
index -> index,
index -> FlintOpenSearchIndexMetadataService.deserialize(
response.getMappings().get(index).source().toString(),
response.getSettings().get(index).toString()
)
));
} catch (Exception e) {
throw new IllegalStateException("Failed to get Flint index metadata for " +
String.join(",", indexNames), e);
}
}

@Override
public FlintMetadata getIndexMetadata(String indexName) {
LOG.info("Fetching Flint index metadata for " + indexName);
String osIndexName = sanitizeIndexName(indexName);
try (IRestHighLevelClient client = createClient()) {
GetIndexRequest request = new GetIndexRequest(osIndexName);
GetIndexResponse response = client.getIndex(request, RequestOptions.DEFAULT);

MappingMetadata mapping = response.getMappings().get(osIndexName);
Settings settings = response.getSettings().get(osIndexName);
return FlintOpenSearchIndexMetadataService.deserialize(mapping.source().string(), settings.toString());
} catch (Exception e) {
throw new IllegalStateException("Failed to get Flint index metadata for " + osIndexName, e);
}
}

// TODO: remove this interface? update index should be handled by index metadata service
@Override
public void updateIndex(String indexName, FlintMetadata metadata) {
LOG.info("Updating Flint index " + indexName + " with metadata " + metadata);
String osIndexName = sanitizeIndexName(indexName);
try (IRestHighLevelClient client = createClient()) {
PutMappingRequest request = new PutMappingRequest(osIndexName);
// TODO: use generic index metadata service
request.source(FlintOpenSearchIndexMetadataService.serialize(metadata), XContentType.JSON);
client.updateIndexMapping(request, RequestOptions.DEFAULT);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ package org.opensearch.flint.core.storage

import java.util

import scala.collection.JavaConverters.mapAsJavaMapConverter

import org.opensearch.client.RequestOptions
import org.opensearch.client.indices.{GetIndexRequest, GetIndexResponse}
import org.opensearch.flint.common.FlintVersion
import org.opensearch.flint.common.metadata.{FlintIndexMetadataService, FlintMetadata}
import org.opensearch.flint.core.FlintOptions
Expand All @@ -19,22 +23,52 @@ class FlintOpenSearchIndexMetadataService(options: FlintOptions)
with Logging {

override def getIndexMetadata(indexName: String): FlintMetadata = {
FlintMetadata(
FlintVersion.current,
"",
"",
"",
Array(),
util.Map.of(),
util.Map.of(),
util.Map.of(),
None,
None,
None)
logInfo(s"Fetching Flint index metadata for $indexName")
// TODO: sanitize
// val osIndexName = sanitizeIndexName(indexName)
val osIndexName = indexName
val client = OpenSearchClientUtils.createClient(options)
try {
val request = new GetIndexRequest(osIndexName)
val response = client.getIndex(request, RequestOptions.DEFAULT)
val mapping = response.getMappings.get(osIndexName)
val settings = response.getSettings.get(osIndexName)
FlintOpenSearchIndexMetadataService.deserialize(mapping.source.string, settings.toString)
} catch {
case e: Exception =>
throw new IllegalStateException(
"Failed to get Flint index metadata for " + osIndexName,
e)
} finally {
client.close()
}
}

override def getAllIndexMetadata(indexNamePattern: String*): util.Map[String, FlintMetadata] = {
util.Map.of()
logInfo(s"Fetching all Flint index metadata for pattern ${indexNamePattern.mkString(",")}");
// TODO: sanitize
// val indexNames = indexNamePattern.map(sanitizeIndexName)
val indexNames = indexNamePattern
val client = OpenSearchClientUtils.createClient(options)
try {
val request = new GetIndexRequest(indexNames: _*)
val response: GetIndexResponse = client.getIndex(request, RequestOptions.DEFAULT)

response.getIndices
.map(index =>
index -> FlintOpenSearchIndexMetadataService.deserialize(
response.getMappings.get(index).source().string(),
response.getSettings.get(index).toString))
.toMap
.asJava
} catch {
case e: Exception =>
throw new IllegalStateException(
s"Failed to get Flint index metadata for ${indexNames.mkString(",")}",
e)
} finally {
client.close()
}
}

override def updateIndexMetadata(indexName: String, metadata: FlintMetadata): Unit = {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@ package org.apache.spark.opensearch.table
import scala.collection.JavaConverters._

import org.opensearch.flint.common.metadata.FlintMetadata
import org.opensearch.flint.core.{FlintClientBuilder, FlintOptions}
import org.opensearch.flint.core.FlintOptions
import org.opensearch.flint.core.metadata.FlintIndexMetadataServiceBuilder
import org.opensearch.flint.core.storage.FlintOpenSearchIndexMetadataService

import org.apache.spark.SparkConf
import org.apache.spark.sql.flint.datatype.FlintDataType
import org.apache.spark.sql.types.StructType

Expand Down Expand Up @@ -48,14 +50,16 @@ object OpenSearchTable {
* tableName support (1) single index name. (2) wildcard index name. (3) comma sep index name.
* @param options
* The options for Flint.
* @param conf
* Configurations for Spark application.
* @return
* An instance of OpenSearchTable.
*/
def apply(tableName: String, options: FlintOptions): OpenSearchTable = {
def apply(tableName: String, options: FlintOptions, conf: SparkConf): OpenSearchTable = {
OpenSearchTable(
tableName,
FlintClientBuilder
.build(options)
FlintIndexMetadataServiceBuilder
.build(options, conf)
.getAllIndexMetadata(tableName.split(","): _*)
.asScala
.toMap)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class FlintReadOnlyTable(
lazy val name: String = flintSparkConf.tableName()

lazy val openSearchTable: OpenSearchTable =
OpenSearchTable.apply(name, flintSparkConf.flintOptions())
OpenSearchTable.apply(name, flintSparkConf.flintOptions(), sparkSession.sparkContext.getConf)

lazy val schema: StructType = {
userSpecifiedSchema.getOrElse { openSearchTable.schema }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,15 @@ import scala.collection.JavaConverters._

import org.json4s.{Formats, NoTypeHints}
import org.json4s.native.Serialization
import org.opensearch.flint.common.metadata.FlintMetadata
import org.opensearch.flint.common.metadata.{FlintIndexMetadataService, FlintMetadata}
import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry.IndexState._
import org.opensearch.flint.common.metadata.log.FlintMetadataLogService
import org.opensearch.flint.common.metadata.log.OptimisticTransaction
import org.opensearch.flint.common.metadata.log.OptimisticTransaction.NO_LOG_ENTRY
import org.opensearch.flint.core.{FlintClient, FlintClientBuilder}
import org.opensearch.flint.core.metadata.FlintIndexMetadataServiceBuilder
import org.opensearch.flint.core.metadata.log.FlintMetadataLogServiceBuilder
import org.opensearch.flint.core.storage.FlintOpenSearchIndexMetadataService
import org.opensearch.flint.spark.FlintSparkIndex.ID_COLUMN
import org.opensearch.flint.spark.FlintSparkIndexOptions.OptionName._
import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex
Expand Down Expand Up @@ -47,6 +49,12 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w
/** Flint client for low-level index operation */
private val flintClient: FlintClient = FlintClientBuilder.build(flintSparkConf.flintOptions())

private val flintIndexMetadataService: FlintIndexMetadataService = {
FlintIndexMetadataServiceBuilder.build(
flintSparkConf.flintOptions(),
spark.sparkContext.getConf)
}

override protected val flintMetadataLogService: FlintMetadataLogService = {
FlintMetadataLogServiceBuilder.build(
flintSparkConf.flintOptions(),
Expand Down Expand Up @@ -112,6 +120,7 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w
.transientLog(latest => latest.copy(state = CREATING))
.finalLog(latest => latest.copy(state = ACTIVE))
.commit(latest =>
// TODO: update index metadata
if (latest == null) { // in case transaction capability is disabled
flintClient.createIndex(indexName, metadata)
} else {
Expand Down Expand Up @@ -163,7 +172,7 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w
def describeIndexes(indexNamePattern: String): Seq[FlintSparkIndex] = {
logInfo(s"Describing indexes with pattern $indexNamePattern")
if (flintClient.exists(indexNamePattern)) {
flintClient
flintIndexMetadataService
.getAllIndexMetadata(indexNamePattern)
.asScala
.map { case (indexName, metadata) =>
Expand All @@ -187,7 +196,7 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w
def describeIndex(indexName: String): Option[FlintSparkIndex] = {
logInfo(s"Describing index name $indexName")
if (flintClient.exists(indexName)) {
val metadata = flintClient.getIndexMetadata(indexName)
val metadata = flintIndexMetadataService.getIndexMetadata(indexName)
val metadataWithEntry = attachLatestLogEntry(indexName, metadata)
FlintSparkIndexFactory.create(metadataWithEntry)
} else {
Expand Down Expand Up @@ -267,6 +276,7 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w
.finalLog(_ => NO_LOG_ENTRY)
.commit(_ => {
flintClient.deleteIndex(indexName)
// TODO: delete index metadata
true
})
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,18 @@ import scala.collection.JavaConverters._

import org.mockito.ArgumentMatchers.any
import org.mockito.Mockito.{mockStatic, when, RETURNS_DEEP_STUBS}
import org.opensearch.flint.common.metadata.FlintIndexMetadataService
import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry
import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry.IndexState.{ACTIVE, DELETED, IndexState}
import org.opensearch.flint.core.{FlintClient, FlintClientBuilder, FlintOptions}
import org.opensearch.flint.core.metadata.FlintIndexMetadataServiceBuilder
import org.opensearch.flint.spark.FlintSpark
import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex.getFlintIndexName
import org.scalatest.matchers.{Matcher, MatchResult}
import org.scalatest.matchers.should.Matchers
import org.scalatestplus.mockito.MockitoSugar.mock

import org.apache.spark.FlintSuite
import org.apache.spark.{FlintSuite, SparkConf}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
Expand All @@ -29,9 +31,13 @@ class ApplyFlintSparkCoveringIndexSuite extends FlintSuite with Matchers {
private val testTable = "spark_catalog.default.apply_covering_index_test"
private val testTable2 = "spark_catalog.default.apply_covering_index_test_2"

/** Mock FlintClient to avoid looking for real OpenSearch cluster */
/**
* Mock FlintClient and FlintIndexMetadataService to avoid looking for real OpenSearch cluster
*/
private val clientBuilder = mockStatic(classOf[FlintClientBuilder])
private val client = mock[FlintClient](RETURNS_DEEP_STUBS)
private val indexMetadataServiceBuilder = mockStatic(classOf[FlintIndexMetadataServiceBuilder])
private val indexMetadataService = mock[FlintIndexMetadataService](RETURNS_DEEP_STUBS)

/** Mock FlintSpark which is required by the rule. Deep stub required to replace spark val. */
private val flint = mock[FlintSpark](RETURNS_DEEP_STUBS)
Expand All @@ -50,16 +56,24 @@ class ApplyFlintSparkCoveringIndexSuite extends FlintSuite with Matchers {
| ('F', 35), ('G', 40), ('H', 45), ('I', 50), ('J', 55)
| """.stripMargin)

// Mock static create method in FlintClientBuilder used by Flint data source
// Mock static create method in FlintClientBuilder and FlintIndexMetadataServiceBuilder used by Flint data source
clientBuilder
.when(() => FlintClientBuilder.build(any(classOf[FlintOptions])))
.when(() =>
FlintClientBuilder
.build(any(classOf[FlintOptions])))
.thenReturn(client)
indexMetadataServiceBuilder
.when(() =>
FlintIndexMetadataServiceBuilder
.build(any(classOf[FlintOptions]), any(classOf[SparkConf])))
.thenReturn(indexMetadataService)
when(flint.spark).thenReturn(spark)
}

override protected def afterAll(): Unit = {
sql(s"DROP TABLE $testTable")
clientBuilder.close()
indexMetadataServiceBuilder.close()
super.afterAll()
}

Expand Down Expand Up @@ -265,7 +279,7 @@ class ApplyFlintSparkCoveringIndexSuite extends FlintSuite with Matchers {
})

indexes.foreach { index =>
when(client.getAllIndexMetadata(index.name()))
when(indexMetadataService.getAllIndexMetadata(index.name()))
.thenReturn(Map.apply(index.name() -> index.metadata()).asJava)
}
rule.apply(plan)
Expand Down
Loading

0 comments on commit 88ed8dd

Please sign in to comment.