diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexFactory.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexFactory.scala index ca659550d..3a12b63fe 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexFactory.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexFactory.scala @@ -14,7 +14,7 @@ import org.opensearch.flint.common.metadata.FlintMetadata import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex.COVERING_INDEX_TYPE import org.opensearch.flint.spark.mv.FlintSparkMaterializedView -import org.opensearch.flint.spark.mv.FlintSparkMaterializedView.MV_INDEX_TYPE +import org.opensearch.flint.spark.mv.FlintSparkMaterializedView.{getSourceTablesFromMetadata, MV_INDEX_TYPE} import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.SKIPPING_INDEX_TYPE import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind @@ -141,9 +141,9 @@ object FlintSparkIndexFactory extends Logging { } private def getMvSourceTables(spark: SparkSession, metadata: FlintMetadata): Array[String] = { - val sourceTables = getArrayString(metadata.properties, "sourceTables") + val sourceTables = getSourceTablesFromMetadata(metadata) if (sourceTables.isEmpty) { - FlintSparkMaterializedView.extractSourceTableNames(spark, metadata.source) + FlintSparkMaterializedView.extractSourceTablesFromQuery(spark, metadata.source) } else { sourceTables } @@ -161,12 +161,4 @@ object FlintSparkIndexFactory extends Logging { Some(value.asInstanceOf[String]) } } - - private def getArrayString(map: java.util.Map[String, AnyRef], key: String): Array[String] = { - map.get(key) match { - case list: java.util.ArrayList[_] => - list.toArray.map(_.toString) - case _ => Array.empty[String] - } - } } diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/metadatacache/FlintMetadataCache.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/metadatacache/FlintMetadataCache.scala index e1c0f318c..86267c881 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/metadatacache/FlintMetadataCache.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/metadatacache/FlintMetadataCache.scala @@ -10,7 +10,7 @@ import scala.collection.JavaConverters.mapAsScalaMapConverter import org.opensearch.flint.common.metadata.FlintMetadata import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry import org.opensearch.flint.spark.FlintSparkIndexOptions -import org.opensearch.flint.spark.mv.FlintSparkMaterializedView.MV_INDEX_TYPE +import org.opensearch.flint.spark.mv.FlintSparkMaterializedView.{getSourceTablesFromMetadata, MV_INDEX_TYPE} import org.opensearch.flint.spark.scheduler.util.IntervalSchedulerParser /** @@ -61,12 +61,7 @@ object FlintMetadataCache { None } val sourceTables = metadata.kind match { - case MV_INDEX_TYPE => - metadata.properties.get("sourceTables") match { - case list: java.util.ArrayList[_] => - list.toArray.map(_.toString) - case _ => Array.empty[String] - } + case MV_INDEX_TYPE => getSourceTablesFromMetadata(metadata) case _ => Array(metadata.source) } val lastRefreshTime: Option[Long] = metadata.latestLogEntry.flatMap { entry => diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/metadatacache/FlintOpenSearchMetadataCacheWriter.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/metadatacache/FlintOpenSearchMetadataCacheWriter.scala index 2bc373792..f6fc0ba6f 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/metadatacache/FlintOpenSearchMetadataCacheWriter.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/metadatacache/FlintOpenSearchMetadataCacheWriter.scala @@ -38,13 +38,15 @@ class FlintOpenSearchMetadataCacheWriter(options: FlintOptions) .isInstanceOf[FlintOpenSearchIndexMetadataService] override def updateMetadataCache(indexName: String, metadata: FlintMetadata): Unit = { - logInfo(s"Updating metadata cache for $indexName"); + logInfo(s"Updating metadata cache for $indexName with $metadata"); val osIndexName = OpenSearchClientUtils.sanitizeIndexName(indexName) var client: IRestHighLevelClient = null try { client = OpenSearchClientUtils.createClient(options) val request = new PutMappingRequest(osIndexName) - request.source(serialize(metadata), XContentType.JSON) + val serialized = serialize(metadata) + logInfo(s"Serialized: $serialized") + request.source(serialized, XContentType.JSON) client.updateIndexMapping(request, RequestOptions.DEFAULT) } catch { case e: Exception => diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedView.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedView.scala index aecfc99df..81f996d83 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedView.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedView.scala @@ -7,7 +7,7 @@ package org.opensearch.flint.spark.mv import java.util.Locale -import scala.collection.JavaConverters.mapAsJavaMapConverter +import scala.collection.JavaConverters._ import scala.collection.convert.ImplicitConversions.`map AsScala` import org.opensearch.flint.common.metadata.FlintMetadata @@ -18,6 +18,7 @@ import org.opensearch.flint.spark.FlintSparkIndexOptions.empty import org.opensearch.flint.spark.function.TumbleFunction import org.opensearch.flint.spark.mv.FlintSparkMaterializedView.{getFlintIndexName, MV_INDEX_TYPE} +import org.apache.spark.internal.Logging import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.sql.catalyst.FunctionIdentifier import org.apache.spark.sql.catalyst.analysis.{UnresolvedFunction, UnresolvedRelation} @@ -64,10 +65,14 @@ case class FlintSparkMaterializedView( }.toArray val schema = generateSchema(outputSchema).asJava + // Convert Scala Array to Java ArrayList for consistency with OpenSearch JSON parsing. + // OpenSearch uses Jackson, which deserializes JSON arrays into ArrayLists. + val sourceTablesProperty = new java.util.ArrayList[String](sourceTables.toSeq.asJava) + metadataBuilder(this) .name(mvName) .source(query) - .addProperty("sourceTables", sourceTables) + .addProperty("sourceTables", sourceTablesProperty) .indexedColumns(indexColumnMaps) .schema(schema) .build() @@ -147,7 +152,7 @@ case class FlintSparkMaterializedView( } } -object FlintSparkMaterializedView { +object FlintSparkMaterializedView extends Logging { /** MV index type name */ val MV_INDEX_TYPE = "mv" @@ -179,13 +184,40 @@ object FlintSparkMaterializedView { * @return * source table names */ - def extractSourceTableNames(spark: SparkSession, query: String): Array[String] = { - spark.sessionState.sqlParser + def extractSourceTablesFromQuery(spark: SparkSession, query: String): Array[String] = { + logInfo(s"Extracting source tables from query $query") + val sourceTables = spark.sessionState.sqlParser .parsePlan(query) .collect { case relation: UnresolvedRelation => qualifyTableName(spark, relation.tableName) } .toArray + logInfo(s"Extracted tables: [${sourceTables.mkString(", ")}]") + sourceTables + } + + /** + * Get source tables from Flint metadata properties field. + * + * @param metadata + * Flint metadata + * @return + * source table names + */ + def getSourceTablesFromMetadata(metadata: FlintMetadata): Array[String] = { + logInfo(s"Getting source tables from metadata $metadata") + val sourceTables = metadata.properties.get("sourceTables") + sourceTables match { + case list: java.util.ArrayList[_] => + logInfo(s"sourceTables is [${list.asScala.mkString(", ")}]") + list.toArray.map(_.toString) + case null => + logInfo("sourceTables property does not exist") + Array.empty[String] + case _ => + logInfo(s"sourceTables has unexpected type: ${sourceTables.getClass.getName}") + Array.empty[String] + } } /** Builder class for MV build */ @@ -217,7 +249,7 @@ object FlintSparkMaterializedView { */ def query(query: String): Builder = { this.query = query - this.sourceTables = extractSourceTableNames(flint.spark, query) + this.sourceTables = extractSourceTablesFromQuery(flint.spark, query) this } diff --git a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedViewSuite.scala b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedViewSuite.scala index 1c9a9e83c..78d2eb09e 100644 --- a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedViewSuite.scala +++ b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedViewSuite.scala @@ -64,7 +64,9 @@ class FlintSparkMaterializedViewSuite extends FlintSuite { metadata.kind shouldBe MV_INDEX_TYPE metadata.source shouldBe "SELECT 1" metadata.properties should contain key "sourceTables" - metadata.properties.get("sourceTables").asInstanceOf[Array[String]] should have size 0 + metadata.properties + .get("sourceTables") + .asInstanceOf[java.util.ArrayList[String]] should have size 0 metadata.indexedColumns shouldBe Array( Map("columnName" -> "test_col", "columnType" -> "integer").asJava) metadata.schema shouldBe Map("test_col" -> Map("type" -> "integer").asJava).asJava diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala index fc77faaea..cf0347820 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala @@ -18,7 +18,7 @@ import org.opensearch.flint.core.FlintOptions import org.opensearch.flint.core.storage.{FlintOpenSearchIndexMetadataService, OpenSearchClientUtils} import org.opensearch.flint.spark.FlintSparkIndex.quotedTableName import org.opensearch.flint.spark.mv.FlintSparkMaterializedView -import org.opensearch.flint.spark.mv.FlintSparkMaterializedView.{extractSourceTableNames, getFlintIndexName} +import org.opensearch.flint.spark.mv.FlintSparkMaterializedView.{extractSourceTablesFromQuery, getFlintIndexName, getSourceTablesFromMetadata, MV_INDEX_TYPE} import org.opensearch.flint.spark.scheduler.OpenSearchAsyncQueryScheduler import org.scalatest.matchers.must.Matchers._ import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper @@ -65,14 +65,76 @@ class FlintSparkMaterializedViewITSuite extends FlintSparkSuite { | FROM spark_catalog.default.`table/3` | INNER JOIN spark_catalog.default.`table.4` |""".stripMargin - extractSourceTableNames(flint.spark, testComplexQuery) should contain theSameElementsAs + extractSourceTablesFromQuery(flint.spark, testComplexQuery) should contain theSameElementsAs Array( "spark_catalog.default.table1", "spark_catalog.default.table2", "spark_catalog.default.`table/3`", "spark_catalog.default.`table.4`") - extractSourceTableNames(flint.spark, "SELECT 1") should have size 0 + extractSourceTablesFromQuery(flint.spark, "SELECT 1") should have size 0 + } + + test("get source table names from index metadata successfully") { + val mv = FlintSparkMaterializedView( + "spark_catalog.default.mv", + s"SELECT 1 FROM $testTable", + Array(testTable), + Map("1" -> "integer")) + val metadata = mv.metadata() + getSourceTablesFromMetadata(metadata) should contain theSameElementsAs Array(testTable) + } + + test("get source table names from deserialized metadata successfully") { + val metadata = FlintOpenSearchIndexMetadataService.deserialize(s""" { + | "_meta": { + | "kind": "$MV_INDEX_TYPE", + | "properties": { + | "sourceTables": [ + | "$testTable" + | ] + | } + | }, + | "properties": { + | "age": { + | "type": "integer" + | } + | } + | } + |""".stripMargin) + getSourceTablesFromMetadata(metadata) should contain theSameElementsAs Array(testTable) + } + + test("get empty source tables from invalid field in metadata") { + val metadataWrongType = FlintOpenSearchIndexMetadataService.deserialize(s""" { + | "_meta": { + | "kind": "$MV_INDEX_TYPE", + | "properties": { + | "sourceTables": "$testTable" + | } + | }, + | "properties": { + | "age": { + | "type": "integer" + | } + | } + | } + |""".stripMargin) + val metadataMissingField = FlintOpenSearchIndexMetadataService.deserialize(s""" { + | "_meta": { + | "kind": "$MV_INDEX_TYPE", + | "properties": { } + | }, + | "properties": { + | "age": { + | "type": "integer" + | } + | } + | } + |""".stripMargin) + + getSourceTablesFromMetadata(metadataWrongType) shouldBe empty + getSourceTablesFromMetadata(metadataMissingField) shouldBe empty } test("create materialized view with metadata successfully") { diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/metadatacache/FlintOpenSearchMetadataCacheWriterITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/metadatacache/FlintOpenSearchMetadataCacheWriterITSuite.scala index c0d253fd3..5b4dd0208 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/spark/metadatacache/FlintOpenSearchMetadataCacheWriterITSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/metadatacache/FlintOpenSearchMetadataCacheWriterITSuite.scala @@ -18,6 +18,7 @@ import org.opensearch.flint.core.FlintOptions import org.opensearch.flint.core.storage.{FlintOpenSearchClient, FlintOpenSearchIndexMetadataService} import org.opensearch.flint.spark.{FlintSparkIndexOptions, FlintSparkSuite} import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex.COVERING_INDEX_TYPE +import org.opensearch.flint.spark.mv.FlintSparkMaterializedView import org.opensearch.flint.spark.mv.FlintSparkMaterializedView.MV_INDEX_TYPE import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.{getSkippingIndexName, SKIPPING_INDEX_TYPE} import org.scalatest.Entry @@ -161,12 +162,29 @@ class FlintOpenSearchMetadataCacheWriterITSuite extends FlintSparkSuite with Mat val properties = flintIndexMetadataService.getIndexMetadata(testFlintIndex).properties properties .get("sourceTables") - .asInstanceOf[List[String]] - .toArray should contain theSameElementsAs Array(testTable) + .asInstanceOf[java.util.ArrayList[String]] should contain theSameElementsAs Array( + testTable) } } - test(s"write metadata cache to materialized view index mappings with source tables") { + test("write metadata cache with source tables from index metadata") { + val mv = FlintSparkMaterializedView( + "spark_catalog.default.mv", + s"SELECT 1 FROM $testTable", + Array(testTable), + Map("1" -> "integer")) + val metadata = mv.metadata().copy(latestLogEntry = Some(flintMetadataLogEntry)) + + flintClient.createIndex(testFlintIndex, metadata) + flintMetadataCacheWriter.updateMetadataCache(testFlintIndex, metadata) + + val properties = flintIndexMetadataService.getIndexMetadata(testFlintIndex).properties + properties + .get("sourceTables") + .asInstanceOf[java.util.ArrayList[String]] should contain theSameElementsAs Array(testTable) + } + + test("write metadata cache with source tables from deserialized metadata") { val testTable2 = "spark_catalog.default.metadatacache_test2" val content = s""" { @@ -194,8 +212,9 @@ class FlintOpenSearchMetadataCacheWriterITSuite extends FlintSparkSuite with Mat val properties = flintIndexMetadataService.getIndexMetadata(testFlintIndex).properties properties .get("sourceTables") - .asInstanceOf[List[String]] - .toArray should contain theSameElementsAs Array(testTable, testTable2) + .asInstanceOf[java.util.ArrayList[String]] should contain theSameElementsAs Array( + testTable, + testTable2) } test("write metadata cache to index mappings with refresh interval") {