Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SNOW-938038: Support AVRO Logical Types #722

Merged
merged 7 commits into from
Nov 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package com.snowflake.kafka.connector.internal;

import static com.snowflake.kafka.connector.internal.metrics.MetricsUtil.*;
import static com.snowflake.kafka.connector.internal.metrics.MetricsUtil.BUFFER_RECORD_COUNT;
import static com.snowflake.kafka.connector.internal.metrics.MetricsUtil.BUFFER_SIZE_BYTES;
import static com.snowflake.kafka.connector.internal.metrics.MetricsUtil.BUFFER_SUB_DOMAIN;
import static org.apache.kafka.common.record.TimestampType.NO_TIMESTAMP_TYPE;

import com.codahale.metrics.Histogram;
Expand Down Expand Up @@ -660,7 +662,7 @@ private SinkRecord handleNativeRecord(SinkRecord record, boolean isKey) {
Schema schema = isKey ? record.keySchema() : record.valueSchema();
Object content = isKey ? record.key() : record.value();
try {
newSFContent = new SnowflakeRecordContent(schema, content);
newSFContent = new SnowflakeRecordContent(schema, content, false);
} catch (Exception e) {
LOGGER.error("Native content parser error:\n{}", e.getMessage());
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,13 @@
import java.util.Set;
import javax.annotation.Nonnull;
import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.JsonNode;
import org.apache.kafka.connect.data.Date;
import org.apache.kafka.connect.data.Decimal;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Schema.Type;
import org.apache.kafka.connect.data.Time;
import org.apache.kafka.connect.data.Timestamp;
import org.apache.kafka.connect.sink.SinkRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -115,7 +119,7 @@ static Map<String, String> getColumnTypes(SinkRecord record, List<String> column
}
Map<String, String> columnToType = new HashMap<>();
Map<String, String> schemaMap = getSchemaMapFromRecord(record);
JsonNode recordNode = RecordService.convertToJson(record.valueSchema(), record.value());
JsonNode recordNode = RecordService.convertToJson(record.valueSchema(), record.value(), true);
Set<String> columnNamesSet = new HashSet<>(columnNames);

Iterator<Map.Entry<String, JsonNode>> fields = recordNode.fields();
Expand Down Expand Up @@ -154,7 +158,15 @@ private static Map<String, String> getSchemaMapFromRecord(SinkRecord record) {
Schema schema = record.valueSchema();
if (schema != null && schema.fields() != null) {
for (Field field : schema.fields()) {
schemaMap.put(field.name(), convertToSnowflakeType(field.schema().type()));
String snowflakeType = convertToSnowflakeType(field.schema().type(), field.schema().name());
LOGGER.info(
"Got the snowflake data type for field:{}, schemaName:{}, kafkaType:{},"
+ " snowflakeType:{}",
field.name(),
field.schema().name(),
field.schema().type(),
snowflakeType);
schemaMap.put(field.name(), snowflakeType);
}
}
return schemaMap;
Expand All @@ -167,7 +179,8 @@ private static String inferDataTypeFromJsonObject(JsonNode value) {
// only when the type of the value is unrecognizable for JAVA
throw SnowflakeErrors.ERROR_5021.getException("class: " + value.getClass());
}
return convertToSnowflakeType(schemaType);
// Passing null to schemaName when there is no schema information
return convertToSnowflakeType(schemaType, null);
sfc-gh-tzhang marked this conversation as resolved.
Show resolved Hide resolved
}

/** Convert a json node type to kafka data type */
Expand Down Expand Up @@ -201,16 +214,26 @@ private static Type convertJsonNodeTypeToKafkaType(JsonNode value) {
}

/** Convert the kafka data type to Snowflake data type */
private static String convertToSnowflakeType(Type kafkaType) {
private static String convertToSnowflakeType(Type kafkaType, String schemaName) {
switch (kafkaType) {
case INT8:
return "BYTEINT";
case INT16:
return "SMALLINT";
case INT32:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit can we add a comment explaining why we chose these 4? something like a link to the code or to this pr's description

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did add it to the PR description, do you want to take a look to see what's missing? We will definitely update our online doc to reflect this change.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pr description is great, but adding it could help with readability, but i dont feel too strongly about it, up to you

return "INT";
if (Date.LOGICAL_NAME.equals(schemaName)) {
return "DATE";
} else if (Time.LOGICAL_NAME.equals(schemaName)) {
return "TIME(6)";
} else {
return "INT";
Comment on lines +224 to +229
Copy link
Collaborator

@sfc-gh-japatel sfc-gh-japatel Oct 19, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thinking out loud here: feel free to ignore.
will this be a behavior change? before this, we converted all INT32 to INT, now we are introducing new snowflake data types- which essentially mean new columns?

I also understand this is a good distinction for them and also we are still in PuPr..

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really, not sure if you remember, but before the insert will fail because it's inserting VARCHAR into a INT column

}
case INT64:
return "BIGINT";
if (Timestamp.LOGICAL_NAME.equals(schemaName)) {
return "TIMESTAMP(6)";
} else {
return "BIGINT";
}
case FLOAT32:
return "FLOAT";
case FLOAT64:
Expand All @@ -220,7 +243,11 @@ private static String convertToSnowflakeType(Type kafkaType) {
case STRING:
return "VARCHAR";
case BYTES:
return "BINARY";
if (Decimal.LOGICAL_NAME.equals(schemaName)) {
return "VARCHAR";
} else {
return "BINARY";
}
Comment on lines +246 to +250
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it would be good to mention in description that it is not decimal to varchar but the bytes that represents decimal. Is that correct understanding?

case ARRAY:
return "ARRAY";
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -446,7 +446,7 @@ private SinkRecord handleNativeRecord(SinkRecord record, boolean isKey) {
Schema schema = isKey ? record.keySchema() : record.valueSchema();
Object content = isKey ? record.key() : record.value();
try {
newSFContent = new SnowflakeRecordContent(schema, content);
newSFContent = new SnowflakeRecordContent(schema, content, true);
} catch (Exception e) {
LOGGER.error("Native content parser error:\n{}", e.getMessage());
try {
Expand Down Expand Up @@ -778,7 +778,7 @@ private void handleInsertRowsFailures(
}
}

// TODO: SNOW-529755 POLL committed offsets in backgraound thread
// TODO: SNOW-529755 POLL committed offsets in background thread

/**
* Get committed offset from Snowflake. It does an HTTP call internally to find out what was the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ public class RecordService {

public static final ThreadLocal<SimpleDateFormat> TIME_FORMAT =
ThreadLocal.withInitial(() -> new SimpleDateFormat("HH:mm:ss.SSSZ"));
public static final ThreadLocal<SimpleDateFormat> TIME_FORMAT_STREAMING =
ThreadLocal.withInitial(() -> new SimpleDateFormat("HH:mm:ss.SSSXXX"));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs to be updated for Snowpipe Streaming but I don't see a good way to distinguish Snowpipe VS Snowpipe Streaming in RecordService, happy to hear any suggestions

Copy link
Collaborator

@sfc-gh-japatel sfc-gh-japatel Oct 12, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

simplest way to do it would be in TopicPartitionChannel ctor. since TopicPartitionChannel is only used in Snowpipe Streaming, just set a private field in RecordService to true. this will not change anything in snowpipe. Something like

public void isIngestionMethodSnowpipeStreaming(final boolean isSnowpipeStreaming) {
    this.isSnowpipeStreaming = isSnowpipeStreaming;
  }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about that, but convertToJson is a static function :)

Copy link
Contributor Author

@sfc-gh-tzhang sfc-gh-tzhang Oct 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Basically we need to add a boolean to convertToJson and update all the references, I'm debating on whether this is really needed. The difference between Z and XXX is that there will be a colon with XXX in the timezone, for example, Z = 02:24:00.000+0000 and XXX = 02:24:00.000+00:00. In fact, it will fail if you try to insert 02:24:00.000+0000 to a TIME column so this is technically a bug. I'm not sure why there is no complains from customer, probably because not many people is using logical types.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

out of scope of this PR, but RecordService does have a lot of overlap between snowpipe and streaming, might be useful to refactor or split it into two files for simplicity

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Basically we need to add a boolean to convertToJson and update all the references, I'm debating on whether this is really needed. The difference between Z and XXX is that there will be a colon with XXX in the timezone, for example, Z = 02:24:00.000+0000 and XXX = 02:24:00.000+00:00. In fact, it will fail if you try to insert 02:24:00.000+0000 to a TIME column so this is technically a bug. I'm not sure why there is no complains from customer, probably because not many people is using logical types.

Thanks for detailed explanation.
I would be careful to make changes in snowpipe code.
Most folks havent complained because we dont have schematization in snowpipe. they have base table and then more tables on top of that. This change will break their downstream pipeline right?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You might have to take a giant step and add another arg in convertToJson :(
@sfc-gh-rcheng 's suggestion is the only way forward.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added as an argument, PTAL

static final int MAX_SNOWFLAKE_NUMBER_PRECISION = 38;

// This class is designed to work with empty metadata config map
Expand Down Expand Up @@ -369,7 +371,7 @@ void putKey(SinkRecord record, ObjectNode meta) {
static JsonNode parseHeaders(Headers headers) {
ObjectNode result = MAPPER.createObjectNode();
for (Header header : headers) {
result.set(header.key(), convertToJson(header.schema(), header.value()));
result.set(header.key(), convertToJson(header.schema(), header.value(), false));
}
return result;
}
Expand All @@ -380,15 +382,17 @@ static JsonNode parseHeaders(Headers headers) {
*
* @param schema schema of the object
* @param logicalValue object to be converted
* @param isStreaming indicates whether this is part of snowpipe streaming
* @return a JsonNode of the object
*/
public static JsonNode convertToJson(Schema schema, Object logicalValue) {
public static JsonNode convertToJson(Schema schema, Object logicalValue, boolean isStreaming) {
if (logicalValue == null) {
if (schema
== null) // Any schema is valid and we don't have a default, so treat this as an optional
// schema
return null;
if (schema.defaultValue() != null) return convertToJson(schema, schema.defaultValue());
if (schema.defaultValue() != null)
return convertToJson(schema, schema.defaultValue(), isStreaming);
if (schema.isOptional()) return JsonNodeFactory.instance.nullNode();
throw SnowflakeErrors.ERROR_5015.getException(
"Conversion error: null value for field that is required and has no default value");
Expand Down Expand Up @@ -424,8 +428,9 @@ public static JsonNode convertToJson(Schema schema, Object logicalValue) {
ISO_DATE_TIME_FORMAT.get().format((java.util.Date) value));
}
if (schema != null && Time.LOGICAL_NAME.equals(schema.name())) {
return JsonNodeFactory.instance.textNode(
TIME_FORMAT.get().format((java.util.Date) value));
ThreadLocal<SimpleDateFormat> format =
isStreaming ? TIME_FORMAT_STREAMING : TIME_FORMAT;
return JsonNodeFactory.instance.textNode(format.get().format((java.util.Date) value));
}
return JsonNodeFactory.instance.numberNode((Integer) value);
case INT64:
Expand Down Expand Up @@ -482,7 +487,7 @@ else if (value instanceof ByteBuffer) {
ArrayNode list = JsonNodeFactory.instance.arrayNode();
for (Object elem : collection) {
Schema valueSchema = schema == null ? null : schema.valueSchema();
JsonNode fieldValue = convertToJson(valueSchema, elem);
JsonNode fieldValue = convertToJson(valueSchema, elem, isStreaming);
list.add(fieldValue);
}
return list;
Expand Down Expand Up @@ -512,8 +517,8 @@ else if (value instanceof ByteBuffer) {
for (Map.Entry<?, ?> entry : map.entrySet()) {
Schema keySchema = schema == null ? null : schema.keySchema();
Schema valueSchema = schema == null ? null : schema.valueSchema();
JsonNode mapKey = convertToJson(keySchema, entry.getKey());
JsonNode mapValue = convertToJson(valueSchema, entry.getValue());
JsonNode mapKey = convertToJson(keySchema, entry.getKey(), isStreaming);
JsonNode mapValue = convertToJson(valueSchema, entry.getValue(), isStreaming);

if (objectMode) obj.set(mapKey.asText(), mapValue);
else list.add(JsonNodeFactory.instance.arrayNode().add(mapKey).add(mapValue));
Expand All @@ -527,7 +532,7 @@ else if (value instanceof ByteBuffer) {
throw SnowflakeErrors.ERROR_5015.getException("Mismatching schema.");
ObjectNode obj = JsonNodeFactory.instance.objectNode();
for (Field field : schema.fields()) {
obj.set(field.name(), convertToJson(field.schema(), struct.get(field)));
obj.set(field.name(), convertToJson(field.schema(), struct.get(field), isStreaming));
}
return obj;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,12 @@ public SnowflakeRecordContent() {
*
* @param schema schema of the object
* @param data object produced by native avro/json converters
* @param isStreaming indicates whether this is part of snowpipe streaming
*/
public SnowflakeRecordContent(Schema schema, Object data) {
public SnowflakeRecordContent(Schema schema, Object data, boolean isStreaming) {
this.content = new JsonNode[1];
this.schemaID = NON_AVRO_SCHEMA;
this.content[0] = RecordService.convertToJson(schema, data);
this.content[0] = RecordService.convertToJson(schema, data, isStreaming);
this.isBroken = false;
this.brokenData = null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,8 @@ public void testConnectJsonConverter_MapInt64() throws JsonProcessingException {
jsonMap.put("test", Integer.MAX_VALUE);
SchemaAndValue schemaAndValue =
jsonConverter.toConnectData("test", mapper.writeValueAsBytes(jsonMap));
JsonNode result = RecordService.convertToJson(schemaAndValue.schema(), schemaAndValue.value());
JsonNode result =
RecordService.convertToJson(schemaAndValue.schema(), schemaAndValue.value(), false);

ObjectNode expected = mapper.createObjectNode();
expected.put("test", Integer.MAX_VALUE);
Expand Down Expand Up @@ -332,7 +333,7 @@ private boolean testSimpleDataFormat_jsonConverter_thread_safe() {
SchemaAndValue schemaInputValue = jsonConverter.toConnectData("test", value.getBytes());

JsonNode result =
RecordService.convertToJson(schemaInputValue.schema(), schemaInputValue.value());
RecordService.convertToJson(schemaInputValue.schema(), schemaInputValue.value(), false);
System.out.println("Record Service result:" + result + " Thread :" + Thread.currentThread());

String exptectedDateTimeFormatStr =
Expand All @@ -350,7 +351,8 @@ public void testConnectJsonConverter_MapBigDecimalExceedsMaxPrecision()
jsonMap.put("test", new BigDecimal("999999999999999999999999999999999999999"));
SchemaAndValue schemaAndValue =
jsonConverter.toConnectData("test", mapper.writeValueAsBytes(jsonMap));
JsonNode result = RecordService.convertToJson(schemaAndValue.schema(), schemaAndValue.value());
JsonNode result =
RecordService.convertToJson(schemaAndValue.schema(), schemaAndValue.value(), false);

ObjectNode expected = mapper.createObjectNode();
expected.put("test", new BigDecimal("999999999999999999999999999999999999999"));
Expand All @@ -367,7 +369,8 @@ public void testConnectSimpleHeaderConverter_MapDateAndOtherTypes()
SchemaAndValue schemaAndValue =
headerConverter.toConnectHeader(
"test", "h1", rawHeader.getBytes(StandardCharsets.US_ASCII));
JsonNode result = RecordService.convertToJson(schemaAndValue.schema(), schemaAndValue.value());
JsonNode result =
RecordService.convertToJson(schemaAndValue.schema(), schemaAndValue.value(), false);

ObjectNode expected = mapper.createObjectNode();
long expectedTimestampValue =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,14 @@
import java.io.IOException;
import java.math.BigDecimal;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.TimeZone;
import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.JsonNode;
import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.common.record.TimestampType;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public void test() throws IOException {
.put("map", Collections.singletonMap("field", 1))
.put("mapNonStringKeys", Collections.singletonMap(1, 1));

content = new SnowflakeRecordContent(schema, original);
content = new SnowflakeRecordContent(schema, original, false);
assert content
.getData()[0]
.toString()
Expand All @@ -117,7 +117,7 @@ public void test() throws IOException {
"{\"int8\":12,\"int16\":12,\"int32\":12,\"int64\":12,\"float32\":12.2,\"float64\":12.2,\"boolean\":true,\"string\":\"foo\",\"bytes\":\"Zm9v\",\"array\":[\"a\",\"b\",\"c\"],\"map\":{\"field\":1},\"mapNonStringKeys\":[[1,1]]}");
Map<String, Object> jsonMap =
mapper.convertValue(jsonObject, new TypeReference<Map<String, Object>>() {});
content = new SnowflakeRecordContent(null, jsonMap);
content = new SnowflakeRecordContent(null, jsonMap, false);
assert content
.getData()[0]
.toString()
Expand Down Expand Up @@ -206,24 +206,22 @@ public void testWrongKeyType() {

@Test(expected = SnowflakeKafkaConnectorException.class)
public void testConvertToJsonEmptyValue() {
assert RecordService.convertToJson(null, null) == null;

Schema schema = SchemaBuilder.int32().optional().defaultValue(123).build();
assert RecordService.convertToJson(schema, null).toString().equals("123");
assert RecordService.convertToJson(schema, null, false).toString().equals("123");

schema = SchemaBuilder.int32().build();
RecordService.convertToJson(schema, null);
RecordService.convertToJson(schema, null, false);
}

@Test(expected = SnowflakeKafkaConnectorException.class)
public void testConvertToJsonNonOptional() {
Schema schema = SchemaBuilder.int32().build();
RecordService.convertToJson(schema, null);
RecordService.convertToJson(schema, null, false);
}

@Test(expected = SnowflakeKafkaConnectorException.class)
public void testConvertToJsonNoSchemaType() {
RecordService.convertToJson(null, new SnowflakeJsonSchema());
RecordService.convertToJson(null, new SnowflakeJsonSchema(), false);
}

@Test
Expand All @@ -233,7 +231,7 @@ public void testConvertToJsonReadOnlyByteBuffer() {
String expected = "\"" + Base64.getEncoder().encodeToString(original.getBytes()) + "\"";
ByteBuffer buffer = ByteBuffer.wrap(original.getBytes()).asReadOnlyBuffer();
Schema schema = SchemaBuilder.bytes().build();
assert RecordService.convertToJson(schema, buffer).toString().equals(expected);
assert RecordService.convertToJson(schema, buffer, false).toString().equals(expected);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
{
"name": "SNOWFLAKE_CONNECTOR_NAME",
"config": {
"connector.class": "com.snowflake.kafka.connector.SnowflakeSinkConnector",
"topics": "SNOWFLAKE_TEST_TOPIC0,SNOWFLAKE_TEST_TOPIC1",
"snowflake.topic2table.map": "SNOWFLAKE_TEST_TOPIC0:SNOWFLAKE_CONNECTOR_NAME,SNOWFLAKE_TEST_TOPIC1:SNOWFLAKE_CONNECTOR_NAME",
"tasks.max": "1",
"buffer.flush.time": "10",
"buffer.count.records": "100",
"buffer.size.bytes": "5000000",
"snowflake.url.name": "SNOWFLAKE_HOST",
"snowflake.user.name": "SNOWFLAKE_USER",
"snowflake.private.key": "SNOWFLAKE_PRIVATE_KEY",
"snowflake.database.name": "SNOWFLAKE_DATABASE",
"snowflake.schema.name": "SNOWFLAKE_SCHEMA",
"snowflake.role.name": "SNOWFLAKE_ROLE",
"snowflake.ingestion.method": "SNOWPIPE_STREAMING",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter":"io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url":"CONFLUENT_SCHEMA_REGISTRY",
"value.converter.schemas.enable": "false",
"jmx": "true",
"errors.tolerance": "all",
"errors.log.enable": true,
"errors.deadletterqueue.topic.name": "DLQ_TOPIC",
"errors.deadletterqueue.topic.replication.factor": 1,
"snowflake.enable.schematization": true
}
}
Loading
Loading