Skip to content

Commit

Permalink
Support Spark data types VarcharType and CharType (#21)
Browse files Browse the repository at this point in the history
* IT: create skipping index for supported data types

Signed-off-by: Sean Kao <[email protected]>

* UT: build index for supported data types

Signed-off-by: Sean Kao <[email protected]>

* parse catalog data types to Spark StructType

Signed-off-by: Sean Kao <[email protected]>

* Support Varchar and Char

Signed-off-by: Sean Kao <[email protected]>

* Fix indentation

Signed-off-by: Sean Kao <[email protected]>

* Update doc for VarcharType and CharType support

Signed-off-by: Sean Kao <[email protected]>

* Fix IT: rename table

Signed-off-by: Sean Kao <[email protected]>

* Update doc for unsupported types

Signed-off-by: Sean Kao <[email protected]>

* Update quotes for style consistency

Signed-off-by: Sean Kao <[email protected]>

* Map varchar|char with osType=text to text type

Signed-off-by: Sean Kao <[email protected]>

---------

Signed-off-by: Sean Kao <[email protected]>
  • Loading branch information
seankao-az authored Sep 8, 2023
1 parent a43c862 commit 0331d86
Show file tree
Hide file tree
Showing 5 changed files with 203 additions and 26 deletions.
43 changes: 27 additions & 16 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -226,25 +226,36 @@ In the index mapping, the `_meta` and `properties`field stores meta and schema i

The following table define the data type mapping between Flint data type and Spark data type.

| **FlintDataType** | **SparkDataType** |
|-------------------|----------------------------------|
| boolean | BooleanType |
| long | LongType |
| integer | IntegerType |
| short | ShortType |
| byte | ByteType |
| double | DoubleType |
| float | FloatType |
| date(Timestamp) | DateType |
| date(Date) | TimestampType |
| keyword | StringType |
| text | StringType(meta(osType)=text) |
| object | StructType |

* currently, Flint data type only support date. it is mapped to Spark Data Type based on the format:
| **FlintDataType** | **SparkDataType** |
|-------------------|-----------------------------------|
| boolean | BooleanType |
| long | LongType |
| integer | IntegerType |
| short | ShortType |
| byte | ByteType |
| double | DoubleType |
| float | FloatType |
| date(Timestamp) | DateType |
| date(Date) | TimestampType |
| keyword | StringType, VarcharType, CharType |
| text | StringType(meta(osType)=text) |
| object | StructType |

* Currently, Flint data type only support date. it is mapped to Spark Data Type based on the format:
* Map to DateType if format = strict_date, (we also support format = date, may change in future)
* Map to TimestampType if format = strict_date_optional_time_nanos, (we also support format =
strict_date_optional_time | epoch_millis, may change in future)
* Spark data types VarcharType(length) and CharType(length) are both currently mapped to Flint data
type *keyword*, dropping their length property. On the other hand, Flint data type *keyword* only
maps to StringType.

Unsupported Spark data types:
* DecimalType
* BinaryType
* YearMonthIntervalType
* DayTimeIntervalType
* ArrayType
* MapType

#### API

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ object FlintDataType {
case BooleanType => JObject("type" -> JString("boolean"))

// string
case StringType =>
case StringType | _: VarcharType | _: CharType =>
if (metadata.contains("osType") && metadata.getString("osType") == "text") {
JObject("type" -> JString("text"))
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,54 @@ class FlintDataTypeSuite extends FlintSuite with Matchers {
|}""".stripMargin)
}

test("spark varchar and char type serialize") {
val flintDataType = """{
| "properties": {
| "varcharField": {
| "type": "keyword"
| },
| "charField": {
| "type": "keyword"
| }
| }
|}""".stripMargin
val sparkStructType = StructType(
StructField("varcharField", VarcharType(20), true) ::
StructField("charField", CharType(20), true) ::
Nil)
FlintDataType.serialize(sparkStructType) shouldBe compactJson(flintDataType)
// flint data type should not deserialize to varchar or char
FlintDataType.deserialize(flintDataType) should contain theSameElementsAs StructType(
StructField("varcharField", StringType, true) ::
StructField("charField", StringType, true) ::
Nil)
}

test("spark varchar and char type with osType text serialize") {
val flintDataType =
"""{
| "properties": {
| "varcharTextField": {
| "type": "text"
| },
| "charTextField": {
| "type": "text"
| }
| }
|}""".stripMargin
val textMetadata = new MetadataBuilder().putString("osType", "text").build()
val sparkStructType = StructType(
StructField("varcharTextField", VarcharType(20), true, textMetadata) ::
StructField("charTextField", CharType(20), true, textMetadata) ::
Nil)
FlintDataType.serialize(sparkStructType) shouldBe compactJson(flintDataType)
// flint data type should not deserialize to varchar or char
FlintDataType.deserialize(flintDataType) should contain theSameElementsAs StructType(
StructField("varcharTextField", StringType, true, textMetadata) ::
StructField("charTextField", StringType, true, textMetadata) ::
Nil)
}

test("flint date type deserialize and serialize") {
val flintDataType = """{
| "properties": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,56 @@ class FlintSparkSkippingIndexSuite extends FlintSuite {

// TODO: test for osType "text"

test("can build index for varchar column") {
val indexCol = mock[FlintSparkSkippingStrategy]
when(indexCol.outputSchema()).thenReturn(Map("varchar_col" -> "varchar(20)"))
when(indexCol.getAggregators).thenReturn(Seq(CollectSet(col("varchar_col").expr)))

val index = new FlintSparkSkippingIndex("default.test", Seq(indexCol))
index.metadata().getContent should matchJson(
s"""{
| "_meta": {
| "kind": "skipping",
| "indexedColumns": [{}],
| "source": "default.test"
| },
| "properties": {
| "varchar_col": {
| "type": "keyword"
| },
| "file_path": {
| "type": "keyword"
| }
| }
| }
|""".stripMargin)
}

test("can build index for char column") {
val indexCol = mock[FlintSparkSkippingStrategy]
when(indexCol.outputSchema()).thenReturn(Map("char_col" -> "char(20)"))
when(indexCol.getAggregators).thenReturn(Seq(CollectSet(col("char_col").expr)))

val index = new FlintSparkSkippingIndex("default.test", Seq(indexCol))
index.metadata().getContent should matchJson(
s"""{
| "_meta": {
| "kind": "skipping",
| "indexedColumns": [{}],
| "source": "default.test"
| },
| "properties": {
| "char_col": {
| "type": "keyword"
| },
| "file_path": {
| "type": "keyword"
| }
| }
| }
|""".stripMargin)
}

test("can build index for long column") {
val indexCol = mock[FlintSparkSkippingStrategy]
when(indexCol.outputSchema()).thenReturn(Map("long_col" -> "bigint"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -389,14 +389,16 @@ class FlintSparkSkippingIndexITSuite

test("create skipping index for all supported data types successfully") {
// Prepare test table
val testDataTypeTable = "default.data_type_table"
val testDataTypeIndex = getSkippingIndexName(testDataTypeTable)
val testTable = "default.data_type_table"
val testIndex = getSkippingIndexName(testTable)
sql(
s"""
| CREATE TABLE $testDataTypeTable
| CREATE TABLE $testTable
| (
| boolean_col BOOLEAN,
| string_col STRING,
| varchar_col VARCHAR(20),
| char_col CHAR(20),
| long_col LONG,
| int_col INT,
| short_col SHORT,
Expand All @@ -411,10 +413,12 @@ class FlintSparkSkippingIndexITSuite
|""".stripMargin)
sql(
s"""
| INSERT INTO $testDataTypeTable
| INSERT INTO $testTable
| VALUES (
| TRUE,
| "sample string",
| "sample varchar",
| "sample char",
| 1L,
| 2,
| 3S,
Expand All @@ -424,15 +428,17 @@ class FlintSparkSkippingIndexITSuite
| TIMESTAMP "2023-08-09 17:24:40.322171",
| DATE "2023-08-09",
| STRUCT("subfieldValue1",123)
|)
| )
|""".stripMargin)

// Create index on all columns
flint
.skippingIndex()
.onTable(testDataTypeTable)
.onTable(testTable)
.addValueSet("boolean_col")
.addValueSet("string_col")
.addValueSet("varchar_col")
.addValueSet("char_col")
.addValueSet("long_col")
.addValueSet("int_col")
.addValueSet("short_col")
Expand All @@ -444,7 +450,7 @@ class FlintSparkSkippingIndexITSuite
.addValueSet("struct_col")
.create()

val index = flint.describeIndex(testDataTypeIndex)
val index = flint.describeIndex(testIndex)
index shouldBe defined
index.get.metadata().getContent should matchJson(
s"""{
Expand All @@ -463,6 +469,16 @@ class FlintSparkSkippingIndexITSuite
| },
| {
| "kind": "VALUE_SET",
| "columnName": "varchar_col",
| "columnType": "varchar(20)"
| },
| {
| "kind": "VALUE_SET",
| "columnName": "char_col",
| "columnType": "char(20)"
| },
| {
| "kind": "VALUE_SET",
| "columnName": "long_col",
| "columnType": "bigint"
| },
Expand Down Expand Up @@ -506,7 +522,7 @@ class FlintSparkSkippingIndexITSuite
| "columnName": "struct_col",
| "columnType": "struct<subfield1:string,subfield2:int>"
| }],
| "source": "$testDataTypeTable"
| "source": "$testTable"
| },
| "properties": {
| "boolean_col": {
Expand All @@ -515,6 +531,12 @@ class FlintSparkSkippingIndexITSuite
| "string_col": {
| "type": "keyword"
| },
| "varchar_col": {
| "type": "keyword"
| },
| "char_col": {
| "type": "keyword"
| },
| "long_col": {
| "type": "long"
| },
Expand Down Expand Up @@ -558,7 +580,53 @@ class FlintSparkSkippingIndexITSuite
| }
|""".stripMargin)

flint.deleteIndex(testDataTypeIndex)
flint.deleteIndex(testIndex)
}

test("can build skipping index for varchar and char and rewrite applicable query") {
val testTable = "default.varchar_char_table"
val testIndex = getSkippingIndexName(testTable)
sql(
s"""
| CREATE TABLE $testTable
| (
| varchar_col VARCHAR(20),
| char_col CHAR(20)
| )
| USING PARQUET
|""".stripMargin)
sql(
s"""
| INSERT INTO $testTable
| VALUES (
| "sample varchar",
| "sample char"
| )
|""".stripMargin)

flint
.skippingIndex()
.onTable(testTable)
.addValueSet("varchar_col")
.addValueSet("char_col")
.create()
flint.refreshIndex(testIndex, FULL)

val query = sql(
s"""
| SELECT varchar_col, char_col
| FROM $testTable
| WHERE varchar_col = "sample varchar" AND char_col = "sample char"
|""".stripMargin)

// CharType column is padded to a fixed length with whitespace
val paddedChar = "sample char".padTo(20, ' ')
checkAnswer(query, Row("sample varchar", paddedChar))
query.queryExecution.executedPlan should
useFlintSparkSkippingFileIndex(hasIndexFilter(
col("varchar_col") === "sample varchar" && col("char_col") === paddedChar))

flint.deleteIndex(testIndex)
}

// Custom matcher to check if a SparkPlan uses FlintSparkSkippingFileIndex
Expand Down

0 comments on commit 0331d86

Please sign in to comment.