Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SNOW-938038: Support AVRO Logical Types #722

Merged
merged 7 commits into from
Nov 3, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,13 @@
import java.util.Set;
import javax.annotation.Nonnull;
import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.JsonNode;
import org.apache.kafka.connect.data.Date;
import org.apache.kafka.connect.data.Decimal;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Schema.Type;
import org.apache.kafka.connect.data.Time;
import org.apache.kafka.connect.data.Timestamp;
import org.apache.kafka.connect.sink.SinkRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -154,7 +158,8 @@ private static Map<String, String> getSchemaMapFromRecord(SinkRecord record) {
Schema schema = record.valueSchema();
if (schema != null && schema.fields() != null) {
for (Field field : schema.fields()) {
schemaMap.put(field.name(), convertToSnowflakeType(field.schema().type()));
schemaMap.put(
field.name(), convertToSnowflakeType(field.schema().type(), field.schema().name()));
}
}
return schemaMap;
Expand All @@ -167,7 +172,7 @@ private static String inferDataTypeFromJsonObject(JsonNode value) {
// only when the type of the value is unrecognizable for JAVA
throw SnowflakeErrors.ERROR_5021.getException("class: " + value.getClass());
}
return convertToSnowflakeType(schemaType);
return convertToSnowflakeType(schemaType, null);
sfc-gh-tzhang marked this conversation as resolved.
Show resolved Hide resolved
}

/** Convert a json node type to kafka data type */
Expand Down Expand Up @@ -201,16 +206,26 @@ private static Type convertJsonNodeTypeToKafkaType(JsonNode value) {
}

/** Convert the kafka data type to Snowflake data type */
private static String convertToSnowflakeType(Type kafkaType) {
private static String convertToSnowflakeType(Type kafkaType, String schemaName) {
switch (kafkaType) {
case INT8:
return "BYTEINT";
case INT16:
return "SMALLINT";
case INT32:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit can we add a comment explaining why we chose these 4? something like a link to the code or to this pr's description

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did add it to the PR description, do you want to take a look to see what's missing? We will definitely update our online doc to reflect this change.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pr description is great, but adding it could help with readability, but i dont feel too strongly about it, up to you

return "INT";
if (Date.LOGICAL_NAME.equals(schemaName)) {
return "DATE";
} else if (Time.LOGICAL_NAME.equals(schemaName)) {
return "TIME(6)";
} else {
return "INT";
Comment on lines +224 to +229
Copy link
Collaborator

@sfc-gh-japatel sfc-gh-japatel Oct 19, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thinking out loud here: feel free to ignore.
will this be a behavior change? before this, we converted all INT32 to INT, now we are introducing new snowflake data types- which essentially mean new columns?

I also understand this is a good distinction for them and also we are still in PuPr..

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really, not sure if you remember, but before the insert will fail because it's inserting VARCHAR into a INT column

}
case INT64:
return "BIGINT";
if (Timestamp.LOGICAL_NAME.equals(schemaName)) {
return "TIMESTAMP(6)";
} else {
return "BIGINT";
}
case FLOAT32:
return "FLOAT";
case FLOAT64:
Expand All @@ -220,7 +235,11 @@ private static String convertToSnowflakeType(Type kafkaType) {
case STRING:
return "VARCHAR";
case BYTES:
return "BINARY";
if (Decimal.LOGICAL_NAME.equals(schemaName)) {
return "VARCHAR";
} else {
return "BINARY";
}
Comment on lines +246 to +250
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it would be good to mention in description that it is not decimal to varchar but the bytes that represents decimal. Is that correct understanding?

case ARRAY:
return "ARRAY";
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -778,7 +778,7 @@ private void handleInsertRowsFailures(
}
}

// TODO: SNOW-529755 POLL committed offsets in backgraound thread
// TODO: SNOW-529755 POLL committed offsets in background thread

/**
* Get committed offset from Snowflake. It does an HTTP call internally to find out what was the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public class RecordService {
});

public static final ThreadLocal<SimpleDateFormat> TIME_FORMAT =
ThreadLocal.withInitial(() -> new SimpleDateFormat("HH:mm:ss.SSSZ"));
ThreadLocal.withInitial(() -> new SimpleDateFormat("HH:mm:ss.SSSXXX"));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs to be updated for Snowpipe Streaming but I don't see a good way to distinguish Snowpipe VS Snowpipe Streaming in RecordService, happy to hear any suggestions

Copy link
Collaborator

@sfc-gh-japatel sfc-gh-japatel Oct 12, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

simplest way to do it would be in TopicPartitionChannel ctor. since TopicPartitionChannel is only used in Snowpipe Streaming, just set a private field in RecordService to true. this will not change anything in snowpipe. Something like

public void isIngestionMethodSnowpipeStreaming(final boolean isSnowpipeStreaming) {
    this.isSnowpipeStreaming = isSnowpipeStreaming;
  }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about that, but convertToJson is a static function :)

Copy link
Contributor Author

@sfc-gh-tzhang sfc-gh-tzhang Oct 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Basically we need to add a boolean to convertToJson and update all the references, I'm debating on whether this is really needed. The difference between Z and XXX is that there will be a colon with XXX in the timezone, for example, Z = 02:24:00.000+0000 and XXX = 02:24:00.000+00:00. In fact, it will fail if you try to insert 02:24:00.000+0000 to a TIME column so this is technically a bug. I'm not sure why there is no complains from customer, probably because not many people is using logical types.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

out of scope of this PR, but RecordService does have a lot of overlap between snowpipe and streaming, might be useful to refactor or split it into two files for simplicity

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Basically we need to add a boolean to convertToJson and update all the references, I'm debating on whether this is really needed. The difference between Z and XXX is that there will be a colon with XXX in the timezone, for example, Z = 02:24:00.000+0000 and XXX = 02:24:00.000+00:00. In fact, it will fail if you try to insert 02:24:00.000+0000 to a TIME column so this is technically a bug. I'm not sure why there is no complains from customer, probably because not many people is using logical types.

Thanks for detailed explanation.
I would be careful to make changes in snowpipe code.
Most folks havent complained because we dont have schematization in snowpipe. they have base table and then more tables on top of that. This change will break their downstream pipeline right?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You might have to take a giant step and add another arg in convertToJson :(
@sfc-gh-rcheng 's suggestion is the only way forward.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added as an argument, PTAL

static final int MAX_SNOWFLAKE_NUMBER_PRECISION = 38;

// This class is designed to work with empty metadata config map
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,14 @@
import java.io.IOException;
import java.math.BigDecimal;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.TimeZone;
import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.JsonNode;
import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.common.record.TimestampType;
Expand All @@ -20,7 +27,7 @@ public class HeaderTest {

public static final SimpleDateFormat ISO_DATE_FORMAT =
new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");
public static final SimpleDateFormat TIME_FORMAT = new SimpleDateFormat("HH:mm:ss.SSSZ");
public static final SimpleDateFormat TIME_FORMAT = new SimpleDateFormat("HH:mm:ss.SSSXXX");

static {
ISO_DATE_FORMAT.setTimeZone(TimeZone.getTimeZone("UTC"));
Expand Down
120 changes: 120 additions & 0 deletions test/test_suit/test_schema_evolution_avro_sr_logical_types.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
from decimal import Decimal

from confluent_kafka import avro
from test_suit.test_utils import NonRetryableError


# test if the table is updated with the correct column
# add test if all the records from different topics safely land in the table
class TestSchemaEvolutionAvroSRLogicalTypes:
def __init__(self, driver, nameSalt):
self.driver = driver
self.fileName = "travis_correct_schema_evolution_avro_sr"
self.topics = []
self.table = self.fileName + nameSalt
self.recordNum = 100

for i in range(2):
self.topics.append(self.table + str(i))

self.driver.snowflake_conn.cursor().execute(
"Create or replace table {} (PERFORMANCE_STRING STRING)".format(self.table))

self.driver.snowflake_conn.cursor().execute(
"alter table {} set ENABLE_SCHEMA_EVOLUTION = true".format(self.table))

self.records = []

self.records.append({
'PERFORMANCE_STRING': 'Excellent',
'TIME_MILLIS': 10,
'DATE': 11,
'TIMESTAMP_MILLIS': 12,
'DECIMAL': Decimal(4.0)
})

self.records.append({
'PERFORMANCE_STRING': 'Excellent',
'RATING_DOUBLE': 0.99,
})

self.ValueSchemaStr = []

self.ValueSchemaStr.append("""
{
"type":"record",
"name":"value_schema_0",
"fields":[
{"name":"PERFORMANCE_STRING","type":"string"},
{"name":"TIME_MILLIS","type":{"type":"int","logicalType":"time-millis"}},
{"name":"DATE","type":{"type":"int","logicalType":"date"}},
{"name":"DECIMAL","type":{"type":"bytes","logicalType":"decimal", "precision":4, "scale":2}},
{"name":"TIMESTAMP_MILLIS","type":{"type":"long","logicalType":"timestamp-millis"}}
]
}
""")

self.ValueSchemaStr.append("""
{
"type":"record",
"name":"value_schema_1",
"fields":[
{"name":"RATING_DOUBLE","type":"float"},
{"name":"PERFORMANCE_STRING","type":"string"}
]
}
""")

self.gold_type = {
'PERFORMANCE_STRING': 'VARCHAR',
'RATING_DOUBLE': 'FLOAT',
'TIME_MILLIS': 'TIME(6)',
'DATE': 'DATE',
'TIMESTAMP_MILLIS': 'TIMESTAMP_NTZ(6)',
'DECIMAL': 'VARCHAR',
'RECORD_METADATA': 'VARIANT'
}

self.gold_columns = [columnName for columnName in self.gold_type]

self.valueSchema = []

for valueSchemaStr in self.ValueSchemaStr:
self.valueSchema.append(avro.loads(valueSchemaStr))

def getConfigFileName(self):
return self.fileName + ".json"

def send(self):
for i, topic in enumerate(self.topics):
value = []
for _ in range(self.recordNum):
value.append(self.records[i])
self.driver.sendAvroSRData(topic, value, self.valueSchema[i], key=[], key_schema="", partition=0)

def verify(self, round):
rows = self.driver.snowflake_conn.cursor().execute(
"desc table {}".format(self.table)).fetchall()
res_col = {}

for index, row in enumerate(rows):
self.gold_columns.remove(row[0])
if not row[1].startswith(self.gold_type[row[0]]):
raise NonRetryableError("Column {} has the wrong type. got: {}, expected: {}".format(row[0], row[1],
self.gold_type[
row[0]]))
res_col[row[0]] = index

print("Columns not in table: ", self.gold_columns)

for columnName in self.gold_columns:
raise NonRetryableError("Column {} was not created".format(columnName))

res = self.driver.snowflake_conn.cursor().execute(
"SELECT count(*) FROM {}".format(self.table)).fetchone()[0]
if res != len(self.topics) * self.recordNum:
print("Number of record expected: {}, got: {}".format(len(self.topics) * self.recordNum, res))
raise NonRetryableError("Number of record in table is different from number of record sent")

def clean(self):
self.driver.cleanTableStagePipe(self.table)
14 changes: 11 additions & 3 deletions test/test_suites.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from test_suit.test_native_string_json_without_schema import TestNativeStringJsonWithoutSchema
from test_suit.test_native_string_protobuf import TestNativeStringProtobuf
from test_suit.test_schema_evolution_avro_sr import TestSchemaEvolutionAvroSR
from test_suit.test_schema_evolution_avro_sr_logical_types import TestSchemaEvolutionAvroSRLogicalTypes
from test_suit.test_schema_evolution_drop_table import TestSchemaEvolutionDropTable
from test_suit.test_schema_evolution_json import TestSchemaEvolutionJson
from test_suit.test_schema_evolution_json_ignore_tombstone import TestSchemaEvolutionJsonIgnoreTombstone
Expand All @@ -39,8 +40,9 @@
from test_suit.test_snowpipe_streaming_schema_mapping_dlq import TestSnowpipeStreamingSchemaMappingDLQ
from test_suit.test_snowpipe_streaming_string_avro_sr import TestSnowpipeStreamingStringAvroSR
from test_suit.test_snowpipe_streaming_string_json import TestSnowpipeStreamingStringJson
from test_suit.test_snowpipe_streaming_string_json_ignore_tombstone import TestSnowpipeStreamingStringJsonIgnoreTombstone
from test_suit.test_snowpipe_streaming_string_json_dlq import TestSnowpipeStreamingStringJsonDLQ
from test_suit.test_snowpipe_streaming_string_json_ignore_tombstone import \
TestSnowpipeStreamingStringJsonIgnoreTombstone
from test_suit.test_string_avro import TestStringAvro
from test_suit.test_string_avrosr import TestStringAvrosr
from test_suit.test_string_json import TestStringJson
Expand Down Expand Up @@ -91,7 +93,8 @@ def create_end_to_end_test_suites(driver, nameSalt, schemaRegistryAddress, testS
test_instance=TestStringJson(driver, nameSalt), clean=True, run_in_confluent=True, run_in_apache=True
)),
("TestStringJsonIgnoreTombstone", EndToEndTestSuite(
test_instance=TestStringJsonIgnoreTombstone(driver, nameSalt), clean=True, run_in_confluent=True, run_in_apache=True
test_instance=TestStringJsonIgnoreTombstone(driver, nameSalt), clean=True, run_in_confluent=True,
run_in_apache=True
)),
("TestJsonJson", EndToEndTestSuite(
test_instance=TestJsonJson(driver, nameSalt), clean=True, run_in_confluent=True, run_in_apache=True
Expand Down Expand Up @@ -132,7 +135,8 @@ def create_end_to_end_test_suites(driver, nameSalt, schemaRegistryAddress, testS
run_in_apache=True
)),
("TestSnowpipeStreamingStringJsonIgnoreTombstone", EndToEndTestSuite(
test_instance=TestSnowpipeStreamingStringJsonIgnoreTombstone(driver, nameSalt), clean=True, run_in_confluent=True,
test_instance=TestSnowpipeStreamingStringJsonIgnoreTombstone(driver, nameSalt), clean=True,
run_in_confluent=True,
run_in_apache=True
)),
("TestSnowpipeStreamingStringJsonDLQ", EndToEndTestSuite(
Expand Down Expand Up @@ -178,6 +182,10 @@ def create_end_to_end_test_suites(driver, nameSalt, schemaRegistryAddress, testS
test_instance=TestSchemaEvolutionAvroSR(driver, nameSalt), clean=True, run_in_confluent=True,
run_in_apache=False
)),
("TestSchemaEvolutionAvroSRLogicalTypes", EndToEndTestSuite(
test_instance=TestSchemaEvolutionAvroSRLogicalTypes(driver, nameSalt), clean=True, run_in_confluent=True,
run_in_apache=False
)),
("TestSchemaEvolutionWithAutoTableCreationJson", EndToEndTestSuite(
test_instance=TestSchemaEvolutionWithAutoTableCreationJson(driver, nameSalt), clean=True,
run_in_confluent=True, run_in_apache=True
Expand Down