Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for serializing Spark's DecimalType (#947) #962

Merged
merged 1 commit into from
Dec 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -577,6 +577,10 @@ The following table define the data type mapping between Flint data type and Spa
* Spark data types VarcharType(length) and CharType(length) are both currently mapped to Flint data
type *keyword*, dropping their length property. On the other hand, Flint data type *keyword* only
maps to StringType.
* Spark data type MapType is mapped to an empty OpenSearch object. The inner fields then rely on
dynamic mapping. On the other hand, Flint data type *object* only maps to StructType.
* Spark data type DecimalType is mapped to an OpenSearch double. On the other hand, Flint data type
*double* only maps to DoubleType.

Unsupported Spark data types:
* DecimalType
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ object FlintDataType {
case ByteType => JObject("type" -> JString("byte"))
case DoubleType => JObject("type" -> JString("double"))
case FloatType => JObject("type" -> JString("float"))
case DecimalType() => JObject("type" -> JString("double"))

// Date
case TimestampType | _: TimestampNTZType =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,20 @@ class FlintDataTypeSuite extends FlintSuite with Matchers {
|}""".stripMargin)
}

test("spark decimal type serialize") {
val sparkStructType = StructType(
StructField("decimalField", DecimalType(1, 1), true) ::
Nil)

FlintDataType.serialize(sparkStructType) shouldBe compactJson("""{
| "properties": {
| "decimalField": {
| "type": "double"
| }
| }
|}""".stripMargin)
}

test("spark varchar and char type serialize") {
val flintDataType = """{
| "properties": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -448,5 +448,45 @@ class FlintSparkMaterializedViewSqlITSuite extends FlintSparkSuite {
}
}

test("create materialized view with decimal and map types") {
val decimalAndMapTable = s"$catalogName.default.mv_test_decimal_map"
val decimalAndMapMv = s"$catalogName.default.mv_test_decimal_map_ser"
withTable(decimalAndMapTable) {
createMapAndDecimalTimeSeriesTable(decimalAndMapTable)

withTempDir { checkpointDir =>
sql(s"""
| CREATE MATERIALIZED VIEW $decimalAndMapMv
| AS
| SELECT
| base_score, mymap
| FROM $decimalAndMapTable
| WITH (
| auto_refresh = true,
| checkpoint_location = '${checkpointDir.getAbsolutePath}'
| )
|""".stripMargin)

// Wait for streaming job complete current micro batch
val flintIndex = getFlintIndexName(decimalAndMapMv)
val job = spark.streams.active.find(_.name == flintIndex)
job shouldBe defined
failAfter(streamingTimeout) {
job.get.processAllAvailable()
}

flint.describeIndex(flintIndex) shouldBe defined
checkAnswer(
flint.queryIndex(flintIndex).select("base_score", "mymap"),
Seq(
Row(3.1415926, Row(null, null, null, null, "mapvalue1")),
Row(4.1415926, Row("mapvalue2", null, null, null, null)),
Row(5.1415926, Row(null, null, "mapvalue3", null, null)),
Row(6.1415926, Row(null, null, null, "mapvalue4", null)),
Row(7.1415926, Row(null, "mapvalue5", null, null, null))))
}
}
}

private def timestamp(ts: String): Timestamp = Timestamp.valueOf(ts)
}
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,34 @@ trait FlintSparkSuite extends QueryTest with FlintSuite with OpenSearchSuite wit
sql(s"INSERT INTO $testTable VALUES (TIMESTAMP '2023-10-01 03:00:00', 'E', 15, 'Vancouver')")
}

protected def createMapAndDecimalTimeSeriesTable(testTable: String): Unit = {
// CSV tables do not support MAP types so we use JSON instead
val finalTableType = if (tableType == "CSV") "JSON" else tableType

sql(s"""
| CREATE TABLE $testTable
| (
| time TIMESTAMP,
| name STRING,
| age INT,
| base_score DECIMAL(8, 7),
| mymap MAP<STRING, STRING>
| )
| USING $finalTableType $tableOptions
|""".stripMargin)

sql(
s"INSERT INTO $testTable VALUES (TIMESTAMP '2023-10-01 00:01:00', 'A', 30, 3.1415926, Map('mapkey1', 'mapvalue1'))")
sql(
s"INSERT INTO $testTable VALUES (TIMESTAMP '2023-10-01 00:10:00', 'B', 20, 4.1415926, Map('mapkey2', 'mapvalue2'))")
sql(
s"INSERT INTO $testTable VALUES (TIMESTAMP '2023-10-01 00:15:00', 'C', 35, 5.1415926, Map('mapkey3', 'mapvalue3'))")
sql(
s"INSERT INTO $testTable VALUES (TIMESTAMP '2023-10-01 01:00:00', 'D', 40, 6.1415926, Map('mapkey4', 'mapvalue4'))")
sql(
s"INSERT INTO $testTable VALUES (TIMESTAMP '2023-10-01 03:00:00', 'E', 15, 7.1415926, Map('mapkey5', 'mapvalue5'))")
}

protected def createTimeSeriesTransactionTable(testTable: String): Unit = {
sql(s"""
| CREATE TABLE $testTable
Expand Down
Loading