From 22f68a984140e3b48392cbd500cac2d38f1c3509 Mon Sep 17 00:00:00 2001 From: Bing Li Date: Thu, 4 Apr 2024 15:51:18 -0700 Subject: [PATCH 01/21] fix type name --- pom.xml | 2 +- .../snowpark/internal/ServerConnection.scala | 50 +++++++------------ .../snowflake/snowpark/types/StructType.scala | 2 +- .../snowpark_test/DataTypeSuite.scala | 16 ++++++ 4 files changed, 37 insertions(+), 33 deletions(-) diff --git a/pom.xml b/pom.xml index d8369361..6941ab7c 100644 --- a/pom.xml +++ b/pom.xml @@ -37,7 +37,7 @@ 2.12.18 2.12 4.2.0 - 3.14.4 + 1.0-SNAPSHOT ${scala.compat.version} Snowpark ${project.version} 1.4.11 diff --git a/src/main/scala/com/snowflake/snowpark/internal/ServerConnection.scala b/src/main/scala/com/snowflake/snowpark/internal/ServerConnection.scala index 41a97a33..fb8662e2 100644 --- a/src/main/scala/com/snowflake/snowpark/internal/ServerConnection.scala +++ b/src/main/scala/com/snowflake/snowpark/internal/ServerConnection.scala @@ -3,41 +3,17 @@ package com.snowflake.snowpark.internal import java.io.{Closeable, InputStream} import java.sql.{PreparedStatement, ResultSetMetaData, SQLException, Statement} import java.time.LocalDateTime -import com.snowflake.snowpark.{ - MergeBuilder, - MergeTypedAsyncJob, - Row, - SnowparkClientException, - TypedAsyncJob -} -import com.snowflake.snowpark.internal.ParameterUtils.{ - ClosureCleanerMode, - DEFAULT_MAX_FILE_DOWNLOAD_RETRY_COUNT, - DEFAULT_MAX_FILE_UPLOAD_RETRY_COUNT, - DEFAULT_REQUEST_TIMEOUT_IN_SECONDS, - DEFAULT_SNOWPARK_USE_SCOPED_TEMP_OBJECTS, - MAX_REQUEST_TIMEOUT_IN_SECONDS, - MIN_REQUEST_TIMEOUT_IN_SECONDS, - SnowparkMaxFileDownloadRetryCount, - SnowparkMaxFileUploadRetryCount, - SnowparkRequestTimeoutInSeconds, - Url -} +import com.snowflake.snowpark.{MergeBuilder, MergeTypedAsyncJob, Row, SnowparkClientException, TypedAsyncJob} +import com.snowflake.snowpark.internal.ParameterUtils.{ClosureCleanerMode, DEFAULT_MAX_FILE_DOWNLOAD_RETRY_COUNT, DEFAULT_MAX_FILE_UPLOAD_RETRY_COUNT, DEFAULT_REQUEST_TIMEOUT_IN_SECONDS, DEFAULT_SNOWPARK_USE_SCOPED_TEMP_OBJECTS, MAX_REQUEST_TIMEOUT_IN_SECONDS, MIN_REQUEST_TIMEOUT_IN_SECONDS, SnowparkMaxFileDownloadRetryCount, SnowparkMaxFileUploadRetryCount, SnowparkRequestTimeoutInSeconds, Url} import com.snowflake.snowpark.internal.Utils.PackageNameDelimiter import com.snowflake.snowpark.internal.analyzer.{Attribute, Query, SnowflakePlan} -import net.snowflake.client.jdbc.{ - SnowflakeConnectString, - SnowflakeConnectionV1, - SnowflakeReauthenticationRequest, - SnowflakeResultSet, - SnowflakeSQLException, - SnowflakeStatement -} +import net.snowflake.client.jdbc.{FieldMetadata, SnowflakeConnectString, SnowflakeConnectionV1, SnowflakeReauthenticationRequest, SnowflakeResultSet, SnowflakeResultSetMetaDataV1, SnowflakeSQLException, SnowflakeStatement} import com.snowflake.snowpark.types._ import net.snowflake.client.core.QueryStatus import scala.collection.mutable import scala.reflect.runtime.universe.TypeTag +import scala.collection.JavaConverters._ private[snowpark] case class QueryResult( rows: Option[Array[Row]], @@ -55,6 +31,8 @@ private[snowpark] object ServerConnection { def convertResultMetaToAttribute(meta: ResultSetMetaData): Seq[Attribute] = (1 to meta.getColumnCount).map(index => { + // todo: replace by public API + val fieldMetadata = meta.asInstanceOf[SnowflakeResultSetMetaDataV1].getColumnMetaData.get(index - 1).getFields.asScala.toList val columnName = analyzer.quoteNameWithoutUpperCasing(meta.getColumnLabel(index)) val dataType = meta.getColumnType(index) val fieldSize = meta.getPrecision(index) @@ -64,7 +42,8 @@ private[snowpark] object ServerConnection { // This field is useful for snowflake types that are not JDBC types like // variant, object and array val columnTypeName = meta.getColumnTypeName(index) - val columnType = getDataType(dataType, columnTypeName, fieldSize, fieldScale, isSigned) + val columnType = getDataType(dataType, columnTypeName, fieldSize, + fieldScale, isSigned, fieldMetadata) Attribute(columnName, columnType, nullable) }) @@ -74,9 +53,18 @@ private[snowpark] object ServerConnection { columnTypeName: String, precision: Int, scale: Int, - signed: Boolean): DataType = { + signed: Boolean, + field: List[FieldMetadata] = List.empty): DataType = { columnTypeName match { - case "ARRAY" => ArrayType(StringType) + case "ARRAY" => + if (field.isEmpty) ArrayType(StringType) + else ArrayType(getDataType( + field.head.getType, + field.head.getTypeName, + field.head.getPrecision, + field.head.getScale, + signed = true, // no sign info in the fields + field.head.getFields.asScala.toList)) case "VARIANT" => VariantType case "OBJECT" => MapType(StringType, StringType) case "GEOGRAPHY" => GeographyType diff --git a/src/main/scala/com/snowflake/snowpark/types/StructType.scala b/src/main/scala/com/snowflake/snowpark/types/StructType.scala index 96733743..7bc4f00c 100644 --- a/src/main/scala/com/snowflake/snowpark/types/StructType.scala +++ b/src/main/scala/com/snowflake/snowpark/types/StructType.scala @@ -168,7 +168,7 @@ case class StructField( private[types] def treeString(layer: Int): String = { val prepended: String = (1 to (1 + 2 * layer)).map(x => " ").mkString + "|--" - val body: String = s"$name: ${dataType.typeName} (nullable = $nullable)\n" + + val body: String = s"$name: ${dataType.toString} (nullable = $nullable)\n" + (dataType match { case st: StructType => st.treeString(layer + 1) case _ => "" diff --git a/src/test/scala/com/snowflake/snowpark_test/DataTypeSuite.scala b/src/test/scala/com/snowflake/snowpark_test/DataTypeSuite.scala index 75ff469a..ad7c2f0f 100644 --- a/src/test/scala/com/snowflake/snowpark_test/DataTypeSuite.scala +++ b/src/test/scala/com/snowflake/snowpark_test/DataTypeSuite.scala @@ -183,4 +183,20 @@ class DataTypeSuite extends SNTestBase { |""".stripMargin) } + test("ArrayType v2") { + val query = """SELECT + | [1, 2, 3]::ARRAY(NUMBER) AS arr1, + | [1.1, 2.2, 3.3]::ARRAY(FLOAT) AS arr2, + | [true, false]::ARRAY(BOOLEAN) AS arr3, + | ['a', 'b']::ARRAY(VARCHAR) AS arr4, + | [parse_json(31000000)::timestamp_ntz]::ARRAY(TIMESTAMP_NTZ) AS arr5, + | [TO_BINARY('SNOW', 'utf-8')]::ARRAY(BINARY) AS arr6, + | [TO_DATE('2013-05-17')]::ARRAY(DATE) AS arr7, + | ['1', 2]::ARRAY(VARIANT) AS arr8, + | [[1,2]]::ARRAY(ARRAY) AS arr9, + | [OBJECT_CONSTRUCT('name', 1)]::ARRAY(OBJECT) AS arr10, + | [[1, 2], [3, 4]]::ARRAY(ARRAY(NUMBER)) AS arr11, + | [1, 2, 3] AS arr0;""".stripMargin + session.sql(query).schema.printTreeString() + } } From 3971e49d0d6612224c1939aade8f83d675856129 Mon Sep 17 00:00:00 2001 From: Bing Li Date: Mon, 8 Apr 2024 10:20:17 -0700 Subject: [PATCH 02/21] array v2 --- .../snowpark/internal/ServerConnection.scala | 65 +++++++++++++++---- .../snowpark_test/DataTypeSuite.scala | 18 ++++- 2 files changed, 68 insertions(+), 15 deletions(-) diff --git a/src/main/scala/com/snowflake/snowpark/internal/ServerConnection.scala b/src/main/scala/com/snowflake/snowpark/internal/ServerConnection.scala index fb8662e2..12735ca3 100644 --- a/src/main/scala/com/snowflake/snowpark/internal/ServerConnection.scala +++ b/src/main/scala/com/snowflake/snowpark/internal/ServerConnection.scala @@ -3,11 +3,38 @@ package com.snowflake.snowpark.internal import java.io.{Closeable, InputStream} import java.sql.{PreparedStatement, ResultSetMetaData, SQLException, Statement} import java.time.LocalDateTime -import com.snowflake.snowpark.{MergeBuilder, MergeTypedAsyncJob, Row, SnowparkClientException, TypedAsyncJob} -import com.snowflake.snowpark.internal.ParameterUtils.{ClosureCleanerMode, DEFAULT_MAX_FILE_DOWNLOAD_RETRY_COUNT, DEFAULT_MAX_FILE_UPLOAD_RETRY_COUNT, DEFAULT_REQUEST_TIMEOUT_IN_SECONDS, DEFAULT_SNOWPARK_USE_SCOPED_TEMP_OBJECTS, MAX_REQUEST_TIMEOUT_IN_SECONDS, MIN_REQUEST_TIMEOUT_IN_SECONDS, SnowparkMaxFileDownloadRetryCount, SnowparkMaxFileUploadRetryCount, SnowparkRequestTimeoutInSeconds, Url} +import com.snowflake.snowpark.{ + MergeBuilder, + MergeTypedAsyncJob, + Row, + SnowparkClientException, + TypedAsyncJob +} +import com.snowflake.snowpark.internal.ParameterUtils.{ + ClosureCleanerMode, + DEFAULT_MAX_FILE_DOWNLOAD_RETRY_COUNT, + DEFAULT_MAX_FILE_UPLOAD_RETRY_COUNT, + DEFAULT_REQUEST_TIMEOUT_IN_SECONDS, + DEFAULT_SNOWPARK_USE_SCOPED_TEMP_OBJECTS, + MAX_REQUEST_TIMEOUT_IN_SECONDS, + MIN_REQUEST_TIMEOUT_IN_SECONDS, + SnowparkMaxFileDownloadRetryCount, + SnowparkMaxFileUploadRetryCount, + SnowparkRequestTimeoutInSeconds, + Url +} import com.snowflake.snowpark.internal.Utils.PackageNameDelimiter import com.snowflake.snowpark.internal.analyzer.{Attribute, Query, SnowflakePlan} -import net.snowflake.client.jdbc.{FieldMetadata, SnowflakeConnectString, SnowflakeConnectionV1, SnowflakeReauthenticationRequest, SnowflakeResultSet, SnowflakeResultSetMetaDataV1, SnowflakeSQLException, SnowflakeStatement} +import net.snowflake.client.jdbc.{ + FieldMetadata, + SnowflakeConnectString, + SnowflakeConnectionV1, + SnowflakeReauthenticationRequest, + SnowflakeResultSet, + SnowflakeResultSetMetaDataV1, + SnowflakeSQLException, + SnowflakeStatement +} import com.snowflake.snowpark.types._ import net.snowflake.client.core.QueryStatus @@ -32,7 +59,13 @@ private[snowpark] object ServerConnection { def convertResultMetaToAttribute(meta: ResultSetMetaData): Seq[Attribute] = (1 to meta.getColumnCount).map(index => { // todo: replace by public API - val fieldMetadata = meta.asInstanceOf[SnowflakeResultSetMetaDataV1].getColumnMetaData.get(index - 1).getFields.asScala.toList + val fieldMetadata = meta + .asInstanceOf[SnowflakeResultSetMetaDataV1] + .getColumnMetaData + .get(index - 1) + .getFields + .asScala + .toList val columnName = analyzer.quoteNameWithoutUpperCasing(meta.getColumnLabel(index)) val dataType = meta.getColumnType(index) val fieldSize = meta.getPrecision(index) @@ -42,8 +75,8 @@ private[snowpark] object ServerConnection { // This field is useful for snowflake types that are not JDBC types like // variant, object and array val columnTypeName = meta.getColumnTypeName(index) - val columnType = getDataType(dataType, columnTypeName, fieldSize, - fieldScale, isSigned, fieldMetadata) + val columnType = + getDataType(dataType, columnTypeName, fieldSize, fieldScale, isSigned, fieldMetadata) Attribute(columnName, columnType, nullable) }) @@ -57,14 +90,18 @@ private[snowpark] object ServerConnection { field: List[FieldMetadata] = List.empty): DataType = { columnTypeName match { case "ARRAY" => - if (field.isEmpty) ArrayType(StringType) - else ArrayType(getDataType( - field.head.getType, - field.head.getTypeName, - field.head.getPrecision, - field.head.getScale, - signed = true, // no sign info in the fields - field.head.getFields.asScala.toList)) + if (field.isEmpty) { + ArrayType(StringType) + } else { + ArrayType( + getDataType( + field.head.getType, + field.head.getTypeName, + field.head.getPrecision, + field.head.getScale, + signed = true, // no sign info in the fields + field.head.getFields.asScala.toList)) + } case "VARIANT" => VariantType case "OBJECT" => MapType(StringType, StringType) case "GEOGRAPHY" => GeographyType diff --git a/src/test/scala/com/snowflake/snowpark_test/DataTypeSuite.scala b/src/test/scala/com/snowflake/snowpark_test/DataTypeSuite.scala index ad7c2f0f..1fcba323 100644 --- a/src/test/scala/com/snowflake/snowpark_test/DataTypeSuite.scala +++ b/src/test/scala/com/snowflake/snowpark_test/DataTypeSuite.scala @@ -197,6 +197,22 @@ class DataTypeSuite extends SNTestBase { | [OBJECT_CONSTRUCT('name', 1)]::ARRAY(OBJECT) AS arr10, | [[1, 2], [3, 4]]::ARRAY(ARRAY(NUMBER)) AS arr11, | [1, 2, 3] AS arr0;""".stripMargin - session.sql(query).schema.printTreeString() + val df = session.sql(query) + assert( + TestUtils.treeString(df.schema, 0) == + s"""root + | |--ARR1: ArrayType[Long] (nullable = true) + | |--ARR2: ArrayType[Double] (nullable = true) + | |--ARR3: ArrayType[Boolean] (nullable = true) + | |--ARR4: ArrayType[String] (nullable = true) + | |--ARR5: ArrayType[Timestamp] (nullable = true) + | |--ARR6: ArrayType[Binary] (nullable = true) + | |--ARR7: ArrayType[Date] (nullable = true) + | |--ARR8: ArrayType[Variant] (nullable = true) + | |--ARR9: ArrayType[ArrayType[String]] (nullable = true) + | |--ARR10: ArrayType[MapType[String, String]] (nullable = true) + | |--ARR11: ArrayType[ArrayType[Long]] (nullable = true) + | |--ARR0: ArrayType[String] (nullable = true) + |""".stripMargin) } } From 7cf91c561b6e19e06902b459217ccf9972e031ca Mon Sep 17 00:00:00 2001 From: Bing Li Date: Mon, 8 Apr 2024 13:30:06 -0700 Subject: [PATCH 03/21] map type --- .../snowpark/internal/ServerConnection.scala | 24 ++++++++++++++++++- .../snowpark_test/DataTypeSuite.scala | 21 ++++++++++++++++ 2 files changed, 44 insertions(+), 1 deletion(-) diff --git a/src/main/scala/com/snowflake/snowpark/internal/ServerConnection.scala b/src/main/scala/com/snowflake/snowpark/internal/ServerConnection.scala index 12735ca3..ee5ae59e 100644 --- a/src/main/scala/com/snowflake/snowpark/internal/ServerConnection.scala +++ b/src/main/scala/com/snowflake/snowpark/internal/ServerConnection.scala @@ -103,7 +103,29 @@ private[snowpark] object ServerConnection { field.head.getFields.asScala.toList)) } case "VARIANT" => VariantType - case "OBJECT" => MapType(StringType, StringType) + case "OBJECT" => + if (field.isEmpty) { + MapType(StringType, StringType) + } else if (field.size == 2 && field.head.getName.isEmpty) { + // Map + MapType( + getDataType( + field.head.getType, + field.head.getTypeName, + field.head.getPrecision, + field.head.getScale, + signed = true, + field.head.getFields.asScala.toList), + getDataType( + field(1).getType, + field(1).getTypeName, + field(1).getPrecision, + field(1).getScale, + signed = true, + field(1).getFields.asScala.toList)) + } else { + null // object + } case "GEOGRAPHY" => GeographyType case "GEOMETRY" => GeometryType case _ => getTypeFromJDBCType(sqlType, precision, scale, signed) diff --git a/src/test/scala/com/snowflake/snowpark_test/DataTypeSuite.scala b/src/test/scala/com/snowflake/snowpark_test/DataTypeSuite.scala index 1fcba323..066fc69f 100644 --- a/src/test/scala/com/snowflake/snowpark_test/DataTypeSuite.scala +++ b/src/test/scala/com/snowflake/snowpark_test/DataTypeSuite.scala @@ -215,4 +215,25 @@ class DataTypeSuite extends SNTestBase { | |--ARR0: ArrayType[String] (nullable = true) |""".stripMargin) } + + test("MapType v2") { + val query = + """SELECT + | {'a': 1, 'b': 2} :: MAP(VARCHAR, NUMBER) as map1, + | {'1': 'a'} :: MAP(NUMBER, VARCHAR) as map2, + | {'1': [1,2,3]} :: MAP(NUMBER, ARRAY(NUMBER)) as map3, + | {'1': {'a':1}} :: MAP(NUMBER, MAP(VARCHAR, NUMBER)) as map4, + | {'a': 1, 'b': 2} :: OBJECT as map0 + |""".stripMargin + val df = session.sql(query) + assert( + TestUtils.treeString(df.schema, 0) == + s"""root + | |--MAP1: MapType[String, Long] (nullable = true) + | |--MAP2: MapType[Long, String] (nullable = true) + | |--MAP3: MapType[Long, ArrayType[Long]] (nullable = true) + | |--MAP4: MapType[Long, MapType[String, Long]] (nullable = true) + | |--MAP0: MapType[String, String] (nullable = true) + |""".stripMargin) + } } From 7e5d47edf5efeec903ea1caf51d185a843015721 Mon Sep 17 00:00:00 2001 From: Bing Li Date: Mon, 8 Apr 2024 14:06:30 -0700 Subject: [PATCH 04/21] object --- .../snowpark/internal/ServerConnection.scala | 15 ++++++++- .../snowflake/snowpark/types/package.scala | 1 + .../snowpark_test/DataTypeSuite.scala | 33 +++++++++++++++++++ 3 files changed, 48 insertions(+), 1 deletion(-) diff --git a/src/main/scala/com/snowflake/snowpark/internal/ServerConnection.scala b/src/main/scala/com/snowflake/snowpark/internal/ServerConnection.scala index ee5ae59e..f2d199ef 100644 --- a/src/main/scala/com/snowflake/snowpark/internal/ServerConnection.scala +++ b/src/main/scala/com/snowflake/snowpark/internal/ServerConnection.scala @@ -124,7 +124,20 @@ private[snowpark] object ServerConnection { signed = true, field(1).getFields.asScala.toList)) } else { - null // object + // object + StructType( + field.map( + f => + StructField( + f.getName, + getDataType( + f.getType, + f.getTypeName, + f.getPrecision, + f.getScale, + signed = true, + f.getFields.asScala.toList), + f.isNullable))) } case "GEOGRAPHY" => GeographyType case "GEOMETRY" => GeometryType diff --git a/src/main/scala/com/snowflake/snowpark/types/package.scala b/src/main/scala/com/snowflake/snowpark/types/package.scala index 9f87d3d5..5b06a8a6 100644 --- a/src/main/scala/com/snowflake/snowpark/types/package.scala +++ b/src/main/scala/com/snowflake/snowpark/types/package.scala @@ -68,6 +68,7 @@ package object types { case VariantType => "VARIANT" case GeographyType => "GEOGRAPHY" case GeometryType => "GEOMETRY" + case StructType(_) => "OBJECT" case _ => throw new UnsupportedOperationException(s"Unsupported data type: ${dataType.typeName}") } diff --git a/src/test/scala/com/snowflake/snowpark_test/DataTypeSuite.scala b/src/test/scala/com/snowflake/snowpark_test/DataTypeSuite.scala index 066fc69f..ceb96a35 100644 --- a/src/test/scala/com/snowflake/snowpark_test/DataTypeSuite.scala +++ b/src/test/scala/com/snowflake/snowpark_test/DataTypeSuite.scala @@ -236,4 +236,37 @@ class DataTypeSuite extends SNTestBase { | |--MAP0: MapType[String, String] (nullable = true) |""".stripMargin) } + + test("ObjectType v2") { + val query = + // scalastyle:off + """SELECT + | {'a': 1, 'b': 'a'} :: OBJECT(a VARCHAR, b NUMBER) as object1, + | {'a': 1, 'b': [1,2,3,4]} :: OBJECT(a VARCHAR, b ARRAY(NUMBER)) as object2, + | {'a': 1, 'b': [1,2,3,4], 'c': {'1':'a'}} :: OBJECT(a VARCHAR, b ARRAY(NUMBER), c MAP(NUMBER, VARCHAR)) as object3, + | {'a': {'b': {'c': 1}}} :: OBJECT(a OBJECT(b OBJECT(c NUMBER))) as object4 + |""".stripMargin + // scalastyle:on + val df = session.sql(query) + assert( + TestUtils.treeString(df.schema, 0) == + // scalastyle:off + s"""root + | |--OBJECT1: StructType[StructField(A, String, Nullable = true), StructField(B, Long, Nullable = true)] (nullable = true) + | |--A: String (nullable = true) + | |--B: Long (nullable = true) + | |--OBJECT2: StructType[StructField(A, String, Nullable = true), StructField(B, ArrayType[Long], Nullable = true)] (nullable = true) + | |--A: String (nullable = true) + | |--B: ArrayType[Long] (nullable = true) + | |--OBJECT3: StructType[StructField(A, String, Nullable = true), StructField(B, ArrayType[Long], Nullable = true), StructField(C, MapType[Long, String], Nullable = true)] (nullable = true) + | |--A: String (nullable = true) + | |--B: ArrayType[Long] (nullable = true) + | |--C: MapType[Long, String] (nullable = true) + | |--OBJECT4: StructType[StructField(A, StructType[StructField(B, StructType[StructField(C, Long, Nullable = true)], Nullable = true)], Nullable = true)] (nullable = true) + | |--A: StructType[StructField(B, StructType[StructField(C, Long, Nullable = true)], Nullable = true)] (nullable = true) + | |--B: StructType[StructField(C, Long, Nullable = true)] (nullable = true) + | |--C: Long (nullable = true) + |""".stripMargin) + // scalastyle:on + } } From bae327b797f94c588249394af3e7dc6ce2e4a678 Mon Sep 17 00:00:00 2001 From: Bing Li Date: Wed, 10 Apr 2024 11:01:27 -0700 Subject: [PATCH 05/21] update JDBC --- .../snowpark/internal/ServerConnection.scala | 49 +++++-------------- 1 file changed, 12 insertions(+), 37 deletions(-) diff --git a/src/main/scala/com/snowflake/snowpark/internal/ServerConnection.scala b/src/main/scala/com/snowflake/snowpark/internal/ServerConnection.scala index f2d199ef..939d2a72 100644 --- a/src/main/scala/com/snowflake/snowpark/internal/ServerConnection.scala +++ b/src/main/scala/com/snowflake/snowpark/internal/ServerConnection.scala @@ -3,38 +3,11 @@ package com.snowflake.snowpark.internal import java.io.{Closeable, InputStream} import java.sql.{PreparedStatement, ResultSetMetaData, SQLException, Statement} import java.time.LocalDateTime -import com.snowflake.snowpark.{ - MergeBuilder, - MergeTypedAsyncJob, - Row, - SnowparkClientException, - TypedAsyncJob -} -import com.snowflake.snowpark.internal.ParameterUtils.{ - ClosureCleanerMode, - DEFAULT_MAX_FILE_DOWNLOAD_RETRY_COUNT, - DEFAULT_MAX_FILE_UPLOAD_RETRY_COUNT, - DEFAULT_REQUEST_TIMEOUT_IN_SECONDS, - DEFAULT_SNOWPARK_USE_SCOPED_TEMP_OBJECTS, - MAX_REQUEST_TIMEOUT_IN_SECONDS, - MIN_REQUEST_TIMEOUT_IN_SECONDS, - SnowparkMaxFileDownloadRetryCount, - SnowparkMaxFileUploadRetryCount, - SnowparkRequestTimeoutInSeconds, - Url -} +import com.snowflake.snowpark.{MergeBuilder, MergeTypedAsyncJob, Row, SnowparkClientException, TypedAsyncJob} +import com.snowflake.snowpark.internal.ParameterUtils.{ClosureCleanerMode, DEFAULT_MAX_FILE_DOWNLOAD_RETRY_COUNT, DEFAULT_MAX_FILE_UPLOAD_RETRY_COUNT, DEFAULT_REQUEST_TIMEOUT_IN_SECONDS, DEFAULT_SNOWPARK_USE_SCOPED_TEMP_OBJECTS, MAX_REQUEST_TIMEOUT_IN_SECONDS, MIN_REQUEST_TIMEOUT_IN_SECONDS, SnowparkMaxFileDownloadRetryCount, SnowparkMaxFileUploadRetryCount, SnowparkRequestTimeoutInSeconds, Url} import com.snowflake.snowpark.internal.Utils.PackageNameDelimiter import com.snowflake.snowpark.internal.analyzer.{Attribute, Query, SnowflakePlan} -import net.snowflake.client.jdbc.{ - FieldMetadata, - SnowflakeConnectString, - SnowflakeConnectionV1, - SnowflakeReauthenticationRequest, - SnowflakeResultSet, - SnowflakeResultSetMetaDataV1, - SnowflakeSQLException, - SnowflakeStatement -} +import net.snowflake.client.jdbc.{FieldMetadata, SnowflakeConnectString, SnowflakeConnectionV1, SnowflakeReauthenticationRequest, SnowflakeResultSet, SnowflakeResultSetMetaData, SnowflakeStatement} import com.snowflake.snowpark.types._ import net.snowflake.client.core.QueryStatus @@ -59,13 +32,15 @@ private[snowpark] object ServerConnection { def convertResultMetaToAttribute(meta: ResultSetMetaData): Seq[Attribute] = (1 to meta.getColumnCount).map(index => { // todo: replace by public API - val fieldMetadata = meta - .asInstanceOf[SnowflakeResultSetMetaDataV1] - .getColumnMetaData - .get(index - 1) - .getFields - .asScala - .toList +// val fieldMetadata = meta +// .asInstanceOf[SnowflakeResultSetMetaDataV1] +// .getColumnMetaData +// .get(index - 1) +// .getFields +// .asScala +// .toList + val fieldMetadata = meta.asInstanceOf[SnowflakeResultSetMetaData] + .getColumnFields(index).asScala.toList val columnName = analyzer.quoteNameWithoutUpperCasing(meta.getColumnLabel(index)) val dataType = meta.getColumnType(index) val fieldSize = meta.getPrecision(index) From 78f09b39f466ca01a7060abc2bad88ab0bc7b116 Mon Sep 17 00:00:00 2001 From: Bing Li Date: Wed, 10 Apr 2024 14:07:48 -0700 Subject: [PATCH 06/21] support structed array --- .../snowpark/internal/ServerConnection.scala | 13 ++-- .../internal/analyzer/DataTypeMapper.scala | 3 +- .../snowflake/snowpark/types/ArrayType.scala | 16 +++++ .../snowflake/snowpark/types/package.scala | 3 + .../snowpark_test/DataTypeSuite.scala | 64 +++++++++++++++---- 5 files changed, 78 insertions(+), 21 deletions(-) diff --git a/src/main/scala/com/snowflake/snowpark/internal/ServerConnection.scala b/src/main/scala/com/snowflake/snowpark/internal/ServerConnection.scala index 939d2a72..c16447a3 100644 --- a/src/main/scala/com/snowflake/snowpark/internal/ServerConnection.scala +++ b/src/main/scala/com/snowflake/snowpark/internal/ServerConnection.scala @@ -32,13 +32,6 @@ private[snowpark] object ServerConnection { def convertResultMetaToAttribute(meta: ResultSetMetaData): Seq[Attribute] = (1 to meta.getColumnCount).map(index => { // todo: replace by public API -// val fieldMetadata = meta -// .asInstanceOf[SnowflakeResultSetMetaDataV1] -// .getColumnMetaData -// .get(index - 1) -// .getFields -// .asScala -// .toList val fieldMetadata = meta.asInstanceOf[SnowflakeResultSetMetaData] .getColumnFields(index).asScala.toList val columnName = analyzer.quoteNameWithoutUpperCasing(meta.getColumnLabel(index)) @@ -68,14 +61,16 @@ private[snowpark] object ServerConnection { if (field.isEmpty) { ArrayType(StringType) } else { - ArrayType( + StructuredArrayType( getDataType( field.head.getType, field.head.getTypeName, field.head.getPrecision, field.head.getScale, signed = true, // no sign info in the fields - field.head.getFields.asScala.toList)) + field.head.getFields.asScala.toList), + field.head.isNullable + ) } case "VARIANT" => VariantType case "OBJECT" => diff --git a/src/main/scala/com/snowflake/snowpark/internal/analyzer/DataTypeMapper.scala b/src/main/scala/com/snowflake/snowpark/internal/analyzer/DataTypeMapper.scala index 2781e22c..13e662a0 100644 --- a/src/main/scala/com/snowflake/snowpark/internal/analyzer/DataTypeMapper.scala +++ b/src/main/scala/com/snowflake/snowpark/internal/analyzer/DataTypeMapper.scala @@ -90,7 +90,7 @@ object DataTypeMapper { dataType match { case GeographyType => "TRY_TO_GEOGRAPHY(NULL)" case GeometryType => "TRY_TO_GEOMETRY(NULL)" - case ArrayType(_) => "PARSE_JSON('NULL')::ARRAY" + case ArrayType(_) => "PARSE_JSON('NULL')::" + convertToSFType(dataType) case _ => "NULL :: " + convertToSFType(dataType) } } else { @@ -102,6 +102,7 @@ object DataTypeMapper { case DateType => "date('2020-9-16')" case TimeType => "to_time('04:15:29.999')" case TimestampType => "to_timestamp_ntz('2020-09-16 06:30:00')" + case _: StructuredArrayType => "[]::" + convertToSFType(dataType) case _: ArrayType => "to_array(0)" case _: MapType => "to_object(parse_json('0'))" case VariantType => "to_variant(0)" diff --git a/src/main/scala/com/snowflake/snowpark/types/ArrayType.scala b/src/main/scala/com/snowflake/snowpark/types/ArrayType.scala index 90cf4c57..75f5fa7e 100644 --- a/src/main/scala/com/snowflake/snowpark/types/ArrayType.scala +++ b/src/main/scala/com/snowflake/snowpark/types/ArrayType.scala @@ -10,3 +10,19 @@ case class ArrayType(elementType: DataType) extends DataType { s"ArrayType[${elementType.toString}]" } } + +/* temporary solution for Structured and Semi Structured data types. +Two types will be merged in the future BCR. */ +private[snowpark] class StructuredArrayType( + override val elementType: DataType, + val nullable: Boolean) extends ArrayType(elementType) { + override def toString: String = { + s"ArrayType[${elementType.toString} nullable = $nullable]" + } +} + +private[snowpark] object StructuredArrayType { + + def apply(elementType: DataType, nullable: Boolean): StructuredArrayType = + new StructuredArrayType(elementType, nullable) +} diff --git a/src/main/scala/com/snowflake/snowpark/types/package.scala b/src/main/scala/com/snowflake/snowpark/types/package.scala index 5b06a8a6..ff2fd669 100644 --- a/src/main/scala/com/snowflake/snowpark/types/package.scala +++ b/src/main/scala/com/snowflake/snowpark/types/package.scala @@ -63,6 +63,9 @@ package object types { case TimeType => "TIME" case TimestampType => "TIMESTAMP" case BinaryType => "BINARY" + case sa: StructuredArrayType => + val nullable = if (sa.nullable) "" else " not null" + s"ARRAY(${convertToSFType(sa.elementType)}${nullable})" case ArrayType(_) => "ARRAY" case MapType(_, _) => "OBJECT" case VariantType => "VARIANT" diff --git a/src/test/scala/com/snowflake/snowpark_test/DataTypeSuite.scala b/src/test/scala/com/snowflake/snowpark_test/DataTypeSuite.scala index ceb96a35..2ea514cc 100644 --- a/src/test/scala/com/snowflake/snowpark_test/DataTypeSuite.scala +++ b/src/test/scala/com/snowflake/snowpark_test/DataTypeSuite.scala @@ -201,19 +201,61 @@ class DataTypeSuite extends SNTestBase { assert( TestUtils.treeString(df.schema, 0) == s"""root - | |--ARR1: ArrayType[Long] (nullable = true) - | |--ARR2: ArrayType[Double] (nullable = true) - | |--ARR3: ArrayType[Boolean] (nullable = true) - | |--ARR4: ArrayType[String] (nullable = true) - | |--ARR5: ArrayType[Timestamp] (nullable = true) - | |--ARR6: ArrayType[Binary] (nullable = true) - | |--ARR7: ArrayType[Date] (nullable = true) - | |--ARR8: ArrayType[Variant] (nullable = true) - | |--ARR9: ArrayType[ArrayType[String]] (nullable = true) - | |--ARR10: ArrayType[MapType[String, String]] (nullable = true) - | |--ARR11: ArrayType[ArrayType[Long]] (nullable = true) + | |--ARR1: ArrayType[Long nullable = true] (nullable = true) + | |--ARR2: ArrayType[Double nullable = true] (nullable = true) + | |--ARR3: ArrayType[Boolean nullable = true] (nullable = true) + | |--ARR4: ArrayType[String nullable = true] (nullable = true) + | |--ARR5: ArrayType[Timestamp nullable = true] (nullable = true) + | |--ARR6: ArrayType[Binary nullable = true] (nullable = true) + | |--ARR7: ArrayType[Date nullable = true] (nullable = true) + | |--ARR8: ArrayType[Variant nullable = true] (nullable = true) + | |--ARR9: ArrayType[ArrayType[String] nullable = true] (nullable = true) + | |--ARR10: ArrayType[MapType[String, String] nullable = true] (nullable = true) + | |--ARR11: ArrayType[ArrayType[Long nullable = true] nullable = true] (nullable = true) | |--ARR0: ArrayType[String] (nullable = true) |""".stripMargin) + // schema string: nullable + assert( + // since we retrieved the schema of df before, df.select("*") will use the + // schema query instead of the real query to analyze the result schema. + TestUtils.treeString(df.select("*").schema, 0) == + s"""root + | |--ARR1: ArrayType[Long nullable = true] (nullable = true) + | |--ARR2: ArrayType[Double nullable = true] (nullable = true) + | |--ARR3: ArrayType[Boolean nullable = true] (nullable = true) + | |--ARR4: ArrayType[String nullable = true] (nullable = true) + | |--ARR5: ArrayType[Timestamp nullable = true] (nullable = true) + | |--ARR6: ArrayType[Binary nullable = true] (nullable = true) + | |--ARR7: ArrayType[Date nullable = true] (nullable = true) + | |--ARR8: ArrayType[Variant nullable = true] (nullable = true) + | |--ARR9: ArrayType[ArrayType[String] nullable = true] (nullable = true) + | |--ARR10: ArrayType[MapType[String, String] nullable = true] (nullable = true) + | |--ARR11: ArrayType[ArrayType[Long nullable = true] nullable = true] (nullable = true) + | |--ARR0: ArrayType[String] (nullable = true) + |""".stripMargin) + + // schema string: not nullable + val query2 = + """SELECT + | [1, 2, 3]::ARRAY(NUMBER not null) AS arr1, + | [[1, 2], [3, 4]]::ARRAY(ARRAY(NUMBER not null) not null) AS arr11""".stripMargin + + val df2 = session.sql(query2) + df.schema.printTreeString() + assert( + TestUtils.treeString(df2.schema, 0) == + s"""root + | |--ARR1: ArrayType[Long nullable = false] (nullable = true) + | |--ARR11: ArrayType[ArrayType[Long nullable = false] nullable = false] (nullable = true) + |""".stripMargin) + + assert( + TestUtils.treeString(df2.select("*").schema, 0) == + s"""root + | |--ARR1: ArrayType[Long nullable = false] (nullable = true) + | |--ARR11: ArrayType[ArrayType[Long nullable = false] nullable = false] (nullable = true) + |""".stripMargin) + } test("MapType v2") { From c8daffc24a44ee6c571317089ccabbf13ace346e Mon Sep 17 00:00:00 2001 From: Bing Li Date: Wed, 10 Apr 2024 15:15:19 -0700 Subject: [PATCH 07/21] map type --- .../snowpark/internal/ServerConnection.scala | 6 +++-- .../internal/analyzer/DataTypeMapper.scala | 1 - .../snowflake/snowpark/types/MapType.scala | 17 ++++++++++++ .../snowflake/snowpark/types/package.scala | 5 +++- .../snowpark_test/DataTypeSuite.scala | 26 +++++++++++++++---- 5 files changed, 46 insertions(+), 9 deletions(-) diff --git a/src/main/scala/com/snowflake/snowpark/internal/ServerConnection.scala b/src/main/scala/com/snowflake/snowpark/internal/ServerConnection.scala index c16447a3..ea94799f 100644 --- a/src/main/scala/com/snowflake/snowpark/internal/ServerConnection.scala +++ b/src/main/scala/com/snowflake/snowpark/internal/ServerConnection.scala @@ -78,7 +78,7 @@ private[snowpark] object ServerConnection { MapType(StringType, StringType) } else if (field.size == 2 && field.head.getName.isEmpty) { // Map - MapType( + StructuredMapType( getDataType( field.head.getType, field.head.getTypeName, @@ -92,7 +92,9 @@ private[snowpark] object ServerConnection { field(1).getPrecision, field(1).getScale, signed = true, - field(1).getFields.asScala.toList)) + field(1).getFields.asScala.toList), + field(1).isNullable + ) } else { // object StructType( diff --git a/src/main/scala/com/snowflake/snowpark/internal/analyzer/DataTypeMapper.scala b/src/main/scala/com/snowflake/snowpark/internal/analyzer/DataTypeMapper.scala index 13e662a0..dc7c3c3a 100644 --- a/src/main/scala/com/snowflake/snowpark/internal/analyzer/DataTypeMapper.scala +++ b/src/main/scala/com/snowflake/snowpark/internal/analyzer/DataTypeMapper.scala @@ -90,7 +90,6 @@ object DataTypeMapper { dataType match { case GeographyType => "TRY_TO_GEOGRAPHY(NULL)" case GeometryType => "TRY_TO_GEOMETRY(NULL)" - case ArrayType(_) => "PARSE_JSON('NULL')::" + convertToSFType(dataType) case _ => "NULL :: " + convertToSFType(dataType) } } else { diff --git a/src/main/scala/com/snowflake/snowpark/types/MapType.scala b/src/main/scala/com/snowflake/snowpark/types/MapType.scala index ea1e4d05..7da41c55 100644 --- a/src/main/scala/com/snowflake/snowpark/types/MapType.scala +++ b/src/main/scala/com/snowflake/snowpark/types/MapType.scala @@ -10,3 +10,20 @@ case class MapType(keyType: DataType, valueType: DataType) extends DataType { s"MapType[${keyType.toString}, ${valueType.toString}]" } } + +private[snowpark] class StructuredMapType( + override val keyType: DataType, + override val valueType: DataType, + val isValueNullable: Boolean + ) extends MapType(keyType, valueType) { + override def toString: String = { + s"MapType[${keyType.toString}, ${valueType.toString} nullable = $isValueNullable]" + } +} + +private[snowpark] object StructuredMapType { + def apply(keyType: DataType, + valueType: DataType, + isValueType: Boolean): StructuredMapType = + new StructuredMapType(keyType, valueType, isValueType) +} diff --git a/src/main/scala/com/snowflake/snowpark/types/package.scala b/src/main/scala/com/snowflake/snowpark/types/package.scala index ff2fd669..0db686d2 100644 --- a/src/main/scala/com/snowflake/snowpark/types/package.scala +++ b/src/main/scala/com/snowflake/snowpark/types/package.scala @@ -65,7 +65,10 @@ package object types { case BinaryType => "BINARY" case sa: StructuredArrayType => val nullable = if (sa.nullable) "" else " not null" - s"ARRAY(${convertToSFType(sa.elementType)}${nullable})" + s"ARRAY(${convertToSFType(sa.elementType)}$nullable)" + case sm: StructuredMapType => + val isValueNullable = if (sm.isValueNullable) "" else " not null" + s"MAP(${convertToSFType(sm.keyType)}, ${convertToSFType(sm.valueType)}$isValueNullable)" case ArrayType(_) => "ARRAY" case MapType(_, _) => "OBJECT" case VariantType => "VARIANT" diff --git a/src/test/scala/com/snowflake/snowpark_test/DataTypeSuite.scala b/src/test/scala/com/snowflake/snowpark_test/DataTypeSuite.scala index 2ea514cc..94eee642 100644 --- a/src/test/scala/com/snowflake/snowpark_test/DataTypeSuite.scala +++ b/src/test/scala/com/snowflake/snowpark_test/DataTypeSuite.scala @@ -241,7 +241,6 @@ class DataTypeSuite extends SNTestBase { | [[1, 2], [3, 4]]::ARRAY(ARRAY(NUMBER not null) not null) AS arr11""".stripMargin val df2 = session.sql(query2) - df.schema.printTreeString() assert( TestUtils.treeString(df2.schema, 0) == s"""root @@ -270,13 +269,30 @@ class DataTypeSuite extends SNTestBase { val df = session.sql(query) assert( TestUtils.treeString(df.schema, 0) == + // scalastyle:off s"""root - | |--MAP1: MapType[String, Long] (nullable = true) - | |--MAP2: MapType[Long, String] (nullable = true) - | |--MAP3: MapType[Long, ArrayType[Long]] (nullable = true) - | |--MAP4: MapType[Long, MapType[String, Long]] (nullable = true) + | |--MAP1: MapType[String, Long nullable = true] (nullable = true) + | |--MAP2: MapType[Long, String nullable = true] (nullable = true) + | |--MAP3: MapType[Long, ArrayType[Long nullable = true] nullable = true] (nullable = true) + | |--MAP4: MapType[Long, MapType[String, Long nullable = true] nullable = true] (nullable = true) | |--MAP0: MapType[String, String] (nullable = true) |""".stripMargin) + // scalastyle:on + + assert( + // since we retrieved the schema of df before, df.select("*") will use the + // schema query instead of the real query to analyze the result schema. + TestUtils.treeString(df.select("*").schema, 0) == + // scalastyle:off + s"""root + | |--MAP1: MapType[String, Long nullable = true] (nullable = true) + | |--MAP2: MapType[Long, String nullable = true] (nullable = true) + | |--MAP3: MapType[Long, ArrayType[Long nullable = true] nullable = true] (nullable = true) + | |--MAP4: MapType[Long, MapType[String, Long nullable = true] nullable = true] (nullable = true) + | |--MAP0: MapType[String, String] (nullable = true) + |""".stripMargin) + // scalastyle:on + } test("ObjectType v2") { From 48be3db0782ff7b2549a184e6cefc75b923aedf7 Mon Sep 17 00:00:00 2001 From: Bing Li Date: Wed, 10 Apr 2024 15:21:57 -0700 Subject: [PATCH 08/21] support map type --- .../internal/analyzer/DataTypeMapper.scala | 5 ++-- .../snowpark_test/DataTypeSuite.scala | 28 +++++++++++++++++++ 2 files changed, 30 insertions(+), 3 deletions(-) diff --git a/src/main/scala/com/snowflake/snowpark/internal/analyzer/DataTypeMapper.scala b/src/main/scala/com/snowflake/snowpark/internal/analyzer/DataTypeMapper.scala index dc7c3c3a..73bca596 100644 --- a/src/main/scala/com/snowflake/snowpark/internal/analyzer/DataTypeMapper.scala +++ b/src/main/scala/com/snowflake/snowpark/internal/analyzer/DataTypeMapper.scala @@ -101,9 +101,8 @@ object DataTypeMapper { case DateType => "date('2020-9-16')" case TimeType => "to_time('04:15:29.999')" case TimestampType => "to_timestamp_ntz('2020-09-16 06:30:00')" - case _: StructuredArrayType => "[]::" + convertToSFType(dataType) - case _: ArrayType => "to_array(0)" - case _: MapType => "to_object(parse_json('0'))" + case _: ArrayType => "[]::" + convertToSFType(dataType) + case _: MapType => "{}::" + convertToSFType(dataType) case VariantType => "to_variant(0)" case GeographyType => "to_geography('POINT(-122.35 37.55)')" case GeometryType => "to_geometry('POINT(-122.35 37.55)')" diff --git a/src/test/scala/com/snowflake/snowpark_test/DataTypeSuite.scala b/src/test/scala/com/snowflake/snowpark_test/DataTypeSuite.scala index 94eee642..d36d069f 100644 --- a/src/test/scala/com/snowflake/snowpark_test/DataTypeSuite.scala +++ b/src/test/scala/com/snowflake/snowpark_test/DataTypeSuite.scala @@ -293,6 +293,34 @@ class DataTypeSuite extends SNTestBase { |""".stripMargin) // scalastyle:on + // nullable + val query2 = + """SELECT + | {'a': 1, 'b': 2} :: MAP(VARCHAR, NUMBER not null) as map1, + | {'1': [1,2,3]} :: MAP(NUMBER, ARRAY(NUMBER not null)) as map3, + | {'1': {'a':1}} :: MAP(NUMBER, MAP(VARCHAR, NUMBER not null)) as map4 + |""".stripMargin + val df2 = session.sql(query2) + assert( + TestUtils.treeString(df2.schema, 0) == + // scalastyle:off + s"""root + | |--MAP1: MapType[String, Long nullable = false] (nullable = true) + | |--MAP3: MapType[Long, ArrayType[Long nullable = false] nullable = true] (nullable = true) + | |--MAP4: MapType[Long, MapType[String, Long nullable = false] nullable = true] (nullable = true) + |""".stripMargin) + // scalastyle:on + + assert( + TestUtils.treeString(df2.select("*").schema, 0) == + // scalastyle:off + s"""root + | |--MAP1: MapType[String, Long nullable = false] (nullable = true) + | |--MAP3: MapType[Long, ArrayType[Long nullable = false] nullable = true] (nullable = true) + | |--MAP4: MapType[Long, MapType[String, Long nullable = false] nullable = true] (nullable = true) + |""".stripMargin) + // scalastyle:on + } test("ObjectType v2") { From 6b4abaf52da256fd8e145c05d04778a3f22e4242 Mon Sep 17 00:00:00 2001 From: Bing Li Date: Wed, 10 Apr 2024 15:41:15 -0700 Subject: [PATCH 09/21] struct type --- .../snowpark/internal/ServerConnection.scala | 45 +++++++++++++++---- .../snowflake/snowpark/types/ArrayType.scala | 5 ++- .../snowflake/snowpark/types/DataType.scala | 2 + .../snowflake/snowpark/types/MapType.scala | 12 +++-- .../snowflake/snowpark/types/StructType.scala | 4 +- .../snowflake/snowpark/types/package.scala | 8 ++++ .../snowpark_test/DataTypeSuite.scala | 45 ++++++++++++++----- 7 files changed, 92 insertions(+), 29 deletions(-) diff --git a/src/main/scala/com/snowflake/snowpark/internal/ServerConnection.scala b/src/main/scala/com/snowflake/snowpark/internal/ServerConnection.scala index ea94799f..fab55a05 100644 --- a/src/main/scala/com/snowflake/snowpark/internal/ServerConnection.scala +++ b/src/main/scala/com/snowflake/snowpark/internal/ServerConnection.scala @@ -3,11 +3,37 @@ package com.snowflake.snowpark.internal import java.io.{Closeable, InputStream} import java.sql.{PreparedStatement, ResultSetMetaData, SQLException, Statement} import java.time.LocalDateTime -import com.snowflake.snowpark.{MergeBuilder, MergeTypedAsyncJob, Row, SnowparkClientException, TypedAsyncJob} -import com.snowflake.snowpark.internal.ParameterUtils.{ClosureCleanerMode, DEFAULT_MAX_FILE_DOWNLOAD_RETRY_COUNT, DEFAULT_MAX_FILE_UPLOAD_RETRY_COUNT, DEFAULT_REQUEST_TIMEOUT_IN_SECONDS, DEFAULT_SNOWPARK_USE_SCOPED_TEMP_OBJECTS, MAX_REQUEST_TIMEOUT_IN_SECONDS, MIN_REQUEST_TIMEOUT_IN_SECONDS, SnowparkMaxFileDownloadRetryCount, SnowparkMaxFileUploadRetryCount, SnowparkRequestTimeoutInSeconds, Url} +import com.snowflake.snowpark.{ + MergeBuilder, + MergeTypedAsyncJob, + Row, + SnowparkClientException, + TypedAsyncJob +} +import com.snowflake.snowpark.internal.ParameterUtils.{ + ClosureCleanerMode, + DEFAULT_MAX_FILE_DOWNLOAD_RETRY_COUNT, + DEFAULT_MAX_FILE_UPLOAD_RETRY_COUNT, + DEFAULT_REQUEST_TIMEOUT_IN_SECONDS, + DEFAULT_SNOWPARK_USE_SCOPED_TEMP_OBJECTS, + MAX_REQUEST_TIMEOUT_IN_SECONDS, + MIN_REQUEST_TIMEOUT_IN_SECONDS, + SnowparkMaxFileDownloadRetryCount, + SnowparkMaxFileUploadRetryCount, + SnowparkRequestTimeoutInSeconds, + Url +} import com.snowflake.snowpark.internal.Utils.PackageNameDelimiter import com.snowflake.snowpark.internal.analyzer.{Attribute, Query, SnowflakePlan} -import net.snowflake.client.jdbc.{FieldMetadata, SnowflakeConnectString, SnowflakeConnectionV1, SnowflakeReauthenticationRequest, SnowflakeResultSet, SnowflakeResultSetMetaData, SnowflakeStatement} +import net.snowflake.client.jdbc.{ + FieldMetadata, + SnowflakeConnectString, + SnowflakeConnectionV1, + SnowflakeReauthenticationRequest, + SnowflakeResultSet, + SnowflakeResultSetMetaData, + SnowflakeStatement +} import com.snowflake.snowpark.types._ import net.snowflake.client.core.QueryStatus @@ -32,8 +58,11 @@ private[snowpark] object ServerConnection { def convertResultMetaToAttribute(meta: ResultSetMetaData): Seq[Attribute] = (1 to meta.getColumnCount).map(index => { // todo: replace by public API - val fieldMetadata = meta.asInstanceOf[SnowflakeResultSetMetaData] - .getColumnFields(index).asScala.toList + val fieldMetadata = meta + .asInstanceOf[SnowflakeResultSetMetaData] + .getColumnFields(index) + .asScala + .toList val columnName = analyzer.quoteNameWithoutUpperCasing(meta.getColumnLabel(index)) val dataType = meta.getColumnType(index) val fieldSize = meta.getPrecision(index) @@ -69,8 +98,7 @@ private[snowpark] object ServerConnection { field.head.getScale, signed = true, // no sign info in the fields field.head.getFields.asScala.toList), - field.head.isNullable - ) + field.head.isNullable) } case "VARIANT" => VariantType case "OBJECT" => @@ -93,8 +121,7 @@ private[snowpark] object ServerConnection { field(1).getScale, signed = true, field(1).getFields.asScala.toList), - field(1).isNullable - ) + field(1).isNullable) } else { // object StructType( diff --git a/src/main/scala/com/snowflake/snowpark/types/ArrayType.scala b/src/main/scala/com/snowflake/snowpark/types/ArrayType.scala index 75f5fa7e..d5f027bf 100644 --- a/src/main/scala/com/snowflake/snowpark/types/ArrayType.scala +++ b/src/main/scala/com/snowflake/snowpark/types/ArrayType.scala @@ -14,8 +14,9 @@ case class ArrayType(elementType: DataType) extends DataType { /* temporary solution for Structured and Semi Structured data types. Two types will be merged in the future BCR. */ private[snowpark] class StructuredArrayType( - override val elementType: DataType, - val nullable: Boolean) extends ArrayType(elementType) { + override val elementType: DataType, + val nullable: Boolean) + extends ArrayType(elementType) { override def toString: String = { s"ArrayType[${elementType.toString} nullable = $nullable]" } diff --git a/src/main/scala/com/snowflake/snowpark/types/DataType.scala b/src/main/scala/com/snowflake/snowpark/types/DataType.scala index 968cc990..35b1f28e 100644 --- a/src/main/scala/com/snowflake/snowpark/types/DataType.scala +++ b/src/main/scala/com/snowflake/snowpark/types/DataType.scala @@ -18,6 +18,8 @@ abstract class DataType { * @since 0.1.0 */ override def toString: String = typeName + + private[snowpark] def schemaString: String = toString } private[snowpark] abstract class AtomicType extends DataType diff --git a/src/main/scala/com/snowflake/snowpark/types/MapType.scala b/src/main/scala/com/snowflake/snowpark/types/MapType.scala index 7da41c55..a5fe3475 100644 --- a/src/main/scala/com/snowflake/snowpark/types/MapType.scala +++ b/src/main/scala/com/snowflake/snowpark/types/MapType.scala @@ -12,18 +12,16 @@ case class MapType(keyType: DataType, valueType: DataType) extends DataType { } private[snowpark] class StructuredMapType( - override val keyType: DataType, - override val valueType: DataType, - val isValueNullable: Boolean - ) extends MapType(keyType, valueType) { + override val keyType: DataType, + override val valueType: DataType, + val isValueNullable: Boolean) + extends MapType(keyType, valueType) { override def toString: String = { s"MapType[${keyType.toString}, ${valueType.toString} nullable = $isValueNullable]" } } private[snowpark] object StructuredMapType { - def apply(keyType: DataType, - valueType: DataType, - isValueType: Boolean): StructuredMapType = + def apply(keyType: DataType, valueType: DataType, isValueType: Boolean): StructuredMapType = new StructuredMapType(keyType, valueType, isValueType) } diff --git a/src/main/scala/com/snowflake/snowpark/types/StructType.scala b/src/main/scala/com/snowflake/snowpark/types/StructType.scala index 7bc4f00c..24337518 100644 --- a/src/main/scala/com/snowflake/snowpark/types/StructType.scala +++ b/src/main/scala/com/snowflake/snowpark/types/StructType.scala @@ -65,6 +65,8 @@ case class StructType(fields: Array[StructField] = Array()) override def toString: String = s"StructType[${fields.map(_.toString).mkString(", ")}]" + override private[snowpark] def schemaString: String = "StructType" + /** * Appends a new [[StructField]] to the end of this object. * @since 0.1.0 @@ -168,7 +170,7 @@ case class StructField( private[types] def treeString(layer: Int): String = { val prepended: String = (1 to (1 + 2 * layer)).map(x => " ").mkString + "|--" - val body: String = s"$name: ${dataType.toString} (nullable = $nullable)\n" + + val body: String = s"$name: ${dataType.schemaString} (nullable = $nullable)\n" + (dataType match { case st: StructType => st.treeString(layer + 1) case _ => "" diff --git a/src/main/scala/com/snowflake/snowpark/types/package.scala b/src/main/scala/com/snowflake/snowpark/types/package.scala index 0db686d2..2f91f189 100644 --- a/src/main/scala/com/snowflake/snowpark/types/package.scala +++ b/src/main/scala/com/snowflake/snowpark/types/package.scala @@ -69,6 +69,14 @@ package object types { case sm: StructuredMapType => val isValueNullable = if (sm.isValueNullable) "" else " not null" s"MAP(${convertToSFType(sm.keyType)}, ${convertToSFType(sm.valueType)}$isValueNullable)" + case StructType(fields) => + val fieldStr = fields + .map( + field => + s"${field.name} ${convertToSFType(field.dataType)} " + + (if (field.nullable) "" else "not null")) + .mkString(",") + s"OBJECT($fieldStr)" case ArrayType(_) => "ARRAY" case MapType(_, _) => "OBJECT" case VariantType => "VARIANT" diff --git a/src/test/scala/com/snowflake/snowpark_test/DataTypeSuite.scala b/src/test/scala/com/snowflake/snowpark_test/DataTypeSuite.scala index d36d069f..c82c5881 100644 --- a/src/test/scala/com/snowflake/snowpark_test/DataTypeSuite.scala +++ b/src/test/scala/com/snowflake/snowpark_test/DataTypeSuite.scala @@ -243,18 +243,21 @@ class DataTypeSuite extends SNTestBase { val df2 = session.sql(query2) assert( TestUtils.treeString(df2.schema, 0) == + // scalastyle:off s"""root | |--ARR1: ArrayType[Long nullable = false] (nullable = true) | |--ARR11: ArrayType[ArrayType[Long nullable = false] nullable = false] (nullable = true) |""".stripMargin) + // scalastyle:on assert( TestUtils.treeString(df2.select("*").schema, 0) == + // scalastyle:off s"""root | |--ARR1: ArrayType[Long nullable = false] (nullable = true) | |--ARR11: ArrayType[ArrayType[Long nullable = false] nullable = false] (nullable = true) |""".stripMargin) - + // scalastyle:on } test("MapType v2") { @@ -338,19 +341,41 @@ class DataTypeSuite extends SNTestBase { TestUtils.treeString(df.schema, 0) == // scalastyle:off s"""root - | |--OBJECT1: StructType[StructField(A, String, Nullable = true), StructField(B, Long, Nullable = true)] (nullable = true) + | |--OBJECT1: StructType (nullable = true) + | |--A: String (nullable = true) + | |--B: Long (nullable = true) + | |--OBJECT2: StructType (nullable = true) + | |--A: String (nullable = true) + | |--B: ArrayType[Long nullable = true] (nullable = true) + | |--OBJECT3: StructType (nullable = true) + | |--A: String (nullable = true) + | |--B: ArrayType[Long nullable = true] (nullable = true) + | |--C: MapType[Long, String nullable = true] (nullable = true) + | |--OBJECT4: StructType (nullable = true) + | |--A: StructType (nullable = true) + | |--B: StructType (nullable = true) + | |--C: Long (nullable = true) + |""".stripMargin) + // scalastyle:on + + // schema string: nullable + assert( + TestUtils.treeString(df.select("*").schema, 0) == + // scalastyle:off + s"""root + | |--OBJECT1: StructType (nullable = true) | |--A: String (nullable = true) | |--B: Long (nullable = true) - | |--OBJECT2: StructType[StructField(A, String, Nullable = true), StructField(B, ArrayType[Long], Nullable = true)] (nullable = true) + | |--OBJECT2: StructType (nullable = true) | |--A: String (nullable = true) - | |--B: ArrayType[Long] (nullable = true) - | |--OBJECT3: StructType[StructField(A, String, Nullable = true), StructField(B, ArrayType[Long], Nullable = true), StructField(C, MapType[Long, String], Nullable = true)] (nullable = true) + | |--B: ArrayType[Long nullable = true] (nullable = true) + | |--OBJECT3: StructType (nullable = true) | |--A: String (nullable = true) - | |--B: ArrayType[Long] (nullable = true) - | |--C: MapType[Long, String] (nullable = true) - | |--OBJECT4: StructType[StructField(A, StructType[StructField(B, StructType[StructField(C, Long, Nullable = true)], Nullable = true)], Nullable = true)] (nullable = true) - | |--A: StructType[StructField(B, StructType[StructField(C, Long, Nullable = true)], Nullable = true)] (nullable = true) - | |--B: StructType[StructField(C, Long, Nullable = true)] (nullable = true) + | |--B: ArrayType[Long nullable = true] (nullable = true) + | |--C: MapType[Long, String nullable = true] (nullable = true) + | |--OBJECT4: StructType (nullable = true) + | |--A: StructType (nullable = true) + | |--B: StructType (nullable = true) | |--C: Long (nullable = true) |""".stripMargin) // scalastyle:on From 8ab9fe7f623eebe508a8b2460706c1ee0bd0d41e Mon Sep 17 00:00:00 2001 From: Bing Li Date: Wed, 10 Apr 2024 15:50:20 -0700 Subject: [PATCH 10/21] structure type --- .../snowpark_test/DataTypeSuite.scala | 62 +++++++++++++++++-- 1 file changed, 57 insertions(+), 5 deletions(-) diff --git a/src/test/scala/com/snowflake/snowpark_test/DataTypeSuite.scala b/src/test/scala/com/snowflake/snowpark_test/DataTypeSuite.scala index c82c5881..ec55cc01 100644 --- a/src/test/scala/com/snowflake/snowpark_test/DataTypeSuite.scala +++ b/src/test/scala/com/snowflake/snowpark_test/DataTypeSuite.scala @@ -128,22 +128,20 @@ class DataTypeSuite extends SNTestBase { StructField("col10", DoubleType), StructField("col11", DecimalType(10, 1))))))) - schema.printTreeString() - assert( TestUtils.treeString(schema, 0) == s"""root | |--COL1: Binary (nullable = true) | |--COL2: Boolean (nullable = true) - | |--COL14: Struct (nullable = false) + | |--COL14: StructType (nullable = false) | |--COL15: Timestamp (nullable = false) | |--COL3: Date (nullable = false) - | |--COL4: Struct (nullable = true) + | |--COL4: StructType (nullable = true) | |--COL5: Byte (nullable = true) | |--COL6: Short (nullable = true) | |--COL7: Integer (nullable = false) | |--COL8: Long (nullable = true) - | |--COL12: Struct (nullable = false) + | |--COL12: StructType (nullable = false) | |--COL13: String (nullable = true) | |--COL9: Float (nullable = true) | |--COL10: Double (nullable = true) @@ -379,5 +377,59 @@ class DataTypeSuite extends SNTestBase { | |--C: Long (nullable = true) |""".stripMargin) // scalastyle:on + + // schema query: not null + val query2 = + // scalastyle:off + """SELECT + | {'a': 1, 'b': 'a'} :: OBJECT(a VARCHAR not null, b NUMBER) as object1, + | {'a': 1, 'b': [1,2,3,4]} :: OBJECT(a VARCHAR, b ARRAY(NUMBER not null) not null) as object2, + | {'a': 1, 'b': [1,2,3,4], 'c': {'1':'a'}} :: OBJECT(a VARCHAR, b ARRAY(NUMBER), c MAP(NUMBER, VARCHAR not null) not null) as object3, + | {'a': {'b': {'c': 1}}} :: OBJECT(a OBJECT(b OBJECT(c NUMBER not null) not null) not null) as object4 + |""".stripMargin + // scalastyle:on + + val df2 = session.sql(query2) + assert( + TestUtils.treeString(df2.schema, 0) == + // scalastyle:off + s"""root + | |--OBJECT1: StructType (nullable = true) + | |--A: String (nullable = false) + | |--B: Long (nullable = true) + | |--OBJECT2: StructType (nullable = true) + | |--A: String (nullable = true) + | |--B: ArrayType[Long nullable = false] (nullable = false) + | |--OBJECT3: StructType (nullable = true) + | |--A: String (nullable = true) + | |--B: ArrayType[Long nullable = true] (nullable = true) + | |--C: MapType[Long, String nullable = false] (nullable = false) + | |--OBJECT4: StructType (nullable = true) + | |--A: StructType (nullable = false) + | |--B: StructType (nullable = false) + | |--C: Long (nullable = false) + |""".stripMargin) + // scalastyle:on + + assert( + TestUtils.treeString(df2.select("*").schema, 0) == + // scalastyle:off + s"""root + | |--OBJECT1: StructType (nullable = true) + | |--A: String (nullable = false) + | |--B: Long (nullable = true) + | |--OBJECT2: StructType (nullable = true) + | |--A: String (nullable = true) + | |--B: ArrayType[Long nullable = false] (nullable = false) + | |--OBJECT3: StructType (nullable = true) + | |--A: String (nullable = true) + | |--B: ArrayType[Long nullable = true] (nullable = true) + | |--C: MapType[Long, String nullable = false] (nullable = false) + | |--OBJECT4: StructType (nullable = true) + | |--A: StructType (nullable = false) + | |--B: StructType (nullable = false) + | |--C: Long (nullable = false) + |""".stripMargin) + // scalastyle:on } } From 78bb2d47a8faf3a1ef864777e6a0b6b7e68a9d02 Mon Sep 17 00:00:00 2001 From: Bing Li Date: Mon, 15 Apr 2024 13:45:27 -0700 Subject: [PATCH 11/21] tmp --- .../com/snowflake/snowpark/DataFrame.scala | 18 +++++-- .../snowpark/internal/ServerConnection.scala | 52 ++++++++----------- .../snowpark_test/DataTypeSuite.scala | 22 +++++++- 3 files changed, 57 insertions(+), 35 deletions(-) diff --git a/src/main/scala/com/snowflake/snowpark/DataFrame.scala b/src/main/scala/com/snowflake/snowpark/DataFrame.scala index 12417787..88bd15bc 100644 --- a/src/main/scala/com/snowflake/snowpark/DataFrame.scala +++ b/src/main/scala/com/snowflake/snowpark/DataFrame.scala @@ -2369,17 +2369,25 @@ class DataFrame private[snowpark] ( lines } + def castValueToString(value: Any): String = + value match { + case ba: Array[Byte] => s"'${DatatypeConverter.printHexBinary(ba)}'" + case bytes: Array[java.lang.Byte] => + s"'${DatatypeConverter.printHexBinary(bytes.map(_.toByte))}'" + case arr: Array[String] => + arr.mkString("[", ",", "]") + case arr: java.sql.Array => + arr.getArray().asInstanceOf[Array[_]].map(castValueToString).mkString("[", ",", "]") + case _ => value.toString + } + val body: Seq[Seq[String]] = result.flatMap(row => { // Value may contain multiple lines val lines: Seq[Seq[String]] = row.toSeq.zipWithIndex.map { case (value, index) => val texts: Seq[String] = if (value != null) { - val str = value match { - case ba: Array[Byte] => s"'${DatatypeConverter.printHexBinary(ba)}'" - case _ => value.toString - } // if the result contains multiple lines, split result string - splitLines(str) + splitLines(castValueToString(value)) } else { Seq("NULL") } diff --git a/src/main/scala/com/snowflake/snowpark/internal/ServerConnection.scala b/src/main/scala/com/snowflake/snowpark/internal/ServerConnection.scala index fab55a05..1c5c6fdc 100644 --- a/src/main/scala/com/snowflake/snowpark/internal/ServerConnection.scala +++ b/src/main/scala/com/snowflake/snowpark/internal/ServerConnection.scala @@ -3,40 +3,15 @@ package com.snowflake.snowpark.internal import java.io.{Closeable, InputStream} import java.sql.{PreparedStatement, ResultSetMetaData, SQLException, Statement} import java.time.LocalDateTime -import com.snowflake.snowpark.{ - MergeBuilder, - MergeTypedAsyncJob, - Row, - SnowparkClientException, - TypedAsyncJob -} -import com.snowflake.snowpark.internal.ParameterUtils.{ - ClosureCleanerMode, - DEFAULT_MAX_FILE_DOWNLOAD_RETRY_COUNT, - DEFAULT_MAX_FILE_UPLOAD_RETRY_COUNT, - DEFAULT_REQUEST_TIMEOUT_IN_SECONDS, - DEFAULT_SNOWPARK_USE_SCOPED_TEMP_OBJECTS, - MAX_REQUEST_TIMEOUT_IN_SECONDS, - MIN_REQUEST_TIMEOUT_IN_SECONDS, - SnowparkMaxFileDownloadRetryCount, - SnowparkMaxFileUploadRetryCount, - SnowparkRequestTimeoutInSeconds, - Url -} +import com.snowflake.snowpark.{MergeBuilder, MergeTypedAsyncJob, Row, SnowparkClientException, TypedAsyncJob} +import com.snowflake.snowpark.internal.ParameterUtils.{ClosureCleanerMode, DEFAULT_MAX_FILE_DOWNLOAD_RETRY_COUNT, DEFAULT_MAX_FILE_UPLOAD_RETRY_COUNT, DEFAULT_REQUEST_TIMEOUT_IN_SECONDS, DEFAULT_SNOWPARK_USE_SCOPED_TEMP_OBJECTS, MAX_REQUEST_TIMEOUT_IN_SECONDS, MIN_REQUEST_TIMEOUT_IN_SECONDS, SnowparkMaxFileDownloadRetryCount, SnowparkMaxFileUploadRetryCount, SnowparkRequestTimeoutInSeconds, Url} import com.snowflake.snowpark.internal.Utils.PackageNameDelimiter import com.snowflake.snowpark.internal.analyzer.{Attribute, Query, SnowflakePlan} -import net.snowflake.client.jdbc.{ - FieldMetadata, - SnowflakeConnectString, - SnowflakeConnectionV1, - SnowflakeReauthenticationRequest, - SnowflakeResultSet, - SnowflakeResultSetMetaData, - SnowflakeStatement -} +import net.snowflake.client.jdbc.{FieldMetadata, SnowflakeConnectString, SnowflakeConnectionV1, SnowflakeReauthenticationRequest, SnowflakeResultSet, SnowflakeResultSetMetaData, SnowflakeStatement} import com.snowflake.snowpark.types._ import net.snowflake.client.core.QueryStatus +import java.util import scala.collection.mutable import scala.reflect.runtime.universe.TypeTag import scala.collection.JavaConverters._ @@ -311,6 +286,15 @@ private[snowpark] class ServerConnection( buff.result() } + private def convertStructuredToScala(data: Any, dataType: DataType): Any = + dataType match { + case sa: StructuredArrayType => + data.asInstanceOf[util.ArrayList[_]] + .toArray().map(v => convertStructuredToScala(v, sa.elementType)) + case _ => data.toString + + } + private[snowpark] def resultSetToIterator( statement: Statement): (CloseableIterator[Row], StructType) = withValidConnection { @@ -338,6 +322,16 @@ private[snowpark] class ServerConnection( } else { attribute.dataType match { case VariantType => data.getString(resultIndex) + case sa: StructuredArrayType => + sa.elementType match { + case ArrayType(StringType) => // semi structured array + convertStructuredToScala(data.getObject(resultIndex), sa) +// case sa: StructuredArrayType => +// val result = data.getObject(resultIndex) +// .asInstanceOf[util.ArrayList[_]].toArray() +// "" + case _ => data.getArray(resultIndex) + } case ArrayType(StringType) => data.getString(resultIndex) case MapType(StringType, StringType) => data.getString(resultIndex) case StringType => data.getString(resultIndex) diff --git a/src/test/scala/com/snowflake/snowpark_test/DataTypeSuite.scala b/src/test/scala/com/snowflake/snowpark_test/DataTypeSuite.scala index ec55cc01..3a816588 100644 --- a/src/test/scala/com/snowflake/snowpark_test/DataTypeSuite.scala +++ b/src/test/scala/com/snowflake/snowpark_test/DataTypeSuite.scala @@ -181,6 +181,26 @@ class DataTypeSuite extends SNTestBase { |""".stripMargin) } + test("read Structured Array") { + val query = + """SELECT + | [1, 2, 3]::ARRAY(NUMBER) AS arr1, + | [1.1, 2.2, 3.3]::ARRAY(FLOAT) AS arr2, + | [true, false]::ARRAY(BOOLEAN) AS arr3, + | ['a', 'b']::ARRAY(VARCHAR) AS arr4, + | [parse_json(31000000)::timestamp_ntz]::ARRAY(TIMESTAMP_NTZ) AS arr5, + | [TO_BINARY('SNOW', 'utf-8')]::ARRAY(BINARY) AS arr6, + | [TO_DATE('2013-05-17')]::ARRAY(DATE) AS arr7, + | ['1', 2]::ARRAY(VARIANT) AS arr8, + | [[1,2]]::ARRAY(ARRAY) AS arr9, + | [OBJECT_CONSTRUCT('name', 1)]::ARRAY(OBJECT) AS arr10, + | [[1, 2], [3, 4]]::ARRAY(ARRAY(NUMBER)) AS arr11, + | [1, 2, 3] AS arr0 + |""".stripMargin + val df = session.sql(query) + df.show() + } + test("ArrayType v2") { val query = """SELECT | [1, 2, 3]::ARRAY(NUMBER) AS arr1, @@ -194,7 +214,7 @@ class DataTypeSuite extends SNTestBase { | [[1,2]]::ARRAY(ARRAY) AS arr9, | [OBJECT_CONSTRUCT('name', 1)]::ARRAY(OBJECT) AS arr10, | [[1, 2], [3, 4]]::ARRAY(ARRAY(NUMBER)) AS arr11, - | [1, 2, 3] AS arr0;""".stripMargin + | [1, 2, 3] AS arr0""".stripMargin val df = session.sql(query) assert( TestUtils.treeString(df.schema, 0) == From 0f8583a5b3bfa7750344a3971853c1b8bfc288c9 Mon Sep 17 00:00:00 2001 From: Bing Li Date: Mon, 15 Apr 2024 15:52:53 -0700 Subject: [PATCH 12/21] tmp --- .../com/snowflake/snowpark/DataFrame.scala | 2 ++ .../snowpark/internal/ServerConnection.scala | 36 ++++++++++++------- .../snowpark_test/DataTypeSuite.scala | 9 ++--- 3 files changed, 27 insertions(+), 20 deletions(-) diff --git a/src/main/scala/com/snowflake/snowpark/DataFrame.scala b/src/main/scala/com/snowflake/snowpark/DataFrame.scala index 88bd15bc..949114f4 100644 --- a/src/main/scala/com/snowflake/snowpark/DataFrame.scala +++ b/src/main/scala/com/snowflake/snowpark/DataFrame.scala @@ -2376,6 +2376,8 @@ class DataFrame private[snowpark] ( s"'${DatatypeConverter.printHexBinary(bytes.map(_.toByte))}'" case arr: Array[String] => arr.mkString("[", ",", "]") + case arr: Array[_] => + arr.map(castValueToString).mkString("[", ",", "]") case arr: java.sql.Array => arr.getArray().asInstanceOf[Array[_]].map(castValueToString).mkString("[", ",", "]") case _ => value.toString diff --git a/src/main/scala/com/snowflake/snowpark/internal/ServerConnection.scala b/src/main/scala/com/snowflake/snowpark/internal/ServerConnection.scala index 1c5c6fdc..4b39af2e 100644 --- a/src/main/scala/com/snowflake/snowpark/internal/ServerConnection.scala +++ b/src/main/scala/com/snowflake/snowpark/internal/ServerConnection.scala @@ -1,7 +1,7 @@ package com.snowflake.snowpark.internal import java.io.{Closeable, InputStream} -import java.sql.{PreparedStatement, ResultSetMetaData, SQLException, Statement} +import java.sql.{Date, PreparedStatement, ResultSetMetaData, SQLException, Statement, Timestamp} import java.time.LocalDateTime import com.snowflake.snowpark.{MergeBuilder, MergeTypedAsyncJob, Row, SnowparkClientException, TypedAsyncJob} import com.snowflake.snowpark.internal.ParameterUtils.{ClosureCleanerMode, DEFAULT_MAX_FILE_DOWNLOAD_RETRY_COUNT, DEFAULT_MAX_FILE_UPLOAD_RETRY_COUNT, DEFAULT_REQUEST_TIMEOUT_IN_SECONDS, DEFAULT_SNOWPARK_USE_SCOPED_TEMP_OBJECTS, MAX_REQUEST_TIMEOUT_IN_SECONDS, MIN_REQUEST_TIMEOUT_IN_SECONDS, SnowparkMaxFileDownloadRetryCount, SnowparkMaxFileUploadRetryCount, SnowparkRequestTimeoutInSeconds, Url} @@ -10,6 +10,7 @@ import com.snowflake.snowpark.internal.analyzer.{Attribute, Query, SnowflakePlan import net.snowflake.client.jdbc.{FieldMetadata, SnowflakeConnectString, SnowflakeConnectionV1, SnowflakeReauthenticationRequest, SnowflakeResultSet, SnowflakeResultSetMetaData, SnowflakeStatement} import com.snowflake.snowpark.types._ import net.snowflake.client.core.QueryStatus +import net.snowflake.client.core.arrow.{TwoFieldStructToTimestampLTZConverter, TwoFieldStructToTimestampNTZConverter} import java.util import scala.collection.mutable @@ -286,12 +287,28 @@ private[snowpark] class ServerConnection( buff.result() } - private def convertStructuredToScala(data: Any, dataType: DataType): Any = + private def convertStructuredToScala(value: Any, dataType: DataType): Any = dataType match { case sa: StructuredArrayType => - data.asInstanceOf[util.ArrayList[_]] + value.asInstanceOf[util.ArrayList[_]] .toArray().map(v => convertStructuredToScala(v, sa.elementType)) - case _ => data.toString + case LongType => + value.asInstanceOf[java.math.BigDecimal].toBigInteger.longValue() + case DoubleType => value.asInstanceOf[Double] + case BooleanType => value.asInstanceOf[Boolean] + case StringType => value.toString + case TimestampType => + val time = value.asInstanceOf[java.util.Map[_, _]] + val epoch = time.get("epoch").asInstanceOf[Long] +// val fraction = time.get("fraction").asInstanceOf[Int] + new Timestamp(epoch * 1000) + case DateType => + new Date(value.asInstanceOf[Int]) + case BinaryType => + value.asInstanceOf[Array[Byte]] + case _ => + throw new UnsupportedOperationException( + s"Unsupported type: $dataType") } @@ -323,15 +340,8 @@ private[snowpark] class ServerConnection( attribute.dataType match { case VariantType => data.getString(resultIndex) case sa: StructuredArrayType => - sa.elementType match { - case ArrayType(StringType) => // semi structured array - convertStructuredToScala(data.getObject(resultIndex), sa) -// case sa: StructuredArrayType => -// val result = data.getObject(resultIndex) -// .asInstanceOf[util.ArrayList[_]].toArray() -// "" - case _ => data.getArray(resultIndex) - } +// data.getArray(resultIndex) + convertStructuredToScala(data.getObject(resultIndex), sa) case ArrayType(StringType) => data.getString(resultIndex) case MapType(StringType, StringType) => data.getString(resultIndex) case StringType => data.getString(resultIndex) diff --git a/src/test/scala/com/snowflake/snowpark_test/DataTypeSuite.scala b/src/test/scala/com/snowflake/snowpark_test/DataTypeSuite.scala index 3a816588..a1837a7f 100644 --- a/src/test/scala/com/snowflake/snowpark_test/DataTypeSuite.scala +++ b/src/test/scala/com/snowflake/snowpark_test/DataTypeSuite.scala @@ -188,14 +188,9 @@ class DataTypeSuite extends SNTestBase { | [1.1, 2.2, 3.3]::ARRAY(FLOAT) AS arr2, | [true, false]::ARRAY(BOOLEAN) AS arr3, | ['a', 'b']::ARRAY(VARCHAR) AS arr4, - | [parse_json(31000000)::timestamp_ntz]::ARRAY(TIMESTAMP_NTZ) AS arr5, + | [parse_json(31111111)::timestamp_ntz]::ARRAY(TIMESTAMP_NTZ) AS arr5, | [TO_BINARY('SNOW', 'utf-8')]::ARRAY(BINARY) AS arr6, - | [TO_DATE('2013-05-17')]::ARRAY(DATE) AS arr7, - | ['1', 2]::ARRAY(VARIANT) AS arr8, - | [[1,2]]::ARRAY(ARRAY) AS arr9, - | [OBJECT_CONSTRUCT('name', 1)]::ARRAY(OBJECT) AS arr10, - | [[1, 2], [3, 4]]::ARRAY(ARRAY(NUMBER)) AS arr11, - | [1, 2, 3] AS arr0 + | [TO_DATE('2013-05-17')]::ARRAY(DATE) AS arr7 |""".stripMargin val df = session.sql(query) df.show() From 4fae41cfbafca3a8ac839ced66f2f7503c48c6f9 Mon Sep 17 00:00:00 2001 From: Bing Li Date: Wed, 17 Apr 2024 14:04:36 -0700 Subject: [PATCH 13/21] support date --- .../snowpark/internal/ServerConnection.scala | 88 +++++++++++++------ .../snowpark_test/DataTypeSuite.scala | 5 +- 2 files changed, 62 insertions(+), 31 deletions(-) diff --git a/src/main/scala/com/snowflake/snowpark/internal/ServerConnection.scala b/src/main/scala/com/snowflake/snowpark/internal/ServerConnection.scala index 4b39af2e..8dba938a 100644 --- a/src/main/scala/com/snowflake/snowpark/internal/ServerConnection.scala +++ b/src/main/scala/com/snowflake/snowpark/internal/ServerConnection.scala @@ -7,12 +7,14 @@ import com.snowflake.snowpark.{MergeBuilder, MergeTypedAsyncJob, Row, SnowparkCl import com.snowflake.snowpark.internal.ParameterUtils.{ClosureCleanerMode, DEFAULT_MAX_FILE_DOWNLOAD_RETRY_COUNT, DEFAULT_MAX_FILE_UPLOAD_RETRY_COUNT, DEFAULT_REQUEST_TIMEOUT_IN_SECONDS, DEFAULT_SNOWPARK_USE_SCOPED_TEMP_OBJECTS, MAX_REQUEST_TIMEOUT_IN_SECONDS, MIN_REQUEST_TIMEOUT_IN_SECONDS, SnowparkMaxFileDownloadRetryCount, SnowparkMaxFileUploadRetryCount, SnowparkRequestTimeoutInSeconds, Url} import com.snowflake.snowpark.internal.Utils.PackageNameDelimiter import com.snowflake.snowpark.internal.analyzer.{Attribute, Query, SnowflakePlan} -import net.snowflake.client.jdbc.{FieldMetadata, SnowflakeConnectString, SnowflakeConnectionV1, SnowflakeReauthenticationRequest, SnowflakeResultSet, SnowflakeResultSetMetaData, SnowflakeStatement} +import net.snowflake.client.jdbc.{FieldMetadata, SnowflakeBaseResultSet, SnowflakeConnectString, SnowflakeConnectionV1, SnowflakeReauthenticationRequest, SnowflakeResultSet, SnowflakeResultSetMetaData, SnowflakeResultSetV1, SnowflakeStatement} +import net.snowflake.client.core.json.Converters import com.snowflake.snowpark.types._ -import net.snowflake.client.core.QueryStatus +import net.snowflake.client.core.{ColumnTypeHelper, QueryStatus, SFArrowResultSet, SFBaseResultSet, SFBaseSession} import net.snowflake.client.core.arrow.{TwoFieldStructToTimestampLTZConverter, TwoFieldStructToTimestampNTZConverter} import java.util +import java.util.TimeZone import scala.collection.mutable import scala.reflect.runtime.universe.TypeTag import scala.collection.JavaConverters._ @@ -287,40 +289,54 @@ private[snowpark] class ServerConnection( buff.result() } - private def convertStructuredToScala(value: Any, dataType: DataType): Any = - dataType match { - case sa: StructuredArrayType => - value.asInstanceOf[util.ArrayList[_]] - .toArray().map(v => convertStructuredToScala(v, sa.elementType)) - case LongType => - value.asInstanceOf[java.math.BigDecimal].toBigInteger.longValue() - case DoubleType => value.asInstanceOf[Double] - case BooleanType => value.asInstanceOf[Boolean] - case StringType => value.toString - case TimestampType => - val time = value.asInstanceOf[java.util.Map[_, _]] - val epoch = time.get("epoch").asInstanceOf[Long] -// val fraction = time.get("fraction").asInstanceOf[Int] - new Timestamp(epoch * 1000) - case DateType => - new Date(value.asInstanceOf[Int]) - case BinaryType => - value.asInstanceOf[Array[Byte]] - case _ => - throw new UnsupportedOperationException( - s"Unsupported type: $dataType") - - } - private[snowpark] def resultSetToIterator( statement: Statement): (CloseableIterator[Row], StructType) = withValidConnection { val data = statement.getResultSet + + // used by structured types + lazy val arrowResultSet: SFArrowResultSet = { + val sfResultSet = data.asInstanceOf[SnowflakeBaseResultSet] + val baseResultSetField = classOf[SnowflakeBaseResultSet].getDeclaredField("sfBaseResultSet") + baseResultSetField.setAccessible(true) + baseResultSetField.get(sfResultSet).asInstanceOf[SFArrowResultSet] + } val schema = ServerConnection.convertResultMetaToAttribute(data.getMetaData) lazy val geographyOutputFormat = getParameterValue(ParameterUtils.GeographyOutputFormat) lazy val geometryOutputFormat = getParameterValue(ParameterUtils.GeometryOutputFormat) + def convertToSnowparkValue(value: Any, meta: FieldMetadata): Any = { + meta.getTypeName match { + case "ARRAY" => + if (meta.getFields.isEmpty) { + null // semi structured + } else { + value.asInstanceOf[util.ArrayList[_]] + .toArray + .map(v => convertToSnowparkValue(v, meta.getFields.get(0))) + } + case "NUMBER" if meta.getType == java.sql.Types.BIGINT => + value.asInstanceOf[java.math.BigDecimal].toBigInteger.longValue() + case "DOUBLE" => value.asInstanceOf[Double] + case "BOOLEAN" => value.asInstanceOf[Boolean] + case "VARCHAR" => value.toString + case "BINARY" => value // byte array + case "DATE" => + arrowResultSet.convertToDate(value, null) + case _ if meta.getType == java.sql.Types.TIMESTAMP || + meta.getType == java.sql.Types.TIMESTAMP_WITH_TIMEZONE => + val columnSubType = meta.getType + val columnType = ColumnTypeHelper + .getColumnType(columnSubType, arrowResultSet.getSession) + arrowResultSet.convertToTimestamp( + value, columnType, columnSubType, null, meta.getScale + ) + case _ => + throw new UnsupportedOperationException(s"Unsupported type: ${meta.getTypeName}") + } + } + val iterator = new CloseableIterator[Row] { private var _currentRow: Row = _ private var _hasNext: Boolean = _ @@ -340,8 +356,22 @@ private[snowpark] class ServerConnection( attribute.dataType match { case VariantType => data.getString(resultIndex) case sa: StructuredArrayType => -// data.getArray(resultIndex) - convertStructuredToScala(data.getObject(resultIndex), sa) + val meta = data.getMetaData + // convert meta to field meta + val field = new FieldMetadata( + meta.getColumnName(resultIndex), + meta.getColumnTypeName(resultIndex), + meta.getColumnType(resultIndex), + true, + 0, + 0, + 0, + false, + null, + meta.asInstanceOf[SnowflakeResultSetMetaData] + .getColumnFields(resultIndex)) + convertToSnowparkValue(data.getObject(resultIndex), field) +// convertStructuredToScala(data.getObject(resultIndex), sa) case ArrayType(StringType) => data.getString(resultIndex) case MapType(StringType, StringType) => data.getString(resultIndex) case StringType => data.getString(resultIndex) diff --git a/src/test/scala/com/snowflake/snowpark_test/DataTypeSuite.scala b/src/test/scala/com/snowflake/snowpark_test/DataTypeSuite.scala index a1837a7f..0477d070 100644 --- a/src/test/scala/com/snowflake/snowpark_test/DataTypeSuite.scala +++ b/src/test/scala/com/snowflake/snowpark_test/DataTypeSuite.scala @@ -188,10 +188,11 @@ class DataTypeSuite extends SNTestBase { | [1.1, 2.2, 3.3]::ARRAY(FLOAT) AS arr2, | [true, false]::ARRAY(BOOLEAN) AS arr3, | ['a', 'b']::ARRAY(VARCHAR) AS arr4, - | [parse_json(31111111)::timestamp_ntz]::ARRAY(TIMESTAMP_NTZ) AS arr5, + | [parse_json(31000000)::timestamp_ntz]::ARRAY(TIMESTAMP_NTZ) AS arr5, | [TO_BINARY('SNOW', 'utf-8')]::ARRAY(BINARY) AS arr6, | [TO_DATE('2013-05-17')]::ARRAY(DATE) AS arr7 |""".stripMargin +// val query = "select TO_DATE('2013-05-17') AS arr7" val df = session.sql(query) df.show() } @@ -395,7 +396,7 @@ class DataTypeSuite extends SNTestBase { // schema query: not null val query2 = - // scalastyle:off + // scalastyle:off """SELECT | {'a': 1, 'b': 'a'} :: OBJECT(a VARCHAR not null, b NUMBER) as object1, | {'a': 1, 'b': [1,2,3,4]} :: OBJECT(a VARCHAR, b ARRAY(NUMBER not null) not null) as object2, From 8da9f0327c1d414237bfd9c3f0e860d243aea77a Mon Sep 17 00:00:00 2001 From: Bing Li Date: Wed, 17 Apr 2024 14:13:38 -0700 Subject: [PATCH 14/21] time --- .../com/snowflake/snowpark/internal/ServerConnection.scala | 3 +++ .../scala/com/snowflake/snowpark_test/DataTypeSuite.scala | 5 +++-- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/src/main/scala/com/snowflake/snowpark/internal/ServerConnection.scala b/src/main/scala/com/snowflake/snowpark/internal/ServerConnection.scala index 8dba938a..b0f667e4 100644 --- a/src/main/scala/com/snowflake/snowpark/internal/ServerConnection.scala +++ b/src/main/scala/com/snowflake/snowpark/internal/ServerConnection.scala @@ -321,9 +321,12 @@ private[snowpark] class ServerConnection( case "DOUBLE" => value.asInstanceOf[Double] case "BOOLEAN" => value.asInstanceOf[Boolean] case "VARCHAR" => value.toString + case "VARIANT" => value.toString case "BINARY" => value // byte array case "DATE" => arrowResultSet.convertToDate(value, null) + case "TIME" => + arrowResultSet.convertToTime(value, meta.getScale) case _ if meta.getType == java.sql.Types.TIMESTAMP || meta.getType == java.sql.Types.TIMESTAMP_WITH_TIMEZONE => val columnSubType = meta.getType diff --git a/src/test/scala/com/snowflake/snowpark_test/DataTypeSuite.scala b/src/test/scala/com/snowflake/snowpark_test/DataTypeSuite.scala index 0477d070..9de11a3c 100644 --- a/src/test/scala/com/snowflake/snowpark_test/DataTypeSuite.scala +++ b/src/test/scala/com/snowflake/snowpark_test/DataTypeSuite.scala @@ -190,9 +190,10 @@ class DataTypeSuite extends SNTestBase { | ['a', 'b']::ARRAY(VARCHAR) AS arr4, | [parse_json(31000000)::timestamp_ntz]::ARRAY(TIMESTAMP_NTZ) AS arr5, | [TO_BINARY('SNOW', 'utf-8')]::ARRAY(BINARY) AS arr6, - | [TO_DATE('2013-05-17')]::ARRAY(DATE) AS arr7 + | [TO_DATE('2013-05-17')]::ARRAY(DATE) AS arr7, + | [time '10:03:56']::ARRAY(TIME) as arr11 |""".stripMargin -// val query = "select TO_DATE('2013-05-17') AS arr7" +// val query = "select time '10:03:56' AS arr7" val df = session.sql(query) df.show() } From c5c51fc6e0bef9a221a77df1c8caa7641417d034 Mon Sep 17 00:00:00 2001 From: Bing Li Date: Wed, 17 Apr 2024 14:24:24 -0700 Subject: [PATCH 15/21] complete show --- .../snowflake/snowpark/internal/ServerConnection.scala | 8 +++++++- .../scala/com/snowflake/snowpark_test/DataTypeSuite.scala | 5 ++++- 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/src/main/scala/com/snowflake/snowpark/internal/ServerConnection.scala b/src/main/scala/com/snowflake/snowpark/internal/ServerConnection.scala index b0f667e4..0943aa3a 100644 --- a/src/main/scala/com/snowflake/snowpark/internal/ServerConnection.scala +++ b/src/main/scala/com/snowflake/snowpark/internal/ServerConnection.scala @@ -310,12 +310,18 @@ private[snowpark] class ServerConnection( meta.getTypeName match { case "ARRAY" => if (meta.getFields.isEmpty) { - null // semi structured + value.toString // semi structured } else { value.asInstanceOf[util.ArrayList[_]] .toArray .map(v => convertToSnowparkValue(v, meta.getFields.get(0))) } + case "OBJECT" => + if (meta.getFields.isEmpty) { // semi-structured + value.toString + } else { + null + } case "NUMBER" if meta.getType == java.sql.Types.BIGINT => value.asInstanceOf[java.math.BigDecimal].toBigInteger.longValue() case "DOUBLE" => value.asInstanceOf[Double] diff --git a/src/test/scala/com/snowflake/snowpark_test/DataTypeSuite.scala b/src/test/scala/com/snowflake/snowpark_test/DataTypeSuite.scala index 9de11a3c..ec7927f5 100644 --- a/src/test/scala/com/snowflake/snowpark_test/DataTypeSuite.scala +++ b/src/test/scala/com/snowflake/snowpark_test/DataTypeSuite.scala @@ -191,7 +191,10 @@ class DataTypeSuite extends SNTestBase { | [parse_json(31000000)::timestamp_ntz]::ARRAY(TIMESTAMP_NTZ) AS arr5, | [TO_BINARY('SNOW', 'utf-8')]::ARRAY(BINARY) AS arr6, | [TO_DATE('2013-05-17')]::ARRAY(DATE) AS arr7, - | [time '10:03:56']::ARRAY(TIME) as arr11 + | [[1,2]]::ARRAY(ARRAY) AS arr9, + | [OBJECT_CONSTRUCT('name', 1)]::ARRAY(OBJECT) AS arr10, + | [[1, 2], [3, 4]]::ARRAY(ARRAY(NUMBER)) AS arr11, + | [time '10:03:56']::ARRAY(TIME) as arr21 |""".stripMargin // val query = "select time '10:03:56' AS arr7" val df = session.sql(query) From 3aa542afa7ea56561074e98c4cc87c11ca9b999f Mon Sep 17 00:00:00 2001 From: Bing Li Date: Wed, 17 Apr 2024 14:45:00 -0700 Subject: [PATCH 16/21] decimal --- .../com/snowflake/snowpark/internal/ServerConnection.scala | 5 +++-- .../scala/com/snowflake/snowpark_test/DataTypeSuite.scala | 2 +- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/src/main/scala/com/snowflake/snowpark/internal/ServerConnection.scala b/src/main/scala/com/snowflake/snowpark/internal/ServerConnection.scala index 0943aa3a..49a5712c 100644 --- a/src/main/scala/com/snowflake/snowpark/internal/ServerConnection.scala +++ b/src/main/scala/com/snowflake/snowpark/internal/ServerConnection.scala @@ -324,6 +324,8 @@ private[snowpark] class ServerConnection( } case "NUMBER" if meta.getType == java.sql.Types.BIGINT => value.asInstanceOf[java.math.BigDecimal].toBigInteger.longValue() + case "NUMBER" if meta.getType == java.sql.Types.DECIMAL => + value case "DOUBLE" => value.asInstanceOf[Double] case "BOOLEAN" => value.asInstanceOf[Boolean] case "VARCHAR" => value.toString @@ -364,7 +366,7 @@ private[snowpark] class ServerConnection( } else { attribute.dataType match { case VariantType => data.getString(resultIndex) - case sa: StructuredArrayType => + case _: StructuredArrayType => val meta = data.getMetaData // convert meta to field meta val field = new FieldMetadata( @@ -380,7 +382,6 @@ private[snowpark] class ServerConnection( meta.asInstanceOf[SnowflakeResultSetMetaData] .getColumnFields(resultIndex)) convertToSnowparkValue(data.getObject(resultIndex), field) -// convertStructuredToScala(data.getObject(resultIndex), sa) case ArrayType(StringType) => data.getString(resultIndex) case MapType(StringType, StringType) => data.getString(resultIndex) case StringType => data.getString(resultIndex) diff --git a/src/test/scala/com/snowflake/snowpark_test/DataTypeSuite.scala b/src/test/scala/com/snowflake/snowpark_test/DataTypeSuite.scala index ec7927f5..26dfb6f2 100644 --- a/src/test/scala/com/snowflake/snowpark_test/DataTypeSuite.scala +++ b/src/test/scala/com/snowflake/snowpark_test/DataTypeSuite.scala @@ -194,9 +194,9 @@ class DataTypeSuite extends SNTestBase { | [[1,2]]::ARRAY(ARRAY) AS arr9, | [OBJECT_CONSTRUCT('name', 1)]::ARRAY(OBJECT) AS arr10, | [[1, 2], [3, 4]]::ARRAY(ARRAY(NUMBER)) AS arr11, + | [1.234::DECIMAL(13, 5)]::ARRAY(DECIMAL(13,5)) as arr12, | [time '10:03:56']::ARRAY(TIME) as arr21 |""".stripMargin -// val query = "select time '10:03:56' AS arr7" val df = session.sql(query) df.show() } From d526ca93fdcda7932e17c09d08af5dbf0a8f4c82 Mon Sep 17 00:00:00 2001 From: Bing Li Date: Wed, 17 Apr 2024 14:56:21 -0700 Subject: [PATCH 17/21] reorg --- .../snowpark/internal/ServerConnection.scala | 8 +------- .../snowflake/snowpark/APIInternalSuite.scala | 20 +++++++++++++++++++ 2 files changed, 21 insertions(+), 7 deletions(-) diff --git a/src/main/scala/com/snowflake/snowpark/internal/ServerConnection.scala b/src/main/scala/com/snowflake/snowpark/internal/ServerConnection.scala index 49a5712c..4947ca79 100644 --- a/src/main/scala/com/snowflake/snowpark/internal/ServerConnection.scala +++ b/src/main/scala/com/snowflake/snowpark/internal/ServerConnection.scala @@ -324,13 +324,7 @@ private[snowpark] class ServerConnection( } case "NUMBER" if meta.getType == java.sql.Types.BIGINT => value.asInstanceOf[java.math.BigDecimal].toBigInteger.longValue() - case "NUMBER" if meta.getType == java.sql.Types.DECIMAL => - value - case "DOUBLE" => value.asInstanceOf[Double] - case "BOOLEAN" => value.asInstanceOf[Boolean] - case "VARCHAR" => value.toString - case "VARIANT" => value.toString - case "BINARY" => value // byte array + case "DOUBLE"| "BOOLEAN"| "BINARY"| "NUMBER"| "VARCHAR"| "VARIANT" => value case "DATE" => arrowResultSet.convertToDate(value, null) case "TIME" => diff --git a/src/test/scala/com/snowflake/snowpark/APIInternalSuite.scala b/src/test/scala/com/snowflake/snowpark/APIInternalSuite.scala index 29111bf9..82eaa34a 100644 --- a/src/test/scala/com/snowflake/snowpark/APIInternalSuite.scala +++ b/src/test/scala/com/snowflake/snowpark/APIInternalSuite.scala @@ -393,6 +393,26 @@ class APIInternalSuite extends TestData { |""".stripMargin) } + test("show structured array") { + val query = + """SELECT + | [1, 2, 3]::ARRAY(NUMBER) AS arr1, + | [1.1, 2.2, 3.3]::ARRAY(FLOAT) AS arr2, + | [true, false]::ARRAY(BOOLEAN) AS arr3, + | ['a', 'b']::ARRAY(VARCHAR) AS arr4, + | [parse_json(31000000)::timestamp_ntz]::ARRAY(TIMESTAMP_NTZ) AS arr5, + | [TO_BINARY('SNOW', 'utf-8')]::ARRAY(BINARY) AS arr6, + | [TO_DATE('2013-05-17')]::ARRAY(DATE) AS arr7, + | [[1,2]]::ARRAY(ARRAY) AS arr9, + | [OBJECT_CONSTRUCT('name', 1)]::ARRAY(OBJECT) AS arr10, + | [[1, 2], [3, 4]]::ARRAY(ARRAY(NUMBER)) AS arr11, + | [1.234::DECIMAL(13, 5)]::ARRAY(DECIMAL(13,5)) as arr12, + | [time '10:03:56']::ARRAY(TIME) as arr21 + |""".stripMargin + val df = session.sql(query) + df.show() + } + // dataframe test("withColumn function uses * instead of full column name list") { import session.implicits._ From e4ee30eab891833b9dc181071121a512b7cbb1b6 Mon Sep 17 00:00:00 2001 From: Bing Li Date: Wed, 17 Apr 2024 14:58:33 -0700 Subject: [PATCH 18/21] add test --- .../com/snowflake/snowpark/APIInternalSuite.scala | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/src/test/scala/com/snowflake/snowpark/APIInternalSuite.scala b/src/test/scala/com/snowflake/snowpark/APIInternalSuite.scala index 82eaa34a..d2abde0e 100644 --- a/src/test/scala/com/snowflake/snowpark/APIInternalSuite.scala +++ b/src/test/scala/com/snowflake/snowpark/APIInternalSuite.scala @@ -410,7 +410,19 @@ class APIInternalSuite extends TestData { | [time '10:03:56']::ARRAY(TIME) as arr21 |""".stripMargin val df = session.sql(query) - df.show() + // scalastyle:off + assert( + df.showString(10) == + """--------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + ||"ARR1" |"ARR2" |"ARR3" |"ARR4" |"ARR5" |"ARR6" |"ARR7" |"ARR9" |"ARR10" |"ARR11" |"ARR12" |"ARR21" | + |--------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + ||[1,2,3] |[1.1,2.2,3.3] |[true,false] |[a,b] |[1970-12-25 11:06:40.0] |['534E4F57'] |[2013-05-17] |[[ |[{ |[[1,2],[3,4]] |[1.23400] |[10:03:56] | + || | | | | | | | 1, | "name": 1 | | | | + || | | | | | | | 2 |}] | | | | + || | | | | | | |]] | | | | | + |--------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + |""".stripMargin) + // scalastyle:on } // dataframe From de4a61cc0f4939cb53aed80fee0dd8b6c58842bb Mon Sep 17 00:00:00 2001 From: Bing Li Date: Wed, 17 Apr 2024 15:32:28 -0700 Subject: [PATCH 19/21] add test --- .../snowpark/internal/ServerConnection.scala | 71 +++++++++++++++---- .../snowpark_test/DataTypeSuite.scala | 18 ++++- 2 files changed, 75 insertions(+), 14 deletions(-) diff --git a/src/main/scala/com/snowflake/snowpark/internal/ServerConnection.scala b/src/main/scala/com/snowflake/snowpark/internal/ServerConnection.scala index 4947ca79..c2e2ad96 100644 --- a/src/main/scala/com/snowflake/snowpark/internal/ServerConnection.scala +++ b/src/main/scala/com/snowflake/snowpark/internal/ServerConnection.scala @@ -3,15 +3,52 @@ package com.snowflake.snowpark.internal import java.io.{Closeable, InputStream} import java.sql.{Date, PreparedStatement, ResultSetMetaData, SQLException, Statement, Timestamp} import java.time.LocalDateTime -import com.snowflake.snowpark.{MergeBuilder, MergeTypedAsyncJob, Row, SnowparkClientException, TypedAsyncJob} -import com.snowflake.snowpark.internal.ParameterUtils.{ClosureCleanerMode, DEFAULT_MAX_FILE_DOWNLOAD_RETRY_COUNT, DEFAULT_MAX_FILE_UPLOAD_RETRY_COUNT, DEFAULT_REQUEST_TIMEOUT_IN_SECONDS, DEFAULT_SNOWPARK_USE_SCOPED_TEMP_OBJECTS, MAX_REQUEST_TIMEOUT_IN_SECONDS, MIN_REQUEST_TIMEOUT_IN_SECONDS, SnowparkMaxFileDownloadRetryCount, SnowparkMaxFileUploadRetryCount, SnowparkRequestTimeoutInSeconds, Url} +import com.snowflake.snowpark.{ + MergeBuilder, + MergeTypedAsyncJob, + Row, + SnowparkClientException, + TypedAsyncJob +} +import com.snowflake.snowpark.internal.ParameterUtils.{ + ClosureCleanerMode, + DEFAULT_MAX_FILE_DOWNLOAD_RETRY_COUNT, + DEFAULT_MAX_FILE_UPLOAD_RETRY_COUNT, + DEFAULT_REQUEST_TIMEOUT_IN_SECONDS, + DEFAULT_SNOWPARK_USE_SCOPED_TEMP_OBJECTS, + MAX_REQUEST_TIMEOUT_IN_SECONDS, + MIN_REQUEST_TIMEOUT_IN_SECONDS, + SnowparkMaxFileDownloadRetryCount, + SnowparkMaxFileUploadRetryCount, + SnowparkRequestTimeoutInSeconds, + Url +} import com.snowflake.snowpark.internal.Utils.PackageNameDelimiter import com.snowflake.snowpark.internal.analyzer.{Attribute, Query, SnowflakePlan} -import net.snowflake.client.jdbc.{FieldMetadata, SnowflakeBaseResultSet, SnowflakeConnectString, SnowflakeConnectionV1, SnowflakeReauthenticationRequest, SnowflakeResultSet, SnowflakeResultSetMetaData, SnowflakeResultSetV1, SnowflakeStatement} +import net.snowflake.client.jdbc.{ + FieldMetadata, + SnowflakeBaseResultSet, + SnowflakeConnectString, + SnowflakeConnectionV1, + SnowflakeReauthenticationRequest, + SnowflakeResultSet, + SnowflakeResultSetMetaData, + SnowflakeResultSetV1, + SnowflakeStatement +} import net.snowflake.client.core.json.Converters import com.snowflake.snowpark.types._ -import net.snowflake.client.core.{ColumnTypeHelper, QueryStatus, SFArrowResultSet, SFBaseResultSet, SFBaseSession} -import net.snowflake.client.core.arrow.{TwoFieldStructToTimestampLTZConverter, TwoFieldStructToTimestampNTZConverter} +import net.snowflake.client.core.{ + ColumnTypeHelper, + QueryStatus, + SFArrowResultSet, + SFBaseResultSet, + SFBaseSession +} +import net.snowflake.client.core.arrow.{ + TwoFieldStructToTimestampLTZConverter, + TwoFieldStructToTimestampNTZConverter +} import java.util import java.util.TimeZone @@ -297,7 +334,8 @@ private[snowpark] class ServerConnection( // used by structured types lazy val arrowResultSet: SFArrowResultSet = { val sfResultSet = data.asInstanceOf[SnowflakeBaseResultSet] - val baseResultSetField = classOf[SnowflakeBaseResultSet].getDeclaredField("sfBaseResultSet") + val baseResultSetField = + classOf[SnowflakeBaseResultSet].getDeclaredField("sfBaseResultSet") baseResultSetField.setAccessible(true) baseResultSetField.get(sfResultSet).asInstanceOf[SFArrowResultSet] } @@ -312,7 +350,8 @@ private[snowpark] class ServerConnection( if (meta.getFields.isEmpty) { value.toString // semi structured } else { - value.asInstanceOf[util.ArrayList[_]] + value + .asInstanceOf[util.ArrayList[_]] .toArray .map(v => convertToSnowparkValue(v, meta.getFields.get(0))) } @@ -324,19 +363,24 @@ private[snowpark] class ServerConnection( } case "NUMBER" if meta.getType == java.sql.Types.BIGINT => value.asInstanceOf[java.math.BigDecimal].toBigInteger.longValue() - case "DOUBLE"| "BOOLEAN"| "BINARY"| "NUMBER"| "VARCHAR"| "VARIANT" => value + case "DOUBLE" | "BOOLEAN" | "BINARY" | "NUMBER" => value + case "VARCHAR" | "VARIANT" => value.toString // Text to String case "DATE" => arrowResultSet.convertToDate(value, null) case "TIME" => arrowResultSet.convertToTime(value, meta.getScale) - case _ if meta.getType == java.sql.Types.TIMESTAMP || - meta.getType == java.sql.Types.TIMESTAMP_WITH_TIMEZONE => + case _ + if meta.getType == java.sql.Types.TIMESTAMP || + meta.getType == java.sql.Types.TIMESTAMP_WITH_TIMEZONE => val columnSubType = meta.getType val columnType = ColumnTypeHelper .getColumnType(columnSubType, arrowResultSet.getSession) arrowResultSet.convertToTimestamp( - value, columnType, columnSubType, null, meta.getScale - ) + value, + columnType, + columnSubType, + null, + meta.getScale) case _ => throw new UnsupportedOperationException(s"Unsupported type: ${meta.getTypeName}") } @@ -373,7 +417,8 @@ private[snowpark] class ServerConnection( 0, false, null, - meta.asInstanceOf[SnowflakeResultSetMetaData] + meta + .asInstanceOf[SnowflakeResultSetMetaData] .getColumnFields(resultIndex)) convertToSnowparkValue(data.getObject(resultIndex), field) case ArrayType(StringType) => data.getString(resultIndex) diff --git a/src/test/scala/com/snowflake/snowpark_test/DataTypeSuite.scala b/src/test/scala/com/snowflake/snowpark_test/DataTypeSuite.scala index 26dfb6f2..639f9370 100644 --- a/src/test/scala/com/snowflake/snowpark_test/DataTypeSuite.scala +++ b/src/test/scala/com/snowflake/snowpark_test/DataTypeSuite.scala @@ -4,6 +4,8 @@ import com.snowflake.snowpark.{Row, SNTestBase, TestUtils} import com.snowflake.snowpark.types._ import com.snowflake.snowpark.functions._ +import java.sql.{Date, Time, Timestamp} + // Test DataTypes out of com.snowflake.snowpark package. class DataTypeSuite extends SNTestBase { test("IntegralType") { @@ -198,7 +200,21 @@ class DataTypeSuite extends SNTestBase { | [time '10:03:56']::ARRAY(TIME) as arr21 |""".stripMargin val df = session.sql(query) - df.show() + checkAnswer( + df, + Row( + Array(1L, 2L, 3L), + Array(1.1, 2.2, 3.3), + Array(true, false), + Array("a", "b"), + Array(new Timestamp(31000000000L)), + Array(Array(83.toByte, 78.toByte, 79.toByte, 87.toByte)), + Array(Date.valueOf("2013-05-17")), + Array("[\n 1,\n 2\n]"), + Array("{\n \"name\": 1\n}"), + Array(Array(1L, 2L), Array(3L, 4L)), + Array(java.math.BigDecimal.valueOf(1.234)), + Array(Time.valueOf("10:03:56")))) } test("ArrayType v2") { From 7684a9a292acc59907a6b8fd8d586cb1e21bae6c Mon Sep 17 00:00:00 2001 From: Bing Li Date: Mon, 22 Apr 2024 14:33:41 -0700 Subject: [PATCH 20/21] show string --- .../com/snowflake/snowpark/DataFrame.scala | 6 ++ .../snowpark/internal/ServerConnection.scala | 66 ++++++++++--------- .../snowflake/snowpark/APIInternalSuite.scala | 16 +++++ 3 files changed, 58 insertions(+), 30 deletions(-) diff --git a/src/main/scala/com/snowflake/snowpark/DataFrame.scala b/src/main/scala/com/snowflake/snowpark/DataFrame.scala index 949114f4..e272de3b 100644 --- a/src/main/scala/com/snowflake/snowpark/DataFrame.scala +++ b/src/main/scala/com/snowflake/snowpark/DataFrame.scala @@ -2371,6 +2371,12 @@ class DataFrame private[snowpark] ( def castValueToString(value: Any): String = value match { + case map: Map[_, _] => + map + .map { + case (key, value) => s"${castValueToString(key)}:${castValueToString(value)}" + } + .mkString("{", ",", "}") case ba: Array[Byte] => s"'${DatatypeConverter.printHexBinary(ba)}'" case bytes: Array[java.lang.Byte] => s"'${DatatypeConverter.printHexBinary(bytes.map(_.toByte))}'" diff --git a/src/main/scala/com/snowflake/snowpark/internal/ServerConnection.scala b/src/main/scala/com/snowflake/snowpark/internal/ServerConnection.scala index c2e2ad96..2c188e23 100644 --- a/src/main/scala/com/snowflake/snowpark/internal/ServerConnection.scala +++ b/src/main/scala/com/snowflake/snowpark/internal/ServerConnection.scala @@ -1,7 +1,7 @@ package com.snowflake.snowpark.internal import java.io.{Closeable, InputStream} -import java.sql.{Date, PreparedStatement, ResultSetMetaData, SQLException, Statement, Timestamp} +import java.sql.{PreparedStatement, ResultSetMetaData, SQLException, Statement} import java.time.LocalDateTime import com.snowflake.snowpark.{ MergeBuilder, @@ -33,25 +33,16 @@ import net.snowflake.client.jdbc.{ SnowflakeReauthenticationRequest, SnowflakeResultSet, SnowflakeResultSetMetaData, - SnowflakeResultSetV1, SnowflakeStatement } -import net.snowflake.client.core.json.Converters import com.snowflake.snowpark.types._ -import net.snowflake.client.core.{ - ColumnTypeHelper, - QueryStatus, - SFArrowResultSet, - SFBaseResultSet, - SFBaseSession -} -import net.snowflake.client.core.arrow.{ - TwoFieldStructToTimestampLTZConverter, - TwoFieldStructToTimestampNTZConverter +import net.snowflake.client.core.{ColumnTypeHelper, QueryStatus, SFArrowResultSet} +import net.snowflake.client.jdbc.internal.apache.arrow.vector.util.{ + JsonStringArrayList, + JsonStringHashMap } import java.util -import java.util.TimeZone import scala.collection.mutable import scala.reflect.runtime.universe.TypeTag import scala.collection.JavaConverters._ @@ -346,23 +337,38 @@ private[snowpark] class ServerConnection( def convertToSnowparkValue(value: Any, meta: FieldMetadata): Any = { meta.getTypeName match { - case "ARRAY" => - if (meta.getFields.isEmpty) { - value.toString // semi structured - } else { - value - .asInstanceOf[util.ArrayList[_]] - .toArray - .map(v => convertToSnowparkValue(v, meta.getFields.get(0))) - } - case "OBJECT" => - if (meta.getFields.isEmpty) { // semi-structured - value.toString - } else { - null + // semi structured + case "ARRAY" if meta.getFields.isEmpty => value.toString + // structured array + case "ARRAY" if meta.getFields.size() == 1 => + value + .asInstanceOf[util.ArrayList[_]] + .toArray + .map(v => convertToSnowparkValue(v, meta.getFields.get(0))) + // semi-structured + case "OBJECT" if meta.getFields.isEmpty => value.toString + // structured map + case "OBJECT" if meta.getFields.size() == 2 => + value match { + // nested structured maps are JsonStringArrayValues + case subMap: JsonStringArrayList[_] => + subMap.asScala.map { + case mapValue: JsonStringHashMap[_, _] => + convertToSnowparkValue(mapValue.get("key"), meta.getFields.get(0)) -> + convertToSnowparkValue(mapValue.get("value"), meta.getFields.get(1)) + }.toMap + case map: util.HashMap[_, _] => + map.asScala.map { + case (key, value) => + convertToSnowparkValue(key, meta.getFields.get(0)) -> + convertToSnowparkValue(value, meta.getFields.get(1)) + }.toMap } case "NUMBER" if meta.getType == java.sql.Types.BIGINT => - value.asInstanceOf[java.math.BigDecimal].toBigInteger.longValue() + value match { + case str: String => str.toLong // number key in structured map + case bd: java.math.BigDecimal => bd.toBigInteger.longValue() + } case "DOUBLE" | "BOOLEAN" | "BINARY" | "NUMBER" => value case "VARCHAR" | "VARIANT" => value.toString // Text to String case "DATE" => @@ -404,7 +410,7 @@ private[snowpark] class ServerConnection( } else { attribute.dataType match { case VariantType => data.getString(resultIndex) - case _: StructuredArrayType => + case _: StructuredArrayType | _: StructuredMapType => val meta = data.getMetaData // convert meta to field meta val field = new FieldMetadata( diff --git a/src/test/scala/com/snowflake/snowpark/APIInternalSuite.scala b/src/test/scala/com/snowflake/snowpark/APIInternalSuite.scala index d2abde0e..4d8ceec7 100644 --- a/src/test/scala/com/snowflake/snowpark/APIInternalSuite.scala +++ b/src/test/scala/com/snowflake/snowpark/APIInternalSuite.scala @@ -393,6 +393,22 @@ class APIInternalSuite extends TestData { |""".stripMargin) } + test("show structured map") { + val query = + """SELECT + | {'a':1,'b':2} :: MAP(VARCHAR, NUMBER) as map1, + | {'1':'a','2':'b'} :: MAP(NUMBER, VARCHAR) as map2, + | {'1':[1,2,3],'2':[4,5,6]} :: MAP(NUMBER, ARRAY(NUMBER)) as map3, + | {'1':{'a':1,'b':2},'2':{'c':3}} :: MAP(NUMBER, MAP(VARCHAR, NUMBER)) as map4, + | [{'a':1,'b':2},{'c':3}] :: ARRAY(MAP(VARCHAR, NUMBER)) as map5, + | {'a':1,'b':2} :: OBJECT as map0 + |""".stripMargin + + val df = session.sql(query) + + df.show() + } + test("show structured array") { val query = """SELECT From 89a44068316b9b6353741c2786c38198dcda0f66 Mon Sep 17 00:00:00 2001 From: Bing Li Date: Mon, 22 Apr 2024 16:03:55 -0700 Subject: [PATCH 21/21] result test --- .../snowflake/snowpark/APIInternalSuite.scala | 15 +++++++++++-- .../snowpark_test/DataTypeSuite.scala | 22 +++++++++++++++++++ 2 files changed, 35 insertions(+), 2 deletions(-) diff --git a/src/test/scala/com/snowflake/snowpark/APIInternalSuite.scala b/src/test/scala/com/snowflake/snowpark/APIInternalSuite.scala index 4d8ceec7..a5a3f361 100644 --- a/src/test/scala/com/snowflake/snowpark/APIInternalSuite.scala +++ b/src/test/scala/com/snowflake/snowpark/APIInternalSuite.scala @@ -405,8 +405,19 @@ class APIInternalSuite extends TestData { |""".stripMargin val df = session.sql(query) - - df.show() + // scalastyle:off + assert( + df.showString(10) == + """--------------------------------------------------------------------------------------------------------- + ||"MAP1" |"MAP2" |"MAP3" |"MAP4" |"MAP5" |"MAP0" | + |--------------------------------------------------------------------------------------------------------- + ||{b:2,a:1} |{2:b,1:a} |{2:[4,5,6],1:[1,2,3]} |{2:{c:3},1:{a:1,b:2}} |[{a:1,b:2},{c:3}] |{ | + || | | | | | "a": 1, | + || | | | | | "b": 2 | + || | | | | |} | + |--------------------------------------------------------------------------------------------------------- + |""".stripMargin) + // scalastyle:on } test("show structured array") { diff --git a/src/test/scala/com/snowflake/snowpark_test/DataTypeSuite.scala b/src/test/scala/com/snowflake/snowpark_test/DataTypeSuite.scala index 639f9370..70b1a3dd 100644 --- a/src/test/scala/com/snowflake/snowpark_test/DataTypeSuite.scala +++ b/src/test/scala/com/snowflake/snowpark_test/DataTypeSuite.scala @@ -217,6 +217,28 @@ class DataTypeSuite extends SNTestBase { Array(Time.valueOf("10:03:56")))) } + test("read Structured Map") { + val query = + """SELECT + | {'a':1,'b':2} :: MAP(VARCHAR, NUMBER) as map1, + | {'1':'a','2':'b'} :: MAP(NUMBER, VARCHAR) as map2, + | {'1':[1,2,3],'2':[4,5,6]} :: MAP(NUMBER, ARRAY(NUMBER)) as map3, + | {'1':{'a':1,'b':2},'2':{'c':3}} :: MAP(NUMBER, MAP(VARCHAR, NUMBER)) as map4, + | [{'a':1,'b':2},{'c':3}] :: ARRAY(MAP(VARCHAR, NUMBER)) as map5, + | {'a':1,'b':2} :: OBJECT as map0 + |""".stripMargin + val df = session.sql(query) + checkAnswer( + df, + Row( + Map("b" -> 2, "a" -> 1), + Map(2 -> "b", 1 -> "a"), + Map(2 -> Array(4L, 5L, 6L), 1 -> Array(1L, 2L, 3L)), + Map(2 -> Map("c" -> 3), 1 -> Map("a" -> 1, "b" -> 2)), + Array(Map("a" -> 1, "b" -> 2), Map("c" -> 3)), + "{\n \"a\": 1,\n \"b\": 2\n}")) + } + test("ArrayType v2") { val query = """SELECT | [1, 2, 3]::ARRAY(NUMBER) AS arr1,