Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SNOW-1234214 Native Arrow structured types - map support #1686

Merged
merged 12 commits into from
Mar 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 13 additions & 6 deletions src/main/java/net/snowflake/client/core/ArrowSqlInput.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import java.sql.Timestamp;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.TimeZone;
import net.snowflake.client.core.json.Converters;
import net.snowflake.client.core.structs.SQLDataCreationHelper;
Expand All @@ -24,16 +25,22 @@
@SnowflakeJdbcInternalApi
public class ArrowSqlInput extends BaseSqlInput {

private final Map<String, Object> input;
private final Iterator<Object> structuredTypeFields;
private int currentIndex = 0;

public ArrowSqlInput(
JsonStringHashMap<String, Object> input,
Map<String, Object> input,
SFBaseSession session,
Converters converters,
List<FieldMetadata> fields) {
super(session, converters, fields);
this.structuredTypeFields = input.values().iterator();
this.input = input;
}

public Map<String, Object> getInput() {
return input;
}

@Override
Expand Down Expand Up @@ -172,14 +179,17 @@ public Timestamp readTimestamp(TimeZone tz) throws SQLException {
if (value == null) {
return null;
}
int columnType = ColumnTypeHelper.getColumnType(fieldMetadata.getType(), session);
int columnSubType = fieldMetadata.getType();
int scale = fieldMetadata.getScale();
return mapSFExceptionToSQLException(
() ->
converters
.getStructuredTypeDateTimeConverter()
.getTimestamp(
(JsonStringHashMap<String, Object>) value,
fieldMetadata.getBase(),
columnType,
columnSubType,
tz,
scale));
});
Expand All @@ -204,10 +214,7 @@ public <T> T readObject(Class<T> type) throws SQLException {
SQLData instance = (SQLData) SQLDataCreationHelper.create(type);
instance.readSQL(
new ArrowSqlInput(
(JsonStringHashMap<String, Object>) value,
session,
converters,
fieldMetadata.getFields()),
(Map<String, Object>) value, session, converters, fieldMetadata.getFields()),
null);
return (T) instance;
});
Expand Down
38 changes: 32 additions & 6 deletions src/main/java/net/snowflake/client/core/SFArrowResultSet.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.sql.Time;
import java.sql.Timestamp;
import java.sql.Types;
import java.util.Map;
import java.util.TimeZone;
import net.snowflake.client.core.arrow.ArrowVectorConverter;
import net.snowflake.client.core.arrow.StructConverter;
Expand Down Expand Up @@ -104,7 +105,7 @@ public class SFArrowResultSet extends SFBaseResultSet implements DataConversionC
*/
private boolean formatDateWithTimezone;

@SnowflakeJdbcInternalApi protected Converters jsonConverters;
@SnowflakeJdbcInternalApi protected Converters converters;

/**
* Constructor takes a result from the API response that we get from executing a SQL statement.
Expand All @@ -124,7 +125,7 @@ public SFArrowResultSet(
boolean sortResult)
throws SQLException {
this(resultSetSerializable, session.getTelemetryClient(), sortResult);
this.jsonConverters =
this.converters =
new Converters(
resultSetSerializable.getTimeZone(),
session,
Expand Down Expand Up @@ -356,6 +357,31 @@ private boolean fetchNextRowSorted() throws SnowflakeSQLException {
}
}

@Override
@SnowflakeJdbcInternalApi
public Converters getConverters() {
return converters;
}

@Override
public Date convertToDate(Object object, TimeZone tz) throws SFException {
return converters.getStructuredTypeDateTimeConverter().getDate((int) object, tz);
}

@Override
public Time convertToTime(Object object, int scale) throws SFException {
return converters.getStructuredTypeDateTimeConverter().getTime((long) object, scale);
}

@Override
public Timestamp convertToTimestamp(
Object object, int columnType, int columnSubType, TimeZone tz, int scale) throws SFException {
return converters
.getStructuredTypeDateTimeConverter()
.getTimestamp(
(JsonStringHashMap<String, Object>) object, columnType, columnSubType, tz, scale);
}

/**
* Advance to next row
*
Expand Down Expand Up @@ -510,7 +536,7 @@ public Object getObject(int columnIndex) throws SFException {
if (converter instanceof VarCharConverter) {
return createJsonSqlInput(columnIndex, obj);
} else if (converter instanceof StructConverter) {
return createArrowSqlInput(columnIndex, (JsonStringHashMap<String, Object>) obj);
return createArrowSqlInput(columnIndex, (Map<String, Object>) obj);
}
}
return obj;
Expand All @@ -522,19 +548,19 @@ private Object createJsonSqlInput(int columnIndex, Object obj) throws SFExceptio
return new JsonSqlInput(
jsonNode,
session,
jsonConverters,
converters,
resultSetMetaData.getColumnMetadata().get(columnIndex - 1).getFields(),
sessionTimezone);
} catch (JsonProcessingException e) {
throw new SFException(e, ErrorCode.INVALID_STRUCT_DATA);
}
}

private Object createArrowSqlInput(int columnIndex, JsonStringHashMap<String, Object> input) {
private Object createArrowSqlInput(int columnIndex, Map<String, Object> input) {
return new ArrowSqlInput(
input,
session,
jsonConverters,
converters,
resultSetMetaData.getColumnMetadata().get(columnIndex - 1).getFields());
}

Expand Down
10 changes: 10 additions & 0 deletions src/main/java/net/snowflake/client/core/SFBaseResultSet.java
Original file line number Diff line number Diff line change
Expand Up @@ -213,4 +213,14 @@ public Converters getConverters() {
public TimeZone getSessionTimeZone() {
return resultSetSerializable.getTimeZone();
}

@SnowflakeJdbcInternalApi
public abstract Date convertToDate(Object object, TimeZone tz) throws SFException;

@SnowflakeJdbcInternalApi
public abstract Time convertToTime(Object object, int scale) throws SFException;

@SnowflakeJdbcInternalApi
public abstract Timestamp convertToTimestamp(
Object object, int columnType, int columnSubType, TimeZone tz, int scale) throws SFException;
}
19 changes: 19 additions & 0 deletions src/main/java/net/snowflake/client/core/SFJsonResultSet.java
Original file line number Diff line number Diff line change
Expand Up @@ -426,4 +426,23 @@ private static Object convert(JsonStringToTypeConverter converter, JsonNode node
return converter.convert(node.toString());
}
}

@Override
public Date convertToDate(Object object, TimeZone tz) throws SFException {
return (Date) converters.dateConverter(session).convert((String) object);
}

@Override
public Time convertToTime(Object object, int scale) throws SFException {
return (Time) converters.timeConverter(session).convert((String) object);
}

@Override
public Timestamp convertToTimestamp(
Object object, int columnType, int columnSubType, TimeZone tz, int scale) throws SFException {
return (Timestamp)
converters
.timestampConverter(columnSubType, columnType, scale, session, null, tz)
.convert((String) object);
}
}
33 changes: 33 additions & 0 deletions src/main/java/net/snowflake/client/core/arrow/MapConverter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package net.snowflake.client.core.arrow;

import java.util.List;
import java.util.stream.Collectors;
import net.snowflake.client.core.DataConversionContext;
import net.snowflake.client.core.SFException;
import net.snowflake.client.jdbc.SnowflakeType;
import org.apache.arrow.vector.complex.MapVector;
import org.apache.arrow.vector.util.JsonStringHashMap;

public class MapConverter extends AbstractArrowVectorConverter {

private final MapVector vector;

public MapConverter(MapVector valueVector, int columnIndex, DataConversionContext context) {
super(SnowflakeType.MAP.name(), valueVector, columnIndex, context);
this.vector = valueVector;
}

@Override
public Object toObject(int index) throws SFException {
List<JsonStringHashMap<String, Object>> entriesList =
(List<JsonStringHashMap<String, Object>>) vector.getObject(index);
return entriesList.stream()
.collect(
Collectors.toMap(entry -> entry.get("key").toString(), entry -> entry.get("value")));
}

@Override
public String toString(int index) throws SFException {
return vector.getObject(index).toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,12 @@
import java.sql.Date;
import java.sql.Time;
import java.sql.Timestamp;
import java.sql.Types;
import java.util.TimeZone;
import net.snowflake.client.core.SFException;
import net.snowflake.client.core.SnowflakeJdbcInternalApi;
import net.snowflake.client.jdbc.ErrorCode;
import net.snowflake.client.jdbc.SnowflakeType;
import net.snowflake.client.jdbc.SnowflakeUtil;
import org.apache.arrow.vector.util.JsonStringHashMap;

@SnowflakeJdbcInternalApi
Expand Down Expand Up @@ -45,22 +46,33 @@ public StructuredTypeDateTimeConverter(
}

public Timestamp getTimestamp(
JsonStringHashMap<String, Object> obj, SnowflakeType type, TimeZone tz, int scale)
JsonStringHashMap<String, Object> obj,
int columnType,
int columnSubType,
TimeZone tz,
int scale)
throws SFException {
if (tz == null) {
tz = TimeZone.getDefault();
}
switch (type) {
case TIMESTAMP_LTZ:
if (Types.TIMESTAMP == columnType) {
if (SnowflakeUtil.EXTRA_TYPES_TIMESTAMP_LTZ == columnSubType) {
return convertTimestampLtz(obj, scale);
case TIMESTAMP_NTZ:
} else {
return convertTimestampNtz(obj, tz, scale);
case TIMESTAMP_TZ:
return convertTimestampTz(obj, scale);
}
} else if (Types.TIMESTAMP_WITH_TIMEZONE == columnType
&& SnowflakeUtil.EXTRA_TYPES_TIMESTAMP_TZ == columnSubType) {
return convertTimestampTz(obj, scale);
}
throw new SFException(
ErrorCode.INVALID_VALUE_CONVERT,
"Unexpected Arrow Field for " + type.name() + " and object type " + obj.getClass());
"Unexpected Arrow Field for columnType "
+ columnType
+ " , column subtype "
+ columnSubType
+ " , and object type "
+ obj.getClass());
}

public Date getDate(int value, TimeZone tz) throws SFException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import net.snowflake.client.core.arrow.IntToFixedConverter;
import net.snowflake.client.core.arrow.IntToScaledFixedConverter;
import net.snowflake.client.core.arrow.IntToTimeConverter;
import net.snowflake.client.core.arrow.MapConverter;
import net.snowflake.client.core.arrow.SmallIntToFixedConverter;
import net.snowflake.client.core.arrow.SmallIntToScaledFixedConverter;
import net.snowflake.client.core.arrow.StructConverter;
Expand Down Expand Up @@ -55,6 +56,7 @@
import org.apache.arrow.vector.VarBinaryVector;
import org.apache.arrow.vector.VarCharVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.complex.MapVector;
import org.apache.arrow.vector.complex.StructVector;
import org.apache.arrow.vector.ipc.ArrowStreamReader;
import org.apache.arrow.vector.types.Types;
Expand Down Expand Up @@ -206,6 +208,10 @@ private static List<ArrowVectorConverter> initConverters(
converters.add(new VarCharConverter(vector, i, context));
break;

case MAP:
converters.add(new MapConverter((MapVector) vector, i, context));
break;

case OBJECT:
if (vector instanceof StructVector) {
converters.add(new StructConverter((StructVector) vector, i, context));
Expand Down
Loading
Loading