diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/metadatacache/FlintMetadataCache.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/metadatacache/FlintMetadataCache.scala index 86267c881..c2007c124 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/metadatacache/FlintMetadataCache.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/metadatacache/FlintMetadataCache.scala @@ -23,6 +23,8 @@ case class FlintMetadataCache( refreshInterval: Option[Int], /** Source table names for building the Flint index. */ sourceTables: Array[String], + /** Source query for MV */ + sourceQuery: Option[String], /** Timestamp when Flint index is last refreshed. Unit: milliseconds */ lastRefreshTime: Option[Long]) { @@ -64,6 +66,10 @@ object FlintMetadataCache { case MV_INDEX_TYPE => getSourceTablesFromMetadata(metadata) case _ => Array(metadata.source) } + val sourceQuery = metadata.kind match { + case MV_INDEX_TYPE => Some(metadata.source) + case _ => None + } val lastRefreshTime: Option[Long] = metadata.latestLogEntry.flatMap { entry => entry.lastRefreshCompleteTime match { case FlintMetadataLogEntry.EMPTY_TIMESTAMP => None @@ -71,6 +77,11 @@ object FlintMetadataCache { } } - FlintMetadataCache(metadataCacheVersion, refreshInterval, sourceTables, lastRefreshTime) + FlintMetadataCache( + metadataCacheVersion, + refreshInterval, + sourceTables, + sourceQuery, + lastRefreshTime) } } diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/metadatacache/FlintOpenSearchMetadataCacheWriter.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/metadatacache/FlintOpenSearchMetadataCacheWriter.scala index f6fc0ba6f..de3e051fb 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/metadatacache/FlintOpenSearchMetadataCacheWriter.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/metadatacache/FlintOpenSearchMetadataCacheWriter.scala @@ -5,18 +5,18 @@ package org.opensearch.flint.spark.metadatacache -import java.util +import java.util.{HashMap, Map => JMap} import scala.collection.JavaConverters._ import org.opensearch.client.RequestOptions +import org.opensearch.client.indices.GetIndexRequest import org.opensearch.client.indices.PutMappingRequest import org.opensearch.common.xcontent.XContentType -import org.opensearch.flint.common.metadata.{FlintIndexMetadataService, FlintMetadata} +import org.opensearch.flint.common.metadata.FlintMetadata import org.opensearch.flint.core.{FlintOptions, IRestHighLevelClient} -import org.opensearch.flint.core.metadata.FlintIndexMetadataServiceBuilder import org.opensearch.flint.core.metadata.FlintJsonHelper._ -import org.opensearch.flint.core.storage.{FlintOpenSearchIndexMetadataService, OpenSearchClientUtils} +import org.opensearch.flint.core.storage.OpenSearchClientUtils import org.apache.spark.internal.Logging @@ -27,27 +27,20 @@ class FlintOpenSearchMetadataCacheWriter(options: FlintOptions) extends FlintMetadataCacheWriter with Logging { - /** - * Since metadata cache shares the index mappings _meta field with OpenSearch index metadata - * storage, this flag is to allow for preserving index metadata that is already stored in _meta - * when updating metadata cache. - */ - private val includeSpec: Boolean = - FlintIndexMetadataServiceBuilder - .build(options) - .isInstanceOf[FlintOpenSearchIndexMetadataService] - override def updateMetadataCache(indexName: String, metadata: FlintMetadata): Unit = { - logInfo(s"Updating metadata cache for $indexName with $metadata"); + logInfo(s"Updating metadata cache for $indexName"); val osIndexName = OpenSearchClientUtils.sanitizeIndexName(indexName) var client: IRestHighLevelClient = null try { client = OpenSearchClientUtils.createClient(options) - val request = new PutMappingRequest(osIndexName) - val serialized = serialize(metadata) - logInfo(s"Serialized: $serialized") - request.source(serialized, XContentType.JSON) - client.updateIndexMapping(request, RequestOptions.DEFAULT) + val indexMapping = getIndexMapping(client, osIndexName) + val metadataCacheProperties = FlintMetadataCache(metadata).toMap.asJava + mergeMetadataCacheProperties(indexMapping, metadataCacheProperties) + val serialized = buildJson(builder => { + builder.field("_meta", indexMapping.get("_meta")) + builder.field("properties", indexMapping.get("properties")) + }) + updateIndexMapping(client, osIndexName, serialized) } catch { case e: Exception => throw new IllegalStateException( @@ -59,50 +52,35 @@ class FlintOpenSearchMetadataCacheWriter(options: FlintOptions) } } - /** - * Serialize FlintMetadataCache from FlintMetadata. Modified from {@link - * FlintOpenSearchIndexMetadataService} - */ - private[metadatacache] def serialize(metadata: FlintMetadata): String = { - try { - buildJson(builder => { - objectField(builder, "_meta") { - // If _meta is used as index metadata storage, preserve them. - if (includeSpec) { - builder - .field("version", metadata.version.version) - .field("name", metadata.name) - .field("kind", metadata.kind) - .field("source", metadata.source) - .field("indexedColumns", metadata.indexedColumns) - - if (metadata.latestId.isDefined) { - builder.field("latestId", metadata.latestId.get) - } - optionalObjectField(builder, "options", metadata.options) - } - - optionalObjectField(builder, "properties", buildPropertiesMap(metadata)) - } - builder.field("properties", metadata.schema) - }) - } catch { - case e: Exception => - throw new IllegalStateException("Failed to jsonify cache metadata", e) - } + private[metadatacache] def getIndexMapping( + client: IRestHighLevelClient, + osIndexName: String): JMap[String, AnyRef] = { + val request = new GetIndexRequest(osIndexName) + val response = client.getIndex(request, RequestOptions.DEFAULT) + response.getMappings.get(osIndexName).sourceAsMap() } /** - * Since _meta.properties is shared by both index metadata and metadata cache, here we merge the - * two maps. + * Merge metadata cache properties into index mapping in place. Metadata cache is written into + * _meta.properties field of index mapping. */ - private def buildPropertiesMap(metadata: FlintMetadata): util.Map[String, AnyRef] = { - val metadataCacheProperties = FlintMetadataCache(metadata).toMap + private def mergeMetadataCacheProperties( + indexMapping: JMap[String, AnyRef], + metadataCacheProperties: JMap[String, AnyRef]): Unit = { + indexMapping + .computeIfAbsent("_meta", _ => new HashMap[String, AnyRef]()) + .asInstanceOf[JMap[String, AnyRef]] + .computeIfAbsent("properties", _ => new HashMap[String, AnyRef]()) + .asInstanceOf[JMap[String, AnyRef]] + .putAll(metadataCacheProperties) + } - if (includeSpec) { - (metadataCacheProperties ++ metadata.properties.asScala).asJava - } else { - metadataCacheProperties.asJava - } + private[metadatacache] def updateIndexMapping( + client: IRestHighLevelClient, + osIndexName: String, + mappingSource: String): Unit = { + val request = new PutMappingRequest(osIndexName) + request.source(mappingSource, XContentType.JSON) + client.updateIndexMapping(request, RequestOptions.DEFAULT) } } diff --git a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/metadatacache/FlintMetadataCacheSuite.scala b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/metadatacache/FlintMetadataCacheSuite.scala index 6ec6cf696..971c18857 100644 --- a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/metadatacache/FlintMetadataCacheSuite.scala +++ b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/metadatacache/FlintMetadataCacheSuite.scala @@ -83,11 +83,13 @@ class FlintMetadataCacheSuite extends AnyFlatSpec with Matchers { } it should "construct from materialized view FlintMetadata" in { + val testQuery = + "SELECT 1 FROM spark_catalog.default.test_table UNION SELECT 1 FROM spark_catalog.default.another_table" val content = s""" { | "_meta": { | "kind": "$MV_INDEX_TYPE", - | "source": "spark_catalog.default.wrong_table", + | "source": "$testQuery", | "options": { | "auto_refresh": "true", | "refresh_interval": "10 Minutes" @@ -116,6 +118,7 @@ class FlintMetadataCacheSuite extends AnyFlatSpec with Matchers { metadataCache.sourceTables shouldBe Array( "spark_catalog.default.test_table", "spark_catalog.default.another_table") + metadataCache.sourceQuery.get shouldBe testQuery metadataCache.lastRefreshTime.get shouldBe 1234567890123L } @@ -145,6 +148,7 @@ class FlintMetadataCacheSuite extends AnyFlatSpec with Matchers { metadataCache.metadataCacheVersion shouldBe FlintMetadataCache.metadataCacheVersion metadataCache.refreshInterval shouldBe empty metadataCache.sourceTables shouldBe Array("spark_catalog.default.test_table") + metadataCache.sourceQuery shouldBe empty metadataCache.lastRefreshTime shouldBe empty } } diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/metadatacache/FlintOpenSearchMetadataCacheWriterITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/metadatacache/FlintOpenSearchMetadataCacheWriterITSuite.scala index 5b4dd0208..692a0c2ff 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/spark/metadatacache/FlintOpenSearchMetadataCacheWriterITSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/metadatacache/FlintOpenSearchMetadataCacheWriterITSuite.scala @@ -5,18 +5,17 @@ package org.opensearch.flint.spark.metadatacache -import java.util.{Base64, List} +import java.util.{Base64, Map => JMap} import scala.collection.JavaConverters._ import com.stephenn.scalatest.jsonassert.JsonMatchers.matchJson +import org.json4s.{DefaultFormats, Extraction, JValue} import org.json4s.native.JsonMethods._ -import org.opensearch.flint.common.FlintVersion.current -import org.opensearch.flint.common.metadata.FlintMetadata import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry import org.opensearch.flint.core.FlintOptions -import org.opensearch.flint.core.storage.{FlintOpenSearchClient, FlintOpenSearchIndexMetadataService} -import org.opensearch.flint.spark.{FlintSparkIndexOptions, FlintSparkSuite} +import org.opensearch.flint.core.storage.{FlintOpenSearchClient, FlintOpenSearchIndexMetadataService, OpenSearchClientUtils} +import org.opensearch.flint.spark.{FlintSpark, FlintSparkIndexOptions, FlintSparkSuite} import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex.COVERING_INDEX_TYPE import org.opensearch.flint.spark.mv.FlintSparkMaterializedView import org.opensearch.flint.spark.mv.FlintSparkMaterializedView.MV_INDEX_TYPE @@ -69,63 +68,24 @@ class FlintOpenSearchMetadataCacheWriterITSuite extends FlintSparkSuite with Mat } test("build opensearch metadata cache writer") { - setFlintSparkConf(FlintSparkConf.METADATA_CACHE_WRITE, "true") withMetadataCacheWriteEnabled { FlintMetadataCacheWriterBuilder .build(FlintSparkConf()) shouldBe a[FlintOpenSearchMetadataCacheWriter] } } - test("serialize metadata cache to JSON") { - val expectedMetadataJson: String = s""" - | { - | "_meta": { - | "version": "${current()}", - | "name": "$testFlintIndex", - | "kind": "test_kind", - | "source": "$testTable", - | "indexedColumns": [ - | { - | "test_field": "spark_type" - | }], - | "options": { - | "auto_refresh": "true", - | "refresh_interval": "10 Minutes" - | }, - | "properties": { - | "metadataCacheVersion": "${FlintMetadataCache.metadataCacheVersion}", - | "refreshInterval": 600, - | "sourceTables": ["$testTable"], - | "lastRefreshTime": $testLastRefreshCompleteTime - | }, - | "latestId": "$testLatestId" - | }, - | "properties": { - | "test_field": { - | "type": "os_type" - | } - | } - | } - |""".stripMargin - val builder = new FlintMetadata.Builder - builder.name(testFlintIndex) - builder.kind("test_kind") - builder.source(testTable) - builder.addIndexedColumn(Map[String, AnyRef]("test_field" -> "spark_type").asJava) - builder.options( - Map("auto_refresh" -> "true", "refresh_interval" -> "10 Minutes") - .mapValues(_.asInstanceOf[AnyRef]) - .asJava) - builder.schema(Map[String, AnyRef]("test_field" -> Map("type" -> "os_type").asJava).asJava) - builder.latestLogEntry(flintMetadataLogEntry) - - val metadata = builder.build() - flintMetadataCacheWriter.serialize(metadata) should matchJson(expectedMetadataJson) - } - test("write metadata cache to index mappings") { + val content = + s""" { + | "properties": { + | "age": { + | "type": "integer" + | } + | } + | } + |""".stripMargin val metadata = FlintOpenSearchIndexMetadataService - .deserialize("{}") + .deserialize(content) .copy(latestLogEntry = Some(flintMetadataLogEntry)) flintClient.createIndex(testFlintIndex, metadata) flintMetadataCacheWriter.updateMetadataCache(testFlintIndex, metadata) @@ -139,7 +99,7 @@ class FlintOpenSearchMetadataCacheWriterITSuite extends FlintSparkSuite with Mat } Seq(SKIPPING_INDEX_TYPE, COVERING_INDEX_TYPE).foreach { case kind => - test(s"write metadata cache to $kind index mappings with source tables") { + test(s"write metadata cache to $kind index mappings with source tables for non mv index") { val content = s""" { | "_meta": { @@ -164,10 +124,11 @@ class FlintOpenSearchMetadataCacheWriterITSuite extends FlintSparkSuite with Mat .get("sourceTables") .asInstanceOf[java.util.ArrayList[String]] should contain theSameElementsAs Array( testTable) + properties should not contain key("sourceQuery") } } - test("write metadata cache with source tables from index metadata") { + test("write metadata cache with source tables and query from mv index metadata") { val mv = FlintSparkMaterializedView( "spark_catalog.default.mv", s"SELECT 1 FROM $testTable", @@ -182,9 +143,12 @@ class FlintOpenSearchMetadataCacheWriterITSuite extends FlintSparkSuite with Mat properties .get("sourceTables") .asInstanceOf[java.util.ArrayList[String]] should contain theSameElementsAs Array(testTable) + properties + .get("sourceQuery") + .asInstanceOf[String] shouldBe s"SELECT 1 FROM $testTable" } - test("write metadata cache with source tables from deserialized metadata") { + test("write metadata cache with source tables and query from deserialized mv metadata") { val testTable2 = "spark_catalog.default.metadatacache_test2" val content = s""" { @@ -272,31 +236,39 @@ class FlintOpenSearchMetadataCacheWriterITSuite extends FlintSparkSuite with Mat flintMetadataCacheWriter.updateMetadataCache(testFlintIndex, metadata) val properties = flintIndexMetadataService.getIndexMetadata(testFlintIndex).properties - properties should have size 3 - properties should contain allOf (Entry( - "metadataCacheVersion", - FlintMetadataCache.metadataCacheVersion), - Entry("lastRefreshTime", testLastRefreshCompleteTime)) + properties should not contain key("refreshInterval") } test("exclude last refresh time in metadata cache when index has not been refreshed") { + val content = + s""" { + | "properties": { + | "age": { + | "type": "integer" + | } + | } + | } + |""".stripMargin val metadata = FlintOpenSearchIndexMetadataService - .deserialize("{}") + .deserialize(content) .copy(latestLogEntry = Some(flintMetadataLogEntry.copy(lastRefreshCompleteTime = 0L))) flintClient.createIndex(testFlintIndex, metadata) flintMetadataCacheWriter.updateMetadataCache(testFlintIndex, metadata) val properties = flintIndexMetadataService.getIndexMetadata(testFlintIndex).properties - properties should have size 2 - properties should contain( - Entry("metadataCacheVersion", FlintMetadataCache.metadataCacheVersion)) + properties should not contain key("lastRefreshTime") } test("write metadata cache to index mappings and preserve other index metadata") { val content = """ { | "_meta": { - | "kind": "test_kind" + | "kind": "test_kind", + | "name": "test_name", + | "custom": "test_custom", + | "properties": { + | "custom_in_properties": "test_custom" + | } | }, | "properties": { | "age": { @@ -311,48 +283,31 @@ class FlintOpenSearchMetadataCacheWriterITSuite extends FlintSparkSuite with Mat .copy(latestLogEntry = Some(flintMetadataLogEntry)) flintClient.createIndex(testFlintIndex, metadata) - flintIndexMetadataService.updateIndexMetadata(testFlintIndex, metadata) - flintMetadataCacheWriter.updateMetadataCache(testFlintIndex, metadata) + // Simulates index mapping updated by custom implementation of FlintIndexMetadataService + // with the extra "custom" field. + val client = OpenSearchClientUtils.createClient(options) + flintMetadataCacheWriter.updateIndexMapping(client, testFlintIndex, content) - flintIndexMetadataService.getIndexMetadata(testFlintIndex).kind shouldBe "test_kind" - flintIndexMetadataService.getIndexMetadata(testFlintIndex).name shouldBe empty - flintIndexMetadataService.getIndexMetadata(testFlintIndex).schema should have size 1 - var properties = flintIndexMetadataService.getIndexMetadata(testFlintIndex).properties - properties should have size 3 - properties should contain allOf (Entry( - "metadataCacheVersion", - FlintMetadataCache.metadataCacheVersion), - Entry("lastRefreshTime", testLastRefreshCompleteTime)) - - val newContent = - """ { - | "_meta": { - | "kind": "test_kind", - | "name": "test_name" - | }, - | "properties": { - | "age": { - | "type": "integer" - | } - | } - | } - |""".stripMargin - - val newMetadata = FlintOpenSearchIndexMetadataService - .deserialize(newContent) - .copy(latestLogEntry = Some(flintMetadataLogEntry)) - flintIndexMetadataService.updateIndexMetadata(testFlintIndex, newMetadata) - flintMetadataCacheWriter.updateMetadataCache(testFlintIndex, newMetadata) + flintMetadataCacheWriter.updateMetadataCache(testFlintIndex, metadata) flintIndexMetadataService.getIndexMetadata(testFlintIndex).kind shouldBe "test_kind" flintIndexMetadataService.getIndexMetadata(testFlintIndex).name shouldBe "test_name" flintIndexMetadataService.getIndexMetadata(testFlintIndex).schema should have size 1 - properties = flintIndexMetadataService.getIndexMetadata(testFlintIndex).properties - properties should have size 3 + val properties = flintIndexMetadataService.getIndexMetadata(testFlintIndex).properties + properties should have size 4 properties should contain allOf (Entry( "metadataCacheVersion", FlintMetadataCache.metadataCacheVersion), - Entry("lastRefreshTime", testLastRefreshCompleteTime)) + Entry("lastRefreshTime", testLastRefreshCompleteTime), Entry( + "custom_in_properties", + "test_custom")) + + // Directly get the index mapping and verify custom field is preserved + flintMetadataCacheWriter + .getIndexMapping(client, testFlintIndex) + .get("_meta") + .asInstanceOf[JMap[String, AnyRef]] + .get("custom") shouldBe "test_custom" } Seq( @@ -391,6 +346,7 @@ class FlintOpenSearchMetadataCacheWriterITSuite extends FlintSparkSuite with Mat test(s"write metadata cache for $refreshMode") { withExternalSchedulerEnabled { withMetadataCacheWriteEnabled { + val flint: FlintSpark = new FlintSpark(spark) withTempDir { checkpointDir => // update checkpoint_location if available in optionsMap val indexOptions = FlintSparkIndexOptions( @@ -407,25 +363,12 @@ class FlintOpenSearchMetadataCacheWriterITSuite extends FlintSparkSuite with Mat .options(indexOptions, testFlintIndex) .create() - var index = flint.describeIndex(testFlintIndex) - index shouldBe defined - val propertiesJson = - compact( - render( - parse( - flintMetadataCacheWriter.serialize( - index.get.metadata())) \ "_meta" \ "properties")) + val propertiesJson = compact(render(getPropertiesJValue(testFlintIndex))) propertiesJson should matchJson(expectedJson) flint.refreshIndex(testFlintIndex) - index = flint.describeIndex(testFlintIndex) - index shouldBe defined val lastRefreshTime = - compact( - render( - parse( - flintMetadataCacheWriter.serialize( - index.get.metadata())) \ "_meta" \ "properties" \ "lastRefreshTime")).toLong + compact(render(getPropertiesJValue(testFlintIndex) \ "lastRefreshTime")).toLong lastRefreshTime should be > 0L } } @@ -435,6 +378,7 @@ class FlintOpenSearchMetadataCacheWriterITSuite extends FlintSparkSuite with Mat test("write metadata cache for auto refresh index with internal scheduler") { withMetadataCacheWriteEnabled { + val flint: FlintSpark = new FlintSpark(spark) withTempDir { checkpointDir => flint .skippingIndex() @@ -450,12 +394,7 @@ class FlintOpenSearchMetadataCacheWriterITSuite extends FlintSparkSuite with Mat testFlintIndex) .create() - var index = flint.describeIndex(testFlintIndex) - index shouldBe defined - val propertiesJson = - compact( - render(parse( - flintMetadataCacheWriter.serialize(index.get.metadata())) \ "_meta" \ "properties")) + val propertiesJson = compact(render(getPropertiesJValue(testFlintIndex))) propertiesJson should matchJson(s""" | { | "metadataCacheVersion": "${FlintMetadataCache.metadataCacheVersion}", @@ -465,12 +404,15 @@ class FlintOpenSearchMetadataCacheWriterITSuite extends FlintSparkSuite with Mat |""".stripMargin) flint.refreshIndex(testFlintIndex) - index = flint.describeIndex(testFlintIndex) - index shouldBe defined - compact(render(parse( - flintMetadataCacheWriter.serialize( - index.get.metadata())) \ "_meta" \ "properties")) should not include "lastRefreshTime" + compact(render(getPropertiesJValue(testFlintIndex))) should not include "lastRefreshTime" } } } + + private def getPropertiesJValue(indexName: String): JValue = { + // Convert to scala map because json4s converts java.util.Map into an empty JObject + // https://github.com/json4s/json4s/issues/392 + val properties = flintIndexMetadataService.getIndexMetadata(indexName).properties.asScala + Extraction.decompose(properties)(DefaultFormats) + } }