From 0f8218c4324e0a901a21a9076c6c3d80f5c0ecea Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Thu, 13 Apr 2023 13:30:35 +0800 Subject: [PATCH] [SPARK-42916][SQL] JDBCTableCatalog Keeps Char/Varchar meta on the read-side ### What changes were proposed in this pull request? In this PR, we make the JDBCTableCatalog mapping the Char/Varchar to the raw implementation to avoid losing meta information. ### Why are the changes needed? For some DDLs related to column updating, the raw types are needed. Otherwise, you may get string->varchar/char casting errors according to the underlying database. ### Does this PR introduce _any_ user-facing change? yes, you can create a table with a varchar column and increase its width. But w/o this PR, you got error ### How was this patch tested? new unit tests Closes #40543 from yaooqinn/SPARK-42916. Authored-by: Kent Yao Signed-off-by: Kent Yao --- .../sql/jdbc/MySQLIntegrationSuite.scala | 2 +- .../sql/jdbc/PostgresIntegrationSuite.scala | 6 +++-- .../datasources/jdbc/JdbcUtils.scala | 14 +++++----- .../apache/spark/sql/jdbc/MySQLDialect.scala | 3 +++ .../spark/sql/jdbc/PostgresDialect.scala | 8 ++++-- .../v2/jdbc/JDBCTableCatalogSuite.scala | 19 +++++++++++-- .../org/apache/spark/sql/jdbc/JDBCSuite.scala | 27 +++++++++++-------- 7 files changed, 55 insertions(+), 24 deletions(-) diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala index bc202b1b8323e..c5ca5a72a83ba 100644 --- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala +++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala @@ -147,7 +147,7 @@ class MySQLIntegrationSuite extends DockerJDBCIntegrationSuite { assert(types(6).equals("class [B")) assert(types(7).equals("class [B")) assert(types(8).equals("class [B")) - assert(rows(0).getString(0).equals("the")) + assert(rows(0).getString(0).equals("the".padTo(10, ' '))) assert(rows(0).getString(1).equals("quick")) assert(rows(0).getString(2).equals("brown")) assert(rows(0).getString(3).equals("fox")) diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala index d3229ba50eca3..ff5127ce350f5 100644 --- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala +++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala @@ -122,13 +122,15 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationSuite { ).executeUpdate() conn.prepareStatement("CREATE TABLE char_types (" + - "c0 char(4), c1 character(4), c2 character varying(4), c3 varchar(4), c4 bpchar)" + "c0 char(4), c1 character(4), c2 character varying(4), c3 varchar(4), c4 bpchar(1))" ).executeUpdate() conn.prepareStatement("INSERT INTO char_types VALUES " + "('abcd', 'efgh', 'ijkl', 'mnop', 'q')").executeUpdate() + // SPARK-42916: character/char/bpchar w/o length specifier defaults to int max value, this will + // cause OOM as it will be padded with ' ' to 2147483647. conn.prepareStatement("CREATE TABLE char_array_types (" + - "c0 char(4)[], c1 character(4)[], c2 character varying(4)[], c3 varchar(4)[], c4 bpchar[])" + "c0 char(4)[], c1 character(4)[], c2 character varying(4)[], c3 varchar(4)[], c4 bpchar(1)[])" ).executeUpdate() conn.prepareStatement("INSERT INTO char_array_types VALUES " + """('{"a", "bcd"}', '{"ef", "gh"}', '{"i", "j", "kl"}', '{"mnop"}', '{"q", "r"}')""" diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index c1d162d1478a7..fe53ba91d9592 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -37,7 +37,7 @@ import org.apache.spark.sql.catalyst.analysis.Resolver import org.apache.spark.sql.catalyst.encoders.RowEncoder import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow import org.apache.spark.sql.catalyst.parser.CatalystSqlParser -import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils, GenericArrayData} +import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, CharVarcharUtils, DateTimeUtils, GenericArrayData} import org.apache.spark.sql.catalyst.util.DateTimeUtils.{instantToMicros, localDateTimeToMicros, localDateToDays, toJavaDate, toJavaTimestamp, toJavaTimestampNoRebase} import org.apache.spark.sql.connector.catalog.{Identifier, TableChange} import org.apache.spark.sql.connector.catalog.index.{SupportsIndex, TableIndex} @@ -187,7 +187,7 @@ object JdbcUtils extends Logging with SQLConfHelper { case java.sql.Types.BIT => BooleanType // @see JdbcDialect for quirks case java.sql.Types.BLOB => BinaryType case java.sql.Types.BOOLEAN => BooleanType - case java.sql.Types.CHAR => StringType + case java.sql.Types.CHAR => CharType(precision) case java.sql.Types.CLOB => StringType case java.sql.Types.DATE => DateType case java.sql.Types.DECIMAL if precision != 0 || scale != 0 => @@ -216,7 +216,7 @@ object JdbcUtils extends Logging with SQLConfHelper { case java.sql.Types.TIMESTAMP => TimestampType case java.sql.Types.TINYINT => IntegerType case java.sql.Types.VARBINARY => BinaryType - case java.sql.Types.VARCHAR => StringType + case java.sql.Types.VARCHAR => VarcharType(precision) case _ => // For unmatched types: // including java.sql.Types.ARRAY,DATALINK,DISTINCT,JAVA_OBJECT,NULL,OTHER,REF_CURSOR, @@ -310,7 +310,7 @@ object JdbcUtils extends Logging with SQLConfHelper { fields(i) = StructField(columnName, columnType, nullable, metadata.build()) i = i + 1 } - new StructType(fields) + CharVarcharUtils.replaceCharVarcharWithStringInSchema(new StructType(fields)) } /** @@ -368,8 +368,10 @@ object JdbcUtils extends Logging with SQLConfHelper { * Creates `JDBCValueGetter`s according to [[StructType]], which can set * each value from `ResultSet` to each field of [[InternalRow]] correctly. */ - private def makeGetters(schema: StructType): Array[JDBCValueGetter] = - schema.fields.map(sf => makeGetter(sf.dataType, sf.metadata)) + private def makeGetters(schema: StructType): Array[JDBCValueGetter] = { + val replaced = CharVarcharUtils.replaceCharVarcharWithStringInSchema(schema) + replaced.fields.map(sf => makeGetter(sf.dataType, sf.metadata)) + } private def makeGetter(dt: DataType, metadata: Metadata): JDBCValueGetter = dt match { case BooleanType => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala index 11305dbde4254..5e85ff3ebf633 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala @@ -95,6 +95,9 @@ private case object MySQLDialect extends JdbcDialect with SQLConfHelper { Option(LongType) } else if (sqlType == Types.BIT && typeName.equals("TINYINT")) { Option(BooleanType) + } else if ("TINYTEXT".equalsIgnoreCase(typeName)) { + // TINYTEXT is Types.VARCHAR(63) from mysql jdbc, but keep it AS-IS for historical reason + Some(StringType) } else None } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala index fd9e20fc419b4..b53a0e66ba752 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala @@ -60,6 +60,8 @@ private object PostgresDialect extends JdbcDialect with SQLConfHelper { Some(StringType) } else if (sqlType == Types.OTHER) { Some(StringType) + } else if ("text".equalsIgnoreCase(typeName)) { + Some(StringType) // sqlType is Types.VARCHAR } else if (sqlType == Types.ARRAY) { val scale = md.build.getLong("scale").toInt // postgres array type names start with underscore @@ -78,7 +80,9 @@ private object PostgresDialect extends JdbcDialect with SQLConfHelper { case "int8" | "oid" => Some(LongType) case "float4" => Some(FloatType) case "float8" => Some(DoubleType) - case "text" | "varchar" | "char" | "bpchar" | "cidr" | "inet" | "json" | "jsonb" | "uuid" | + case "varchar" => Some(VarcharType(precision)) + case "char" | "bpchar" => Some(CharType(precision)) + case "text" | "cidr" | "inet" | "json" | "jsonb" | "uuid" | "xml" | "tsvector" | "tsquery" | "macaddr" | "macaddr8" | "txid_snapshot" | "point" | "line" | "lseg" | "box" | "path" | "polygon" | "circle" | "pg_lsn" | "varbit" | "interval" | "pg_snapshot" => @@ -99,7 +103,7 @@ private object PostgresDialect extends JdbcDialect with SQLConfHelper { } override def getJDBCType(dt: DataType): Option[JdbcType] = dt match { - case StringType => Some(JdbcType("TEXT", Types.CHAR)) + case StringType => Some(JdbcType("TEXT", Types.VARCHAR)) case BinaryType => Some(JdbcType("BYTEA", Types.BINARY)) case BooleanType => Some(JdbcType("BOOLEAN", Types.BOOLEAN)) case FloatType => Some(JdbcType("FLOAT4", Types.FLOAT)) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala index d2f3d3d540512..60bc4a14b3640 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala @@ -25,6 +25,7 @@ import org.apache.spark.{SparkConf, SparkIllegalArgumentException} import org.apache.spark.sql.{AnalysisException, QueryTest, Row} import org.apache.spark.sql.catalyst.analysis.{NoSuchNamespaceException, TableAlreadyExistsException} import org.apache.spark.sql.catalyst.parser.ParseException +import org.apache.spark.sql.catalyst.util.CharVarcharUtils import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types._ @@ -136,9 +137,9 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession { test("load a table") { val t = spark.table("h2.test.people") val expectedSchema = new StructType() - .add("NAME", StringType, true, defaultMetadata) + .add("NAME", VarcharType(32), true, defaultMetadata) .add("ID", IntegerType, true, defaultMetadata) - assert(t.schema === expectedSchema) + assert(t.schema === CharVarcharUtils.replaceCharVarcharWithStringInSchema(expectedSchema)) Seq( "h2.test.not_existing_table" -> "`h2`.`test`.`not_existing_table`", "h2.bad_test.not_existing_table" -> "`h2`.`bad_test`.`not_existing_table`" @@ -477,4 +478,18 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession { parameters = Map("catalogString" -> "array") ) } + + test("SPARK-42916: Keep Char/Varchar meta information on the read-side") { + val tableName = "h2.test.alt_table" + withTable(tableName) { + sql(s"CREATE TABLE $tableName (ID CHAR(10), deptno VARCHAR(20))") + sql(s"ALTER TABLE $tableName ALTER COLUMN deptno TYPE VARCHAR(30)") + val t = spark.table(tableName) + val expected = new StructType() + .add("ID", CharType(10), true, defaultMetadata) + .add("deptno", VarcharType(30), true, defaultMetadata) + val replaced = CharVarcharUtils.replaceCharVarcharWithStringInSchema(expected) + assert(t.schema === replaced) + } + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index 3b6332a43a5a1..93b6652d516cc 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -33,8 +33,8 @@ import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, Row} import org.apache.spark.sql.catalyst.{analysis, TableIdentifier} import org.apache.spark.sql.catalyst.parser.CatalystSqlParser import org.apache.spark.sql.catalyst.plans.logical.ShowCreateTable -import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeTestUtils} -import org.apache.spark.sql.execution.{DataSourceScanExec, ExtendedMode} +import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, CharVarcharUtils, DateTimeTestUtils} +import org.apache.spark.sql.execution.{DataSourceScanExec, ExtendedMode, ProjectExec} import org.apache.spark.sql.execution.command.{ExplainCommand, ShowCreateTableCommand} import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JDBCPartition, JDBCRelation, JdbcUtils} @@ -315,8 +315,12 @@ class JDBCSuite extends QueryTest with SharedSparkSession { // the plan only has PhysicalRDD to scan JDBCRelation. assert(parentPlan.isInstanceOf[org.apache.spark.sql.execution.WholeStageCodegenExec]) val node = parentPlan.asInstanceOf[org.apache.spark.sql.execution.WholeStageCodegenExec] - assert(node.child.isInstanceOf[org.apache.spark.sql.execution.DataSourceScanExec]) - assert(node.child.asInstanceOf[DataSourceScanExec].nodeName.contains("JDBCRelation")) + val child = node.child match { + case ProjectExec(_, c) => c + case o => o + } + assert(child.isInstanceOf[org.apache.spark.sql.execution.DataSourceScanExec]) + assert(child.asInstanceOf[DataSourceScanExec].nodeName.contains("JDBCRelation")) df } @@ -924,7 +928,7 @@ class JDBCSuite extends QueryTest with SharedSparkSession { assert(Postgres.getCatalystType(java.sql.Types.ARRAY, "_numeric", 0, md) == Some(ArrayType(DecimalType.SYSTEM_DEFAULT))) assert(Postgres.getCatalystType(java.sql.Types.ARRAY, "_bpchar", 64, md) == - Some(ArrayType(StringType))) + Some(ArrayType(CharType(64)))) assert(Postgres.getJDBCType(FloatType).map(_.databaseTypeDefinition).get == "FLOAT4") assert(Postgres.getJDBCType(DoubleType).map(_.databaseTypeDefinition).get == "FLOAT8") assert(Postgres.getJDBCType(ByteType).map(_.databaseTypeDefinition).get == "SMALLINT") @@ -1373,20 +1377,20 @@ class JDBCSuite extends QueryTest with SharedSparkSession { test("jdbc API support custom schema") { val parts = Array[String]("THEID < 2", "THEID >= 2") - val customSchema = "NAME STRING, THEID INT" + val customSchema = "NAME VARCHAR(32), THEID INT" val props = new Properties() props.put("customSchema", customSchema) val df = spark.read.jdbc(urlWithUserAndPass, "TEST.PEOPLE", parts, props) assert(df.schema.size === 2) val expectedSchema = new StructType(CatalystSqlParser.parseTableSchema(customSchema).map( f => StructField(f.name, f.dataType, f.nullable, defaultMetadata)).toArray) - assert(df.schema === expectedSchema) + assert(df.schema === CharVarcharUtils.replaceCharVarcharWithStringInSchema(expectedSchema)) assert(df.count() === 3) } test("jdbc API custom schema DDL-like strings.") { withTempView("people_view") { - val customSchema = "NAME STRING, THEID INT" + val customSchema = "NAME VARCHAR(32), THEID INT" sql( s""" |CREATE TEMPORARY VIEW people_view @@ -1398,7 +1402,8 @@ class JDBCSuite extends QueryTest with SharedSparkSession { assert(df.schema.length === 2) val expectedSchema = new StructType(CatalystSqlParser.parseTableSchema(customSchema) .map(f => StructField(f.name, f.dataType, f.nullable, defaultMetadata)).toArray) - assert(df.schema === expectedSchema) + + assert(df.schema === CharVarcharUtils.replaceCharVarcharWithStringInSchema(expectedSchema)) assert(df.count() === 3) } } @@ -1543,9 +1548,9 @@ class JDBCSuite extends QueryTest with SharedSparkSession { } test("jdbc data source shouldn't have unnecessary metadata in its schema") { - val schema = StructType(Seq(StructField("NAME", StringType, true, defaultMetadata), + var schema = StructType(Seq(StructField("NAME", VarcharType(32), true, defaultMetadata), StructField("THEID", IntegerType, true, defaultMetadata))) - + schema = CharVarcharUtils.replaceCharVarcharWithStringInSchema(schema) val df = spark.read.format("jdbc") .option("Url", urlWithUserAndPass) .option("DbTaBle", "TEST.PEOPLE")