Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DOP-18631] - add partial support for ArrayType #8

Merged
merged 7 commits into from
Aug 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 59 additions & 12 deletions docs/data_type_mappings.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,66 @@ This documentation outlines the customized mappings that the Spark Dialect Exten

#### Customized Type Mappings with Spark Dialect Extension

| ClickHouse Type (Read) | Spark Type | ClickHouse Type (Write) | ClickHouse Type (Create) |
|----------------------------|--------------------------------|-------------------------------|-----------------------------|
| `Int8` | `ByteType` | `Int8` | `Int8` |
| `Int16` | `ShortType` | `Int16` | `Int16` |
| `Datetime64(6)` | `TimestampType` | `Datetime64(6)` | `Datetime64(6)` |
| `Bool` | `BooleanType` | `Bool` | `Bool` |
| ClickHouse Type (Read) | Spark Type | ClickHouse Type (Write) | ClickHouse Type (Create) |
|------------------------|----------------------|-------------------------|--------------------------|
| `Bool` | `BooleanType` | `Bool` | `Bool` |
| `Int8` | `ByteType` | `Int8` | `Int8` |
| `Int16` | `ShortType` | `Int16` | `Int16` |
| `Int32` | `IntegerType` | `Int32` | `Int32` |
| `Int64` | `LongType` | `Int64` | `Int64` |
| `UInt8` | `ShortType` | `UInt8` | `UInt8` |
| `UInt16` | `IntegerType` | `UInt16` | `UInt16` |
| `UInt32` | `LongType` | `Int64` | `Int64` |
| `UInt64` | `DecimalType(20, 0)` | `Decimal(20, 0)` | `Decimal(20, 0)` |
| `Float32` | `FloatType` | `Float32` | `Float32` |
| `Float64` | `DoubleType` | `Float64` | `Float64` |
| `Decimal(M, N)` | `DecimalType(M, N)` | `Decimal(M, N)` | `Decimal(M, N)` |
| `Decimal32(N)` | `DecimalType(M, N)` | `Decimal32(M, N)` | `Decimal32(M, N)` |
| `Decimal64(N)` | `DecimalType(M, N)` | `Decimal64(M, N)` | `Decimal64(M, N)` |
| `Decimal128(N)` | `DecimalType(M, N)` | `Decimal128(M, N)` | `Decimal128(M, N)` |
| `Decimal256(N)` | unsupported | unsupported | unsupported |
| `DateTime` | `TimestampType` | `DateTime` | `DateTime` |
| `Datetime64(6)` | `TimestampType` | `Datetime64(6)` | `Datetime64(6)` |


``Array(T)`` `->` ``ArrayType(T)``:

| ClickHouse Type (Read) | Spark Type | ClickHouse Type (Write) | ClickHouse Type (Create) |
|------------------------|--------------------------------|-------------------------|--------------------------|
| `Array(String)` | `ArrayType(StringType)` | `Array(String)` | `Array(String)` |
| unsupported | `ArrayType(ByteType)` | `Array(Int8)` | `Array(Int8)` |
| unsupported | `ArrayType(ShortType)` | unsupported | unsupported |
| unsupported | `ArrayType(LongType)` | `Array(Int64)` | `Array(Int64)` |
| `Array(Decimal(M, N))` | `ArrayType(DecimalType(M, N))` | `Array(Decimal(M, N))` | `Array(Decimal(M, N))` |
| unsupported | `ArrayType(TimestampType)` | unsupported | unsupported |
| unsupported | `ArrayType(Date)` | `Array(Date)` | `Array(Date)` |
| unsupported | `ArrayType(FloatType)` | `Array(Float32)` | `Array(Float32)` |
| unsupported | `ArrayType(DoubleType)` | unsupported | unsupported |


#### Default Type Mappings without Spark Dialect Extension

| ClickHouse Type (Read) | Spark Type | ClickHouse Type (Write) | ClickHouse Type (Create) |
|----------------------------|--------------------------------|-------------------------------|-----------------------------|
| `Int8` | `IntegerType` | `Int32` | `Int32` |
| `Int16` | `IntegerType` | `Int32` | `Int32` |
| `Datetime64(6)` | `TimestampType` | `Datetime64(6)` | `DateTime32` |
| `Bool` | `BooleanType` | `Bool` | `UInt64` |
| ClickHouse Type (Read) | Spark Type | ClickHouse Type (Write) | ClickHouse Type (Create) |
|------------------------|----------------------|-------------------------|--------------------------|
| `Bool` | `BooleanType` | `Bool` | `UInt64` |
| `Int8` | `IntegerType` | `Int32` | `Int32` |
| `Int16` | `IntegerType` | `Int32` | `Int32` |
| `Int32` | `IntegerType` | `Int32` | `Int32` |
| `Int64` | `LongType` | `Int64` | `Int64` |
| `UInt8` | `IntegerType` | `UInt8` | `UInt8` |
| `UInt16` | `IntegerType` | `UInt16` | `UInt16` |
| `UInt32` | `DecimalType(20, 0)` | `Decimal(20, 0)` | `Decimal(20, 0)` |
| `UInt64` | `DecimalType(20, 0)` | `Decimal(20, 0)` | `Decimal(20, 0)` |
| `Float32` | `FloatType` | `Float32` | `Float32` |
| `Float64` | `DoubleType` | `Float64` | `Float64` |
| `Decimal(M, N)` | `DecimalType(M, N)` | `Decimal(M, N)` | `Decimal(M, N)` |
| `Decimal32(N)` | `DecimalType(M, N)` | `Decimal32(M, N)` | `Decimal32(M, N)` |
| `Decimal64(N)` | `DecimalType(M, N)` | `Decimal64(M, N)` | `Decimal64(M, N)` |
| `Decimal128(N)` | `DecimalType(M, N)` | `Decimal128(M, N)` | `Decimal128(M, N)` |
| `Decimal256(N)` | unsupported | unsupported | unsupported |
| `DateTime` | `TimestampType` | `DateTime` | `DateTime` |
| `Datetime64(6)` | `TimestampType` | `Datetime64(6)` | `DateTime32` |

``Array(T)`` `->` ``ArrayType(T)``:

**unsupported**
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
// SPDX-FileCopyrightText: 2021-2024 MTS (Mobile Telesystems)
//SPDX-License-Identifier: Apache-2.0
package ru.mts.doetl.sparkdialectextensions

import scala.util.matching.Regex
import org.apache.spark.sql.jdbc.{JdbcDialect, JdbcType}
import org.apache.spark.sql.execution.datasources.jdbc.{JdbcUtils}
import org.apache.spark.sql.types._
import org.slf4j.LoggerFactory
import java.sql.Types
Expand All @@ -11,41 +11,137 @@ private object ClickhouseDialectExtension extends JdbcDialect {

private val logger = LoggerFactory.getLogger(getClass)

private val arrayTypePattern: Regex = """^Array\((.*)\)$""".r
private val nullableTypePattern: Regex = """^Nullable\((.*)\)$""".r
private val dateTypePattern: Regex = """(?i)^Date$""".r
private val dateTimeTypePattern: Regex = """(?i)^DateTime(64)?(\((.*)\))?$""".r
private val decimalTypePattern: Regex = """(?i)^Decimal\((\d+),\s*(\d+)\)$""".r
private val decimalTypePattern2: Regex = """(?i)^Decimal(32|64|128|256)\((\d+)\)$""".r

override def canHandle(url: String): Boolean = {
url.startsWith("jdbc:clickhouse")
}

/**
* A mock method to demonstrate the retrieval of the Catalyst type based on JDBC metadata.
* A method to demonstrate the retrieval of the Catalyst type based on JDBC metadata.
*
* @param sqlType
* SQL type as integer
* @param typeName
* Name of the SQL type
* @param size
* Size of the type (not used in mock)
* Size of the type
* @param md
* MetadataBuilder for further metadata handling (not used in mock)
* MetadataBuilder for further metadata handling
* @return
* Always returns None in this mock
* The corresponding Catalyst data type.
*/
override def getCatalystType(
sqlType: Int,
typeName: String,
size: Int,
md: MetadataBuilder): Option[DataType] = (sqlType, typeName) match {
case (Types.TINYINT, "Int8") =>
logger.debug("Custom mapping applied: ByteType for 'Int8'")
Some(ByteType)
case (Types.SMALLINT, "Int16") =>
logger.debug("Custom mapping applied: ShortType for 'Int16'")
Some(ShortType)
case _ =>
logger.debug(
s"No custom JDBC type mapping for sqlType: $sqlType, typeName: $typeName, default driver mapping is used")
None
md: MetadataBuilder): Option[DataType] = {
val scale = md.build.getLong("scale").toInt
sqlType match {
case Types.ARRAY =>
unwrapNullable(typeName) match {
case (_, arrayTypePattern(nestType)) =>
// due to https://github.com/ClickHouse/clickhouse-java/issues/1754, spark is not able to read Arrays of
// any types except Decimal(...) and String
toCatalystType(Types.ARRAY, nestType, size, scale, md).map {
case (nullable, dataType) => ArrayType(dataType, nullable)
}
case _ => None
}
case _ => toCatalystType(sqlType, typeName, size, scale, md).map(_._2)
}
}

private def toCatalystType(
sqlType: Int,
typeName: String,
precision: Int,
scale: Int,
md: MetadataBuilder): Option[(Boolean, DataType)] = {
val (nullable, _typeName) = unwrapNullable(typeName)
val dataType = _typeName match {
case "String" =>
logger.debug(s"Custom mapping applied: StringType for '${_typeName}'")
Some(StringType)
case "Int8" =>
logger.debug(s"Custom mapping applied: ByteType for 'Int8'")
Some(ByteType)
case "UInt8" | "Int16" =>
logger.debug(s"Custom mapping applied: ShortType for '${_typeName}'")
Some(ShortType)
case "UInt16" | "Int32" =>
logger.debug(s"Custom mapping applied: IntegerType for '${_typeName}'")
Some(IntegerType)
case "UInt32" | "Int64" =>
logger.debug(s"Custom mapping applied: LongType for '${_typeName}'")
Some(LongType)
case "Int128" | "Int256" | "UInt256" =>
logger.debug(s"Type '${_typeName}' is not supported")
maxim-lixakov marked this conversation as resolved.
Show resolved Hide resolved
None
case "Float32" =>
logger.debug(s"Custom mapping applied: FloatType for 'Float32'")
Some(FloatType)
case "Float64" =>
logger.debug(s"Custom mapping applied: DoubleType for 'Float64'")
Some(DoubleType)
case dateTypePattern() =>
logger.debug(s"Custom mapping applied: DateType for '${_typeName}'")
Some(DateType)
case dateTimeTypePattern() =>
logger.debug(s"Custom mapping applied: TimestampType for '${_typeName}'")
Some(TimestampType)
case decimalTypePattern(precision, scale) =>
maxim-lixakov marked this conversation as resolved.
Show resolved Hide resolved
logger.debug(
s"Custom mapping applied: DecimalType($precision, $scale) for '${_typeName}'")
Some(DecimalType(precision.toInt, scale.toInt))
case decimalTypePattern2(w, scale) =>
w match {
case "32" =>
logger.debug(s"Custom mapping applied: DecimalType(9, $scale) for 'Decimal$w'")
Some(DecimalType(9, scale.toInt))
case "64" =>
logger.debug(s"Custom mapping applied: DecimalType(18, $scale) for 'Decimal$w'")
Some(DecimalType(18, scale.toInt))
case "128" =>
logger.debug(s"Custom mapping applied: DecimalType(38, $scale) for 'Decimal$w'")
Some(DecimalType(38, scale.toInt))
case "256" =>
logger.debug(s"Custom mapping applied: DecimalType(76, $scale) for 'Decimal$w'")
Some(
DecimalType(76, scale.toInt)
) // throw exception, spark support precision up to 38
}
case _ =>
logger.debug(
s"No custom mapping for typeName: ${_typeName}, default driver mapping is used")
None
}
dataType.map((nullable, _))
}

/**
* Unwraps nullable types to determine if the type is nullable and to retrieve the base type.
* This logic is copied from the Housepower project.
*
* @see
* https://github.com/housepower/ClickHouse-Native-JDBC
* @param maybeNullableTypeName
* The type name that may include Nullable.
* @return
* A tuple where the first element indicates if the type is nullable, and the second element
* is the base type.
*/
private def unwrapNullable(maybeNullableTypeName: String): (Boolean, String) =
maybeNullableTypeName match {
case nullableTypePattern(typeName) => (true, typeName)
case _ => (false, maybeNullableTypeName)
}

/**
* Retrieve the jdbc / sql type for a given datatype. Logging the usage of the dialect extension
* info.
Expand All @@ -64,6 +160,11 @@ private object ClickhouseDialectExtension extends JdbcDialect {
case TimestampType =>
logger.debug("Custom mapping applied: Datetime64(6) for 'TimestampType'")
Some(JdbcType("Datetime64(6)", Types.TIMESTAMP))
case ArrayType(et, _) =>
logger.debug("Custom mapping applied: Array[T_1] for ArrayType(T_0)")
getJDBCType(et)
.orElse(JdbcUtils.getCommonJDBCType(et))
.map(jdbcType => JdbcType(s"Array(${jdbcType.databaseTypeDefinition})", Types.ARRAY))
case _ =>
logger.debug(
s"No custom JDBC type mapping for DataType: ${dt.simpleString}, default driver mapping is used")
Expand Down
Loading
Loading