From caeac1c287639843506b0e32894bb71d10db1618 Mon Sep 17 00:00:00 2001 From: Chase Engelbrecht Date: Mon, 18 Nov 2024 17:45:01 -0800 Subject: [PATCH] Add support for serializing MapType Signed-off-by: Chase Engelbrecht --- .../flint/spark/FlintSparkSuite.scala | 20 ++++++ ...tSparkIcebergMaterializedViewITSuite.scala | 63 ++++++++++++++++++- 2 files changed, 82 insertions(+), 1 deletion(-) diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkSuite.scala index 68d370791..dc2c125a1 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkSuite.scala @@ -445,6 +445,26 @@ trait FlintSparkSuite extends QueryTest with FlintSuite with OpenSearchSuite wit sql(s"INSERT INTO $testTable VALUES (TIMESTAMP '2023-10-01 03:00:00', 'E', 15, 'Vancouver')") } + protected def createIcebergTimeSeriesTable(testTable: String): Unit = { + sql(s""" + | CREATE TABLE $testTable + | ( + | time TIMESTAMP, + | name STRING, + | age INT, + | address STRING, + | mymap MAP + | ) + | USING ICEBERG + |""".stripMargin) + + sql(s"INSERT INTO $testTable VALUES (TIMESTAMP '2023-10-01 00:01:00', 'A', 30, 'Seattle', Map('mapkey1', 'mapvalue1'))") + sql(s"INSERT INTO $testTable VALUES (TIMESTAMP '2023-10-01 00:10:00', 'B', 20, 'Seattle', Map('mapkey2', 'mapvalue2'))") + sql(s"INSERT INTO $testTable VALUES (TIMESTAMP '2023-10-01 00:15:00', 'C', 35, 'Portland', Map('mapkey3', 'mapvalue3'))") + sql(s"INSERT INTO $testTable VALUES (TIMESTAMP '2023-10-01 01:00:00', 'D', 40, 'Portland', Map('mapkey4', 'mapvalue4'))") + sql(s"INSERT INTO $testTable VALUES (TIMESTAMP '2023-10-01 03:00:00', 'E', 15, 'Vancouver', Map('mapkey5', 'mapvalue5'))") + } + protected def createTimeSeriesTransactionTable(testTable: String): Unit = { sql(s""" | CREATE TABLE $testTable diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/iceberg/FlintSparkIcebergMaterializedViewITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/iceberg/FlintSparkIcebergMaterializedViewITSuite.scala index ffb8a7d1b..9a2cea973 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/spark/iceberg/FlintSparkIcebergMaterializedViewITSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/iceberg/FlintSparkIcebergMaterializedViewITSuite.scala @@ -6,7 +6,68 @@ package org.opensearch.flint.spark.iceberg import org.opensearch.flint.spark.FlintSparkMaterializedViewSqlITSuite +import org.opensearch.flint.spark.mv.FlintSparkMaterializedView.getFlintIndexName +import org.scalatest.matchers.must.Matchers.defined +import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper + +import org.apache.spark.sql.Row +import org.apache.spark.sql.flint.config.FlintSparkConf class FlintSparkIcebergMaterializedViewITSuite extends FlintSparkMaterializedViewSqlITSuite - with FlintSparkIcebergSuite {} + with FlintSparkIcebergSuite{ + + private val testTable = s"$catalogName.default.mv_test_iceberg" + private val testMvName = s"$catalogName.default.mv_test_iceberg_metrics" + private val testFlintIndex = getFlintIndexName(testMvName) + private val testQuery = + s""" + | SELECT + | mymap + | FROM $testTable + |""".stripMargin + + override def beforeEach(): Unit = { + super.beforeAll() + createIcebergTimeSeriesTable(testTable) + } + + override def afterEach(): Unit = { + super.afterEach() + deleteTestIndex(testFlintIndex) + sql(s"DROP TABLE $testTable") + conf.unsetConf(FlintSparkConf.CUSTOM_FLINT_SCHEDULER_CLASS.key) + conf.unsetConf(FlintSparkConf.EXTERNAL_SCHEDULER_ENABLED.key) + } + + test("create materialized view with map type") { + withTempDir { checkpointDir => + sql(s""" + | CREATE MATERIALIZED VIEW $testMvName + | AS $testQuery + | WITH ( + | auto_refresh = true, + | checkpoint_location = '${checkpointDir.getAbsolutePath}' + | ) + |""".stripMargin) + + // Wait for streaming job complete current micro batch + val job = spark.streams.active.find(_.name == testFlintIndex) + job shouldBe defined + failAfter(streamingTimeout) { + job.get.processAllAvailable() + } + + flint.describeIndex(testFlintIndex) shouldBe defined + checkAnswer( + flint.queryIndex(testFlintIndex).select("mymap"), + Seq( + Row(Row("mapvalue2", null, null, null, null)), + Row(Row(null, "mapvalue5", null, null, null)), + Row(Row(null, null, "mapvalue3", null, null)), + Row(Row(null, null, null, "mapvalue4", null)), + Row(Row(null, null, null, null, "mapvalue1")) + )) + } + } +}