From 0331d867a7939d0dae3cd9469bc1898d4a31a3f9 Mon Sep 17 00:00:00 2001 From: Sean Kao Date: Fri, 8 Sep 2023 08:06:58 -0700 Subject: [PATCH] Support Spark data types VarcharType and CharType (#21) * IT: create skipping index for supported data types Signed-off-by: Sean Kao * UT: build index for supported data types Signed-off-by: Sean Kao * parse catalog data types to Spark StructType Signed-off-by: Sean Kao * Support Varchar and Char Signed-off-by: Sean Kao * Fix indentation Signed-off-by: Sean Kao * Update doc for VarcharType and CharType support Signed-off-by: Sean Kao * Fix IT: rename table Signed-off-by: Sean Kao * Update doc for unsupported types Signed-off-by: Sean Kao * Update quotes for style consistency Signed-off-by: Sean Kao * Map varchar|char with osType=text to text type Signed-off-by: Sean Kao --------- Signed-off-by: Sean Kao --- docs/index.md | 43 ++++++---- .../sql/flint/datatype/FlintDataType.scala | 2 +- .../flint/datatype/FlintDataTypeSuite.scala | 48 +++++++++++ .../FlintSparkSkippingIndexSuite.scala | 50 +++++++++++ .../FlintSparkSkippingIndexITSuite.scala | 86 +++++++++++++++++-- 5 files changed, 203 insertions(+), 26 deletions(-) diff --git a/docs/index.md b/docs/index.md index 6594d741b..40ea21892 100644 --- a/docs/index.md +++ b/docs/index.md @@ -226,25 +226,36 @@ In the index mapping, the `_meta` and `properties`field stores meta and schema i The following table define the data type mapping between Flint data type and Spark data type. -| **FlintDataType** | **SparkDataType** | -|-------------------|----------------------------------| -| boolean | BooleanType | -| long | LongType | -| integer | IntegerType | -| short | ShortType | -| byte | ByteType | -| double | DoubleType | -| float | FloatType | -| date(Timestamp) | DateType | -| date(Date) | TimestampType | -| keyword | StringType | -| text | StringType(meta(osType)=text) | -| object | StructType | - -* currently, Flint data type only support date. it is mapped to Spark Data Type based on the format: +| **FlintDataType** | **SparkDataType** | +|-------------------|-----------------------------------| +| boolean | BooleanType | +| long | LongType | +| integer | IntegerType | +| short | ShortType | +| byte | ByteType | +| double | DoubleType | +| float | FloatType | +| date(Timestamp) | DateType | +| date(Date) | TimestampType | +| keyword | StringType, VarcharType, CharType | +| text | StringType(meta(osType)=text) | +| object | StructType | + +* Currently, Flint data type only support date. it is mapped to Spark Data Type based on the format: * Map to DateType if format = strict_date, (we also support format = date, may change in future) * Map to TimestampType if format = strict_date_optional_time_nanos, (we also support format = strict_date_optional_time | epoch_millis, may change in future) +* Spark data types VarcharType(length) and CharType(length) are both currently mapped to Flint data + type *keyword*, dropping their length property. On the other hand, Flint data type *keyword* only + maps to StringType. + +Unsupported Spark data types: +* DecimalType +* BinaryType +* YearMonthIntervalType +* DayTimeIntervalType +* ArrayType +* MapType #### API diff --git a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/datatype/FlintDataType.scala b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/datatype/FlintDataType.scala index 3a736929f..414d8b61d 100644 --- a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/datatype/FlintDataType.scala +++ b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/datatype/FlintDataType.scala @@ -123,7 +123,7 @@ object FlintDataType { case BooleanType => JObject("type" -> JString("boolean")) // string - case StringType => + case StringType | _: VarcharType | _: CharType => if (metadata.contains("osType") && metadata.getString("osType") == "text") { JObject("type" -> JString("text")) } else { diff --git a/flint-spark-integration/src/test/scala/org/apache/spark/sql/flint/datatype/FlintDataTypeSuite.scala b/flint-spark-integration/src/test/scala/org/apache/spark/sql/flint/datatype/FlintDataTypeSuite.scala index ee9c5cd70..b2b6adf81 100644 --- a/flint-spark-integration/src/test/scala/org/apache/spark/sql/flint/datatype/FlintDataTypeSuite.scala +++ b/flint-spark-integration/src/test/scala/org/apache/spark/sql/flint/datatype/FlintDataTypeSuite.scala @@ -123,6 +123,54 @@ class FlintDataTypeSuite extends FlintSuite with Matchers { |}""".stripMargin) } + test("spark varchar and char type serialize") { + val flintDataType = """{ + | "properties": { + | "varcharField": { + | "type": "keyword" + | }, + | "charField": { + | "type": "keyword" + | } + | } + |}""".stripMargin + val sparkStructType = StructType( + StructField("varcharField", VarcharType(20), true) :: + StructField("charField", CharType(20), true) :: + Nil) + FlintDataType.serialize(sparkStructType) shouldBe compactJson(flintDataType) + // flint data type should not deserialize to varchar or char + FlintDataType.deserialize(flintDataType) should contain theSameElementsAs StructType( + StructField("varcharField", StringType, true) :: + StructField("charField", StringType, true) :: + Nil) + } + + test("spark varchar and char type with osType text serialize") { + val flintDataType = + """{ + | "properties": { + | "varcharTextField": { + | "type": "text" + | }, + | "charTextField": { + | "type": "text" + | } + | } + |}""".stripMargin + val textMetadata = new MetadataBuilder().putString("osType", "text").build() + val sparkStructType = StructType( + StructField("varcharTextField", VarcharType(20), true, textMetadata) :: + StructField("charTextField", CharType(20), true, textMetadata) :: + Nil) + FlintDataType.serialize(sparkStructType) shouldBe compactJson(flintDataType) + // flint data type should not deserialize to varchar or char + FlintDataType.deserialize(flintDataType) should contain theSameElementsAs StructType( + StructField("varcharTextField", StringType, true, textMetadata) :: + StructField("charTextField", StringType, true, textMetadata) :: + Nil) + } + test("flint date type deserialize and serialize") { val flintDataType = """{ | "properties": { diff --git a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndexSuite.scala b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndexSuite.scala index e95ac6f05..da362d19a 100644 --- a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndexSuite.scala +++ b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndexSuite.scala @@ -87,6 +87,56 @@ class FlintSparkSkippingIndexSuite extends FlintSuite { // TODO: test for osType "text" + test("can build index for varchar column") { + val indexCol = mock[FlintSparkSkippingStrategy] + when(indexCol.outputSchema()).thenReturn(Map("varchar_col" -> "varchar(20)")) + when(indexCol.getAggregators).thenReturn(Seq(CollectSet(col("varchar_col").expr))) + + val index = new FlintSparkSkippingIndex("default.test", Seq(indexCol)) + index.metadata().getContent should matchJson( + s"""{ + | "_meta": { + | "kind": "skipping", + | "indexedColumns": [{}], + | "source": "default.test" + | }, + | "properties": { + | "varchar_col": { + | "type": "keyword" + | }, + | "file_path": { + | "type": "keyword" + | } + | } + | } + |""".stripMargin) + } + + test("can build index for char column") { + val indexCol = mock[FlintSparkSkippingStrategy] + when(indexCol.outputSchema()).thenReturn(Map("char_col" -> "char(20)")) + when(indexCol.getAggregators).thenReturn(Seq(CollectSet(col("char_col").expr))) + + val index = new FlintSparkSkippingIndex("default.test", Seq(indexCol)) + index.metadata().getContent should matchJson( + s"""{ + | "_meta": { + | "kind": "skipping", + | "indexedColumns": [{}], + | "source": "default.test" + | }, + | "properties": { + | "char_col": { + | "type": "keyword" + | }, + | "file_path": { + | "type": "keyword" + | } + | } + | } + |""".stripMargin) + } + test("can build index for long column") { val indexCol = mock[FlintSparkSkippingStrategy] when(indexCol.outputSchema()).thenReturn(Map("long_col" -> "bigint")) diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala index 7f139e539..f867a99bb 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala @@ -389,14 +389,16 @@ class FlintSparkSkippingIndexITSuite test("create skipping index for all supported data types successfully") { // Prepare test table - val testDataTypeTable = "default.data_type_table" - val testDataTypeIndex = getSkippingIndexName(testDataTypeTable) + val testTable = "default.data_type_table" + val testIndex = getSkippingIndexName(testTable) sql( s""" - | CREATE TABLE $testDataTypeTable + | CREATE TABLE $testTable | ( | boolean_col BOOLEAN, | string_col STRING, + | varchar_col VARCHAR(20), + | char_col CHAR(20), | long_col LONG, | int_col INT, | short_col SHORT, @@ -411,10 +413,12 @@ class FlintSparkSkippingIndexITSuite |""".stripMargin) sql( s""" - | INSERT INTO $testDataTypeTable + | INSERT INTO $testTable | VALUES ( | TRUE, | "sample string", + | "sample varchar", + | "sample char", | 1L, | 2, | 3S, @@ -424,15 +428,17 @@ class FlintSparkSkippingIndexITSuite | TIMESTAMP "2023-08-09 17:24:40.322171", | DATE "2023-08-09", | STRUCT("subfieldValue1",123) - |) + | ) |""".stripMargin) // Create index on all columns flint .skippingIndex() - .onTable(testDataTypeTable) + .onTable(testTable) .addValueSet("boolean_col") .addValueSet("string_col") + .addValueSet("varchar_col") + .addValueSet("char_col") .addValueSet("long_col") .addValueSet("int_col") .addValueSet("short_col") @@ -444,7 +450,7 @@ class FlintSparkSkippingIndexITSuite .addValueSet("struct_col") .create() - val index = flint.describeIndex(testDataTypeIndex) + val index = flint.describeIndex(testIndex) index shouldBe defined index.get.metadata().getContent should matchJson( s"""{ @@ -463,6 +469,16 @@ class FlintSparkSkippingIndexITSuite | }, | { | "kind": "VALUE_SET", + | "columnName": "varchar_col", + | "columnType": "varchar(20)" + | }, + | { + | "kind": "VALUE_SET", + | "columnName": "char_col", + | "columnType": "char(20)" + | }, + | { + | "kind": "VALUE_SET", | "columnName": "long_col", | "columnType": "bigint" | }, @@ -506,7 +522,7 @@ class FlintSparkSkippingIndexITSuite | "columnName": "struct_col", | "columnType": "struct" | }], - | "source": "$testDataTypeTable" + | "source": "$testTable" | }, | "properties": { | "boolean_col": { @@ -515,6 +531,12 @@ class FlintSparkSkippingIndexITSuite | "string_col": { | "type": "keyword" | }, + | "varchar_col": { + | "type": "keyword" + | }, + | "char_col": { + | "type": "keyword" + | }, | "long_col": { | "type": "long" | }, @@ -558,7 +580,53 @@ class FlintSparkSkippingIndexITSuite | } |""".stripMargin) - flint.deleteIndex(testDataTypeIndex) + flint.deleteIndex(testIndex) + } + + test("can build skipping index for varchar and char and rewrite applicable query") { + val testTable = "default.varchar_char_table" + val testIndex = getSkippingIndexName(testTable) + sql( + s""" + | CREATE TABLE $testTable + | ( + | varchar_col VARCHAR(20), + | char_col CHAR(20) + | ) + | USING PARQUET + |""".stripMargin) + sql( + s""" + | INSERT INTO $testTable + | VALUES ( + | "sample varchar", + | "sample char" + | ) + |""".stripMargin) + + flint + .skippingIndex() + .onTable(testTable) + .addValueSet("varchar_col") + .addValueSet("char_col") + .create() + flint.refreshIndex(testIndex, FULL) + + val query = sql( + s""" + | SELECT varchar_col, char_col + | FROM $testTable + | WHERE varchar_col = "sample varchar" AND char_col = "sample char" + |""".stripMargin) + + // CharType column is padded to a fixed length with whitespace + val paddedChar = "sample char".padTo(20, ' ') + checkAnswer(query, Row("sample varchar", paddedChar)) + query.queryExecution.executedPlan should + useFlintSparkSkippingFileIndex(hasIndexFilter( + col("varchar_col") === "sample varchar" && col("char_col") === paddedChar)) + + flint.deleteIndex(testIndex) } // Custom matcher to check if a SparkPlan uses FlintSparkSkippingFileIndex