-
Notifications
You must be signed in to change notification settings - Fork 33
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
write mock metadata cache data to mappings _meta
Signed-off-by: Sean Kao <[email protected]>
- Loading branch information
1 parent
ab13935
commit bddb2bf
Showing
5 changed files
with
306 additions
and
0 deletions.
There are no files selected for viewing
52 changes: 52 additions & 0 deletions
52
flint-core/src/main/scala/org/opensearch/flint/core/metadata/FlintMetadataCache.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,52 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.flint.core.metadata | ||
|
||
/** | ||
* Flint metadata cache defines metadata required to store in read cache for frontend user to | ||
* access. | ||
*/ | ||
case class FlintMetadataCache( | ||
metadataCacheVersion: String, | ||
/** Refresh interval for Flint index with auto refresh. Unit: seconds */ | ||
refreshInterval: Option[Int], | ||
/** Source table names for building the Flint index. */ | ||
sourceTables: Array[String], | ||
/** Timestamp when Flint index is last refreshed. Unit: milliseconds */ | ||
lastRefreshTime: Option[Long]) { | ||
|
||
/** | ||
* Convert FlintMetadataCache to a map for writing. Skips a field if its value not defined. | ||
*/ | ||
def toMap: Map[String, AnyRef] = { | ||
val fieldNames = getClass.getDeclaredFields.map(_.getName) | ||
val fieldValues = productIterator.toList | ||
|
||
fieldNames | ||
.zip(fieldValues) | ||
.flatMap { | ||
case (_, None) => List.empty | ||
case (name, Some(value)) => List((name, value)) | ||
case (name, value) => List((name, value)) | ||
} | ||
.toMap | ||
.mapValues(_.asInstanceOf[AnyRef]) | ||
} | ||
} | ||
|
||
object FlintMetadataCache { | ||
// TODO: construct FlintMetadataCache from FlintMetadata | ||
|
||
def mock: FlintMetadataCache = { | ||
// Fixed dummy data | ||
FlintMetadataCache( | ||
"1.0", | ||
Some(900), | ||
Array( | ||
"dataSourceName.default.logGroups(logGroupIdentifier:['arn:aws:logs:us-east-1:123456:test-llt-xa', 'arn:aws:logs:us-east-1:123456:sample-lg-1'])"), | ||
Some(1727395328283L)) | ||
} | ||
} |
114 changes: 114 additions & 0 deletions
114
...src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchMetadataCacheWriter.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,114 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.flint.core.storage | ||
|
||
import java.util | ||
|
||
import scala.collection.JavaConverters._ | ||
|
||
import org.opensearch.client.RequestOptions | ||
import org.opensearch.client.indices.PutMappingRequest | ||
import org.opensearch.common.xcontent.XContentType | ||
import org.opensearch.flint.common.metadata.FlintMetadata | ||
import org.opensearch.flint.core.{FlintOptions, IRestHighLevelClient} | ||
import org.opensearch.flint.core.metadata.{FlintIndexMetadataServiceBuilder, FlintMetadataCache} | ||
import org.opensearch.flint.core.metadata.FlintJsonHelper._ | ||
|
||
import org.apache.spark.internal.Logging | ||
|
||
/** | ||
* Dual writes the following metadata to index mappings `_meta` for frontend user to access. This | ||
* is different from {@link FlintIndexMetadataService} which persist the full index metadata to a | ||
* storage for single source of truth. | ||
*/ | ||
class FlintOpenSearchMetadataCacheWriter(options: FlintOptions) extends Logging { | ||
|
||
/** | ||
* Since metadata cache shares the index mappings _meta field with OpenSearch index metadata | ||
* storage, this flag is to allow for preserving index metadata that is already stored in _meta | ||
* when updating metadata cache. | ||
*/ | ||
private val includeSpec: Boolean = | ||
FlintIndexMetadataServiceBuilder | ||
.build(options) | ||
.isInstanceOf[FlintOpenSearchIndexMetadataService] | ||
|
||
/** | ||
* Update metadata cache for a Flint index. | ||
* | ||
* @param indexName | ||
* index name | ||
* @param metadata | ||
* index metadata to update the cache | ||
*/ | ||
def updateMetadataCache(indexName: String, metadata: FlintMetadata): Unit = { | ||
logInfo(s"Updating metadata cache for $indexName"); | ||
val osIndexName = OpenSearchClientUtils.sanitizeIndexName(indexName) | ||
var client: IRestHighLevelClient = null | ||
try { | ||
client = OpenSearchClientUtils.createClient(options) | ||
val request = new PutMappingRequest(osIndexName) | ||
// TODO: make sure to preserve existing lastRefreshTime | ||
request.source(serialize(metadata), XContentType.JSON) | ||
client.updateIndexMapping(request, RequestOptions.DEFAULT) | ||
} catch { | ||
case e: Exception => | ||
throw new IllegalStateException( | ||
s"Failed to update metadata cache for Flint index $osIndexName", | ||
e) | ||
} finally | ||
if (client != null) { | ||
client.close() | ||
} | ||
} | ||
|
||
/** | ||
* Serialize FlintMetadataCache from FlintMetadata. Modified from {@link | ||
* FlintOpenSearchIndexMetadataService} | ||
*/ | ||
private def serialize(metadata: FlintMetadata): String = { | ||
try { | ||
buildJson(builder => { | ||
objectField(builder, "_meta") { | ||
// If _meta is used as index metadata storage, preserve them. | ||
if (includeSpec) { | ||
builder | ||
.field("version", metadata.version.version) | ||
.field("name", metadata.name) | ||
.field("kind", metadata.kind) | ||
.field("source", metadata.source) | ||
.field("indexedColumns", metadata.indexedColumns) | ||
|
||
if (metadata.latestId.isDefined) { | ||
builder.field("latestId", metadata.latestId.get) | ||
} | ||
optionalObjectField(builder, "options", metadata.options) | ||
} | ||
|
||
optionalObjectField(builder, "properties", buildPropertiesMap(metadata)) | ||
} | ||
builder.field("properties", metadata.schema) | ||
}) | ||
} catch { | ||
case e: Exception => | ||
throw new IllegalStateException("Failed to jsonify cache metadata", e) | ||
} | ||
} | ||
|
||
/** | ||
* Since _meta.properties is shared by both index metadata and metadata cache, here we merge the | ||
* two maps. | ||
*/ | ||
private def buildPropertiesMap(metadata: FlintMetadata): util.Map[String, AnyRef] = { | ||
val metadataCacheProperties = FlintMetadataCache.mock.toMap | ||
|
||
if (includeSpec) { | ||
(metadataCacheProperties ++ metadata.properties.asScala).asJava | ||
} else { | ||
metadataCacheProperties.asJava | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
120 changes: 120 additions & 0 deletions
120
...tegration/scala/org/opensearch/flint/core/FlintOpenSearchMetadataCacheWriterITSuite.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,120 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.flint.core | ||
|
||
import java.util.List | ||
|
||
import scala.collection.JavaConverters._ | ||
|
||
import org.opensearch.flint.core.metadata.FlintMetadataCache | ||
import org.opensearch.flint.core.storage.{FlintOpenSearchClient, FlintOpenSearchIndexMetadataService, FlintOpenSearchMetadataCacheWriter} | ||
import org.opensearch.flint.spark.FlintSparkSuite | ||
import org.scalatest.Entry | ||
import org.scalatest.matchers.should.Matchers | ||
|
||
import org.apache.spark.sql.flint.config.FlintSparkConf | ||
|
||
class FlintOpenSearchMetadataCacheWriterITSuite extends FlintSparkSuite with Matchers { | ||
|
||
/** Lazy initialize after container started. */ | ||
lazy val options = new FlintOptions(openSearchOptions.asJava) | ||
lazy val flintClient = new FlintOpenSearchClient(options) | ||
lazy val flintMetadataCacheWriter = new FlintOpenSearchMetadataCacheWriter(options) | ||
lazy val flintIndexMetadataService = new FlintOpenSearchIndexMetadataService(options) | ||
|
||
private val mockMetadataCacheData = FlintMetadataCache.mock | ||
|
||
test("write metadata cache to index mappings") { | ||
setFlintSparkConf(FlintSparkConf.METADATA_CACHE_WRITE, "true") | ||
val indexName = "flint_test_index" | ||
val metadata = FlintOpenSearchIndexMetadataService.deserialize("{}") | ||
flintClient.createIndex(indexName, metadata) | ||
flintMetadataCacheWriter.updateMetadataCache(indexName, metadata) | ||
|
||
val properties = flintIndexMetadataService.getIndexMetadata(indexName).properties | ||
properties should have size 4 | ||
properties should contain allOf (Entry( | ||
"metadataCacheVersion", | ||
mockMetadataCacheData.metadataCacheVersion), | ||
Entry("refreshInterval", mockMetadataCacheData.refreshInterval.get), | ||
Entry("lastRefreshTime", mockMetadataCacheData.lastRefreshTime.get)) | ||
properties | ||
.get("sourceTables") | ||
.asInstanceOf[List[String]] | ||
.toArray should contain theSameElementsAs mockMetadataCacheData.sourceTables | ||
} | ||
|
||
test("write metadata cache to index mappings and preserve other index metadata") { | ||
setFlintSparkConf(FlintSparkConf.METADATA_CACHE_WRITE, "true") | ||
val indexName = "test_update" | ||
val content = | ||
""" { | ||
| "_meta": { | ||
| "kind": "test_kind" | ||
| }, | ||
| "properties": { | ||
| "age": { | ||
| "type": "integer" | ||
| } | ||
| } | ||
| } | ||
|""".stripMargin | ||
|
||
val metadata = FlintOpenSearchIndexMetadataService.deserialize(content) | ||
flintClient.createIndex(indexName, metadata) | ||
|
||
flintIndexMetadataService.updateIndexMetadata(indexName, metadata) | ||
flintMetadataCacheWriter.updateMetadataCache(indexName, metadata) | ||
|
||
flintIndexMetadataService.getIndexMetadata(indexName).kind shouldBe "test_kind" | ||
flintIndexMetadataService.getIndexMetadata(indexName).name shouldBe empty | ||
flintIndexMetadataService.getIndexMetadata(indexName).schema should have size 1 | ||
var properties = flintIndexMetadataService.getIndexMetadata(indexName).properties | ||
properties should have size 4 | ||
properties should contain allOf (Entry( | ||
"metadataCacheVersion", | ||
mockMetadataCacheData.metadataCacheVersion), | ||
Entry("refreshInterval", mockMetadataCacheData.refreshInterval.get), | ||
Entry("lastRefreshTime", mockMetadataCacheData.lastRefreshTime.get)) | ||
properties | ||
.get("sourceTables") | ||
.asInstanceOf[List[String]] | ||
.toArray should contain theSameElementsAs mockMetadataCacheData.sourceTables | ||
|
||
val newContent = | ||
""" { | ||
| "_meta": { | ||
| "kind": "test_kind", | ||
| "name": "test_name" | ||
| }, | ||
| "properties": { | ||
| "age": { | ||
| "type": "integer" | ||
| } | ||
| } | ||
| } | ||
|""".stripMargin | ||
|
||
val newMetadata = FlintOpenSearchIndexMetadataService.deserialize(newContent) | ||
flintIndexMetadataService.updateIndexMetadata(indexName, newMetadata) | ||
flintMetadataCacheWriter.updateMetadataCache(indexName, newMetadata) | ||
|
||
flintIndexMetadataService.getIndexMetadata(indexName).kind shouldBe "test_kind" | ||
flintIndexMetadataService.getIndexMetadata(indexName).name shouldBe "test_name" | ||
flintIndexMetadataService.getIndexMetadata(indexName).schema should have size 1 | ||
properties = flintIndexMetadataService.getIndexMetadata(indexName).properties | ||
properties should have size 4 | ||
properties should contain allOf (Entry( | ||
"metadataCacheVersion", | ||
mockMetadataCacheData.metadataCacheVersion), | ||
Entry("refreshInterval", mockMetadataCacheData.refreshInterval.get), | ||
Entry("lastRefreshTime", mockMetadataCacheData.lastRefreshTime.get)) | ||
properties | ||
.get("sourceTables") | ||
.asInstanceOf[List[String]] | ||
.toArray should contain theSameElementsAs mockMetadataCacheData.sourceTables | ||
} | ||
} |