Skip to content

Commit

Permalink
preserve index mapping content when updating cache
Browse files Browse the repository at this point in the history
Signed-off-by: Sean Kao <[email protected]>
  • Loading branch information
seankao-az committed Dec 12, 2024
1 parent 81dab56 commit 2825d87
Show file tree
Hide file tree
Showing 2 changed files with 106 additions and 182 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,18 @@

package org.opensearch.flint.spark.metadatacache

import java.util
import java.util.{HashMap, Map => JMap}

import scala.collection.JavaConverters._

import org.opensearch.client.RequestOptions
import org.opensearch.client.indices.GetIndexRequest
import org.opensearch.client.indices.PutMappingRequest
import org.opensearch.common.xcontent.XContentType
import org.opensearch.flint.common.metadata.{FlintIndexMetadataService, FlintMetadata}
import org.opensearch.flint.common.metadata.FlintMetadata
import org.opensearch.flint.core.{FlintOptions, IRestHighLevelClient}
import org.opensearch.flint.core.metadata.FlintIndexMetadataServiceBuilder
import org.opensearch.flint.core.metadata.FlintJsonHelper._
import org.opensearch.flint.core.storage.{FlintOpenSearchIndexMetadataService, OpenSearchClientUtils}
import org.opensearch.flint.core.storage.OpenSearchClientUtils

import org.apache.spark.internal.Logging

Expand All @@ -27,26 +27,20 @@ class FlintOpenSearchMetadataCacheWriter(options: FlintOptions)
extends FlintMetadataCacheWriter
with Logging {

/**
* Since metadata cache shares the index mappings _meta field with OpenSearch index metadata
* storage, this flag is to allow for preserving index metadata that is already stored in _meta
* when updating metadata cache.
*/
private val includeSpec: Boolean =
FlintIndexMetadataServiceBuilder
.build(options)
.isInstanceOf[FlintOpenSearchIndexMetadataService]

override def updateMetadataCache(indexName: String, metadata: FlintMetadata): Unit = {
logInfo(s"Updating metadata cache for $indexName");
val osIndexName = OpenSearchClientUtils.sanitizeIndexName(indexName)
var client: IRestHighLevelClient = null
try {
client = OpenSearchClientUtils.createClient(options)
val request = new PutMappingRequest(osIndexName)
val serialized = serialize(metadata)
request.source(serialized, XContentType.JSON)
client.updateIndexMapping(request, RequestOptions.DEFAULT)
val existingMapping = getIndexMapping(client, osIndexName)
val metadataCacheProperties = FlintMetadataCache(metadata).toMap.asJava
val mergedMapping = mergeMapping(existingMapping, metadataCacheProperties)
val serialized = buildJson(builder => {
builder.field("_meta", mergedMapping.get("_meta"))
builder.field("properties", mergedMapping.get("properties"))
})
updateIndexMapping(client, osIndexName, serialized)
} catch {
case e: Exception =>
throw new IllegalStateException(
Expand All @@ -58,50 +52,36 @@ class FlintOpenSearchMetadataCacheWriter(options: FlintOptions)
}
}

/**
* Serialize FlintMetadataCache from FlintMetadata. Modified from {@link
* FlintOpenSearchIndexMetadataService}
*/
private[metadatacache] def serialize(metadata: FlintMetadata): String = {
try {
buildJson(builder => {
objectField(builder, "_meta") {
// If _meta is used as index metadata storage, preserve them.
if (includeSpec) {
builder
.field("version", metadata.version.version)
.field("name", metadata.name)
.field("kind", metadata.kind)
.field("source", metadata.source)
.field("indexedColumns", metadata.indexedColumns)

if (metadata.latestId.isDefined) {
builder.field("latestId", metadata.latestId.get)
}
optionalObjectField(builder, "options", metadata.options)
}

optionalObjectField(builder, "properties", buildPropertiesMap(metadata))
}
builder.field("properties", metadata.schema)
})
} catch {
case e: Exception =>
throw new IllegalStateException("Failed to jsonify cache metadata", e)
}
private[metadatacache] def getIndexMapping(
client: IRestHighLevelClient,
osIndexName: String): JMap[String, AnyRef] = {
val request = new GetIndexRequest(osIndexName)
val response = client.getIndex(request, RequestOptions.DEFAULT)
response.getMappings.get(osIndexName).sourceAsMap()
}

/**
* Since _meta.properties is shared by both index metadata and metadata cache, here we merge the
* two maps.
*/
private def buildPropertiesMap(metadata: FlintMetadata): util.Map[String, AnyRef] = {
val metadataCacheProperties = FlintMetadataCache(metadata).toMap
private def mergeMapping(
existingMapping: JMap[String, AnyRef],
metadataCacheProperties: JMap[String, AnyRef]): JMap[String, AnyRef] = {
val meta =
existingMapping.getOrDefault("_meta", Map.empty.asJava).asInstanceOf[JMap[String, AnyRef]]
val properties =
meta.getOrDefault("properties", Map.empty.asJava).asInstanceOf[JMap[String, AnyRef]]
val updatedProperties = new HashMap[String, AnyRef](properties)
updatedProperties.putAll(metadataCacheProperties)
val updatedMeta = new HashMap[String, AnyRef](meta)
updatedMeta.put("properties", updatedProperties)
val result = new HashMap[String, AnyRef](existingMapping)
result.put("_meta", updatedMeta)
result
}

if (includeSpec) {
(metadataCacheProperties ++ metadata.properties.asScala).asJava
} else {
metadataCacheProperties.asJava
}
private[metadatacache] def updateIndexMapping(
client: IRestHighLevelClient,
osIndexName: String,
mappingSource: String): Unit = {
val request = new PutMappingRequest(osIndexName)
request.source(mappingSource, XContentType.JSON)
client.updateIndexMapping(request, RequestOptions.DEFAULT)
}
}
Loading

0 comments on commit 2825d87

Please sign in to comment.