diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala index ae2e53090..bf5e6309e 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala @@ -523,5 +523,45 @@ class FlintSparkMaterializedViewSqlITSuite extends FlintSparkSuite { } } + test("create materialized view with decimal and map types") { + val decimalAndMapTable = s"$catalogName.default.mv_test_decimal_map" + val decimalAndMapMv = s"$catalogName.default.mv_test_decimal_map_ser" + withTable(decimalAndMapTable) { + createMapAndDecimalTimeSeriesTable(decimalAndMapTable) + + withTempDir { checkpointDir => + sql(s""" + | CREATE MATERIALIZED VIEW $decimalAndMapMv + | AS + | SELECT + | base_score, mymap + | FROM $decimalAndMapTable + | WITH ( + | auto_refresh = true, + | checkpoint_location = '${checkpointDir.getAbsolutePath}' + | ) + |""".stripMargin) + + // Wait for streaming job complete current micro batch + val flintIndex = getFlintIndexName(decimalAndMapMv) + val job = spark.streams.active.find(_.name == flintIndex) + job shouldBe defined + failAfter(streamingTimeout) { + job.get.processAllAvailable() + } + + flint.describeIndex(flintIndex) shouldBe defined + checkAnswer( + flint.queryIndex(flintIndex).select("base_score", "mymap"), + Seq( + Row(3.1415926, Row(null, null, null, null, "mapvalue1")), + Row(4.1415926, Row("mapvalue2", null, null, null, null)), + Row(5.1415926, Row(null, null, "mapvalue3", null, null)), + Row(6.1415926, Row(null, null, null, "mapvalue4", null)), + Row(7.1415926, Row(null, "mapvalue5", null, null, null)))) + } + } + } + private def timestamp(ts: String): Timestamp = Timestamp.valueOf(ts) } diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkSuite.scala index cafab008c..7c19cab12 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkSuite.scala @@ -445,7 +445,10 @@ trait FlintSparkSuite extends QueryTest with FlintSuite with OpenSearchSuite wit sql(s"INSERT INTO $testTable VALUES (TIMESTAMP '2023-10-01 03:00:00', 'E', 15, 'Vancouver')") } - protected def createIcebergTimeSeriesTable(testTable: String): Unit = { + protected def createMapAndDecimalTimeSeriesTable(testTable: String): Unit = { + // CSV tables do not support MAP types so we use JSON instead + val finalTableType = if (tableType == "CSV") "JSON" else tableType + sql(s""" | CREATE TABLE $testTable | ( @@ -455,7 +458,7 @@ trait FlintSparkSuite extends QueryTest with FlintSuite with OpenSearchSuite wit | base_score DECIMAL(8, 7), | mymap MAP | ) - | USING ICEBERG + | USING $finalTableType $tableOptions |""".stripMargin) sql( diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/iceberg/FlintSparkIcebergMaterializedViewITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/iceberg/FlintSparkIcebergMaterializedViewITSuite.scala index 582f49260..ffb8a7d1b 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/spark/iceberg/FlintSparkIcebergMaterializedViewITSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/iceberg/FlintSparkIcebergMaterializedViewITSuite.scala @@ -6,67 +6,7 @@ package org.opensearch.flint.spark.iceberg import org.opensearch.flint.spark.FlintSparkMaterializedViewSqlITSuite -import org.opensearch.flint.spark.mv.FlintSparkMaterializedView.getFlintIndexName -import org.scalatest.matchers.must.Matchers.defined -import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper - -import org.apache.spark.sql.Row -import org.apache.spark.sql.flint.config.FlintSparkConf class FlintSparkIcebergMaterializedViewITSuite extends FlintSparkMaterializedViewSqlITSuite - with FlintSparkIcebergSuite { - - private val testTable = s"$catalogName.default.mv_test_iceberg" - private val testMvName = s"$catalogName.default.mv_test_iceberg_metrics" - private val testFlintIndex = getFlintIndexName(testMvName) - private val testQuery = - s""" - | SELECT - | base_score, mymap - | FROM $testTable - |""".stripMargin - - override def beforeAll(): Unit = { - super.beforeAll() - createIcebergTimeSeriesTable(testTable) - } - - override def afterAll(): Unit = { - deleteTestIndex(testFlintIndex) - sql(s"DROP TABLE $testTable") - conf.unsetConf(FlintSparkConf.CUSTOM_FLINT_SCHEDULER_CLASS.key) - conf.unsetConf(FlintSparkConf.EXTERNAL_SCHEDULER_ENABLED.key) - super.afterAll() - } - - test("create materialized view with decimal and map types") { - withTempDir { checkpointDir => - sql(s""" - | CREATE MATERIALIZED VIEW $testMvName - | AS $testQuery - | WITH ( - | auto_refresh = true, - | checkpoint_location = '${checkpointDir.getAbsolutePath}' - | ) - |""".stripMargin) - - // Wait for streaming job complete current micro batch - val job = spark.streams.active.find(_.name == testFlintIndex) - job shouldBe defined - failAfter(streamingTimeout) { - job.get.processAllAvailable() - } - - flint.describeIndex(testFlintIndex) shouldBe defined - checkAnswer( - flint.queryIndex(testFlintIndex).select("base_score", "mymap"), - Seq( - Row(3.1415926, Row(null, null, null, null, "mapvalue1")), - Row(4.1415926, Row("mapvalue2", null, null, null, null)), - Row(5.1415926, Row(null, null, "mapvalue3", null, null)), - Row(6.1415926, Row(null, null, null, "mapvalue4", null)), - Row(7.1415926, Row(null, "mapvalue5", null, null, null)))) - } - } -} + with FlintSparkIcebergSuite {}