Skip to content

Commit

Permalink
[SPARK-42916][SQL] JDBCTableCatalog Keeps Char/Varchar meta on the re…
Browse files Browse the repository at this point in the history
…ad-side

### What changes were proposed in this pull request?

In this PR, we make the JDBCTableCatalog mapping the Char/Varchar to the raw implementation to avoid losing meta information.

### Why are the changes needed?

For some DDLs related to column updating, the raw types are needed.

Otherwise, you may get string->varchar/char casting errors according to the underlying database.

### Does this PR introduce _any_ user-facing change?

yes, you can create a table with a varchar column and increase its width. But w/o this PR, you got error

### How was this patch tested?

new unit tests

Closes apache#40543 from yaooqinn/SPARK-42916.

Authored-by: Kent Yao <[email protected]>
Signed-off-by: Kent Yao <[email protected]>
  • Loading branch information
yaooqinn committed Apr 13, 2023
1 parent c01dad4 commit 0f8218c
Show file tree
Hide file tree
Showing 7 changed files with 55 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ class MySQLIntegrationSuite extends DockerJDBCIntegrationSuite {
assert(types(6).equals("class [B"))
assert(types(7).equals("class [B"))
assert(types(8).equals("class [B"))
assert(rows(0).getString(0).equals("the"))
assert(rows(0).getString(0).equals("the".padTo(10, ' ')))
assert(rows(0).getString(1).equals("quick"))
assert(rows(0).getString(2).equals("brown"))
assert(rows(0).getString(3).equals("fox"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,13 +122,15 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationSuite {
).executeUpdate()

conn.prepareStatement("CREATE TABLE char_types (" +
"c0 char(4), c1 character(4), c2 character varying(4), c3 varchar(4), c4 bpchar)"
"c0 char(4), c1 character(4), c2 character varying(4), c3 varchar(4), c4 bpchar(1))"
).executeUpdate()
conn.prepareStatement("INSERT INTO char_types VALUES " +
"('abcd', 'efgh', 'ijkl', 'mnop', 'q')").executeUpdate()

// SPARK-42916: character/char/bpchar w/o length specifier defaults to int max value, this will
// cause OOM as it will be padded with ' ' to 2147483647.
conn.prepareStatement("CREATE TABLE char_array_types (" +
"c0 char(4)[], c1 character(4)[], c2 character varying(4)[], c3 varchar(4)[], c4 bpchar[])"
"c0 char(4)[], c1 character(4)[], c2 character varying(4)[], c3 varchar(4)[], c4 bpchar(1)[])"
).executeUpdate()
conn.prepareStatement("INSERT INTO char_array_types VALUES " +
"""('{"a", "bcd"}', '{"ef", "gh"}', '{"i", "j", "kl"}', '{"mnop"}', '{"q", "r"}')"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import org.apache.spark.sql.catalyst.analysis.Resolver
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils, GenericArrayData}
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, CharVarcharUtils, DateTimeUtils, GenericArrayData}
import org.apache.spark.sql.catalyst.util.DateTimeUtils.{instantToMicros, localDateTimeToMicros, localDateToDays, toJavaDate, toJavaTimestamp, toJavaTimestampNoRebase}
import org.apache.spark.sql.connector.catalog.{Identifier, TableChange}
import org.apache.spark.sql.connector.catalog.index.{SupportsIndex, TableIndex}
Expand Down Expand Up @@ -187,7 +187,7 @@ object JdbcUtils extends Logging with SQLConfHelper {
case java.sql.Types.BIT => BooleanType // @see JdbcDialect for quirks
case java.sql.Types.BLOB => BinaryType
case java.sql.Types.BOOLEAN => BooleanType
case java.sql.Types.CHAR => StringType
case java.sql.Types.CHAR => CharType(precision)
case java.sql.Types.CLOB => StringType
case java.sql.Types.DATE => DateType
case java.sql.Types.DECIMAL if precision != 0 || scale != 0 =>
Expand Down Expand Up @@ -216,7 +216,7 @@ object JdbcUtils extends Logging with SQLConfHelper {
case java.sql.Types.TIMESTAMP => TimestampType
case java.sql.Types.TINYINT => IntegerType
case java.sql.Types.VARBINARY => BinaryType
case java.sql.Types.VARCHAR => StringType
case java.sql.Types.VARCHAR => VarcharType(precision)
case _ =>
// For unmatched types:
// including java.sql.Types.ARRAY,DATALINK,DISTINCT,JAVA_OBJECT,NULL,OTHER,REF_CURSOR,
Expand Down Expand Up @@ -310,7 +310,7 @@ object JdbcUtils extends Logging with SQLConfHelper {
fields(i) = StructField(columnName, columnType, nullable, metadata.build())
i = i + 1
}
new StructType(fields)
CharVarcharUtils.replaceCharVarcharWithStringInSchema(new StructType(fields))
}

/**
Expand Down Expand Up @@ -368,8 +368,10 @@ object JdbcUtils extends Logging with SQLConfHelper {
* Creates `JDBCValueGetter`s according to [[StructType]], which can set
* each value from `ResultSet` to each field of [[InternalRow]] correctly.
*/
private def makeGetters(schema: StructType): Array[JDBCValueGetter] =
schema.fields.map(sf => makeGetter(sf.dataType, sf.metadata))
private def makeGetters(schema: StructType): Array[JDBCValueGetter] = {
val replaced = CharVarcharUtils.replaceCharVarcharWithStringInSchema(schema)
replaced.fields.map(sf => makeGetter(sf.dataType, sf.metadata))
}

private def makeGetter(dt: DataType, metadata: Metadata): JDBCValueGetter = dt match {
case BooleanType =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,9 @@ private case object MySQLDialect extends JdbcDialect with SQLConfHelper {
Option(LongType)
} else if (sqlType == Types.BIT && typeName.equals("TINYINT")) {
Option(BooleanType)
} else if ("TINYTEXT".equalsIgnoreCase(typeName)) {
// TINYTEXT is Types.VARCHAR(63) from mysql jdbc, but keep it AS-IS for historical reason
Some(StringType)
} else None
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ private object PostgresDialect extends JdbcDialect with SQLConfHelper {
Some(StringType)
} else if (sqlType == Types.OTHER) {
Some(StringType)
} else if ("text".equalsIgnoreCase(typeName)) {
Some(StringType) // sqlType is Types.VARCHAR
} else if (sqlType == Types.ARRAY) {
val scale = md.build.getLong("scale").toInt
// postgres array type names start with underscore
Expand All @@ -78,7 +80,9 @@ private object PostgresDialect extends JdbcDialect with SQLConfHelper {
case "int8" | "oid" => Some(LongType)
case "float4" => Some(FloatType)
case "float8" => Some(DoubleType)
case "text" | "varchar" | "char" | "bpchar" | "cidr" | "inet" | "json" | "jsonb" | "uuid" |
case "varchar" => Some(VarcharType(precision))
case "char" | "bpchar" => Some(CharType(precision))
case "text" | "cidr" | "inet" | "json" | "jsonb" | "uuid" |
"xml" | "tsvector" | "tsquery" | "macaddr" | "macaddr8" | "txid_snapshot" | "point" |
"line" | "lseg" | "box" | "path" | "polygon" | "circle" | "pg_lsn" | "varbit" |
"interval" | "pg_snapshot" =>
Expand All @@ -99,7 +103,7 @@ private object PostgresDialect extends JdbcDialect with SQLConfHelper {
}

override def getJDBCType(dt: DataType): Option[JdbcType] = dt match {
case StringType => Some(JdbcType("TEXT", Types.CHAR))
case StringType => Some(JdbcType("TEXT", Types.VARCHAR))
case BinaryType => Some(JdbcType("BYTEA", Types.BINARY))
case BooleanType => Some(JdbcType("BOOLEAN", Types.BOOLEAN))
case FloatType => Some(JdbcType("FLOAT4", Types.FLOAT))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.apache.spark.{SparkConf, SparkIllegalArgumentException}
import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
import org.apache.spark.sql.catalyst.analysis.{NoSuchNamespaceException, TableAlreadyExistsException}
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.catalyst.util.CharVarcharUtils
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types._
Expand Down Expand Up @@ -136,9 +137,9 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession {
test("load a table") {
val t = spark.table("h2.test.people")
val expectedSchema = new StructType()
.add("NAME", StringType, true, defaultMetadata)
.add("NAME", VarcharType(32), true, defaultMetadata)
.add("ID", IntegerType, true, defaultMetadata)
assert(t.schema === expectedSchema)
assert(t.schema === CharVarcharUtils.replaceCharVarcharWithStringInSchema(expectedSchema))
Seq(
"h2.test.not_existing_table" -> "`h2`.`test`.`not_existing_table`",
"h2.bad_test.not_existing_table" -> "`h2`.`bad_test`.`not_existing_table`"
Expand Down Expand Up @@ -477,4 +478,18 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession {
parameters = Map("catalogString" -> "array<int>")
)
}

test("SPARK-42916: Keep Char/Varchar meta information on the read-side") {
val tableName = "h2.test.alt_table"
withTable(tableName) {
sql(s"CREATE TABLE $tableName (ID CHAR(10), deptno VARCHAR(20))")
sql(s"ALTER TABLE $tableName ALTER COLUMN deptno TYPE VARCHAR(30)")
val t = spark.table(tableName)
val expected = new StructType()
.add("ID", CharType(10), true, defaultMetadata)
.add("deptno", VarcharType(30), true, defaultMetadata)
val replaced = CharVarcharUtils.replaceCharVarcharWithStringInSchema(expected)
assert(t.schema === replaced)
}
}
}
27 changes: 16 additions & 11 deletions sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, Row}
import org.apache.spark.sql.catalyst.{analysis, TableIdentifier}
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.catalyst.plans.logical.ShowCreateTable
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeTestUtils}
import org.apache.spark.sql.execution.{DataSourceScanExec, ExtendedMode}
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, CharVarcharUtils, DateTimeTestUtils}
import org.apache.spark.sql.execution.{DataSourceScanExec, ExtendedMode, ProjectExec}
import org.apache.spark.sql.execution.command.{ExplainCommand, ShowCreateTableCommand}
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JDBCPartition, JDBCRelation, JdbcUtils}
Expand Down Expand Up @@ -315,8 +315,12 @@ class JDBCSuite extends QueryTest with SharedSparkSession {
// the plan only has PhysicalRDD to scan JDBCRelation.
assert(parentPlan.isInstanceOf[org.apache.spark.sql.execution.WholeStageCodegenExec])
val node = parentPlan.asInstanceOf[org.apache.spark.sql.execution.WholeStageCodegenExec]
assert(node.child.isInstanceOf[org.apache.spark.sql.execution.DataSourceScanExec])
assert(node.child.asInstanceOf[DataSourceScanExec].nodeName.contains("JDBCRelation"))
val child = node.child match {
case ProjectExec(_, c) => c
case o => o
}
assert(child.isInstanceOf[org.apache.spark.sql.execution.DataSourceScanExec])
assert(child.asInstanceOf[DataSourceScanExec].nodeName.contains("JDBCRelation"))
df
}

Expand Down Expand Up @@ -924,7 +928,7 @@ class JDBCSuite extends QueryTest with SharedSparkSession {
assert(Postgres.getCatalystType(java.sql.Types.ARRAY, "_numeric", 0, md) ==
Some(ArrayType(DecimalType.SYSTEM_DEFAULT)))
assert(Postgres.getCatalystType(java.sql.Types.ARRAY, "_bpchar", 64, md) ==
Some(ArrayType(StringType)))
Some(ArrayType(CharType(64))))
assert(Postgres.getJDBCType(FloatType).map(_.databaseTypeDefinition).get == "FLOAT4")
assert(Postgres.getJDBCType(DoubleType).map(_.databaseTypeDefinition).get == "FLOAT8")
assert(Postgres.getJDBCType(ByteType).map(_.databaseTypeDefinition).get == "SMALLINT")
Expand Down Expand Up @@ -1373,20 +1377,20 @@ class JDBCSuite extends QueryTest with SharedSparkSession {

test("jdbc API support custom schema") {
val parts = Array[String]("THEID < 2", "THEID >= 2")
val customSchema = "NAME STRING, THEID INT"
val customSchema = "NAME VARCHAR(32), THEID INT"
val props = new Properties()
props.put("customSchema", customSchema)
val df = spark.read.jdbc(urlWithUserAndPass, "TEST.PEOPLE", parts, props)
assert(df.schema.size === 2)
val expectedSchema = new StructType(CatalystSqlParser.parseTableSchema(customSchema).map(
f => StructField(f.name, f.dataType, f.nullable, defaultMetadata)).toArray)
assert(df.schema === expectedSchema)
assert(df.schema === CharVarcharUtils.replaceCharVarcharWithStringInSchema(expectedSchema))
assert(df.count() === 3)
}

test("jdbc API custom schema DDL-like strings.") {
withTempView("people_view") {
val customSchema = "NAME STRING, THEID INT"
val customSchema = "NAME VARCHAR(32), THEID INT"
sql(
s"""
|CREATE TEMPORARY VIEW people_view
Expand All @@ -1398,7 +1402,8 @@ class JDBCSuite extends QueryTest with SharedSparkSession {
assert(df.schema.length === 2)
val expectedSchema = new StructType(CatalystSqlParser.parseTableSchema(customSchema)
.map(f => StructField(f.name, f.dataType, f.nullable, defaultMetadata)).toArray)
assert(df.schema === expectedSchema)

assert(df.schema === CharVarcharUtils.replaceCharVarcharWithStringInSchema(expectedSchema))
assert(df.count() === 3)
}
}
Expand Down Expand Up @@ -1543,9 +1548,9 @@ class JDBCSuite extends QueryTest with SharedSparkSession {
}

test("jdbc data source shouldn't have unnecessary metadata in its schema") {
val schema = StructType(Seq(StructField("NAME", StringType, true, defaultMetadata),
var schema = StructType(Seq(StructField("NAME", VarcharType(32), true, defaultMetadata),
StructField("THEID", IntegerType, true, defaultMetadata)))

schema = CharVarcharUtils.replaceCharVarcharWithStringInSchema(schema)
val df = spark.read.format("jdbc")
.option("Url", urlWithUserAndPass)
.option("DbTaBle", "TEST.PEOPLE")
Expand Down

0 comments on commit 0f8218c

Please sign in to comment.