Skip to content

Commit

Permalink
Add support for serializing MapType
Browse files Browse the repository at this point in the history
Signed-off-by: Chase Engelbrecht <[email protected]>
  • Loading branch information
engechas committed Nov 19, 2024
1 parent 6bc5533 commit caeac1c
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,26 @@ trait FlintSparkSuite extends QueryTest with FlintSuite with OpenSearchSuite wit
sql(s"INSERT INTO $testTable VALUES (TIMESTAMP '2023-10-01 03:00:00', 'E', 15, 'Vancouver')")
}

protected def createIcebergTimeSeriesTable(testTable: String): Unit = {
sql(s"""
| CREATE TABLE $testTable
| (
| time TIMESTAMP,
| name STRING,
| age INT,
| address STRING,
| mymap MAP<STRING, STRING>
| )
| USING ICEBERG
|""".stripMargin)

sql(s"INSERT INTO $testTable VALUES (TIMESTAMP '2023-10-01 00:01:00', 'A', 30, 'Seattle', Map('mapkey1', 'mapvalue1'))")
sql(s"INSERT INTO $testTable VALUES (TIMESTAMP '2023-10-01 00:10:00', 'B', 20, 'Seattle', Map('mapkey2', 'mapvalue2'))")
sql(s"INSERT INTO $testTable VALUES (TIMESTAMP '2023-10-01 00:15:00', 'C', 35, 'Portland', Map('mapkey3', 'mapvalue3'))")
sql(s"INSERT INTO $testTable VALUES (TIMESTAMP '2023-10-01 01:00:00', 'D', 40, 'Portland', Map('mapkey4', 'mapvalue4'))")
sql(s"INSERT INTO $testTable VALUES (TIMESTAMP '2023-10-01 03:00:00', 'E', 15, 'Vancouver', Map('mapkey5', 'mapvalue5'))")
}

protected def createTimeSeriesTransactionTable(testTable: String): Unit = {
sql(s"""
| CREATE TABLE $testTable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,68 @@
package org.opensearch.flint.spark.iceberg

import org.opensearch.flint.spark.FlintSparkMaterializedViewSqlITSuite
import org.opensearch.flint.spark.mv.FlintSparkMaterializedView.getFlintIndexName
import org.scalatest.matchers.must.Matchers.defined
import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper

import org.apache.spark.sql.Row
import org.apache.spark.sql.flint.config.FlintSparkConf

class FlintSparkIcebergMaterializedViewITSuite
extends FlintSparkMaterializedViewSqlITSuite
with FlintSparkIcebergSuite {}
with FlintSparkIcebergSuite{

private val testTable = s"$catalogName.default.mv_test_iceberg"
private val testMvName = s"$catalogName.default.mv_test_iceberg_metrics"
private val testFlintIndex = getFlintIndexName(testMvName)
private val testQuery =
s"""
| SELECT
| mymap
| FROM $testTable
|""".stripMargin

override def beforeEach(): Unit = {
super.beforeAll()
createIcebergTimeSeriesTable(testTable)
}

override def afterEach(): Unit = {
super.afterEach()
deleteTestIndex(testFlintIndex)
sql(s"DROP TABLE $testTable")
conf.unsetConf(FlintSparkConf.CUSTOM_FLINT_SCHEDULER_CLASS.key)
conf.unsetConf(FlintSparkConf.EXTERNAL_SCHEDULER_ENABLED.key)
}

test("create materialized view with map type") {
withTempDir { checkpointDir =>
sql(s"""
| CREATE MATERIALIZED VIEW $testMvName
| AS $testQuery
| WITH (
| auto_refresh = true,
| checkpoint_location = '${checkpointDir.getAbsolutePath}'
| )
|""".stripMargin)

// Wait for streaming job complete current micro batch
val job = spark.streams.active.find(_.name == testFlintIndex)
job shouldBe defined
failAfter(streamingTimeout) {
job.get.processAllAvailable()
}

flint.describeIndex(testFlintIndex) shouldBe defined
checkAnswer(
flint.queryIndex(testFlintIndex).select("mymap"),
Seq(
Row(Row("mapvalue2", null, null, null, null)),
Row(Row(null, "mapvalue5", null, null, null)),
Row(Row(null, null, "mapvalue3", null, null)),
Row(Row(null, null, null, "mapvalue4", null)),
Row(Row(null, null, null, null, "mapvalue1"))
))
}
}
}

0 comments on commit caeac1c

Please sign in to comment.