From efd2207ceb2064fa01fd6c996421d08bce8a724e Mon Sep 17 00:00:00 2001 From: Sean Kao Date: Fri, 8 Nov 2024 14:53:57 -0800 Subject: [PATCH] add test cases Signed-off-by: Sean Kao --- .../spark/mv/FlintSparkMaterializedView.scala | 2 +- .../FlintSparkMaterializedViewITSuite.scala | 66 ++++++++++++++++++- ...OpenSearchMetadataCacheWriterITSuite.scala | 21 +++++- 3 files changed, 86 insertions(+), 3 deletions(-) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedView.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedView.scala index 549ed049c..b8b44ea15 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedView.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedView.scala @@ -194,7 +194,7 @@ object FlintSparkMaterializedView extends Logging { } /** - * Get source tables from Flint metadata properties field. TODO: test case + * Get source tables from Flint metadata properties field. * * @param metadata * Flint metadata diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala index 710d5e2a3..efc6ff69c 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala @@ -18,7 +18,7 @@ import org.opensearch.flint.core.FlintOptions import org.opensearch.flint.core.storage.{FlintOpenSearchIndexMetadataService, OpenSearchClientUtils} import org.opensearch.flint.spark.FlintSparkIndex.quotedTableName import org.opensearch.flint.spark.mv.FlintSparkMaterializedView -import org.opensearch.flint.spark.mv.FlintSparkMaterializedView.{extractSourceTablesFromQuery, getFlintIndexName} +import org.opensearch.flint.spark.mv.FlintSparkMaterializedView.{extractSourceTablesFromQuery, getFlintIndexName, getSourceTablesFromMetadata, MV_INDEX_TYPE} import org.opensearch.flint.spark.scheduler.OpenSearchAsyncQueryScheduler import org.scalatest.matchers.must.Matchers._ import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper @@ -75,6 +75,70 @@ class FlintSparkMaterializedViewITSuite extends FlintSparkSuite { extractSourceTablesFromQuery(flint.spark, "SELECT 1") should have size 0 } + test("get source table names from index metadata successfully") { + val mv = FlintSparkMaterializedView( + "spark_catalog.default.mv", + s"SELECT 1 FROM $testTable", + Array(testTable), + Map("1" -> "integer")) + val metadata = mv.metadata() + metadata.properties.get("sourceTables") shouldBe a[Array[String]] + getSourceTablesFromMetadata(metadata) should contain theSameElementsAs Array(testTable) + } + + test("get source table names from deserialized metadata successfully") { + val metadata = FlintOpenSearchIndexMetadataService.deserialize(s""" { + | "_meta": { + | "kind": "$MV_INDEX_TYPE", + | "properties": { + | "sourceTables": [ + | "$testTable" + | ] + | } + | }, + | "properties": { + | "age": { + | "type": "integer" + | } + | } + | } + |""".stripMargin) + metadata.properties.get("sourceTables") shouldBe a[java.util.ArrayList[String]] + getSourceTablesFromMetadata(metadata) should contain theSameElementsAs Array(testTable) + } + + test("get empty source tables from invalid field in metadata") { + val metadataWrongType = FlintOpenSearchIndexMetadataService.deserialize(s""" { + | "_meta": { + | "kind": "$MV_INDEX_TYPE", + | "properties": { + | "sourceTables": "$testTable" + | } + | }, + | "properties": { + | "age": { + | "type": "integer" + | } + | } + | } + |""".stripMargin) + val metadataMissingField = FlintOpenSearchIndexMetadataService.deserialize(s""" { + | "_meta": { + | "kind": "$MV_INDEX_TYPE", + | "properties": { } + | }, + | "properties": { + | "age": { + | "type": "integer" + | } + | } + | } + |""".stripMargin) + + getSourceTablesFromMetadata(metadataWrongType) shouldBe empty + getSourceTablesFromMetadata(metadataMissingField) shouldBe empty + } + test("create materialized view with metadata successfully") { withTempDir { checkpointDir => val indexOptions = diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/metadatacache/FlintOpenSearchMetadataCacheWriterITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/metadatacache/FlintOpenSearchMetadataCacheWriterITSuite.scala index c0d253fd3..c8bfac216 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/spark/metadatacache/FlintOpenSearchMetadataCacheWriterITSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/metadatacache/FlintOpenSearchMetadataCacheWriterITSuite.scala @@ -18,6 +18,7 @@ import org.opensearch.flint.core.FlintOptions import org.opensearch.flint.core.storage.{FlintOpenSearchClient, FlintOpenSearchIndexMetadataService} import org.opensearch.flint.spark.{FlintSparkIndexOptions, FlintSparkSuite} import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex.COVERING_INDEX_TYPE +import org.opensearch.flint.spark.mv.FlintSparkMaterializedView import org.opensearch.flint.spark.mv.FlintSparkMaterializedView.MV_INDEX_TYPE import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.{getSkippingIndexName, SKIPPING_INDEX_TYPE} import org.scalatest.Entry @@ -166,7 +167,25 @@ class FlintOpenSearchMetadataCacheWriterITSuite extends FlintSparkSuite with Mat } } - test(s"write metadata cache to materialized view index mappings with source tables") { + test("write metadata cache with source tables from index metadata") { + val mv = FlintSparkMaterializedView( + "spark_catalog.default.mv", + s"SELECT 1 FROM $testTable", + Array(testTable), + Map("1" -> "integer")) + val metadata = mv.metadata().copy(latestLogEntry = Some(flintMetadataLogEntry)) + + flintClient.createIndex(testFlintIndex, metadata) + flintMetadataCacheWriter.updateMetadataCache(testFlintIndex, metadata) + + val properties = flintIndexMetadataService.getIndexMetadata(testFlintIndex).properties + properties + .get("sourceTables") + .asInstanceOf[List[String]] + .toArray should contain theSameElementsAs Array(testTable) + } + + test("write metadata cache with source tables from deserialized metadata") { val testTable2 = "spark_catalog.default.metadatacache_test2" val content = s""" {