Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for serializing Spark's DecimalType #947

Merged
merged 6 commits into from
Dec 2, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -577,6 +577,10 @@ The following table define the data type mapping between Flint data type and Spa
* Spark data types VarcharType(length) and CharType(length) are both currently mapped to Flint data
type *keyword*, dropping their length property. On the other hand, Flint data type *keyword* only
maps to StringType.
* Spark data type MapType is mapped to an empty OpenSearch object. The inner fields then rely on
dynamic mapping. On the other hand, Flint data type *object* only maps to StructType.
* Spark data type DecimalType is mapped to an OpenSearch double. On the other hand, Flint data type
dai-chen marked this conversation as resolved.
Show resolved Hide resolved
*double* only maps to DoubleType.

Unsupported Spark data types:
* DecimalType
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ object FlintDataType {
case ByteType => JObject("type" -> JString("byte"))
case DoubleType => JObject("type" -> JString("double"))
case FloatType => JObject("type" -> JString("float"))
case DecimalType() => JObject("type" -> JString("double"))

// Date
case TimestampType | _: TimestampNTZType =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,20 @@ class FlintDataTypeSuite extends FlintSuite with Matchers {
|}""".stripMargin)
}

test("spark decimal type serialize") {
val sparkStructType = StructType(
StructField("decimalField", DecimalType(1, 1), true) ::
Nil)

FlintDataType.serialize(sparkStructType) shouldBe compactJson("""{
| "properties": {
| "decimalField": {
| "type": "double"
| }
| }
|}""".stripMargin)
}

test("spark varchar and char type serialize") {
val flintDataType = """{
| "properties": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,31 @@ trait FlintSparkSuite extends QueryTest with FlintSuite with OpenSearchSuite wit
sql(s"INSERT INTO $testTable VALUES (TIMESTAMP '2023-10-01 03:00:00', 'E', 15, 'Vancouver')")
}

protected def createIcebergTimeSeriesTable(testTable: String): Unit = {
sql(s"""
| CREATE TABLE $testTable
| (
| time TIMESTAMP,
| name STRING,
| age INT,
| base_score DECIMAL(8, 7),
| mymap MAP<STRING, STRING>
| )
| USING ICEBERG
engechas marked this conversation as resolved.
Show resolved Hide resolved
|""".stripMargin)

sql(
s"INSERT INTO $testTable VALUES (TIMESTAMP '2023-10-01 00:01:00', 'A', 30, 3.1415926, Map('mapkey1', 'mapvalue1'))")
sql(
s"INSERT INTO $testTable VALUES (TIMESTAMP '2023-10-01 00:10:00', 'B', 20, 4.1415926, Map('mapkey2', 'mapvalue2'))")
sql(
s"INSERT INTO $testTable VALUES (TIMESTAMP '2023-10-01 00:15:00', 'C', 35, 5.1415926, Map('mapkey3', 'mapvalue3'))")
sql(
s"INSERT INTO $testTable VALUES (TIMESTAMP '2023-10-01 01:00:00', 'D', 40, 6.1415926, Map('mapkey4', 'mapvalue4'))")
sql(
s"INSERT INTO $testTable VALUES (TIMESTAMP '2023-10-01 03:00:00', 'E', 15, 7.1415926, Map('mapkey5', 'mapvalue5'))")
}

protected def createTimeSeriesTransactionTable(testTable: String): Unit = {
sql(s"""
| CREATE TABLE $testTable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,67 @@
package org.opensearch.flint.spark.iceberg

import org.opensearch.flint.spark.FlintSparkMaterializedViewSqlITSuite
import org.opensearch.flint.spark.mv.FlintSparkMaterializedView.getFlintIndexName
import org.scalatest.matchers.must.Matchers.defined
import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper

import org.apache.spark.sql.Row
import org.apache.spark.sql.flint.config.FlintSparkConf

class FlintSparkIcebergMaterializedViewITSuite
extends FlintSparkMaterializedViewSqlITSuite
with FlintSparkIcebergSuite {}
with FlintSparkIcebergSuite {

private val testTable = s"$catalogName.default.mv_test_iceberg"
private val testMvName = s"$catalogName.default.mv_test_iceberg_metrics"
private val testFlintIndex = getFlintIndexName(testMvName)
private val testQuery =
s"""
| SELECT
| base_score, mymap
| FROM $testTable
|""".stripMargin

override def beforeAll(): Unit = {
super.beforeAll()
createIcebergTimeSeriesTable(testTable)
}

override def afterAll(): Unit = {
deleteTestIndex(testFlintIndex)
sql(s"DROP TABLE $testTable")
conf.unsetConf(FlintSparkConf.CUSTOM_FLINT_SCHEDULER_CLASS.key)
conf.unsetConf(FlintSparkConf.EXTERNAL_SCHEDULER_ENABLED.key)
super.afterAll()
}

test("create materialized view with decimal and map types") {
engechas marked this conversation as resolved.
Show resolved Hide resolved
withTempDir { checkpointDir =>
sql(s"""
| CREATE MATERIALIZED VIEW $testMvName
| AS $testQuery
| WITH (
| auto_refresh = true,
| checkpoint_location = '${checkpointDir.getAbsolutePath}'
| )
|""".stripMargin)

// Wait for streaming job complete current micro batch
val job = spark.streams.active.find(_.name == testFlintIndex)
job shouldBe defined
failAfter(streamingTimeout) {
job.get.processAllAvailable()
}

flint.describeIndex(testFlintIndex) shouldBe defined
checkAnswer(
flint.queryIndex(testFlintIndex).select("base_score", "mymap"),
Seq(
Row(3.1415926, Row(null, null, null, null, "mapvalue1")),
Row(4.1415926, Row("mapvalue2", null, null, null, null)),
Row(5.1415926, Row(null, null, "mapvalue3", null, null)),
Row(6.1415926, Row(null, null, null, "mapvalue4", null)),
Row(7.1415926, Row(null, "mapvalue5", null, null, null))))
}
}
}
Loading