diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/FlintMetadataCache.scala b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/FlintMetadataCache.scala deleted file mode 100644 index e5cc43e1b..000000000 --- a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/FlintMetadataCache.scala +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.flint.core.metadata - -/** - * Flint metadata cache defines metadata required to store in read cache for frontend user to - * access. - */ -case class FlintMetadataCache( - metadataCacheVersion: String, - /** Refresh interval for Flint index with auto refresh. Unit: seconds */ - refreshInterval: Option[Int], - /** Source table names for building the Flint index. */ - sourceTables: Array[String], - /** Timestamp when Flint index is last refreshed. Unit: milliseconds */ - lastRefreshTime: Option[Long]) { - - /** - * Convert FlintMetadataCache to a map. Skips a field if its value is not defined. - */ - def toMap: Map[String, AnyRef] = { - val fieldNames = getClass.getDeclaredFields.map(_.getName) - val fieldValues = productIterator.toList - - fieldNames - .zip(fieldValues) - .flatMap { - case (_, None) => List.empty - case (name, Some(value)) => List((name, value)) - case (name, value) => List((name, value)) - } - .toMap - .mapValues(_.asInstanceOf[AnyRef]) - } -} - -object FlintMetadataCache { - // TODO: construct FlintMetadataCache from FlintMetadata - - def mock: FlintMetadataCache = { - // Fixed dummy data - FlintMetadataCache( - "1.0", - Some(900), - Array( - "dataSourceName.default.logGroups(logGroupIdentifier:['arn:aws:logs:us-east-1:123456:test-llt-xa', 'arn:aws:logs:us-east-1:123456:sample-lg-1'])"), - Some(1727395328283L)) - } -} diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchMetadataCacheWriter.scala b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchMetadataCacheWriter.scala deleted file mode 100644 index 07c8fcaa4..000000000 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchMetadataCacheWriter.scala +++ /dev/null @@ -1,115 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.flint.core.storage - -import java.util - -import scala.collection.JavaConverters._ - -import org.opensearch.client.RequestOptions -import org.opensearch.client.indices.PutMappingRequest -import org.opensearch.common.xcontent.XContentType -import org.opensearch.flint.common.metadata.FlintMetadata -import org.opensearch.flint.core.{FlintOptions, IRestHighLevelClient} -import org.opensearch.flint.core.metadata.{FlintIndexMetadataServiceBuilder, FlintMetadataCache} -import org.opensearch.flint.core.metadata.FlintJsonHelper._ - -import org.apache.spark.internal.Logging - -/** - * Writes {@link FlintMetadataCache} to index mappings `_meta` for frontend user to access. This - * is different from {@link FlintIndexMetadataService} which persists the full index metadata to a - * storage for single source of truth. - */ -class FlintOpenSearchMetadataCacheWriter(options: FlintOptions) extends Logging { - - /** - * Since metadata cache shares the index mappings _meta field with OpenSearch index metadata - * storage, this flag is to allow for preserving index metadata that is already stored in _meta - * when updating metadata cache. - */ - private val includeSpec: Boolean = - FlintIndexMetadataServiceBuilder - .build(options) - .isInstanceOf[FlintOpenSearchIndexMetadataService] - - /** - * Update metadata cache for a Flint index. - * - * @param indexName - * index name - * @param metadata - * index metadata to update the cache - */ - def updateMetadataCache(indexName: String, metadata: FlintMetadata): Unit = { - logInfo(s"Updating metadata cache for $indexName"); - val osIndexName = OpenSearchClientUtils.sanitizeIndexName(indexName) - var client: IRestHighLevelClient = null - try { - client = OpenSearchClientUtils.createClient(options) - val request = new PutMappingRequest(osIndexName) - // TODO: make sure to preserve existing lastRefreshTime - // Note that currently lastUpdateTime isn't used to construct FlintMetadataLogEntry - request.source(serialize(metadata), XContentType.JSON) - client.updateIndexMapping(request, RequestOptions.DEFAULT) - } catch { - case e: Exception => - throw new IllegalStateException( - s"Failed to update metadata cache for Flint index $osIndexName", - e) - } finally - if (client != null) { - client.close() - } - } - - /** - * Serialize FlintMetadataCache from FlintMetadata. Modified from {@link - * FlintOpenSearchIndexMetadataService} - */ - private def serialize(metadata: FlintMetadata): String = { - try { - buildJson(builder => { - objectField(builder, "_meta") { - // If _meta is used as index metadata storage, preserve them. - if (includeSpec) { - builder - .field("version", metadata.version.version) - .field("name", metadata.name) - .field("kind", metadata.kind) - .field("source", metadata.source) - .field("indexedColumns", metadata.indexedColumns) - - if (metadata.latestId.isDefined) { - builder.field("latestId", metadata.latestId.get) - } - optionalObjectField(builder, "options", metadata.options) - } - - optionalObjectField(builder, "properties", buildPropertiesMap(metadata)) - } - builder.field("properties", metadata.schema) - }) - } catch { - case e: Exception => - throw new IllegalStateException("Failed to jsonify cache metadata", e) - } - } - - /** - * Since _meta.properties is shared by both index metadata and metadata cache, here we merge the - * two maps. - */ - private def buildPropertiesMap(metadata: FlintMetadata): util.Map[String, AnyRef] = { - val metadataCacheProperties = FlintMetadataCache.mock.toMap - - if (includeSpec) { - (metadataCacheProperties ++ metadata.properties.asScala).asJava - } else { - metadataCacheProperties.asJava - } - } -} diff --git a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/config/FlintSparkConf.scala b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/config/FlintSparkConf.scala index 3f355da1d..68721d235 100644 --- a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/config/FlintSparkConf.scala +++ b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/config/FlintSparkConf.scala @@ -253,10 +253,6 @@ object FlintSparkConf { FlintConfig("spark.metadata.accessAWSCredentialsProvider") .doc("AWS credentials provider for metadata access permission") .createOptional() - val METADATA_CACHE_WRITE = FlintConfig("spark.flint.metadataCacheWrite.enabled") - .doc("Enable Flint metadata cache write to Flint index mappings") - .createWithDefault("true") - val CUSTOM_SESSION_MANAGER = FlintConfig("spark.flint.job.customSessionManager") .createOptional() @@ -313,8 +309,6 @@ case class FlintSparkConf(properties: JMap[String, String]) extends Serializable def monitorMaxErrorCount(): Int = MONITOR_MAX_ERROR_COUNT.readFrom(reader).toInt - def isMetadataCacheWriteEnabled: Boolean = METADATA_CACHE_WRITE.readFrom(reader).toBoolean - /** * spark.sql.session.timeZone */ diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala index 3d9f41b0f..779b7e013 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala @@ -17,7 +17,6 @@ import org.opensearch.flint.common.scheduler.AsyncQueryScheduler import org.opensearch.flint.core.{FlintClient, FlintClientBuilder} import org.opensearch.flint.core.metadata.FlintIndexMetadataServiceBuilder import org.opensearch.flint.core.metadata.log.FlintMetadataLogServiceBuilder -import org.opensearch.flint.core.storage.FlintOpenSearchMetadataCacheWriter import org.opensearch.flint.spark.FlintSparkIndex.ID_COLUMN import org.opensearch.flint.spark.FlintSparkIndexOptions.OptionName._ import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex @@ -55,9 +54,6 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w FlintIndexMetadataServiceBuilder.build(flintSparkConf.flintOptions()) } - private val flintMetadataCacheWriteService = new FlintOpenSearchMetadataCacheWriter( - flintSparkConf.flintOptions()) - private val flintAsyncQueryScheduler: AsyncQueryScheduler = { AsyncQuerySchedulerBuilder.build(flintSparkConf.flintOptions()) } @@ -119,6 +115,7 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w throw new IllegalStateException(s"Flint index $indexName already exists") } } else { + val metadata = index.metadata() val jobSchedulingService = FlintSparkJobSchedulingService.create( index, spark, @@ -130,17 +127,14 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w .transientLog(latest => latest.copy(state = CREATING)) .finalLog(latest => latest.copy(state = ACTIVE)) .commit(latest => { - val metadata = latest match { - case null => // in case transaction capability is disabled - index.metadata() - case latestEntry => - logInfo(s"Creating index with metadata log entry ID ${latestEntry.id}") - index.metadata().copy(latestId = Some(latestEntry.id)) - } - flintClient.createIndex(indexName, metadata) - flintIndexMetadataService.updateIndexMetadata(indexName, metadata) - if (isMetadataCacheWriteEnabled) { - flintMetadataCacheWriteService.updateMetadataCache(indexName, metadata) + if (latest == null) { // in case transaction capability is disabled + flintClient.createIndex(indexName, metadata) + flintIndexMetadataService.updateIndexMetadata(indexName, metadata) + } else { + logInfo(s"Creating index with metadata log entry ID ${latest.id}") + flintClient.createIndex(indexName, metadata.copy(latestId = Some(latest.id))) + flintIndexMetadataService + .updateIndexMetadata(indexName, metadata.copy(latestId = Some(latest.id))) } jobSchedulingService.handleJob(index, AsyncQuerySchedulerAction.SCHEDULE) }) @@ -405,10 +399,6 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w new DataTypeSkippingStrategy().analyzeSkippingIndexColumns(tableName, spark) } - private def isMetadataCacheWriteEnabled: Boolean = { - flintSparkConf.isMetadataCacheWriteEnabled - } - private def getAllIndexMetadata(indexNamePattern: String): Map[String, FlintMetadata] = { if (flintIndexMetadataService.supportsGetByIndexPattern) { flintIndexMetadataService @@ -578,9 +568,6 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w }) .commit(_ => { flintIndexMetadataService.updateIndexMetadata(indexName, index.metadata) - if (isMetadataCacheWriteEnabled) { - flintMetadataCacheWriteService.updateMetadataCache(indexName, index.metadata) - } logInfo("Update index options complete") jobSchedulingService.handleJob(index, AsyncQuerySchedulerAction.UPDATE) }) diff --git a/integ-test/src/integration/scala/org/opensearch/flint/core/FlintOpenSearchMetadataCacheWriterITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/core/FlintOpenSearchMetadataCacheWriterITSuite.scala deleted file mode 100644 index 454b0e493..000000000 --- a/integ-test/src/integration/scala/org/opensearch/flint/core/FlintOpenSearchMetadataCacheWriterITSuite.scala +++ /dev/null @@ -1,129 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.flint.core - -import java.util.List - -import scala.collection.JavaConverters._ - -import org.opensearch.flint.core.metadata.FlintMetadataCache -import org.opensearch.flint.core.storage.{FlintOpenSearchClient, FlintOpenSearchIndexMetadataService, FlintOpenSearchMetadataCacheWriter} -import org.opensearch.flint.spark.FlintSparkSuite -import org.scalatest.Entry -import org.scalatest.matchers.should.Matchers - -import org.apache.spark.sql.flint.config.FlintSparkConf - -class FlintOpenSearchMetadataCacheWriterITSuite extends FlintSparkSuite with Matchers { - - /** Lazy initialize after container started. */ - lazy val options = new FlintOptions(openSearchOptions.asJava) - lazy val flintClient = new FlintOpenSearchClient(options) - lazy val flintMetadataCacheWriter = new FlintOpenSearchMetadataCacheWriter(options) - lazy val flintIndexMetadataService = new FlintOpenSearchIndexMetadataService(options) - - private val mockMetadataCacheData = FlintMetadataCache.mock - - override def beforeAll(): Unit = { - super.beforeAll() - setFlintSparkConf(FlintSparkConf.METADATA_CACHE_WRITE, "true") - } - - override def afterAll(): Unit = { - super.afterAll() - // TODO: unset if default is false - // conf.unsetConf(FlintSparkConf.METADATA_CACHE_WRITE.key) - } - - test("write metadata cache to index mappings") { - val indexName = "flint_test_index" - val metadata = FlintOpenSearchIndexMetadataService.deserialize("{}") - flintClient.createIndex(indexName, metadata) - flintMetadataCacheWriter.updateMetadataCache(indexName, metadata) - - val properties = flintIndexMetadataService.getIndexMetadata(indexName).properties - properties should have size 4 - properties should contain allOf (Entry( - "metadataCacheVersion", - mockMetadataCacheData.metadataCacheVersion), - Entry("refreshInterval", mockMetadataCacheData.refreshInterval.get), - Entry("lastRefreshTime", mockMetadataCacheData.lastRefreshTime.get)) - properties - .get("sourceTables") - .asInstanceOf[List[String]] - .toArray should contain theSameElementsAs mockMetadataCacheData.sourceTables - } - - test("write metadata cache to index mappings and preserve other index metadata") { - val indexName = "test_update" - val content = - """ { - | "_meta": { - | "kind": "test_kind" - | }, - | "properties": { - | "age": { - | "type": "integer" - | } - | } - | } - |""".stripMargin - - val metadata = FlintOpenSearchIndexMetadataService.deserialize(content) - flintClient.createIndex(indexName, metadata) - - flintIndexMetadataService.updateIndexMetadata(indexName, metadata) - flintMetadataCacheWriter.updateMetadataCache(indexName, metadata) - - flintIndexMetadataService.getIndexMetadata(indexName).kind shouldBe "test_kind" - flintIndexMetadataService.getIndexMetadata(indexName).name shouldBe empty - flintIndexMetadataService.getIndexMetadata(indexName).schema should have size 1 - var properties = flintIndexMetadataService.getIndexMetadata(indexName).properties - properties should have size 4 - properties should contain allOf (Entry( - "metadataCacheVersion", - mockMetadataCacheData.metadataCacheVersion), - Entry("refreshInterval", mockMetadataCacheData.refreshInterval.get), - Entry("lastRefreshTime", mockMetadataCacheData.lastRefreshTime.get)) - properties - .get("sourceTables") - .asInstanceOf[List[String]] - .toArray should contain theSameElementsAs mockMetadataCacheData.sourceTables - - val newContent = - """ { - | "_meta": { - | "kind": "test_kind", - | "name": "test_name" - | }, - | "properties": { - | "age": { - | "type": "integer" - | } - | } - | } - |""".stripMargin - - val newMetadata = FlintOpenSearchIndexMetadataService.deserialize(newContent) - flintIndexMetadataService.updateIndexMetadata(indexName, newMetadata) - flintMetadataCacheWriter.updateMetadataCache(indexName, newMetadata) - - flintIndexMetadataService.getIndexMetadata(indexName).kind shouldBe "test_kind" - flintIndexMetadataService.getIndexMetadata(indexName).name shouldBe "test_name" - flintIndexMetadataService.getIndexMetadata(indexName).schema should have size 1 - properties = flintIndexMetadataService.getIndexMetadata(indexName).properties - properties should have size 4 - properties should contain allOf (Entry( - "metadataCacheVersion", - mockMetadataCacheData.metadataCacheVersion), - Entry("refreshInterval", mockMetadataCacheData.refreshInterval.get), - Entry("lastRefreshTime", mockMetadataCacheData.lastRefreshTime.get)) - properties - .get("sourceTables") - .asInstanceOf[List[String]] - .toArray should contain theSameElementsAs mockMetadataCacheData.sourceTables - } -}