diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/metadatacache/FlintOpenSearchMetadataCacheWriter.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/metadatacache/FlintOpenSearchMetadataCacheWriter.scala index d505b84d4..f5346ca67 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/metadatacache/FlintOpenSearchMetadataCacheWriter.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/metadatacache/FlintOpenSearchMetadataCacheWriter.scala @@ -5,18 +5,18 @@ package org.opensearch.flint.spark.metadatacache -import java.util +import java.util.{HashMap, Map => JMap} import scala.collection.JavaConverters._ import org.opensearch.client.RequestOptions +import org.opensearch.client.indices.GetIndexRequest import org.opensearch.client.indices.PutMappingRequest import org.opensearch.common.xcontent.XContentType -import org.opensearch.flint.common.metadata.{FlintIndexMetadataService, FlintMetadata} +import org.opensearch.flint.common.metadata.FlintMetadata import org.opensearch.flint.core.{FlintOptions, IRestHighLevelClient} -import org.opensearch.flint.core.metadata.FlintIndexMetadataServiceBuilder import org.opensearch.flint.core.metadata.FlintJsonHelper._ -import org.opensearch.flint.core.storage.{FlintOpenSearchIndexMetadataService, OpenSearchClientUtils} +import org.opensearch.flint.core.storage.OpenSearchClientUtils import org.apache.spark.internal.Logging @@ -27,26 +27,20 @@ class FlintOpenSearchMetadataCacheWriter(options: FlintOptions) extends FlintMetadataCacheWriter with Logging { - /** - * Since metadata cache shares the index mappings _meta field with OpenSearch index metadata - * storage, this flag is to allow for preserving index metadata that is already stored in _meta - * when updating metadata cache. - */ - private val includeSpec: Boolean = - FlintIndexMetadataServiceBuilder - .build(options) - .isInstanceOf[FlintOpenSearchIndexMetadataService] - override def updateMetadataCache(indexName: String, metadata: FlintMetadata): Unit = { logInfo(s"Updating metadata cache for $indexName"); val osIndexName = OpenSearchClientUtils.sanitizeIndexName(indexName) var client: IRestHighLevelClient = null try { client = OpenSearchClientUtils.createClient(options) - val request = new PutMappingRequest(osIndexName) - val serialized = serialize(metadata) - request.source(serialized, XContentType.JSON) - client.updateIndexMapping(request, RequestOptions.DEFAULT) + val existingMapping = getIndexMapping(client, osIndexName) + val metadataCacheProperties = FlintMetadataCache(metadata).toMap.asJava + val mergedMapping = mergeMapping(existingMapping, metadataCacheProperties) + val serialized = buildJson(builder => { + builder.field("_meta", mergedMapping.get("_meta")) + builder.field("properties", mergedMapping.get("properties")) + }) + updateIndexMapping(client, osIndexName, serialized) } catch { case e: Exception => throw new IllegalStateException( @@ -58,50 +52,36 @@ class FlintOpenSearchMetadataCacheWriter(options: FlintOptions) } } - /** - * Serialize FlintMetadataCache from FlintMetadata. Modified from {@link - * FlintOpenSearchIndexMetadataService} - */ - private[metadatacache] def serialize(metadata: FlintMetadata): String = { - try { - buildJson(builder => { - objectField(builder, "_meta") { - // If _meta is used as index metadata storage, preserve them. - if (includeSpec) { - builder - .field("version", metadata.version.version) - .field("name", metadata.name) - .field("kind", metadata.kind) - .field("source", metadata.source) - .field("indexedColumns", metadata.indexedColumns) - - if (metadata.latestId.isDefined) { - builder.field("latestId", metadata.latestId.get) - } - optionalObjectField(builder, "options", metadata.options) - } - - optionalObjectField(builder, "properties", buildPropertiesMap(metadata)) - } - builder.field("properties", metadata.schema) - }) - } catch { - case e: Exception => - throw new IllegalStateException("Failed to jsonify cache metadata", e) - } + private[metadatacache] def getIndexMapping( + client: IRestHighLevelClient, + osIndexName: String): JMap[String, AnyRef] = { + val request = new GetIndexRequest(osIndexName) + val response = client.getIndex(request, RequestOptions.DEFAULT) + response.getMappings.get(osIndexName).sourceAsMap() } - /** - * Since _meta.properties is shared by both index metadata and metadata cache, here we merge the - * two maps. - */ - private def buildPropertiesMap(metadata: FlintMetadata): util.Map[String, AnyRef] = { - val metadataCacheProperties = FlintMetadataCache(metadata).toMap + private def mergeMapping( + existingMapping: JMap[String, AnyRef], + metadataCacheProperties: JMap[String, AnyRef]): JMap[String, AnyRef] = { + val meta = + existingMapping.getOrDefault("_meta", Map.empty.asJava).asInstanceOf[JMap[String, AnyRef]] + val properties = + meta.getOrDefault("properties", Map.empty.asJava).asInstanceOf[JMap[String, AnyRef]] + val updatedProperties = new HashMap[String, AnyRef](properties) + updatedProperties.putAll(metadataCacheProperties) + val updatedMeta = new HashMap[String, AnyRef](meta) + updatedMeta.put("properties", updatedProperties) + val result = new HashMap[String, AnyRef](existingMapping) + result.put("_meta", updatedMeta) + result + } - if (includeSpec) { - (metadataCacheProperties ++ metadata.properties.asScala).asJava - } else { - metadataCacheProperties.asJava - } + private[metadatacache] def updateIndexMapping( + client: IRestHighLevelClient, + osIndexName: String, + mappingSource: String): Unit = { + val request = new PutMappingRequest(osIndexName) + request.source(mappingSource, XContentType.JSON) + client.updateIndexMapping(request, RequestOptions.DEFAULT) } } diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/metadatacache/FlintOpenSearchMetadataCacheWriterITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/metadatacache/FlintOpenSearchMetadataCacheWriterITSuite.scala index fce71af8e..1a3d2df7d 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/spark/metadatacache/FlintOpenSearchMetadataCacheWriterITSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/metadatacache/FlintOpenSearchMetadataCacheWriterITSuite.scala @@ -5,18 +5,17 @@ package org.opensearch.flint.spark.metadatacache -import java.util.Base64 +import java.util.{Base64, Map => JMap} import scala.collection.JavaConverters._ import com.stephenn.scalatest.jsonassert.JsonMatchers.matchJson +import org.json4s.{DefaultFormats, Extraction, JValue} import org.json4s.native.JsonMethods._ -import org.opensearch.flint.common.FlintVersion.current -import org.opensearch.flint.common.metadata.FlintMetadata import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry import org.opensearch.flint.core.FlintOptions -import org.opensearch.flint.core.storage.{FlintOpenSearchClient, FlintOpenSearchIndexMetadataService} -import org.opensearch.flint.spark.{FlintSparkIndexOptions, FlintSparkSuite} +import org.opensearch.flint.core.storage.{FlintOpenSearchClient, FlintOpenSearchIndexMetadataService, OpenSearchClientUtils} +import org.opensearch.flint.spark.{FlintSpark, FlintSparkIndexOptions, FlintSparkSuite} import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex.COVERING_INDEX_TYPE import org.opensearch.flint.spark.mv.FlintSparkMaterializedView import org.opensearch.flint.spark.mv.FlintSparkMaterializedView.MV_INDEX_TYPE @@ -69,63 +68,24 @@ class FlintOpenSearchMetadataCacheWriterITSuite extends FlintSparkSuite with Mat } test("build opensearch metadata cache writer") { - setFlintSparkConf(FlintSparkConf.METADATA_CACHE_WRITE, "true") withMetadataCacheWriteEnabled { FlintMetadataCacheWriterBuilder .build(FlintSparkConf()) shouldBe a[FlintOpenSearchMetadataCacheWriter] } } - test("serialize metadata cache to JSON") { - val expectedMetadataJson: String = s""" - | { - | "_meta": { - | "version": "${current()}", - | "name": "$testFlintIndex", - | "kind": "test_kind", - | "source": "$testTable", - | "indexedColumns": [ - | { - | "test_field": "spark_type" - | }], - | "options": { - | "auto_refresh": "true", - | "refresh_interval": "10 Minutes" - | }, - | "properties": { - | "metadataCacheVersion": "${FlintMetadataCache.metadataCacheVersion}", - | "refreshInterval": 600, - | "sourceTables": ["$testTable"], - | "lastRefreshTime": $testLastRefreshCompleteTime - | }, - | "latestId": "$testLatestId" - | }, - | "properties": { - | "test_field": { - | "type": "os_type" - | } - | } - | } - |""".stripMargin - val builder = new FlintMetadata.Builder - builder.name(testFlintIndex) - builder.kind("test_kind") - builder.source(testTable) - builder.addIndexedColumn(Map[String, AnyRef]("test_field" -> "spark_type").asJava) - builder.options( - Map("auto_refresh" -> "true", "refresh_interval" -> "10 Minutes") - .mapValues(_.asInstanceOf[AnyRef]) - .asJava) - builder.schema(Map[String, AnyRef]("test_field" -> Map("type" -> "os_type").asJava).asJava) - builder.latestLogEntry(flintMetadataLogEntry) - - val metadata = builder.build() - flintMetadataCacheWriter.serialize(metadata) should matchJson(expectedMetadataJson) - } - test("write metadata cache to index mappings") { + val content = + s""" { + | "properties": { + | "age": { + | "type": "integer" + | } + | } + | } + |""".stripMargin val metadata = FlintOpenSearchIndexMetadataService - .deserialize("{}") + .deserialize(content) .copy(latestLogEntry = Some(flintMetadataLogEntry)) flintClient.createIndex(testFlintIndex, metadata) flintMetadataCacheWriter.updateMetadataCache(testFlintIndex, metadata) @@ -280,8 +240,17 @@ class FlintOpenSearchMetadataCacheWriterITSuite extends FlintSparkSuite with Mat } test("exclude last refresh time in metadata cache when index has not been refreshed") { + val content = + s""" { + | "properties": { + | "age": { + | "type": "integer" + | } + | } + | } + |""".stripMargin val metadata = FlintOpenSearchIndexMetadataService - .deserialize("{}") + .deserialize(content) .copy(latestLogEntry = Some(flintMetadataLogEntry.copy(lastRefreshCompleteTime = 0L))) flintClient.createIndex(testFlintIndex, metadata) flintMetadataCacheWriter.updateMetadataCache(testFlintIndex, metadata) @@ -294,7 +263,12 @@ class FlintOpenSearchMetadataCacheWriterITSuite extends FlintSparkSuite with Mat val content = """ { | "_meta": { - | "kind": "test_kind" + | "kind": "test_kind", + | "name": "test_name", + | "custom": "test_custom", + | "properties": { + | "custom_in_properties": "test_custom" + | } | }, | "properties": { | "age": { @@ -309,48 +283,31 @@ class FlintOpenSearchMetadataCacheWriterITSuite extends FlintSparkSuite with Mat .copy(latestLogEntry = Some(flintMetadataLogEntry)) flintClient.createIndex(testFlintIndex, metadata) - flintIndexMetadataService.updateIndexMetadata(testFlintIndex, metadata) - flintMetadataCacheWriter.updateMetadataCache(testFlintIndex, metadata) + // Simulates index mapping updated by custom implementation of FlintIndexMetadataService + // because our standard FlintOpenSearchIndexMetadataService ignores the "custom" field. + val client = OpenSearchClientUtils.createClient(options) + flintMetadataCacheWriter.updateIndexMapping(client, testFlintIndex, content) - flintIndexMetadataService.getIndexMetadata(testFlintIndex).kind shouldBe "test_kind" - flintIndexMetadataService.getIndexMetadata(testFlintIndex).name shouldBe empty - flintIndexMetadataService.getIndexMetadata(testFlintIndex).schema should have size 1 - var properties = flintIndexMetadataService.getIndexMetadata(testFlintIndex).properties - properties should have size 3 - properties should contain allOf (Entry( - "metadataCacheVersion", - FlintMetadataCache.metadataCacheVersion), - Entry("lastRefreshTime", testLastRefreshCompleteTime)) - - val newContent = - """ { - | "_meta": { - | "kind": "test_kind", - | "name": "test_name" - | }, - | "properties": { - | "age": { - | "type": "integer" - | } - | } - | } - |""".stripMargin - - val newMetadata = FlintOpenSearchIndexMetadataService - .deserialize(newContent) - .copy(latestLogEntry = Some(flintMetadataLogEntry)) - flintIndexMetadataService.updateIndexMetadata(testFlintIndex, newMetadata) - flintMetadataCacheWriter.updateMetadataCache(testFlintIndex, newMetadata) + flintMetadataCacheWriter.updateMetadataCache(testFlintIndex, metadata) flintIndexMetadataService.getIndexMetadata(testFlintIndex).kind shouldBe "test_kind" flintIndexMetadataService.getIndexMetadata(testFlintIndex).name shouldBe "test_name" flintIndexMetadataService.getIndexMetadata(testFlintIndex).schema should have size 1 - properties = flintIndexMetadataService.getIndexMetadata(testFlintIndex).properties - properties should have size 3 + val properties = flintIndexMetadataService.getIndexMetadata(testFlintIndex).properties + properties should have size 4 properties should contain allOf (Entry( "metadataCacheVersion", FlintMetadataCache.metadataCacheVersion), - Entry("lastRefreshTime", testLastRefreshCompleteTime)) + Entry("lastRefreshTime", testLastRefreshCompleteTime), Entry( + "custom_in_properties", + "test_custom")) + + // Directly get the index mapping and verify custom field is preserved + flintMetadataCacheWriter + .getIndexMapping(client, testFlintIndex) + .get("_meta") + .asInstanceOf[JMap[String, AnyRef]] + .get("custom") shouldBe "test_custom" } Seq( @@ -389,6 +346,7 @@ class FlintOpenSearchMetadataCacheWriterITSuite extends FlintSparkSuite with Mat test(s"write metadata cache for $refreshMode") { withExternalSchedulerEnabled { withMetadataCacheWriteEnabled { + val flint: FlintSpark = new FlintSpark(spark) withTempDir { checkpointDir => // update checkpoint_location if available in optionsMap val indexOptions = FlintSparkIndexOptions( @@ -405,25 +363,12 @@ class FlintOpenSearchMetadataCacheWriterITSuite extends FlintSparkSuite with Mat .options(indexOptions, testFlintIndex) .create() - var index = flint.describeIndex(testFlintIndex) - index shouldBe defined - val propertiesJson = - compact( - render( - parse( - flintMetadataCacheWriter.serialize( - index.get.metadata())) \ "_meta" \ "properties")) + val propertiesJson = compact(render(getPropertiesJValue(testFlintIndex))) propertiesJson should matchJson(expectedJson) flint.refreshIndex(testFlintIndex) - index = flint.describeIndex(testFlintIndex) - index shouldBe defined val lastRefreshTime = - compact( - render( - parse( - flintMetadataCacheWriter.serialize( - index.get.metadata())) \ "_meta" \ "properties" \ "lastRefreshTime")).toLong + compact(render(getPropertiesJValue(testFlintIndex) \ "lastRefreshTime")).toLong lastRefreshTime should be > 0L } } @@ -433,6 +378,7 @@ class FlintOpenSearchMetadataCacheWriterITSuite extends FlintSparkSuite with Mat test("write metadata cache for auto refresh index with internal scheduler") { withMetadataCacheWriteEnabled { + val flint: FlintSpark = new FlintSpark(spark) withTempDir { checkpointDir => flint .skippingIndex() @@ -448,27 +394,25 @@ class FlintOpenSearchMetadataCacheWriterITSuite extends FlintSparkSuite with Mat testFlintIndex) .create() - var index = flint.describeIndex(testFlintIndex) - index shouldBe defined - val propertiesJson = - compact( - render(parse( - flintMetadataCacheWriter.serialize(index.get.metadata())) \ "_meta" \ "properties")) + val propertiesJson = compact(render(getPropertiesJValue(testFlintIndex))) propertiesJson should matchJson(s""" - | { - | "metadataCacheVersion": "${FlintMetadataCache.metadataCacheVersion}", - | "refreshInterval": 600, - | "sourceTables": ["$testTable"] - | } - |""".stripMargin) + | { + | "metadataCacheVersion": "${FlintMetadataCache.metadataCacheVersion}", + | "refreshInterval": 600, + | "sourceTables": ["$testTable"] + | } + |""".stripMargin) flint.refreshIndex(testFlintIndex) - index = flint.describeIndex(testFlintIndex) - index shouldBe defined - compact(render(parse( - flintMetadataCacheWriter.serialize( - index.get.metadata())) \ "_meta" \ "properties")) should not include "lastRefreshTime" + compact(render(getPropertiesJValue(testFlintIndex))) should not include "lastRefreshTime" } } } + + private def getPropertiesJValue(indexName: String): JValue = { + // Convert to scala map because json4s converts java.util.Map into an empty JObject + // https://github.com/json4s/json4s/issues/392 + val properties = flintIndexMetadataService.getIndexMetadata(indexName).properties.asScala + Extraction.decompose(properties)(DefaultFormats) + } }