Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[0.5-nexus] Write mock metadata cache data to mappings _meta #744

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.core.metadata

/**
* Flint metadata cache defines metadata required to store in read cache for frontend user to
* access.
*/
case class FlintMetadataCache(
metadataCacheVersion: String,
/** Refresh interval for Flint index with auto refresh. Unit: seconds */
refreshInterval: Option[Int],
/** Source table names for building the Flint index. */
sourceTables: Array[String],
/** Timestamp when Flint index is last refreshed. Unit: milliseconds */
lastRefreshTime: Option[Long]) {

/**
* Convert FlintMetadataCache to a map. Skips a field if its value is not defined.
*/
def toMap: Map[String, AnyRef] = {
val fieldNames = getClass.getDeclaredFields.map(_.getName)
val fieldValues = productIterator.toList

fieldNames
.zip(fieldValues)
.flatMap {
case (_, None) => List.empty
case (name, Some(value)) => List((name, value))
case (name, value) => List((name, value))
}
.toMap
.mapValues(_.asInstanceOf[AnyRef])
}
}

object FlintMetadataCache {
// TODO: construct FlintMetadataCache from FlintMetadata

def mock: FlintMetadataCache = {
// Fixed dummy data
FlintMetadataCache(
"1.0",
Some(900),
Array(
"dataSourceName.default.logGroups(logGroupIdentifier:['arn:aws:logs:us-east-1:123456:test-llt-xa', 'arn:aws:logs:us-east-1:123456:sample-lg-1'])"),
Some(1727395328283L))
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.core.storage

import java.util

import scala.collection.JavaConverters._

import org.opensearch.client.RequestOptions
import org.opensearch.client.indices.PutMappingRequest
import org.opensearch.common.xcontent.XContentType
import org.opensearch.flint.common.metadata.FlintMetadata
import org.opensearch.flint.core.{FlintOptions, IRestHighLevelClient}
import org.opensearch.flint.core.metadata.{FlintIndexMetadataServiceBuilder, FlintMetadataCache}
import org.opensearch.flint.core.metadata.FlintJsonHelper._

import org.apache.spark.internal.Logging

/**
* Writes {@link FlintMetadataCache} to index mappings `_meta` for frontend user to access. This
* is different from {@link FlintIndexMetadataService} which persists the full index metadata to a
* storage for single source of truth.
*/
class FlintOpenSearchMetadataCacheWriter(options: FlintOptions) extends Logging {

/**
* Since metadata cache shares the index mappings _meta field with OpenSearch index metadata
* storage, this flag is to allow for preserving index metadata that is already stored in _meta
* when updating metadata cache.
*/
private val includeSpec: Boolean =
FlintIndexMetadataServiceBuilder
.build(options)
.isInstanceOf[FlintOpenSearchIndexMetadataService]

/**
* Update metadata cache for a Flint index.
*
* @param indexName
* index name
* @param metadata
* index metadata to update the cache
*/
def updateMetadataCache(indexName: String, metadata: FlintMetadata): Unit = {
logInfo(s"Updating metadata cache for $indexName");
val osIndexName = OpenSearchClientUtils.sanitizeIndexName(indexName)
var client: IRestHighLevelClient = null
try {
client = OpenSearchClientUtils.createClient(options)
val request = new PutMappingRequest(osIndexName)
// TODO: make sure to preserve existing lastRefreshTime
// Note that currently lastUpdateTime isn't used to construct FlintMetadataLogEntry
request.source(serialize(metadata), XContentType.JSON)
client.updateIndexMapping(request, RequestOptions.DEFAULT)
} catch {
case e: Exception =>
throw new IllegalStateException(
s"Failed to update metadata cache for Flint index $osIndexName",
e)
} finally
if (client != null) {
client.close()
}
}

/**
* Serialize FlintMetadataCache from FlintMetadata. Modified from {@link
* FlintOpenSearchIndexMetadataService}
*/
private def serialize(metadata: FlintMetadata): String = {
try {
buildJson(builder => {
objectField(builder, "_meta") {
// If _meta is used as index metadata storage, preserve them.
if (includeSpec) {
builder
.field("version", metadata.version.version)
.field("name", metadata.name)
.field("kind", metadata.kind)
.field("source", metadata.source)
.field("indexedColumns", metadata.indexedColumns)

if (metadata.latestId.isDefined) {
builder.field("latestId", metadata.latestId.get)
}
optionalObjectField(builder, "options", metadata.options)
}

optionalObjectField(builder, "properties", buildPropertiesMap(metadata))
}
builder.field("properties", metadata.schema)
})
} catch {
case e: Exception =>
throw new IllegalStateException("Failed to jsonify cache metadata", e)
}
}

/**
* Since _meta.properties is shared by both index metadata and metadata cache, here we merge the
* two maps.
*/
private def buildPropertiesMap(metadata: FlintMetadata): util.Map[String, AnyRef] = {
val metadataCacheProperties = FlintMetadataCache.mock.toMap

if (includeSpec) {
(metadataCacheProperties ++ metadata.properties.asScala).asJava
} else {
metadataCacheProperties.asJava
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,10 @@ object FlintSparkConf {
FlintConfig("spark.metadata.accessAWSCredentialsProvider")
.doc("AWS credentials provider for metadata access permission")
.createOptional()
val METADATA_CACHE_WRITE = FlintConfig("spark.flint.metadataCacheWrite.enabled")
.doc("Enable Flint metadata cache write to Flint index mappings")
.createWithDefault("true")

val CUSTOM_SESSION_MANAGER =
FlintConfig("spark.flint.job.customSessionManager")
.createOptional()
Expand Down Expand Up @@ -307,6 +311,8 @@ case class FlintSparkConf(properties: JMap[String, String]) extends Serializable

def monitorMaxErrorCount(): Int = MONITOR_MAX_ERROR_COUNT.readFrom(reader).toInt

def isMetadataCacheWriteEnabled: Boolean = METADATA_CACHE_WRITE.readFrom(reader).toBoolean

/**
* spark.sql.session.timeZone
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import org.opensearch.flint.common.scheduler.AsyncQueryScheduler
import org.opensearch.flint.core.{FlintClient, FlintClientBuilder}
import org.opensearch.flint.core.metadata.FlintIndexMetadataServiceBuilder
import org.opensearch.flint.core.metadata.log.FlintMetadataLogServiceBuilder
import org.opensearch.flint.core.storage.FlintOpenSearchMetadataCacheWriter
import org.opensearch.flint.spark.FlintSparkIndex.ID_COLUMN
import org.opensearch.flint.spark.FlintSparkIndexOptions.OptionName._
import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex
Expand Down Expand Up @@ -54,6 +55,9 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w
FlintIndexMetadataServiceBuilder.build(flintSparkConf.flintOptions())
}

private val flintMetadataCacheWriteService = new FlintOpenSearchMetadataCacheWriter(
flintSparkConf.flintOptions())

private val flintAsyncQueryScheduler: AsyncQueryScheduler = {
AsyncQuerySchedulerBuilder.build(flintSparkConf.flintOptions())
}
Expand Down Expand Up @@ -115,7 +119,6 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w
throw new IllegalStateException(s"Flint index $indexName already exists")
}
} else {
val metadata = index.metadata()
val jobSchedulingService = FlintSparkJobSchedulingService.create(
index,
spark,
Expand All @@ -127,14 +130,17 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w
.transientLog(latest => latest.copy(state = CREATING))
.finalLog(latest => latest.copy(state = ACTIVE))
.commit(latest => {
if (latest == null) { // in case transaction capability is disabled
flintClient.createIndex(indexName, metadata)
flintIndexMetadataService.updateIndexMetadata(indexName, metadata)
} else {
logInfo(s"Creating index with metadata log entry ID ${latest.id}")
flintClient.createIndex(indexName, metadata.copy(latestId = Some(latest.id)))
flintIndexMetadataService
.updateIndexMetadata(indexName, metadata.copy(latestId = Some(latest.id)))
val metadata = latest match {
case null => // in case transaction capability is disabled
index.metadata()
case latestEntry =>
logInfo(s"Creating index with metadata log entry ID ${latestEntry.id}")
index.metadata().copy(latestId = Some(latestEntry.id))
}
flintClient.createIndex(indexName, metadata)
flintIndexMetadataService.updateIndexMetadata(indexName, metadata)
if (isMetadataCacheWriteEnabled) {
flintMetadataCacheWriteService.updateMetadataCache(indexName, metadata)
}
jobSchedulingService.handleJob(index, AsyncQuerySchedulerAction.SCHEDULE)
})
Expand Down Expand Up @@ -384,6 +390,10 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w
new DataTypeSkippingStrategy().analyzeSkippingIndexColumns(tableName, spark)
}

private def isMetadataCacheWriteEnabled: Boolean = {
flintSparkConf.isMetadataCacheWriteEnabled
}

private def getAllIndexMetadata(indexNamePattern: String): Map[String, FlintMetadata] = {
if (flintIndexMetadataService.supportsGetByIndexPattern) {
flintIndexMetadataService
Expand Down Expand Up @@ -511,6 +521,9 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w
})
.commit(_ => {
flintIndexMetadataService.updateIndexMetadata(indexName, index.metadata)
if (isMetadataCacheWriteEnabled) {
flintMetadataCacheWriteService.updateMetadataCache(indexName, index.metadata)
}
dai-chen marked this conversation as resolved.
Show resolved Hide resolved
logInfo("Update index options complete")
jobSchedulingService.handleJob(index, AsyncQuerySchedulerAction.UPDATE)
})
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.core

import java.util.List

import scala.collection.JavaConverters._

import org.opensearch.flint.core.metadata.FlintMetadataCache
import org.opensearch.flint.core.storage.{FlintOpenSearchClient, FlintOpenSearchIndexMetadataService, FlintOpenSearchMetadataCacheWriter}
import org.opensearch.flint.spark.FlintSparkSuite
import org.scalatest.Entry
import org.scalatest.matchers.should.Matchers

import org.apache.spark.sql.flint.config.FlintSparkConf

class FlintOpenSearchMetadataCacheWriterITSuite extends FlintSparkSuite with Matchers {

/** Lazy initialize after container started. */
lazy val options = new FlintOptions(openSearchOptions.asJava)
lazy val flintClient = new FlintOpenSearchClient(options)
lazy val flintMetadataCacheWriter = new FlintOpenSearchMetadataCacheWriter(options)
lazy val flintIndexMetadataService = new FlintOpenSearchIndexMetadataService(options)

private val mockMetadataCacheData = FlintMetadataCache.mock

override def beforeAll(): Unit = {
super.beforeAll()
setFlintSparkConf(FlintSparkConf.METADATA_CACHE_WRITE, "true")
}

override def afterAll(): Unit = {
super.afterAll()
// TODO: unset if default is false
// conf.unsetConf(FlintSparkConf.METADATA_CACHE_WRITE.key)
}

test("write metadata cache to index mappings") {
val indexName = "flint_test_index"
val metadata = FlintOpenSearchIndexMetadataService.deserialize("{}")
flintClient.createIndex(indexName, metadata)
flintMetadataCacheWriter.updateMetadataCache(indexName, metadata)

val properties = flintIndexMetadataService.getIndexMetadata(indexName).properties
properties should have size 4
properties should contain allOf (Entry(
"metadataCacheVersion",
mockMetadataCacheData.metadataCacheVersion),
Entry("refreshInterval", mockMetadataCacheData.refreshInterval.get),
Entry("lastRefreshTime", mockMetadataCacheData.lastRefreshTime.get))
Comment on lines +49 to +53
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is formated by sbt scalafmtAll. Seems no way to get around this..

properties
.get("sourceTables")
.asInstanceOf[List[String]]
.toArray should contain theSameElementsAs mockMetadataCacheData.sourceTables
}

test("write metadata cache to index mappings and preserve other index metadata") {
val indexName = "test_update"
val content =
""" {
| "_meta": {
| "kind": "test_kind"
| },
| "properties": {
| "age": {
| "type": "integer"
| }
| }
| }
|""".stripMargin

val metadata = FlintOpenSearchIndexMetadataService.deserialize(content)
flintClient.createIndex(indexName, metadata)

flintIndexMetadataService.updateIndexMetadata(indexName, metadata)
flintMetadataCacheWriter.updateMetadataCache(indexName, metadata)

flintIndexMetadataService.getIndexMetadata(indexName).kind shouldBe "test_kind"
flintIndexMetadataService.getIndexMetadata(indexName).name shouldBe empty
flintIndexMetadataService.getIndexMetadata(indexName).schema should have size 1
var properties = flintIndexMetadataService.getIndexMetadata(indexName).properties
properties should have size 4
properties should contain allOf (Entry(
"metadataCacheVersion",
mockMetadataCacheData.metadataCacheVersion),
Entry("refreshInterval", mockMetadataCacheData.refreshInterval.get),
Entry("lastRefreshTime", mockMetadataCacheData.lastRefreshTime.get))
properties
.get("sourceTables")
.asInstanceOf[List[String]]
.toArray should contain theSameElementsAs mockMetadataCacheData.sourceTables

val newContent =
""" {
| "_meta": {
| "kind": "test_kind",
| "name": "test_name"
| },
| "properties": {
| "age": {
| "type": "integer"
| }
| }
| }
|""".stripMargin

val newMetadata = FlintOpenSearchIndexMetadataService.deserialize(newContent)
flintIndexMetadataService.updateIndexMetadata(indexName, newMetadata)
flintMetadataCacheWriter.updateMetadataCache(indexName, newMetadata)

flintIndexMetadataService.getIndexMetadata(indexName).kind shouldBe "test_kind"
flintIndexMetadataService.getIndexMetadata(indexName).name shouldBe "test_name"
flintIndexMetadataService.getIndexMetadata(indexName).schema should have size 1
properties = flintIndexMetadataService.getIndexMetadata(indexName).properties
properties should have size 4
properties should contain allOf (Entry(
"metadataCacheVersion",
mockMetadataCacheData.metadataCacheVersion),
Entry("refreshInterval", mockMetadataCacheData.refreshInterval.get),
Entry("lastRefreshTime", mockMetadataCacheData.lastRefreshTime.get))
properties
.get("sourceTables")
.asInstanceOf[List[String]]
.toArray should contain theSameElementsAs mockMetadataCacheData.sourceTables
}
}
Loading