diff --git a/docs/index.md b/docs/index.md index 82c147de2..e0211d8fa 100644 --- a/docs/index.md +++ b/docs/index.md @@ -577,6 +577,10 @@ The following table define the data type mapping between Flint data type and Spa * Spark data types VarcharType(length) and CharType(length) are both currently mapped to Flint data type *keyword*, dropping their length property. On the other hand, Flint data type *keyword* only maps to StringType. +* Spark data type MapType is mapped to an empty OpenSearch object. The inner fields then rely on + dynamic mapping. On the other hand, Flint data type *object* only maps to StructType. +* Spark data type DecimalType is mapped to an OpenSearch double. On the other hand, Flint data type + *double* only maps to DoubleType. Unsupported Spark data types: * DecimalType diff --git a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/datatype/FlintDataType.scala b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/datatype/FlintDataType.scala index 5d920a07e..19fe28a2d 100644 --- a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/datatype/FlintDataType.scala +++ b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/datatype/FlintDataType.scala @@ -142,6 +142,7 @@ object FlintDataType { case ByteType => JObject("type" -> JString("byte")) case DoubleType => JObject("type" -> JString("double")) case FloatType => JObject("type" -> JString("float")) + case DecimalType() => JObject("type" -> JString("double")) // Date case TimestampType | _: TimestampNTZType => diff --git a/flint-spark-integration/src/test/scala/org/apache/spark/sql/flint/datatype/FlintDataTypeSuite.scala b/flint-spark-integration/src/test/scala/org/apache/spark/sql/flint/datatype/FlintDataTypeSuite.scala index 44e8158d8..312f3a5a1 100644 --- a/flint-spark-integration/src/test/scala/org/apache/spark/sql/flint/datatype/FlintDataTypeSuite.scala +++ b/flint-spark-integration/src/test/scala/org/apache/spark/sql/flint/datatype/FlintDataTypeSuite.scala @@ -143,6 +143,20 @@ class FlintDataTypeSuite extends FlintSuite with Matchers { |}""".stripMargin) } + test("spark decimal type serialize") { + val sparkStructType = StructType( + StructField("decimalField", DecimalType(1, 1), true) :: + Nil) + + FlintDataType.serialize(sparkStructType) shouldBe compactJson("""{ + | "properties": { + | "decimalField": { + | "type": "double" + | } + | } + |}""".stripMargin) + } + test("spark varchar and char type serialize") { val flintDataType = """{ | "properties": { diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala index ae2e53090..bf5e6309e 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala @@ -523,5 +523,45 @@ class FlintSparkMaterializedViewSqlITSuite extends FlintSparkSuite { } } + test("create materialized view with decimal and map types") { + val decimalAndMapTable = s"$catalogName.default.mv_test_decimal_map" + val decimalAndMapMv = s"$catalogName.default.mv_test_decimal_map_ser" + withTable(decimalAndMapTable) { + createMapAndDecimalTimeSeriesTable(decimalAndMapTable) + + withTempDir { checkpointDir => + sql(s""" + | CREATE MATERIALIZED VIEW $decimalAndMapMv + | AS + | SELECT + | base_score, mymap + | FROM $decimalAndMapTable + | WITH ( + | auto_refresh = true, + | checkpoint_location = '${checkpointDir.getAbsolutePath}' + | ) + |""".stripMargin) + + // Wait for streaming job complete current micro batch + val flintIndex = getFlintIndexName(decimalAndMapMv) + val job = spark.streams.active.find(_.name == flintIndex) + job shouldBe defined + failAfter(streamingTimeout) { + job.get.processAllAvailable() + } + + flint.describeIndex(flintIndex) shouldBe defined + checkAnswer( + flint.queryIndex(flintIndex).select("base_score", "mymap"), + Seq( + Row(3.1415926, Row(null, null, null, null, "mapvalue1")), + Row(4.1415926, Row("mapvalue2", null, null, null, null)), + Row(5.1415926, Row(null, null, "mapvalue3", null, null)), + Row(6.1415926, Row(null, null, null, "mapvalue4", null)), + Row(7.1415926, Row(null, "mapvalue5", null, null, null)))) + } + } + } + private def timestamp(ts: String): Timestamp = Timestamp.valueOf(ts) } diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkSuite.scala index 68d370791..7c19cab12 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkSuite.scala @@ -445,6 +445,34 @@ trait FlintSparkSuite extends QueryTest with FlintSuite with OpenSearchSuite wit sql(s"INSERT INTO $testTable VALUES (TIMESTAMP '2023-10-01 03:00:00', 'E', 15, 'Vancouver')") } + protected def createMapAndDecimalTimeSeriesTable(testTable: String): Unit = { + // CSV tables do not support MAP types so we use JSON instead + val finalTableType = if (tableType == "CSV") "JSON" else tableType + + sql(s""" + | CREATE TABLE $testTable + | ( + | time TIMESTAMP, + | name STRING, + | age INT, + | base_score DECIMAL(8, 7), + | mymap MAP + | ) + | USING $finalTableType $tableOptions + |""".stripMargin) + + sql( + s"INSERT INTO $testTable VALUES (TIMESTAMP '2023-10-01 00:01:00', 'A', 30, 3.1415926, Map('mapkey1', 'mapvalue1'))") + sql( + s"INSERT INTO $testTable VALUES (TIMESTAMP '2023-10-01 00:10:00', 'B', 20, 4.1415926, Map('mapkey2', 'mapvalue2'))") + sql( + s"INSERT INTO $testTable VALUES (TIMESTAMP '2023-10-01 00:15:00', 'C', 35, 5.1415926, Map('mapkey3', 'mapvalue3'))") + sql( + s"INSERT INTO $testTable VALUES (TIMESTAMP '2023-10-01 01:00:00', 'D', 40, 6.1415926, Map('mapkey4', 'mapvalue4'))") + sql( + s"INSERT INTO $testTable VALUES (TIMESTAMP '2023-10-01 03:00:00', 'E', 15, 7.1415926, Map('mapkey5', 'mapvalue5'))") + } + protected def createTimeSeriesTransactionTable(testTable: String): Unit = { sql(s""" | CREATE TABLE $testTable