From 9152f39e65140cc3b738f306b520f7284a7035e8 Mon Sep 17 00:00:00 2001 From: Chase Engelbrecht Date: Mon, 25 Nov 2024 20:23:20 -0800 Subject: [PATCH 1/6] Add support for serializing DecimalType in FlintDataType Signed-off-by: Chase Engelbrecht --- .../sql/flint/datatype/FlintDataType.scala | 1 + .../flint/datatype/FlintDataTypeSuite.scala | 14 +++++ .../flint/spark/FlintSparkSuite.scala | 20 ++++++ ...tSparkIcebergMaterializedViewITSuite.scala | 62 ++++++++++++++++++- 4 files changed, 96 insertions(+), 1 deletion(-) diff --git a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/datatype/FlintDataType.scala b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/datatype/FlintDataType.scala index 5d920a07e..19fe28a2d 100644 --- a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/datatype/FlintDataType.scala +++ b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/datatype/FlintDataType.scala @@ -142,6 +142,7 @@ object FlintDataType { case ByteType => JObject("type" -> JString("byte")) case DoubleType => JObject("type" -> JString("double")) case FloatType => JObject("type" -> JString("float")) + case DecimalType() => JObject("type" -> JString("double")) // Date case TimestampType | _: TimestampNTZType => diff --git a/flint-spark-integration/src/test/scala/org/apache/spark/sql/flint/datatype/FlintDataTypeSuite.scala b/flint-spark-integration/src/test/scala/org/apache/spark/sql/flint/datatype/FlintDataTypeSuite.scala index 44e8158d8..312f3a5a1 100644 --- a/flint-spark-integration/src/test/scala/org/apache/spark/sql/flint/datatype/FlintDataTypeSuite.scala +++ b/flint-spark-integration/src/test/scala/org/apache/spark/sql/flint/datatype/FlintDataTypeSuite.scala @@ -143,6 +143,20 @@ class FlintDataTypeSuite extends FlintSuite with Matchers { |}""".stripMargin) } + test("spark decimal type serialize") { + val sparkStructType = StructType( + StructField("decimalField", DecimalType(1, 1), true) :: + Nil) + + FlintDataType.serialize(sparkStructType) shouldBe compactJson("""{ + | "properties": { + | "decimalField": { + | "type": "double" + | } + | } + |}""".stripMargin) + } + test("spark varchar and char type serialize") { val flintDataType = """{ | "properties": { diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkSuite.scala index 68d370791..74854765f 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkSuite.scala @@ -445,6 +445,26 @@ trait FlintSparkSuite extends QueryTest with FlintSuite with OpenSearchSuite wit sql(s"INSERT INTO $testTable VALUES (TIMESTAMP '2023-10-01 03:00:00', 'E', 15, 'Vancouver')") } + protected def createIcebergTimeSeriesTable(testTable: String): Unit = { + sql(s""" + | CREATE TABLE $testTable + | ( + | time TIMESTAMP, + | name STRING, + | age INT, + | base_score DECIMAL(8, 7), + | mymap MAP + | ) + | USING ICEBERG + |""".stripMargin) + + sql(s"INSERT INTO $testTable VALUES (TIMESTAMP '2023-10-01 00:01:00', 'A', 30, 3.1415926, Map('mapkey1', 'mapvalue1'))") + sql(s"INSERT INTO $testTable VALUES (TIMESTAMP '2023-10-01 00:10:00', 'B', 20, 4.1415926, Map('mapkey2', 'mapvalue2'))") + sql(s"INSERT INTO $testTable VALUES (TIMESTAMP '2023-10-01 00:15:00', 'C', 35, 5.1415926, Map('mapkey3', 'mapvalue3'))") + sql(s"INSERT INTO $testTable VALUES (TIMESTAMP '2023-10-01 01:00:00', 'D', 40, 6.1415926, Map('mapkey4', 'mapvalue4'))") + sql(s"INSERT INTO $testTable VALUES (TIMESTAMP '2023-10-01 03:00:00', 'E', 15, 7.1415926, Map('mapkey5', 'mapvalue5'))") + } + protected def createTimeSeriesTransactionTable(testTable: String): Unit = { sql(s""" | CREATE TABLE $testTable diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/iceberg/FlintSparkIcebergMaterializedViewITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/iceberg/FlintSparkIcebergMaterializedViewITSuite.scala index ffb8a7d1b..c84ea863d 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/spark/iceberg/FlintSparkIcebergMaterializedViewITSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/iceberg/FlintSparkIcebergMaterializedViewITSuite.scala @@ -6,7 +6,67 @@ package org.opensearch.flint.spark.iceberg import org.opensearch.flint.spark.FlintSparkMaterializedViewSqlITSuite +import org.opensearch.flint.spark.mv.FlintSparkMaterializedView.getFlintIndexName +import org.scalatest.matchers.must.Matchers.defined +import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper + +import org.apache.spark.sql.Row +import org.apache.spark.sql.flint.config.FlintSparkConf class FlintSparkIcebergMaterializedViewITSuite extends FlintSparkMaterializedViewSqlITSuite - with FlintSparkIcebergSuite {} + with FlintSparkIcebergSuite{ + + private val testTable = s"$catalogName.default.mv_test_iceberg" + private val testMvName = s"$catalogName.default.mv_test_iceberg_metrics" + private val testFlintIndex = getFlintIndexName(testMvName) + private val testQuery = + s""" + | SELECT + | base_score, mymap + | FROM $testTable + |""".stripMargin + + override def beforeEach(): Unit = { + super.beforeAll() + createIcebergTimeSeriesTable(testTable) + } + + override def afterEach(): Unit = { + deleteTestIndex(testFlintIndex) + sql(s"DROP TABLE $testTable") + conf.unsetConf(FlintSparkConf.CUSTOM_FLINT_SCHEDULER_CLASS.key) + conf.unsetConf(FlintSparkConf.EXTERNAL_SCHEDULER_ENABLED.key) + } + + test("create materialized view with map type") { + withTempDir { checkpointDir => + sql(s""" + | CREATE MATERIALIZED VIEW $testMvName + | AS $testQuery + | WITH ( + | auto_refresh = true, + | checkpoint_location = '${checkpointDir.getAbsolutePath}' + | ) + |""".stripMargin) + + // Wait for streaming job complete current micro batch + val job = spark.streams.active.find(_.name == testFlintIndex) + job shouldBe defined + failAfter(streamingTimeout) { + job.get.processAllAvailable() + } + + flint.describeIndex(testFlintIndex) shouldBe defined + checkAnswer( + flint.queryIndex(testFlintIndex).select("base_score", "mymap"), + Seq( + Row(3.1415926, Row(null, null, null, null, "mapvalue1")), + Row(4.1415926, Row("mapvalue2", null, null, null, null)), + Row(5.1415926, Row(null, null, "mapvalue3", null, null)), + Row(6.1415926, Row(null, null, null, "mapvalue4", null)), + Row(7.1415926, Row(null, "mapvalue5", null, null, null)), + )) + } + } +} From be52a8f28496edfe8a4d3a3cfdce6ad61381ec5c Mon Sep 17 00:00:00 2001 From: Chase Engelbrecht Date: Mon, 25 Nov 2024 20:25:05 -0800 Subject: [PATCH 2/6] Fix checkstyle Signed-off-by: Chase Engelbrecht --- .../opensearch/flint/spark/FlintSparkSuite.scala | 15 ++++++++++----- ...FlintSparkIcebergMaterializedViewITSuite.scala | 7 +++---- 2 files changed, 13 insertions(+), 9 deletions(-) diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkSuite.scala index 74854765f..cafab008c 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkSuite.scala @@ -458,11 +458,16 @@ trait FlintSparkSuite extends QueryTest with FlintSuite with OpenSearchSuite wit | USING ICEBERG |""".stripMargin) - sql(s"INSERT INTO $testTable VALUES (TIMESTAMP '2023-10-01 00:01:00', 'A', 30, 3.1415926, Map('mapkey1', 'mapvalue1'))") - sql(s"INSERT INTO $testTable VALUES (TIMESTAMP '2023-10-01 00:10:00', 'B', 20, 4.1415926, Map('mapkey2', 'mapvalue2'))") - sql(s"INSERT INTO $testTable VALUES (TIMESTAMP '2023-10-01 00:15:00', 'C', 35, 5.1415926, Map('mapkey3', 'mapvalue3'))") - sql(s"INSERT INTO $testTable VALUES (TIMESTAMP '2023-10-01 01:00:00', 'D', 40, 6.1415926, Map('mapkey4', 'mapvalue4'))") - sql(s"INSERT INTO $testTable VALUES (TIMESTAMP '2023-10-01 03:00:00', 'E', 15, 7.1415926, Map('mapkey5', 'mapvalue5'))") + sql( + s"INSERT INTO $testTable VALUES (TIMESTAMP '2023-10-01 00:01:00', 'A', 30, 3.1415926, Map('mapkey1', 'mapvalue1'))") + sql( + s"INSERT INTO $testTable VALUES (TIMESTAMP '2023-10-01 00:10:00', 'B', 20, 4.1415926, Map('mapkey2', 'mapvalue2'))") + sql( + s"INSERT INTO $testTable VALUES (TIMESTAMP '2023-10-01 00:15:00', 'C', 35, 5.1415926, Map('mapkey3', 'mapvalue3'))") + sql( + s"INSERT INTO $testTable VALUES (TIMESTAMP '2023-10-01 01:00:00', 'D', 40, 6.1415926, Map('mapkey4', 'mapvalue4'))") + sql( + s"INSERT INTO $testTable VALUES (TIMESTAMP '2023-10-01 03:00:00', 'E', 15, 7.1415926, Map('mapkey5', 'mapvalue5'))") } protected def createTimeSeriesTransactionTable(testTable: String): Unit = { diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/iceberg/FlintSparkIcebergMaterializedViewITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/iceberg/FlintSparkIcebergMaterializedViewITSuite.scala index c84ea863d..f7f9c67b3 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/spark/iceberg/FlintSparkIcebergMaterializedViewITSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/iceberg/FlintSparkIcebergMaterializedViewITSuite.scala @@ -15,7 +15,7 @@ import org.apache.spark.sql.flint.config.FlintSparkConf class FlintSparkIcebergMaterializedViewITSuite extends FlintSparkMaterializedViewSqlITSuite - with FlintSparkIcebergSuite{ + with FlintSparkIcebergSuite { private val testTable = s"$catalogName.default.mv_test_iceberg" private val testMvName = s"$catalogName.default.mv_test_iceberg_metrics" @@ -39,7 +39,7 @@ class FlintSparkIcebergMaterializedViewITSuite conf.unsetConf(FlintSparkConf.EXTERNAL_SCHEDULER_ENABLED.key) } - test("create materialized view with map type") { + test("create materialized view with decimal and map types") { withTempDir { checkpointDir => sql(s""" | CREATE MATERIALIZED VIEW $testMvName @@ -65,8 +65,7 @@ class FlintSparkIcebergMaterializedViewITSuite Row(4.1415926, Row("mapvalue2", null, null, null, null)), Row(5.1415926, Row(null, null, "mapvalue3", null, null)), Row(6.1415926, Row(null, null, null, "mapvalue4", null)), - Row(7.1415926, Row(null, "mapvalue5", null, null, null)), - )) + Row(7.1415926, Row(null, "mapvalue5", null, null, null)))) } } } From b9e5b949ed071b57ab0805f869c95891bd4db9df Mon Sep 17 00:00:00 2001 From: Chase Engelbrecht Date: Mon, 25 Nov 2024 20:30:14 -0800 Subject: [PATCH 3/6] Add documentation on the new serialization behavior Signed-off-by: Chase Engelbrecht --- docs/index.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/docs/index.md b/docs/index.md index 82c147de2..e0211d8fa 100644 --- a/docs/index.md +++ b/docs/index.md @@ -577,6 +577,10 @@ The following table define the data type mapping between Flint data type and Spa * Spark data types VarcharType(length) and CharType(length) are both currently mapped to Flint data type *keyword*, dropping their length property. On the other hand, Flint data type *keyword* only maps to StringType. +* Spark data type MapType is mapped to an empty OpenSearch object. The inner fields then rely on + dynamic mapping. On the other hand, Flint data type *object* only maps to StructType. +* Spark data type DecimalType is mapped to an OpenSearch double. On the other hand, Flint data type + *double* only maps to DoubleType. Unsupported Spark data types: * DecimalType From edb9533ea89757339aa137e1767da2596fe99b59 Mon Sep 17 00:00:00 2001 From: Chase Engelbrecht Date: Tue, 26 Nov 2024 10:32:54 -0800 Subject: [PATCH 4/6] Fix integ test Signed-off-by: Chase Engelbrecht --- .../iceberg/FlintSparkIcebergMaterializedViewITSuite.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/iceberg/FlintSparkIcebergMaterializedViewITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/iceberg/FlintSparkIcebergMaterializedViewITSuite.scala index f7f9c67b3..2e272a7cb 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/spark/iceberg/FlintSparkIcebergMaterializedViewITSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/iceberg/FlintSparkIcebergMaterializedViewITSuite.scala @@ -27,12 +27,13 @@ class FlintSparkIcebergMaterializedViewITSuite | FROM $testTable |""".stripMargin - override def beforeEach(): Unit = { + override def beforeAll(): Unit = { super.beforeAll() createIcebergTimeSeriesTable(testTable) } - override def afterEach(): Unit = { + override def afterAll(): Unit = { + super.afterAll() deleteTestIndex(testFlintIndex) sql(s"DROP TABLE $testTable") conf.unsetConf(FlintSparkConf.CUSTOM_FLINT_SCHEDULER_CLASS.key) From c1d4c8ef2c577d3cb6e3a03d8bf8c43619219545 Mon Sep 17 00:00:00 2001 From: Chase Engelbrecht Date: Tue, 26 Nov 2024 12:25:57 -0800 Subject: [PATCH 5/6] Actually fix integ tests Signed-off-by: Chase Engelbrecht --- .../iceberg/FlintSparkIcebergMaterializedViewITSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/iceberg/FlintSparkIcebergMaterializedViewITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/iceberg/FlintSparkIcebergMaterializedViewITSuite.scala index 2e272a7cb..582f49260 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/spark/iceberg/FlintSparkIcebergMaterializedViewITSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/iceberg/FlintSparkIcebergMaterializedViewITSuite.scala @@ -33,11 +33,11 @@ class FlintSparkIcebergMaterializedViewITSuite } override def afterAll(): Unit = { - super.afterAll() deleteTestIndex(testFlintIndex) sql(s"DROP TABLE $testTable") conf.unsetConf(FlintSparkConf.CUSTOM_FLINT_SCHEDULER_CLASS.key) conf.unsetConf(FlintSparkConf.EXTERNAL_SCHEDULER_ENABLED.key) + super.afterAll() } test("create materialized view with decimal and map types") { From 3ce421b72629672834f89c0af9b6d840af1be156 Mon Sep 17 00:00:00 2001 From: Chase Engelbrecht Date: Sun, 1 Dec 2024 12:11:01 -0800 Subject: [PATCH 6/6] Move the decimal and map IT to the base suite instead of the iceberg suite Signed-off-by: Chase Engelbrecht --- ...FlintSparkMaterializedViewSqlITSuite.scala | 40 ++++++++++++ .../flint/spark/FlintSparkSuite.scala | 7 ++- ...tSparkIcebergMaterializedViewITSuite.scala | 62 +------------------ 3 files changed, 46 insertions(+), 63 deletions(-) diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala index ae2e53090..bf5e6309e 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala @@ -523,5 +523,45 @@ class FlintSparkMaterializedViewSqlITSuite extends FlintSparkSuite { } } + test("create materialized view with decimal and map types") { + val decimalAndMapTable = s"$catalogName.default.mv_test_decimal_map" + val decimalAndMapMv = s"$catalogName.default.mv_test_decimal_map_ser" + withTable(decimalAndMapTable) { + createMapAndDecimalTimeSeriesTable(decimalAndMapTable) + + withTempDir { checkpointDir => + sql(s""" + | CREATE MATERIALIZED VIEW $decimalAndMapMv + | AS + | SELECT + | base_score, mymap + | FROM $decimalAndMapTable + | WITH ( + | auto_refresh = true, + | checkpoint_location = '${checkpointDir.getAbsolutePath}' + | ) + |""".stripMargin) + + // Wait for streaming job complete current micro batch + val flintIndex = getFlintIndexName(decimalAndMapMv) + val job = spark.streams.active.find(_.name == flintIndex) + job shouldBe defined + failAfter(streamingTimeout) { + job.get.processAllAvailable() + } + + flint.describeIndex(flintIndex) shouldBe defined + checkAnswer( + flint.queryIndex(flintIndex).select("base_score", "mymap"), + Seq( + Row(3.1415926, Row(null, null, null, null, "mapvalue1")), + Row(4.1415926, Row("mapvalue2", null, null, null, null)), + Row(5.1415926, Row(null, null, "mapvalue3", null, null)), + Row(6.1415926, Row(null, null, null, "mapvalue4", null)), + Row(7.1415926, Row(null, "mapvalue5", null, null, null)))) + } + } + } + private def timestamp(ts: String): Timestamp = Timestamp.valueOf(ts) } diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkSuite.scala index cafab008c..7c19cab12 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkSuite.scala @@ -445,7 +445,10 @@ trait FlintSparkSuite extends QueryTest with FlintSuite with OpenSearchSuite wit sql(s"INSERT INTO $testTable VALUES (TIMESTAMP '2023-10-01 03:00:00', 'E', 15, 'Vancouver')") } - protected def createIcebergTimeSeriesTable(testTable: String): Unit = { + protected def createMapAndDecimalTimeSeriesTable(testTable: String): Unit = { + // CSV tables do not support MAP types so we use JSON instead + val finalTableType = if (tableType == "CSV") "JSON" else tableType + sql(s""" | CREATE TABLE $testTable | ( @@ -455,7 +458,7 @@ trait FlintSparkSuite extends QueryTest with FlintSuite with OpenSearchSuite wit | base_score DECIMAL(8, 7), | mymap MAP | ) - | USING ICEBERG + | USING $finalTableType $tableOptions |""".stripMargin) sql( diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/iceberg/FlintSparkIcebergMaterializedViewITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/iceberg/FlintSparkIcebergMaterializedViewITSuite.scala index 582f49260..ffb8a7d1b 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/spark/iceberg/FlintSparkIcebergMaterializedViewITSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/iceberg/FlintSparkIcebergMaterializedViewITSuite.scala @@ -6,67 +6,7 @@ package org.opensearch.flint.spark.iceberg import org.opensearch.flint.spark.FlintSparkMaterializedViewSqlITSuite -import org.opensearch.flint.spark.mv.FlintSparkMaterializedView.getFlintIndexName -import org.scalatest.matchers.must.Matchers.defined -import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper - -import org.apache.spark.sql.Row -import org.apache.spark.sql.flint.config.FlintSparkConf class FlintSparkIcebergMaterializedViewITSuite extends FlintSparkMaterializedViewSqlITSuite - with FlintSparkIcebergSuite { - - private val testTable = s"$catalogName.default.mv_test_iceberg" - private val testMvName = s"$catalogName.default.mv_test_iceberg_metrics" - private val testFlintIndex = getFlintIndexName(testMvName) - private val testQuery = - s""" - | SELECT - | base_score, mymap - | FROM $testTable - |""".stripMargin - - override def beforeAll(): Unit = { - super.beforeAll() - createIcebergTimeSeriesTable(testTable) - } - - override def afterAll(): Unit = { - deleteTestIndex(testFlintIndex) - sql(s"DROP TABLE $testTable") - conf.unsetConf(FlintSparkConf.CUSTOM_FLINT_SCHEDULER_CLASS.key) - conf.unsetConf(FlintSparkConf.EXTERNAL_SCHEDULER_ENABLED.key) - super.afterAll() - } - - test("create materialized view with decimal and map types") { - withTempDir { checkpointDir => - sql(s""" - | CREATE MATERIALIZED VIEW $testMvName - | AS $testQuery - | WITH ( - | auto_refresh = true, - | checkpoint_location = '${checkpointDir.getAbsolutePath}' - | ) - |""".stripMargin) - - // Wait for streaming job complete current micro batch - val job = spark.streams.active.find(_.name == testFlintIndex) - job shouldBe defined - failAfter(streamingTimeout) { - job.get.processAllAvailable() - } - - flint.describeIndex(testFlintIndex) shouldBe defined - checkAnswer( - flint.queryIndex(testFlintIndex).select("base_score", "mymap"), - Seq( - Row(3.1415926, Row(null, null, null, null, "mapvalue1")), - Row(4.1415926, Row("mapvalue2", null, null, null, null)), - Row(5.1415926, Row(null, null, "mapvalue3", null, null)), - Row(6.1415926, Row(null, null, null, "mapvalue4", null)), - Row(7.1415926, Row(null, "mapvalue5", null, null, null)))) - } - } -} + with FlintSparkIcebergSuite {}