From 88ed8dde06ac6011a88cb88a5d3621ffc371eedf Mon Sep 17 00:00:00 2001 From: Sean Kao Date: Thu, 1 Aug 2024 18:27:06 -0700 Subject: [PATCH] move get metadata functions to new service * Remove getIndexMetadata and getAllIndexMetadata from FlintClient * Implement the two for OpenSearch * TODO: sanitize index name * Add builder for FlintIndexMetadataService and options * Refactor caller of FlintClient.get(All)IndexMetadata with FlintIndexMetadataService * TODO: test suite for getIndexMetadata and getAllIndexMetadata (might overlap with FlintOpenSearchClientSuite) Signed-off-by: Sean Kao --- .../flint/common/metadata/FlintMetadata.scala | 5 +- .../opensearch/flint/core/FlintClient.java | 17 ------ .../opensearch/flint/core/FlintOptions.java | 6 ++ .../FlintIndexMetadataServiceBuilder.java | 37 ++++++++++++ .../core/storage/FlintOpenSearchClient.java | 41 +------------ .../FlintOpenSearchIndexMetadataService.scala | 60 +++++++++++++++---- .../opensearch/table/OpenSearchTable.scala | 12 ++-- .../spark/sql/flint/FlintReadOnlyTable.scala | 2 +- .../opensearch/flint/spark/FlintSpark.scala | 16 ++++- .../ApplyFlintSparkCoveringIndexSuite.scala | 24 ++++++-- .../core/FlintOpenSearchClientSuite.scala | 23 +++---- .../FlintSparkCoveringIndexSqlITSuite.scala | 10 ++-- ...FlintSparkMaterializedViewSqlITSuite.scala | 8 ++- .../FlintSparkSkippingIndexSqlITSuite.scala | 7 ++- 14 files changed, 163 insertions(+), 105 deletions(-) create mode 100644 flint-core/src/main/scala/org/opensearch/flint/core/metadata/FlintIndexMetadataServiceBuilder.java diff --git a/flint-commons/src/main/scala/org/opensearch/flint/common/metadata/FlintMetadata.scala b/flint-commons/src/main/scala/org/opensearch/flint/common/metadata/FlintMetadata.scala index a2d2c091a..91500b1d2 100644 --- a/flint-commons/src/main/scala/org/opensearch/flint/common/metadata/FlintMetadata.scala +++ b/flint-commons/src/main/scala/org/opensearch/flint/common/metadata/FlintMetadata.scala @@ -35,8 +35,9 @@ case class FlintMetadata( /** Optional latest metadata log entry id */ latestId: Option[String] = None, /** - * Optional latest metadata log entry. TODO: remove. Now describeIndex uses metadata log - * service to fetch log entry. + * Optional latest metadata log entry. TODO: remove. This was added for SHOW command to be + * fetched during get(All)IndexMetadata. Now describeIndex uses metadata log service to fetch + * log entry. */ latestLogEntry: Option[FlintMetadataLogEntry] = None, /** Optional Flint index settings. TODO: move elsewhere? */ diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/FlintClient.java b/flint-core/src/main/scala/org/opensearch/flint/core/FlintClient.java index 31896dd06..977afda8a 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/FlintClient.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/FlintClient.java @@ -33,23 +33,6 @@ public interface FlintClient { */ boolean exists(String indexName); - /** - * Retrieve all metadata for Flint index whose name matches the given pattern. - * - * @param indexNamePattern index name pattern - * @return map where the keys are the matched index names, and the values are - * corresponding index metadata - */ - Map getAllIndexMetadata(String... indexNamePattern); - - /** - * Retrieve metadata in a Flint index. - * - * @param indexName index name - * @return index metadata - */ - FlintMetadata getIndexMetadata(String indexName); - /** * Update a Flint index with the metadata given. * diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/FlintOptions.java b/flint-core/src/main/scala/org/opensearch/flint/core/FlintOptions.java index c49247f37..748842930 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/FlintOptions.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/FlintOptions.java @@ -92,6 +92,8 @@ public class FlintOptions implements Serializable { public static final String CUSTOM_FLINT_METADATA_LOG_SERVICE_CLASS = "spark.datasource.flint.customFlintMetadataLogServiceClass"; + public static final String CUSTOM_FLINT_INDEX_METADATA_SERVICE_CLASS = "spark.datasource.flint.customFlintIndexMetadataServiceClass"; + public FlintOptions(Map options) { this.options = options; this.retryOptions = new FlintRetryOptions(options); @@ -168,4 +170,8 @@ public int getBatchBytes() { public String getCustomFlintMetadataLogServiceClass() { return options.getOrDefault(CUSTOM_FLINT_METADATA_LOG_SERVICE_CLASS, ""); } + + public String getCustomFlintIndexMetadataServiceClass() { + return options.getOrDefault(CUSTOM_FLINT_INDEX_METADATA_SERVICE_CLASS, ""); + } } diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/FlintIndexMetadataServiceBuilder.java b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/FlintIndexMetadataServiceBuilder.java new file mode 100644 index 000000000..d65e180de --- /dev/null +++ b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/FlintIndexMetadataServiceBuilder.java @@ -0,0 +1,37 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.core.metadata; + +import java.lang.reflect.Constructor; +import org.apache.spark.SparkConf; +import org.opensearch.flint.common.metadata.FlintIndexMetadataService; +import org.opensearch.flint.core.FlintOptions; +import org.opensearch.flint.core.storage.FlintOpenSearchIndexMetadataService; + +/** + * {@link FlintIndexMetadataService} builder. + *

+ * Custom implementations of {@link FlintIndexMetadataService} are expected to provide a public + * constructor with the signature {@code public MyCustomService(SparkConf sparkConf)} to be + * instantiated by this builder. + */ +public class FlintIndexMetadataServiceBuilder { + public static FlintIndexMetadataService build(FlintOptions options, SparkConf sparkConf) { + String className = options.getCustomFlintIndexMetadataServiceClass(); + if (className.isEmpty()) { + return new FlintOpenSearchIndexMetadataService(options); + } + + // Attempts to instantiate Flint index metadata service with sparkConf using reflection + try { + Class flintIndexMetadataServiceClass = Class.forName(className); + Constructor constructor = flintIndexMetadataServiceClass.getConstructor(SparkConf.class); + return (FlintIndexMetadataService) constructor.newInstance(sparkConf); + } catch (Exception e) { + throw new RuntimeException("Failed to instantiate FlintIndexMetadataService: " + className, e); + } + } +} diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java index b3f34ab26..0a5f4796a 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java @@ -105,52 +105,13 @@ public boolean exists(String indexName) { } } - @Override - public Map getAllIndexMetadata(String... indexNamePattern) { - LOG.info("Fetching all Flint index metadata for pattern " + String.join(",", indexNamePattern)); - String[] indexNames = - Arrays.stream(indexNamePattern).map(this::sanitizeIndexName).toArray(String[]::new); - try (IRestHighLevelClient client = createClient()) { - GetIndexRequest request = new GetIndexRequest(indexNames); - GetIndexResponse response = client.getIndex(request, RequestOptions.DEFAULT); - - return Arrays.stream(response.getIndices()) - .collect(Collectors.toMap( - index -> index, - index -> FlintOpenSearchIndexMetadataService.deserialize( - response.getMappings().get(index).source().toString(), - response.getSettings().get(index).toString() - ) - )); - } catch (Exception e) { - throw new IllegalStateException("Failed to get Flint index metadata for " + - String.join(",", indexNames), e); - } - } - - @Override - public FlintMetadata getIndexMetadata(String indexName) { - LOG.info("Fetching Flint index metadata for " + indexName); - String osIndexName = sanitizeIndexName(indexName); - try (IRestHighLevelClient client = createClient()) { - GetIndexRequest request = new GetIndexRequest(osIndexName); - GetIndexResponse response = client.getIndex(request, RequestOptions.DEFAULT); - - MappingMetadata mapping = response.getMappings().get(osIndexName); - Settings settings = response.getSettings().get(osIndexName); - return FlintOpenSearchIndexMetadataService.deserialize(mapping.source().string(), settings.toString()); - } catch (Exception e) { - throw new IllegalStateException("Failed to get Flint index metadata for " + osIndexName, e); - } - } - + // TODO: remove this interface? update index should be handled by index metadata service @Override public void updateIndex(String indexName, FlintMetadata metadata) { LOG.info("Updating Flint index " + indexName + " with metadata " + metadata); String osIndexName = sanitizeIndexName(indexName); try (IRestHighLevelClient client = createClient()) { PutMappingRequest request = new PutMappingRequest(osIndexName); - // TODO: use generic index metadata service request.source(FlintOpenSearchIndexMetadataService.serialize(metadata), XContentType.JSON); client.updateIndexMapping(request, RequestOptions.DEFAULT); } catch (Exception e) { diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchIndexMetadataService.scala b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchIndexMetadataService.scala index d353b800d..8220d8276 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchIndexMetadataService.scala +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchIndexMetadataService.scala @@ -7,6 +7,10 @@ package org.opensearch.flint.core.storage import java.util +import scala.collection.JavaConverters.mapAsJavaMapConverter + +import org.opensearch.client.RequestOptions +import org.opensearch.client.indices.{GetIndexRequest, GetIndexResponse} import org.opensearch.flint.common.FlintVersion import org.opensearch.flint.common.metadata.{FlintIndexMetadataService, FlintMetadata} import org.opensearch.flint.core.FlintOptions @@ -19,22 +23,52 @@ class FlintOpenSearchIndexMetadataService(options: FlintOptions) with Logging { override def getIndexMetadata(indexName: String): FlintMetadata = { - FlintMetadata( - FlintVersion.current, - "", - "", - "", - Array(), - util.Map.of(), - util.Map.of(), - util.Map.of(), - None, - None, - None) + logInfo(s"Fetching Flint index metadata for $indexName") + // TODO: sanitize + // val osIndexName = sanitizeIndexName(indexName) + val osIndexName = indexName + val client = OpenSearchClientUtils.createClient(options) + try { + val request = new GetIndexRequest(osIndexName) + val response = client.getIndex(request, RequestOptions.DEFAULT) + val mapping = response.getMappings.get(osIndexName) + val settings = response.getSettings.get(osIndexName) + FlintOpenSearchIndexMetadataService.deserialize(mapping.source.string, settings.toString) + } catch { + case e: Exception => + throw new IllegalStateException( + "Failed to get Flint index metadata for " + osIndexName, + e) + } finally { + client.close() + } } override def getAllIndexMetadata(indexNamePattern: String*): util.Map[String, FlintMetadata] = { - util.Map.of() + logInfo(s"Fetching all Flint index metadata for pattern ${indexNamePattern.mkString(",")}"); + // TODO: sanitize + // val indexNames = indexNamePattern.map(sanitizeIndexName) + val indexNames = indexNamePattern + val client = OpenSearchClientUtils.createClient(options) + try { + val request = new GetIndexRequest(indexNames: _*) + val response: GetIndexResponse = client.getIndex(request, RequestOptions.DEFAULT) + + response.getIndices + .map(index => + index -> FlintOpenSearchIndexMetadataService.deserialize( + response.getMappings.get(index).source().string(), + response.getSettings.get(index).toString)) + .toMap + .asJava + } catch { + case e: Exception => + throw new IllegalStateException( + s"Failed to get Flint index metadata for ${indexNames.mkString(",")}", + e) + } finally { + client.close() + } } override def updateIndexMetadata(indexName: String, metadata: FlintMetadata): Unit = {} diff --git a/flint-spark-integration/src/main/scala/org/apache/spark/opensearch/table/OpenSearchTable.scala b/flint-spark-integration/src/main/scala/org/apache/spark/opensearch/table/OpenSearchTable.scala index d20a6ff79..697f215ca 100644 --- a/flint-spark-integration/src/main/scala/org/apache/spark/opensearch/table/OpenSearchTable.scala +++ b/flint-spark-integration/src/main/scala/org/apache/spark/opensearch/table/OpenSearchTable.scala @@ -8,9 +8,11 @@ package org.apache.spark.opensearch.table import scala.collection.JavaConverters._ import org.opensearch.flint.common.metadata.FlintMetadata -import org.opensearch.flint.core.{FlintClientBuilder, FlintOptions} +import org.opensearch.flint.core.FlintOptions +import org.opensearch.flint.core.metadata.FlintIndexMetadataServiceBuilder import org.opensearch.flint.core.storage.FlintOpenSearchIndexMetadataService +import org.apache.spark.SparkConf import org.apache.spark.sql.flint.datatype.FlintDataType import org.apache.spark.sql.types.StructType @@ -48,14 +50,16 @@ object OpenSearchTable { * tableName support (1) single index name. (2) wildcard index name. (3) comma sep index name. * @param options * The options for Flint. + * @param conf + * Configurations for Spark application. * @return * An instance of OpenSearchTable. */ - def apply(tableName: String, options: FlintOptions): OpenSearchTable = { + def apply(tableName: String, options: FlintOptions, conf: SparkConf): OpenSearchTable = { OpenSearchTable( tableName, - FlintClientBuilder - .build(options) + FlintIndexMetadataServiceBuilder + .build(options, conf) .getAllIndexMetadata(tableName.split(","): _*) .asScala .toMap) diff --git a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/FlintReadOnlyTable.scala b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/FlintReadOnlyTable.scala index b1ec83cae..6bad46946 100644 --- a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/FlintReadOnlyTable.scala +++ b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/FlintReadOnlyTable.scala @@ -43,7 +43,7 @@ class FlintReadOnlyTable( lazy val name: String = flintSparkConf.tableName() lazy val openSearchTable: OpenSearchTable = - OpenSearchTable.apply(name, flintSparkConf.flintOptions()) + OpenSearchTable.apply(name, flintSparkConf.flintOptions(), sparkSession.sparkContext.getConf) lazy val schema: StructType = { userSpecifiedSchema.getOrElse { openSearchTable.schema } diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala index 87af33fe4..2c8a67297 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala @@ -9,13 +9,15 @@ import scala.collection.JavaConverters._ import org.json4s.{Formats, NoTypeHints} import org.json4s.native.Serialization -import org.opensearch.flint.common.metadata.FlintMetadata +import org.opensearch.flint.common.metadata.{FlintIndexMetadataService, FlintMetadata} import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry.IndexState._ import org.opensearch.flint.common.metadata.log.FlintMetadataLogService import org.opensearch.flint.common.metadata.log.OptimisticTransaction import org.opensearch.flint.common.metadata.log.OptimisticTransaction.NO_LOG_ENTRY import org.opensearch.flint.core.{FlintClient, FlintClientBuilder} +import org.opensearch.flint.core.metadata.FlintIndexMetadataServiceBuilder import org.opensearch.flint.core.metadata.log.FlintMetadataLogServiceBuilder +import org.opensearch.flint.core.storage.FlintOpenSearchIndexMetadataService import org.opensearch.flint.spark.FlintSparkIndex.ID_COLUMN import org.opensearch.flint.spark.FlintSparkIndexOptions.OptionName._ import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex @@ -47,6 +49,12 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w /** Flint client for low-level index operation */ private val flintClient: FlintClient = FlintClientBuilder.build(flintSparkConf.flintOptions()) + private val flintIndexMetadataService: FlintIndexMetadataService = { + FlintIndexMetadataServiceBuilder.build( + flintSparkConf.flintOptions(), + spark.sparkContext.getConf) + } + override protected val flintMetadataLogService: FlintMetadataLogService = { FlintMetadataLogServiceBuilder.build( flintSparkConf.flintOptions(), @@ -112,6 +120,7 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w .transientLog(latest => latest.copy(state = CREATING)) .finalLog(latest => latest.copy(state = ACTIVE)) .commit(latest => + // TODO: update index metadata if (latest == null) { // in case transaction capability is disabled flintClient.createIndex(indexName, metadata) } else { @@ -163,7 +172,7 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w def describeIndexes(indexNamePattern: String): Seq[FlintSparkIndex] = { logInfo(s"Describing indexes with pattern $indexNamePattern") if (flintClient.exists(indexNamePattern)) { - flintClient + flintIndexMetadataService .getAllIndexMetadata(indexNamePattern) .asScala .map { case (indexName, metadata) => @@ -187,7 +196,7 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w def describeIndex(indexName: String): Option[FlintSparkIndex] = { logInfo(s"Describing index name $indexName") if (flintClient.exists(indexName)) { - val metadata = flintClient.getIndexMetadata(indexName) + val metadata = flintIndexMetadataService.getIndexMetadata(indexName) val metadataWithEntry = attachLatestLogEntry(indexName, metadata) FlintSparkIndexFactory.create(metadataWithEntry) } else { @@ -267,6 +276,7 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w .finalLog(_ => NO_LOG_ENTRY) .commit(_ => { flintClient.deleteIndex(indexName) + // TODO: delete index metadata true }) } else { diff --git a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/covering/ApplyFlintSparkCoveringIndexSuite.scala b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/covering/ApplyFlintSparkCoveringIndexSuite.scala index 8ede40f86..0f0e0113d 100644 --- a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/covering/ApplyFlintSparkCoveringIndexSuite.scala +++ b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/covering/ApplyFlintSparkCoveringIndexSuite.scala @@ -9,16 +9,18 @@ import scala.collection.JavaConverters._ import org.mockito.ArgumentMatchers.any import org.mockito.Mockito.{mockStatic, when, RETURNS_DEEP_STUBS} +import org.opensearch.flint.common.metadata.FlintIndexMetadataService import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry.IndexState.{ACTIVE, DELETED, IndexState} import org.opensearch.flint.core.{FlintClient, FlintClientBuilder, FlintOptions} +import org.opensearch.flint.core.metadata.FlintIndexMetadataServiceBuilder import org.opensearch.flint.spark.FlintSpark import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex.getFlintIndexName import org.scalatest.matchers.{Matcher, MatchResult} import org.scalatest.matchers.should.Matchers import org.scalatestplus.mockito.MockitoSugar.mock -import org.apache.spark.FlintSuite +import org.apache.spark.{FlintSuite, SparkConf} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation @@ -29,9 +31,13 @@ class ApplyFlintSparkCoveringIndexSuite extends FlintSuite with Matchers { private val testTable = "spark_catalog.default.apply_covering_index_test" private val testTable2 = "spark_catalog.default.apply_covering_index_test_2" - /** Mock FlintClient to avoid looking for real OpenSearch cluster */ + /** + * Mock FlintClient and FlintIndexMetadataService to avoid looking for real OpenSearch cluster + */ private val clientBuilder = mockStatic(classOf[FlintClientBuilder]) private val client = mock[FlintClient](RETURNS_DEEP_STUBS) + private val indexMetadataServiceBuilder = mockStatic(classOf[FlintIndexMetadataServiceBuilder]) + private val indexMetadataService = mock[FlintIndexMetadataService](RETURNS_DEEP_STUBS) /** Mock FlintSpark which is required by the rule. Deep stub required to replace spark val. */ private val flint = mock[FlintSpark](RETURNS_DEEP_STUBS) @@ -50,16 +56,24 @@ class ApplyFlintSparkCoveringIndexSuite extends FlintSuite with Matchers { | ('F', 35), ('G', 40), ('H', 45), ('I', 50), ('J', 55) | """.stripMargin) - // Mock static create method in FlintClientBuilder used by Flint data source + // Mock static create method in FlintClientBuilder and FlintIndexMetadataServiceBuilder used by Flint data source clientBuilder - .when(() => FlintClientBuilder.build(any(classOf[FlintOptions]))) + .when(() => + FlintClientBuilder + .build(any(classOf[FlintOptions]))) .thenReturn(client) + indexMetadataServiceBuilder + .when(() => + FlintIndexMetadataServiceBuilder + .build(any(classOf[FlintOptions]), any(classOf[SparkConf]))) + .thenReturn(indexMetadataService) when(flint.spark).thenReturn(spark) } override protected def afterAll(): Unit = { sql(s"DROP TABLE $testTable") clientBuilder.close() + indexMetadataServiceBuilder.close() super.afterAll() } @@ -265,7 +279,7 @@ class ApplyFlintSparkCoveringIndexSuite extends FlintSuite with Matchers { }) indexes.foreach { index => - when(client.getAllIndexMetadata(index.name())) + when(indexMetadataService.getAllIndexMetadata(index.name())) .thenReturn(Map.apply(index.name() -> index.metadata()).asJava) } rule.apply(plan) diff --git a/integ-test/src/test/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala index f0d100b0b..aaac3da0c 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala @@ -23,7 +23,9 @@ import org.apache.spark.sql.flint.config.FlintSparkConf.{REFRESH_POLICY, SCROLL_ class FlintOpenSearchClientSuite extends AnyFlatSpec with OpenSearchSuite with Matchers { /** Lazy initialize after container started. */ - lazy val flintClient = new FlintOpenSearchClient(new FlintOptions(openSearchOptions.asJava)) + lazy val options = new FlintOptions(openSearchOptions.asJava) + lazy val flintClient = new FlintOpenSearchClient(options) + lazy val flintIndexMetadataService = new FlintOpenSearchIndexMetadataService(options) behavior of "Flint OpenSearch client" @@ -46,7 +48,7 @@ class FlintOpenSearchClientSuite extends AnyFlatSpec with OpenSearchSuite with M flintClient.createIndex(indexName, metadata) flintClient.exists(indexName) shouldBe true - flintClient.getIndexMetadata(indexName).kind shouldBe "test_kind" + flintIndexMetadataService.getIndexMetadata(indexName).kind shouldBe "test_kind" } it should "create index with settings" in { @@ -59,7 +61,7 @@ class FlintOpenSearchClientSuite extends AnyFlatSpec with OpenSearchSuite with M // OS uses full setting name ("index" prefix) and store as string implicit val formats: Formats = Serialization.formats(NoTypeHints) - val settings = parse(flintClient.getIndexMetadata(indexName).indexSettings.get) + val settings = parse(flintIndexMetadataService.getIndexMetadata(indexName).indexSettings.get) (settings \ "index.number_of_shards").extract[String] shouldBe "3" (settings \ "index.number_of_replicas").extract[String] shouldBe "2" } @@ -100,8 +102,9 @@ class FlintOpenSearchClientSuite extends AnyFlatSpec with OpenSearchSuite with M flintClient.updateIndex(indexName, newMetadata) flintClient.exists(indexName) shouldBe true - flintClient.getIndexMetadata(indexName).kind shouldBe "test_kind" - flintClient.getIndexMetadata(indexName).name shouldBe "test_name" + val checkMetadata = flintIndexMetadataService.getIndexMetadata(indexName) + checkMetadata.kind shouldBe "test_kind" + checkMetadata.name shouldBe "test_name" } it should "get all index metadata with the given index name pattern" in { @@ -109,7 +112,7 @@ class FlintOpenSearchClientSuite extends AnyFlatSpec with OpenSearchSuite with M flintClient.createIndex("flint_test_1_index", metadata) flintClient.createIndex("flint_test_2_index", metadata) - val allMetadata = flintClient.getAllIndexMetadata("flint_*_index") + val allMetadata = flintIndexMetadataService.getAllIndexMetadata("flint_*_index") allMetadata should have size 2 allMetadata.values.forEach(metadata => FlintOpenSearchIndexMetadataService.serialize(metadata) should not be empty) @@ -124,8 +127,8 @@ class FlintOpenSearchClientSuite extends AnyFlatSpec with OpenSearchSuite with M """{"properties": {"test": { "type": "integer" } } }""")) flintClient.exists(indexName) shouldBe true - flintClient.getIndexMetadata(indexName) should not be null - flintClient.getAllIndexMetadata("flint_ELB_*") should not be empty + flintIndexMetadataService.getIndexMetadata(indexName) should not be null + flintIndexMetadataService.getAllIndexMetadata("flint_ELB_*") should not be empty // Read write test val writer = flintClient.createWriter(indexName) @@ -149,8 +152,8 @@ class FlintOpenSearchClientSuite extends AnyFlatSpec with OpenSearchSuite with M """{"properties": {"test": { "type": "integer" } } }""")) flintClient.exists(indexName) shouldBe true - flintClient.getIndexMetadata(indexName) should not be null - flintClient.getAllIndexMetadata("test *") should not be empty + flintIndexMetadataService.getIndexMetadata(indexName) should not be null + flintIndexMetadataService.getAllIndexMetadata("test *") should not be empty // Read write test val writer = flintClient.createWriter(indexName) diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala index ffd956b1c..ac1adefa8 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala @@ -6,13 +6,13 @@ package org.opensearch.flint.spark import scala.Option.empty -import scala.collection.JavaConverters.{mapAsJavaMapConverter, mapAsScalaMapConverter} +import scala.collection.JavaConverters.mapAsJavaMapConverter import org.json4s.{Formats, NoTypeHints} import org.json4s.native.JsonMethods.parse import org.json4s.native.Serialization import org.opensearch.flint.core.FlintOptions -import org.opensearch.flint.core.storage.FlintOpenSearchClient +import org.opensearch.flint.core.storage.FlintOpenSearchIndexMetadataService import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex.getFlintIndexName import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.getSkippingIndexName import org.scalatest.matchers.must.Matchers.defined @@ -94,10 +94,12 @@ class FlintSparkCoveringIndexSqlITSuite extends FlintSparkSuite { |""".stripMargin) // Check if the index setting option is set to OS index setting - val flintClient = new FlintOpenSearchClient(new FlintOptions(openSearchOptions.asJava)) + val flintIndexMetadataService = + new FlintOpenSearchIndexMetadataService(new FlintOptions(openSearchOptions.asJava)) implicit val formats: Formats = Serialization.formats(NoTypeHints) - val settings = parse(flintClient.getIndexMetadata(testFlintIndex).indexSettings.get) + val settings = + parse(flintIndexMetadataService.getIndexMetadata(testFlintIndex).indexSettings.get) (settings \ "index.number_of_shards").extract[String] shouldBe "2" (settings \ "index.number_of_replicas").extract[String] shouldBe "3" } diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala index fc4cdbeac..11581e731 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala @@ -14,7 +14,7 @@ import org.json4s.{Formats, NoTypeHints} import org.json4s.native.JsonMethods.parse import org.json4s.native.Serialization import org.opensearch.flint.core.FlintOptions -import org.opensearch.flint.core.storage.FlintOpenSearchClient +import org.opensearch.flint.core.storage.FlintOpenSearchIndexMetadataService import org.opensearch.flint.spark.mv.FlintSparkMaterializedView.getFlintIndexName import org.scalatest.matchers.must.Matchers.{defined, have} import org.scalatest.matchers.should.Matchers.{convertToAnyShouldWrapper, the} @@ -152,10 +152,12 @@ class FlintSparkMaterializedViewSqlITSuite extends FlintSparkSuite { |""".stripMargin) // Check if the index setting option is set to OS index setting - val flintClient = new FlintOpenSearchClient(new FlintOptions(openSearchOptions.asJava)) + val flintIndexMetadataService = + new FlintOpenSearchIndexMetadataService(new FlintOptions(openSearchOptions.asJava)) implicit val formats: Formats = Serialization.formats(NoTypeHints) - val settings = parse(flintClient.getIndexMetadata(testFlintIndex).indexSettings.get) + val settings = + parse(flintIndexMetadataService.getIndexMetadata(testFlintIndex).indexSettings.get) (settings \ "index.number_of_shards").extract[String] shouldBe "3" (settings \ "index.number_of_replicas").extract[String] shouldBe "2" } diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala index 66ec833eb..ebefa4e89 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala @@ -13,7 +13,7 @@ import org.json4s.{Formats, NoTypeHints} import org.json4s.native.JsonMethods.{compact, parse, render} import org.json4s.native.Serialization import org.opensearch.flint.core.FlintOptions -import org.opensearch.flint.core.storage.{FlintOpenSearchClient, FlintOpenSearchIndexMetadataService} +import org.opensearch.flint.core.storage.FlintOpenSearchIndexMetadataService import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.getSkippingIndexName import org.scalatest.matchers.must.Matchers.defined import org.scalatest.matchers.should.Matchers.{convertToAnyShouldWrapper, the} @@ -188,10 +188,11 @@ class FlintSparkSkippingIndexSqlITSuite extends FlintSparkSuite with ExplainSuit |""".stripMargin) // Check if the index setting option is set to OS index setting - val flintClient = new FlintOpenSearchClient(new FlintOptions(openSearchOptions.asJava)) + val flintIndexMetadataService = + new FlintOpenSearchIndexMetadataService(new FlintOptions(openSearchOptions.asJava)) implicit val formats: Formats = Serialization.formats(NoTypeHints) - val settings = parse(flintClient.getIndexMetadata(testIndex).indexSettings.get) + val settings = parse(flintIndexMetadataService.getIndexMetadata(testIndex).indexSettings.get) (settings \ "index.number_of_shards").extract[String] shouldBe "3" (settings \ "index.number_of_replicas").extract[String] shouldBe "2" }