Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] SNOW-1333976 Read Structured Map Value #101

Closed
wants to merge 23 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
<scala.version>2.12.18</scala.version>
<scala.compat.version>2.12</scala.compat.version>
<spec2.version>4.2.0</spec2.version>
<snowflake.jdbc.version>3.14.4</snowflake.jdbc.version>
<snowflake.jdbc.version>1.0-SNAPSHOT</snowflake.jdbc.version>
<version.scala.binary>${scala.compat.version}</version.scala.binary>
<doctitle>Snowpark ${project.version}</doctitle>
<scoverage.plugin.version>1.4.11</scoverage.plugin.version>
Expand Down
26 changes: 21 additions & 5 deletions src/main/scala/com/snowflake/snowpark/DataFrame.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2369,17 +2369,33 @@ class DataFrame private[snowpark] (
lines
}

def castValueToString(value: Any): String =
value match {
case map: Map[_, _] =>
map
.map {
case (key, value) => s"${castValueToString(key)}:${castValueToString(value)}"
}
.mkString("{", ",", "}")
case ba: Array[Byte] => s"'${DatatypeConverter.printHexBinary(ba)}'"
case bytes: Array[java.lang.Byte] =>
s"'${DatatypeConverter.printHexBinary(bytes.map(_.toByte))}'"
case arr: Array[String] =>
arr.mkString("[", ",", "]")
case arr: Array[_] =>
arr.map(castValueToString).mkString("[", ",", "]")
case arr: java.sql.Array =>
arr.getArray().asInstanceOf[Array[_]].map(castValueToString).mkString("[", ",", "]")
case _ => value.toString
}

val body: Seq[Seq[String]] = result.flatMap(row => {
// Value may contain multiple lines
val lines: Seq[Seq[String]] = row.toSeq.zipWithIndex.map {
case (value, index) =>
val texts: Seq[String] = if (value != null) {
val str = value match {
case ba: Array[Byte] => s"'${DatatypeConverter.printHexBinary(ba)}'"
case _ => value.toString
}
// if the result contains multiple lines, split result string
splitLines(str)
splitLines(castValueToString(value))
} else {
Seq("NULL")
}
Expand Down
160 changes: 154 additions & 6 deletions src/main/scala/com/snowflake/snowpark/internal/ServerConnection.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,26 @@ import com.snowflake.snowpark.internal.ParameterUtils.{
import com.snowflake.snowpark.internal.Utils.PackageNameDelimiter
import com.snowflake.snowpark.internal.analyzer.{Attribute, Query, SnowflakePlan}
import net.snowflake.client.jdbc.{
FieldMetadata,
SnowflakeBaseResultSet,
SnowflakeConnectString,
SnowflakeConnectionV1,
SnowflakeReauthenticationRequest,
SnowflakeResultSet,
SnowflakeSQLException,
SnowflakeResultSetMetaData,
SnowflakeStatement
}
import com.snowflake.snowpark.types._
import net.snowflake.client.core.QueryStatus
import net.snowflake.client.core.{ColumnTypeHelper, QueryStatus, SFArrowResultSet}
import net.snowflake.client.jdbc.internal.apache.arrow.vector.util.{
JsonStringArrayList,
JsonStringHashMap
}

import java.util
import scala.collection.mutable
import scala.reflect.runtime.universe.TypeTag
import scala.collection.JavaConverters._

private[snowpark] case class QueryResult(
rows: Option[Array[Row]],
Expand All @@ -55,6 +63,12 @@ private[snowpark] object ServerConnection {

def convertResultMetaToAttribute(meta: ResultSetMetaData): Seq[Attribute] =
(1 to meta.getColumnCount).map(index => {
// todo: replace by public API
val fieldMetadata = meta
.asInstanceOf[SnowflakeResultSetMetaData]
.getColumnFields(index)
.asScala
.toList
val columnName = analyzer.quoteNameWithoutUpperCasing(meta.getColumnLabel(index))
val dataType = meta.getColumnType(index)
val fieldSize = meta.getPrecision(index)
Expand All @@ -64,7 +78,8 @@ private[snowpark] object ServerConnection {
// This field is useful for snowflake types that are not JDBC types like
// variant, object and array
val columnTypeName = meta.getColumnTypeName(index)
val columnType = getDataType(dataType, columnTypeName, fieldSize, fieldScale, isSigned)
val columnType =
getDataType(dataType, columnTypeName, fieldSize, fieldScale, isSigned, fieldMetadata)

Attribute(columnName, columnType, nullable)
})
Expand All @@ -74,11 +89,61 @@ private[snowpark] object ServerConnection {
columnTypeName: String,
precision: Int,
scale: Int,
signed: Boolean): DataType = {
signed: Boolean,
field: List[FieldMetadata] = List.empty): DataType = {
columnTypeName match {
case "ARRAY" => ArrayType(StringType)
case "ARRAY" =>
if (field.isEmpty) {
ArrayType(StringType)
} else {
StructuredArrayType(
getDataType(
field.head.getType,
field.head.getTypeName,
field.head.getPrecision,
field.head.getScale,
signed = true, // no sign info in the fields
field.head.getFields.asScala.toList),
field.head.isNullable)
}
case "VARIANT" => VariantType
case "OBJECT" => MapType(StringType, StringType)
case "OBJECT" =>
if (field.isEmpty) {
MapType(StringType, StringType)
} else if (field.size == 2 && field.head.getName.isEmpty) {
// Map
StructuredMapType(
getDataType(
field.head.getType,
field.head.getTypeName,
field.head.getPrecision,
field.head.getScale,
signed = true,
field.head.getFields.asScala.toList),
getDataType(
field(1).getType,
field(1).getTypeName,
field(1).getPrecision,
field(1).getScale,
signed = true,
field(1).getFields.asScala.toList),
field(1).isNullable)
} else {
// object
StructType(
field.map(
f =>
StructField(
f.getName,
getDataType(
f.getType,
f.getTypeName,
f.getPrecision,
f.getScale,
signed = true,
f.getFields.asScala.toList),
f.isNullable)))
}
case "GEOGRAPHY" => GeographyType
case "GEOMETRY" => GeometryType
case _ => getTypeFromJDBCType(sqlType, precision, scale, signed)
Expand Down Expand Up @@ -256,11 +321,77 @@ private[snowpark] class ServerConnection(
statement: Statement): (CloseableIterator[Row], StructType) =
withValidConnection {
val data = statement.getResultSet

// used by structured types
lazy val arrowResultSet: SFArrowResultSet = {
val sfResultSet = data.asInstanceOf[SnowflakeBaseResultSet]
val baseResultSetField =
classOf[SnowflakeBaseResultSet].getDeclaredField("sfBaseResultSet")
baseResultSetField.setAccessible(true)
baseResultSetField.get(sfResultSet).asInstanceOf[SFArrowResultSet]
}
val schema = ServerConnection.convertResultMetaToAttribute(data.getMetaData)

lazy val geographyOutputFormat = getParameterValue(ParameterUtils.GeographyOutputFormat)
lazy val geometryOutputFormat = getParameterValue(ParameterUtils.GeometryOutputFormat)

def convertToSnowparkValue(value: Any, meta: FieldMetadata): Any = {
meta.getTypeName match {
// semi structured
case "ARRAY" if meta.getFields.isEmpty => value.toString
// structured array
case "ARRAY" if meta.getFields.size() == 1 =>
value
.asInstanceOf[util.ArrayList[_]]
.toArray
.map(v => convertToSnowparkValue(v, meta.getFields.get(0)))
// semi-structured
case "OBJECT" if meta.getFields.isEmpty => value.toString
// structured map
case "OBJECT" if meta.getFields.size() == 2 =>
value match {
// nested structured maps are JsonStringArrayValues
case subMap: JsonStringArrayList[_] =>
subMap.asScala.map {
case mapValue: JsonStringHashMap[_, _] =>
convertToSnowparkValue(mapValue.get("key"), meta.getFields.get(0)) ->
convertToSnowparkValue(mapValue.get("value"), meta.getFields.get(1))
}.toMap
case map: util.HashMap[_, _] =>
map.asScala.map {
case (key, value) =>
convertToSnowparkValue(key, meta.getFields.get(0)) ->
convertToSnowparkValue(value, meta.getFields.get(1))
}.toMap
}
case "NUMBER" if meta.getType == java.sql.Types.BIGINT =>
value match {
case str: String => str.toLong // number key in structured map
case bd: java.math.BigDecimal => bd.toBigInteger.longValue()
}
case "DOUBLE" | "BOOLEAN" | "BINARY" | "NUMBER" => value
case "VARCHAR" | "VARIANT" => value.toString // Text to String
case "DATE" =>
arrowResultSet.convertToDate(value, null)
case "TIME" =>
arrowResultSet.convertToTime(value, meta.getScale)
case _
if meta.getType == java.sql.Types.TIMESTAMP ||
meta.getType == java.sql.Types.TIMESTAMP_WITH_TIMEZONE =>
val columnSubType = meta.getType
val columnType = ColumnTypeHelper
.getColumnType(columnSubType, arrowResultSet.getSession)
arrowResultSet.convertToTimestamp(
value,
columnType,
columnSubType,
null,
meta.getScale)
case _ =>
throw new UnsupportedOperationException(s"Unsupported type: ${meta.getTypeName}")
}
}

val iterator = new CloseableIterator[Row] {
private var _currentRow: Row = _
private var _hasNext: Boolean = _
Expand All @@ -279,6 +410,23 @@ private[snowpark] class ServerConnection(
} else {
attribute.dataType match {
case VariantType => data.getString(resultIndex)
case _: StructuredArrayType | _: StructuredMapType =>
val meta = data.getMetaData
// convert meta to field meta
val field = new FieldMetadata(
meta.getColumnName(resultIndex),
meta.getColumnTypeName(resultIndex),
meta.getColumnType(resultIndex),
true,
0,
0,
0,
false,
null,
meta
.asInstanceOf[SnowflakeResultSetMetaData]
.getColumnFields(resultIndex))
convertToSnowparkValue(data.getObject(resultIndex), field)
case ArrayType(StringType) => data.getString(resultIndex)
case MapType(StringType, StringType) => data.getString(resultIndex)
case StringType => data.getString(resultIndex)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,6 @@ object DataTypeMapper {
dataType match {
case GeographyType => "TRY_TO_GEOGRAPHY(NULL)"
case GeometryType => "TRY_TO_GEOMETRY(NULL)"
case ArrayType(_) => "PARSE_JSON('NULL')::ARRAY"
case _ => "NULL :: " + convertToSFType(dataType)
}
} else {
Expand All @@ -102,8 +101,8 @@ object DataTypeMapper {
case DateType => "date('2020-9-16')"
case TimeType => "to_time('04:15:29.999')"
case TimestampType => "to_timestamp_ntz('2020-09-16 06:30:00')"
case _: ArrayType => "to_array(0)"
case _: MapType => "to_object(parse_json('0'))"
case _: ArrayType => "[]::" + convertToSFType(dataType)
case _: MapType => "{}::" + convertToSFType(dataType)
case VariantType => "to_variant(0)"
case GeographyType => "to_geography('POINT(-122.35 37.55)')"
case GeometryType => "to_geometry('POINT(-122.35 37.55)')"
Expand Down
17 changes: 17 additions & 0 deletions src/main/scala/com/snowflake/snowpark/types/ArrayType.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,20 @@ case class ArrayType(elementType: DataType) extends DataType {
s"ArrayType[${elementType.toString}]"
}
}

/* temporary solution for Structured and Semi Structured data types.
Two types will be merged in the future BCR. */
private[snowpark] class StructuredArrayType(
override val elementType: DataType,
val nullable: Boolean)
extends ArrayType(elementType) {
override def toString: String = {
s"ArrayType[${elementType.toString} nullable = $nullable]"
}
}

private[snowpark] object StructuredArrayType {

def apply(elementType: DataType, nullable: Boolean): StructuredArrayType =
new StructuredArrayType(elementType, nullable)
}
2 changes: 2 additions & 0 deletions src/main/scala/com/snowflake/snowpark/types/DataType.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ abstract class DataType {
* @since 0.1.0
*/
override def toString: String = typeName

private[snowpark] def schemaString: String = toString
}

private[snowpark] abstract class AtomicType extends DataType
15 changes: 15 additions & 0 deletions src/main/scala/com/snowflake/snowpark/types/MapType.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,18 @@ case class MapType(keyType: DataType, valueType: DataType) extends DataType {
s"MapType[${keyType.toString}, ${valueType.toString}]"
}
}

private[snowpark] class StructuredMapType(
override val keyType: DataType,
override val valueType: DataType,
val isValueNullable: Boolean)
extends MapType(keyType, valueType) {
override def toString: String = {
s"MapType[${keyType.toString}, ${valueType.toString} nullable = $isValueNullable]"
}
}

private[snowpark] object StructuredMapType {
def apply(keyType: DataType, valueType: DataType, isValueType: Boolean): StructuredMapType =
new StructuredMapType(keyType, valueType, isValueType)
}
4 changes: 3 additions & 1 deletion src/main/scala/com/snowflake/snowpark/types/StructType.scala
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ case class StructType(fields: Array[StructField] = Array())
override def toString: String =
s"StructType[${fields.map(_.toString).mkString(", ")}]"

override private[snowpark] def schemaString: String = "StructType"

/**
* Appends a new [[StructField]] to the end of this object.
* @since 0.1.0
Expand Down Expand Up @@ -168,7 +170,7 @@ case class StructField(

private[types] def treeString(layer: Int): String = {
val prepended: String = (1 to (1 + 2 * layer)).map(x => " ").mkString + "|--"
val body: String = s"$name: ${dataType.typeName} (nullable = $nullable)\n" +
val body: String = s"$name: ${dataType.schemaString} (nullable = $nullable)\n" +
(dataType match {
case st: StructType => st.treeString(layer + 1)
case _ => ""
Expand Down
15 changes: 15 additions & 0 deletions src/main/scala/com/snowflake/snowpark/types/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,26 @@ package object types {
case TimeType => "TIME"
case TimestampType => "TIMESTAMP"
case BinaryType => "BINARY"
case sa: StructuredArrayType =>
val nullable = if (sa.nullable) "" else " not null"
s"ARRAY(${convertToSFType(sa.elementType)}$nullable)"
case sm: StructuredMapType =>
val isValueNullable = if (sm.isValueNullable) "" else " not null"
s"MAP(${convertToSFType(sm.keyType)}, ${convertToSFType(sm.valueType)}$isValueNullable)"
case StructType(fields) =>
val fieldStr = fields
.map(
field =>
s"${field.name} ${convertToSFType(field.dataType)} " +
(if (field.nullable) "" else "not null"))
.mkString(",")
s"OBJECT($fieldStr)"
case ArrayType(_) => "ARRAY"
case MapType(_, _) => "OBJECT"
case VariantType => "VARIANT"
case GeographyType => "GEOGRAPHY"
case GeometryType => "GEOMETRY"
case StructType(_) => "OBJECT"
case _ =>
throw new UnsupportedOperationException(s"Unsupported data type: ${dataType.typeName}")
}
Expand Down
Loading
Loading