Skip to content

Commit

Permalink
Add sourceQuery in metadata cache (opensearch-project#988)
Browse files Browse the repository at this point in the history
* add sourceQuery in metadata cache

Signed-off-by: Sean Kao <[email protected]>

* preserve index mapping content when updating cache

Signed-off-by: Sean Kao <[email protected]>

* syntax and comment

Signed-off-by: Sean Kao <[email protected]>

* merge index mapping in place

Signed-off-by: Sean Kao <[email protected]>

---------

Signed-off-by: Sean Kao <[email protected]>
  • Loading branch information
seankao-az committed Dec 17, 2024
1 parent 72e1d08 commit f8f04ab
Show file tree
Hide file tree
Showing 4 changed files with 124 additions and 189 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ case class FlintMetadataCache(
refreshInterval: Option[Int],
/** Source table names for building the Flint index. */
sourceTables: Array[String],
/** Source query for MV */
sourceQuery: Option[String],
/** Timestamp when Flint index is last refreshed. Unit: milliseconds */
lastRefreshTime: Option[Long]) {

Expand Down Expand Up @@ -64,13 +66,22 @@ object FlintMetadataCache {
case MV_INDEX_TYPE => getSourceTablesFromMetadata(metadata)
case _ => Array(metadata.source)
}
val sourceQuery = metadata.kind match {
case MV_INDEX_TYPE => Some(metadata.source)
case _ => None
}
val lastRefreshTime: Option[Long] = metadata.latestLogEntry.flatMap { entry =>
entry.lastRefreshCompleteTime match {
case FlintMetadataLogEntry.EMPTY_TIMESTAMP => None
case timestamp => Some(timestamp)
}
}

FlintMetadataCache(metadataCacheVersion, refreshInterval, sourceTables, lastRefreshTime)
FlintMetadataCache(
metadataCacheVersion,
refreshInterval,
sourceTables,
sourceQuery,
lastRefreshTime)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,18 @@

package org.opensearch.flint.spark.metadatacache

import java.util
import java.util.{HashMap, Map => JMap}

import scala.collection.JavaConverters._

import org.opensearch.client.RequestOptions
import org.opensearch.client.indices.GetIndexRequest
import org.opensearch.client.indices.PutMappingRequest
import org.opensearch.common.xcontent.XContentType
import org.opensearch.flint.common.metadata.{FlintIndexMetadataService, FlintMetadata}
import org.opensearch.flint.common.metadata.FlintMetadata
import org.opensearch.flint.core.{FlintOptions, IRestHighLevelClient}
import org.opensearch.flint.core.metadata.FlintIndexMetadataServiceBuilder
import org.opensearch.flint.core.metadata.FlintJsonHelper._
import org.opensearch.flint.core.storage.{FlintOpenSearchIndexMetadataService, OpenSearchClientUtils}
import org.opensearch.flint.core.storage.OpenSearchClientUtils

import org.apache.spark.internal.Logging

Expand All @@ -27,27 +27,20 @@ class FlintOpenSearchMetadataCacheWriter(options: FlintOptions)
extends FlintMetadataCacheWriter
with Logging {

/**
* Since metadata cache shares the index mappings _meta field with OpenSearch index metadata
* storage, this flag is to allow for preserving index metadata that is already stored in _meta
* when updating metadata cache.
*/
private val includeSpec: Boolean =
FlintIndexMetadataServiceBuilder
.build(options)
.isInstanceOf[FlintOpenSearchIndexMetadataService]

override def updateMetadataCache(indexName: String, metadata: FlintMetadata): Unit = {
logInfo(s"Updating metadata cache for $indexName with $metadata");
logInfo(s"Updating metadata cache for $indexName");
val osIndexName = OpenSearchClientUtils.sanitizeIndexName(indexName)
var client: IRestHighLevelClient = null
try {
client = OpenSearchClientUtils.createClient(options)
val request = new PutMappingRequest(osIndexName)
val serialized = serialize(metadata)
logInfo(s"Serialized: $serialized")
request.source(serialized, XContentType.JSON)
client.updateIndexMapping(request, RequestOptions.DEFAULT)
val indexMapping = getIndexMapping(client, osIndexName)
val metadataCacheProperties = FlintMetadataCache(metadata).toMap.asJava
mergeMetadataCacheProperties(indexMapping, metadataCacheProperties)
val serialized = buildJson(builder => {
builder.field("_meta", indexMapping.get("_meta"))
builder.field("properties", indexMapping.get("properties"))
})
updateIndexMapping(client, osIndexName, serialized)
} catch {
case e: Exception =>
throw new IllegalStateException(
Expand All @@ -59,50 +52,35 @@ class FlintOpenSearchMetadataCacheWriter(options: FlintOptions)
}
}

/**
* Serialize FlintMetadataCache from FlintMetadata. Modified from {@link
* FlintOpenSearchIndexMetadataService}
*/
private[metadatacache] def serialize(metadata: FlintMetadata): String = {
try {
buildJson(builder => {
objectField(builder, "_meta") {
// If _meta is used as index metadata storage, preserve them.
if (includeSpec) {
builder
.field("version", metadata.version.version)
.field("name", metadata.name)
.field("kind", metadata.kind)
.field("source", metadata.source)
.field("indexedColumns", metadata.indexedColumns)

if (metadata.latestId.isDefined) {
builder.field("latestId", metadata.latestId.get)
}
optionalObjectField(builder, "options", metadata.options)
}

optionalObjectField(builder, "properties", buildPropertiesMap(metadata))
}
builder.field("properties", metadata.schema)
})
} catch {
case e: Exception =>
throw new IllegalStateException("Failed to jsonify cache metadata", e)
}
private[metadatacache] def getIndexMapping(
client: IRestHighLevelClient,
osIndexName: String): JMap[String, AnyRef] = {
val request = new GetIndexRequest(osIndexName)
val response = client.getIndex(request, RequestOptions.DEFAULT)
response.getMappings.get(osIndexName).sourceAsMap()
}

/**
* Since _meta.properties is shared by both index metadata and metadata cache, here we merge the
* two maps.
* Merge metadata cache properties into index mapping in place. Metadata cache is written into
* _meta.properties field of index mapping.
*/
private def buildPropertiesMap(metadata: FlintMetadata): util.Map[String, AnyRef] = {
val metadataCacheProperties = FlintMetadataCache(metadata).toMap
private def mergeMetadataCacheProperties(
indexMapping: JMap[String, AnyRef],
metadataCacheProperties: JMap[String, AnyRef]): Unit = {
indexMapping
.computeIfAbsent("_meta", _ => new HashMap[String, AnyRef]())
.asInstanceOf[JMap[String, AnyRef]]
.computeIfAbsent("properties", _ => new HashMap[String, AnyRef]())
.asInstanceOf[JMap[String, AnyRef]]
.putAll(metadataCacheProperties)
}

if (includeSpec) {
(metadataCacheProperties ++ metadata.properties.asScala).asJava
} else {
metadataCacheProperties.asJava
}
private[metadatacache] def updateIndexMapping(
client: IRestHighLevelClient,
osIndexName: String,
mappingSource: String): Unit = {
val request = new PutMappingRequest(osIndexName)
request.source(mappingSource, XContentType.JSON)
client.updateIndexMapping(request, RequestOptions.DEFAULT)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,13 @@ class FlintMetadataCacheSuite extends AnyFlatSpec with Matchers {
}

it should "construct from materialized view FlintMetadata" in {
val testQuery =
"SELECT 1 FROM spark_catalog.default.test_table UNION SELECT 1 FROM spark_catalog.default.another_table"
val content =
s""" {
| "_meta": {
| "kind": "$MV_INDEX_TYPE",
| "source": "spark_catalog.default.wrong_table",
| "source": "$testQuery",
| "options": {
| "auto_refresh": "true",
| "refresh_interval": "10 Minutes"
Expand Down Expand Up @@ -116,6 +118,7 @@ class FlintMetadataCacheSuite extends AnyFlatSpec with Matchers {
metadataCache.sourceTables shouldBe Array(
"spark_catalog.default.test_table",
"spark_catalog.default.another_table")
metadataCache.sourceQuery.get shouldBe testQuery
metadataCache.lastRefreshTime.get shouldBe 1234567890123L
}

Expand Down Expand Up @@ -145,6 +148,7 @@ class FlintMetadataCacheSuite extends AnyFlatSpec with Matchers {
metadataCache.metadataCacheVersion shouldBe FlintMetadataCache.metadataCacheVersion
metadataCache.refreshInterval shouldBe empty
metadataCache.sourceTables shouldBe Array("spark_catalog.default.test_table")
metadataCache.sourceQuery shouldBe empty
metadataCache.lastRefreshTime shouldBe empty
}
}
Loading

0 comments on commit f8f04ab

Please sign in to comment.