Skip to content

Commit

Permalink
[0.5-nexus] Write mock metadata cache data to mappings _meta (#744)
Browse files Browse the repository at this point in the history
* write mock metadata cache data to mappings _meta

Signed-off-by: Sean Kao <[email protected]>

* Enable write to cache by default

Signed-off-by: Sean Kao <[email protected]>

* bugfix: _meta.latestId missing when create index

Signed-off-by: Sean Kao <[email protected]>

* set and unset config in test suite

Signed-off-by: Sean Kao <[email protected]>

* fix: use member flintSparkConf

Signed-off-by: Sean Kao <[email protected]>

---------

Signed-off-by: Sean Kao <[email protected]>
  • Loading branch information
seankao-az authored Oct 8, 2024
1 parent ab13935 commit b3adf46
Show file tree
Hide file tree
Showing 5 changed files with 324 additions and 9 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.core.metadata

/**
* Flint metadata cache defines metadata required to store in read cache for frontend user to
* access.
*/
case class FlintMetadataCache(
metadataCacheVersion: String,
/** Refresh interval for Flint index with auto refresh. Unit: seconds */
refreshInterval: Option[Int],
/** Source table names for building the Flint index. */
sourceTables: Array[String],
/** Timestamp when Flint index is last refreshed. Unit: milliseconds */
lastRefreshTime: Option[Long]) {

/**
* Convert FlintMetadataCache to a map. Skips a field if its value is not defined.
*/
def toMap: Map[String, AnyRef] = {
val fieldNames = getClass.getDeclaredFields.map(_.getName)
val fieldValues = productIterator.toList

fieldNames
.zip(fieldValues)
.flatMap {
case (_, None) => List.empty
case (name, Some(value)) => List((name, value))
case (name, value) => List((name, value))
}
.toMap
.mapValues(_.asInstanceOf[AnyRef])
}
}

object FlintMetadataCache {
// TODO: construct FlintMetadataCache from FlintMetadata

def mock: FlintMetadataCache = {
// Fixed dummy data
FlintMetadataCache(
"1.0",
Some(900),
Array(
"dataSourceName.default.logGroups(logGroupIdentifier:['arn:aws:logs:us-east-1:123456:test-llt-xa', 'arn:aws:logs:us-east-1:123456:sample-lg-1'])"),
Some(1727395328283L))
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.core.storage

import java.util

import scala.collection.JavaConverters._

import org.opensearch.client.RequestOptions
import org.opensearch.client.indices.PutMappingRequest
import org.opensearch.common.xcontent.XContentType
import org.opensearch.flint.common.metadata.FlintMetadata
import org.opensearch.flint.core.{FlintOptions, IRestHighLevelClient}
import org.opensearch.flint.core.metadata.{FlintIndexMetadataServiceBuilder, FlintMetadataCache}
import org.opensearch.flint.core.metadata.FlintJsonHelper._

import org.apache.spark.internal.Logging

/**
* Writes {@link FlintMetadataCache} to index mappings `_meta` for frontend user to access. This
* is different from {@link FlintIndexMetadataService} which persists the full index metadata to a
* storage for single source of truth.
*/
class FlintOpenSearchMetadataCacheWriter(options: FlintOptions) extends Logging {

/**
* Since metadata cache shares the index mappings _meta field with OpenSearch index metadata
* storage, this flag is to allow for preserving index metadata that is already stored in _meta
* when updating metadata cache.
*/
private val includeSpec: Boolean =
FlintIndexMetadataServiceBuilder
.build(options)
.isInstanceOf[FlintOpenSearchIndexMetadataService]

/**
* Update metadata cache for a Flint index.
*
* @param indexName
* index name
* @param metadata
* index metadata to update the cache
*/
def updateMetadataCache(indexName: String, metadata: FlintMetadata): Unit = {
logInfo(s"Updating metadata cache for $indexName");
val osIndexName = OpenSearchClientUtils.sanitizeIndexName(indexName)
var client: IRestHighLevelClient = null
try {
client = OpenSearchClientUtils.createClient(options)
val request = new PutMappingRequest(osIndexName)
// TODO: make sure to preserve existing lastRefreshTime
// Note that currently lastUpdateTime isn't used to construct FlintMetadataLogEntry
request.source(serialize(metadata), XContentType.JSON)
client.updateIndexMapping(request, RequestOptions.DEFAULT)
} catch {
case e: Exception =>
throw new IllegalStateException(
s"Failed to update metadata cache for Flint index $osIndexName",
e)
} finally
if (client != null) {
client.close()
}
}

/**
* Serialize FlintMetadataCache from FlintMetadata. Modified from {@link
* FlintOpenSearchIndexMetadataService}
*/
private def serialize(metadata: FlintMetadata): String = {
try {
buildJson(builder => {
objectField(builder, "_meta") {
// If _meta is used as index metadata storage, preserve them.
if (includeSpec) {
builder
.field("version", metadata.version.version)
.field("name", metadata.name)
.field("kind", metadata.kind)
.field("source", metadata.source)
.field("indexedColumns", metadata.indexedColumns)

if (metadata.latestId.isDefined) {
builder.field("latestId", metadata.latestId.get)
}
optionalObjectField(builder, "options", metadata.options)
}

optionalObjectField(builder, "properties", buildPropertiesMap(metadata))
}
builder.field("properties", metadata.schema)
})
} catch {
case e: Exception =>
throw new IllegalStateException("Failed to jsonify cache metadata", e)
}
}

/**
* Since _meta.properties is shared by both index metadata and metadata cache, here we merge the
* two maps.
*/
private def buildPropertiesMap(metadata: FlintMetadata): util.Map[String, AnyRef] = {
val metadataCacheProperties = FlintMetadataCache.mock.toMap

if (includeSpec) {
(metadataCacheProperties ++ metadata.properties.asScala).asJava
} else {
metadataCacheProperties.asJava
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,10 @@ object FlintSparkConf {
FlintConfig("spark.metadata.accessAWSCredentialsProvider")
.doc("AWS credentials provider for metadata access permission")
.createOptional()
val METADATA_CACHE_WRITE = FlintConfig("spark.flint.metadataCacheWrite.enabled")
.doc("Enable Flint metadata cache write to Flint index mappings")
.createWithDefault("true")

val CUSTOM_SESSION_MANAGER =
FlintConfig("spark.flint.job.customSessionManager")
.createOptional()
Expand Down Expand Up @@ -307,6 +311,8 @@ case class FlintSparkConf(properties: JMap[String, String]) extends Serializable

def monitorMaxErrorCount(): Int = MONITOR_MAX_ERROR_COUNT.readFrom(reader).toInt

def isMetadataCacheWriteEnabled: Boolean = METADATA_CACHE_WRITE.readFrom(reader).toBoolean

/**
* spark.sql.session.timeZone
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import org.opensearch.flint.common.scheduler.AsyncQueryScheduler
import org.opensearch.flint.core.{FlintClient, FlintClientBuilder}
import org.opensearch.flint.core.metadata.FlintIndexMetadataServiceBuilder
import org.opensearch.flint.core.metadata.log.FlintMetadataLogServiceBuilder
import org.opensearch.flint.core.storage.FlintOpenSearchMetadataCacheWriter
import org.opensearch.flint.spark.FlintSparkIndex.ID_COLUMN
import org.opensearch.flint.spark.FlintSparkIndexOptions.OptionName._
import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex
Expand Down Expand Up @@ -54,6 +55,9 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w
FlintIndexMetadataServiceBuilder.build(flintSparkConf.flintOptions())
}

private val flintMetadataCacheWriteService = new FlintOpenSearchMetadataCacheWriter(
flintSparkConf.flintOptions())

private val flintAsyncQueryScheduler: AsyncQueryScheduler = {
AsyncQuerySchedulerBuilder.build(flintSparkConf.flintOptions())
}
Expand Down Expand Up @@ -115,7 +119,6 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w
throw new IllegalStateException(s"Flint index $indexName already exists")
}
} else {
val metadata = index.metadata()
val jobSchedulingService = FlintSparkJobSchedulingService.create(
index,
spark,
Expand All @@ -127,14 +130,17 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w
.transientLog(latest => latest.copy(state = CREATING))
.finalLog(latest => latest.copy(state = ACTIVE))
.commit(latest => {
if (latest == null) { // in case transaction capability is disabled
flintClient.createIndex(indexName, metadata)
flintIndexMetadataService.updateIndexMetadata(indexName, metadata)
} else {
logInfo(s"Creating index with metadata log entry ID ${latest.id}")
flintClient.createIndex(indexName, metadata.copy(latestId = Some(latest.id)))
flintIndexMetadataService
.updateIndexMetadata(indexName, metadata.copy(latestId = Some(latest.id)))
val metadata = latest match {
case null => // in case transaction capability is disabled
index.metadata()
case latestEntry =>
logInfo(s"Creating index with metadata log entry ID ${latestEntry.id}")
index.metadata().copy(latestId = Some(latestEntry.id))
}
flintClient.createIndex(indexName, metadata)
flintIndexMetadataService.updateIndexMetadata(indexName, metadata)
if (isMetadataCacheWriteEnabled) {
flintMetadataCacheWriteService.updateMetadataCache(indexName, metadata)
}
jobSchedulingService.handleJob(index, AsyncQuerySchedulerAction.SCHEDULE)
})
Expand Down Expand Up @@ -384,6 +390,10 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w
new DataTypeSkippingStrategy().analyzeSkippingIndexColumns(tableName, spark)
}

private def isMetadataCacheWriteEnabled: Boolean = {
flintSparkConf.isMetadataCacheWriteEnabled
}

private def getAllIndexMetadata(indexNamePattern: String): Map[String, FlintMetadata] = {
if (flintIndexMetadataService.supportsGetByIndexPattern) {
flintIndexMetadataService
Expand Down Expand Up @@ -511,6 +521,9 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w
})
.commit(_ => {
flintIndexMetadataService.updateIndexMetadata(indexName, index.metadata)
if (isMetadataCacheWriteEnabled) {
flintMetadataCacheWriteService.updateMetadataCache(indexName, index.metadata)
}
logInfo("Update index options complete")
jobSchedulingService.handleJob(index, AsyncQuerySchedulerAction.UPDATE)
})
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.core

import java.util.List

import scala.collection.JavaConverters._

import org.opensearch.flint.core.metadata.FlintMetadataCache
import org.opensearch.flint.core.storage.{FlintOpenSearchClient, FlintOpenSearchIndexMetadataService, FlintOpenSearchMetadataCacheWriter}
import org.opensearch.flint.spark.FlintSparkSuite
import org.scalatest.Entry
import org.scalatest.matchers.should.Matchers

import org.apache.spark.sql.flint.config.FlintSparkConf

class FlintOpenSearchMetadataCacheWriterITSuite extends FlintSparkSuite with Matchers {

/** Lazy initialize after container started. */
lazy val options = new FlintOptions(openSearchOptions.asJava)
lazy val flintClient = new FlintOpenSearchClient(options)
lazy val flintMetadataCacheWriter = new FlintOpenSearchMetadataCacheWriter(options)
lazy val flintIndexMetadataService = new FlintOpenSearchIndexMetadataService(options)

private val mockMetadataCacheData = FlintMetadataCache.mock

override def beforeAll(): Unit = {
super.beforeAll()
setFlintSparkConf(FlintSparkConf.METADATA_CACHE_WRITE, "true")
}

override def afterAll(): Unit = {
super.afterAll()
// TODO: unset if default is false
// conf.unsetConf(FlintSparkConf.METADATA_CACHE_WRITE.key)
}

test("write metadata cache to index mappings") {
val indexName = "flint_test_index"
val metadata = FlintOpenSearchIndexMetadataService.deserialize("{}")
flintClient.createIndex(indexName, metadata)
flintMetadataCacheWriter.updateMetadataCache(indexName, metadata)

val properties = flintIndexMetadataService.getIndexMetadata(indexName).properties
properties should have size 4
properties should contain allOf (Entry(
"metadataCacheVersion",
mockMetadataCacheData.metadataCacheVersion),
Entry("refreshInterval", mockMetadataCacheData.refreshInterval.get),
Entry("lastRefreshTime", mockMetadataCacheData.lastRefreshTime.get))
properties
.get("sourceTables")
.asInstanceOf[List[String]]
.toArray should contain theSameElementsAs mockMetadataCacheData.sourceTables
}

test("write metadata cache to index mappings and preserve other index metadata") {
val indexName = "test_update"
val content =
""" {
| "_meta": {
| "kind": "test_kind"
| },
| "properties": {
| "age": {
| "type": "integer"
| }
| }
| }
|""".stripMargin

val metadata = FlintOpenSearchIndexMetadataService.deserialize(content)
flintClient.createIndex(indexName, metadata)

flintIndexMetadataService.updateIndexMetadata(indexName, metadata)
flintMetadataCacheWriter.updateMetadataCache(indexName, metadata)

flintIndexMetadataService.getIndexMetadata(indexName).kind shouldBe "test_kind"
flintIndexMetadataService.getIndexMetadata(indexName).name shouldBe empty
flintIndexMetadataService.getIndexMetadata(indexName).schema should have size 1
var properties = flintIndexMetadataService.getIndexMetadata(indexName).properties
properties should have size 4
properties should contain allOf (Entry(
"metadataCacheVersion",
mockMetadataCacheData.metadataCacheVersion),
Entry("refreshInterval", mockMetadataCacheData.refreshInterval.get),
Entry("lastRefreshTime", mockMetadataCacheData.lastRefreshTime.get))
properties
.get("sourceTables")
.asInstanceOf[List[String]]
.toArray should contain theSameElementsAs mockMetadataCacheData.sourceTables

val newContent =
""" {
| "_meta": {
| "kind": "test_kind",
| "name": "test_name"
| },
| "properties": {
| "age": {
| "type": "integer"
| }
| }
| }
|""".stripMargin

val newMetadata = FlintOpenSearchIndexMetadataService.deserialize(newContent)
flintIndexMetadataService.updateIndexMetadata(indexName, newMetadata)
flintMetadataCacheWriter.updateMetadataCache(indexName, newMetadata)

flintIndexMetadataService.getIndexMetadata(indexName).kind shouldBe "test_kind"
flintIndexMetadataService.getIndexMetadata(indexName).name shouldBe "test_name"
flintIndexMetadataService.getIndexMetadata(indexName).schema should have size 1
properties = flintIndexMetadataService.getIndexMetadata(indexName).properties
properties should have size 4
properties should contain allOf (Entry(
"metadataCacheVersion",
mockMetadataCacheData.metadataCacheVersion),
Entry("refreshInterval", mockMetadataCacheData.refreshInterval.get),
Entry("lastRefreshTime", mockMetadataCacheData.lastRefreshTime.get))
properties
.get("sourceTables")
.asInstanceOf[List[String]]
.toArray should contain theSameElementsAs mockMetadataCacheData.sourceTables
}
}

0 comments on commit b3adf46

Please sign in to comment.