Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for serializing MapType #929

Merged
merged 6 commits into from
Nov 20, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,9 @@ object FlintDataType {
// objects
case st: StructType => serializeJValue(st)

// Serialize maps as empty objects and let the map entries automap
case mt: MapType => serializeJValue(new StructType())
penghuo marked this conversation as resolved.
Show resolved Hide resolved

// array
case ArrayType(elementType, _) => serializeField(elementType, Metadata.empty)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,21 @@ class FlintDataTypeSuite extends FlintSuite with Matchers {
|}""".stripMargin)
}

test("spark map type serialize") {
val sparkStructType = StructType(
StructField("mapField", MapType(StringType, StringType), true) ::
Nil)

FlintDataType.serialize(sparkStructType) shouldBe compactJson("""{
| "properties": {
| "mapField": {
| "properties": {
| }
| }
| }
|}""".stripMargin)
}

test("spark varchar and char type serialize") {
val flintDataType = """{
| "properties": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,26 @@ trait FlintSparkSuite extends QueryTest with FlintSuite with OpenSearchSuite wit
sql(s"INSERT INTO $testTable VALUES (TIMESTAMP '2023-10-01 03:00:00', 'E', 15, 'Vancouver')")
}

protected def createIcebergTimeSeriesTable(testTable: String): Unit = {
sql(s"""
| CREATE TABLE $testTable
| (
| time TIMESTAMP,
| name STRING,
| age INT,
| address STRING,
| mymap MAP<STRING, STRING>
| )
| USING ICEBERG
|""".stripMargin)

sql(s"INSERT INTO $testTable VALUES (TIMESTAMP '2023-10-01 00:01:00', 'A', 30, 'Seattle', Map('mapkey1', 'mapvalue1'))")
sql(s"INSERT INTO $testTable VALUES (TIMESTAMP '2023-10-01 00:10:00', 'B', 20, 'Seattle', Map('mapkey2', 'mapvalue2'))")
sql(s"INSERT INTO $testTable VALUES (TIMESTAMP '2023-10-01 00:15:00', 'C', 35, 'Portland', Map('mapkey3', 'mapvalue3'))")
sql(s"INSERT INTO $testTable VALUES (TIMESTAMP '2023-10-01 01:00:00', 'D', 40, 'Portland', Map('mapkey4', 'mapvalue4'))")
sql(s"INSERT INTO $testTable VALUES (TIMESTAMP '2023-10-01 03:00:00', 'E', 15, 'Vancouver', Map('mapkey5', 'mapvalue5'))")
}

protected def createTimeSeriesTransactionTable(testTable: String): Unit = {
sql(s"""
| CREATE TABLE $testTable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,68 @@
package org.opensearch.flint.spark.iceberg

import org.opensearch.flint.spark.FlintSparkMaterializedViewSqlITSuite
import org.opensearch.flint.spark.mv.FlintSparkMaterializedView.getFlintIndexName
import org.scalatest.matchers.must.Matchers.defined
import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper

import org.apache.spark.sql.Row
import org.apache.spark.sql.flint.config.FlintSparkConf

class FlintSparkIcebergMaterializedViewITSuite
extends FlintSparkMaterializedViewSqlITSuite
with FlintSparkIcebergSuite {}
with FlintSparkIcebergSuite {

private val testTable = s"$catalogName.default.mv_test_iceberg"
private val testMvName = s"$catalogName.default.mv_test_iceberg_metrics"
private val testFlintIndex = getFlintIndexName(testMvName)
private val testQuery =
s"""
| SELECT
| mymap
| FROM $testTable
|""".stripMargin

override def beforeEach(): Unit = {
super.beforeAll()
createIcebergTimeSeriesTable(testTable)
}

override def afterEach(): Unit = {
super.afterEach()
deleteTestIndex(testFlintIndex)
sql(s"DROP TABLE $testTable")
conf.unsetConf(FlintSparkConf.CUSTOM_FLINT_SCHEDULER_CLASS.key)
conf.unsetConf(FlintSparkConf.EXTERNAL_SCHEDULER_ENABLED.key)
}

test("create materialized view with map type") {
withTempDir { checkpointDir =>
sql(s"""
| CREATE MATERIALIZED VIEW $testMvName
| AS $testQuery
| WITH (
| auto_refresh = true,
| checkpoint_location = '${checkpointDir.getAbsolutePath}'
| )
|""".stripMargin)

// Wait for streaming job complete current micro batch
val job = spark.streams.active.find(_.name == testFlintIndex)
job shouldBe defined
failAfter(streamingTimeout) {
job.get.processAllAvailable()
}

flint.describeIndex(testFlintIndex) shouldBe defined
checkAnswer(
flint.queryIndex(testFlintIndex).select("mymap"),
Seq(
Row(Row("mapvalue2", null, null, null, null)),
Row(Row(null, "mapvalue5", null, null, null)),
Row(Row(null, null, "mapvalue3", null, null)),
Row(Row(null, null, null, "mapvalue4", null)),
Row(Row(null, null, null, null, "mapvalue1"))
))
}
}
}
Loading