-
Notifications
You must be signed in to change notification settings - Fork 100
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
SNOW-938038: Support AVRO Logical Types #722
Conversation
@@ -87,7 +87,7 @@ public class RecordService { | |||
}); | |||
|
|||
public static final ThreadLocal<SimpleDateFormat> TIME_FORMAT = | |||
ThreadLocal.withInitial(() -> new SimpleDateFormat("HH:mm:ss.SSSZ")); | |||
ThreadLocal.withInitial(() -> new SimpleDateFormat("HH:mm:ss.SSSXXX")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This needs to be updated for Snowpipe Streaming but I don't see a good way to distinguish Snowpipe VS Snowpipe Streaming in RecordService, happy to hear any suggestions
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
simplest way to do it would be in TopicPartitionChannel ctor. since TopicPartitionChannel is only used in Snowpipe Streaming, just set a private field in RecordService to true. this will not change anything in snowpipe. Something like
public void isIngestionMethodSnowpipeStreaming(final boolean isSnowpipeStreaming) {
this.isSnowpipeStreaming = isSnowpipeStreaming;
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought about that, but convertToJson
is a static function :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Basically we need to add a boolean to convertToJson
and update all the references, I'm debating on whether this is really needed. The difference between Z and XXX is that there will be a colon with XXX in the timezone, for example, Z = 02:24:00.000+0000
and XXX = 02:24:00.000+00:00
. In fact, it will fail if you try to insert 02:24:00.000+0000
to a TIME column so this is technically a bug. I'm not sure why there is no complains from customer, probably because not many people is using logical types.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
out of scope of this PR, but RecordService does have a lot of overlap between snowpipe and streaming, might be useful to refactor or split it into two files for simplicity
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Basically we need to add a boolean to convertToJson and update all the references, I'm debating on whether this is really needed. The difference between Z and XXX is that there will be a colon with XXX in the timezone, for example, Z = 02:24:00.000+0000 and XXX = 02:24:00.000+00:00. In fact, it will fail if you try to insert 02:24:00.000+0000 to a TIME column so this is technically a bug. I'm not sure why there is no complains from customer, probably because not many people is using logical types.
Thanks for detailed explanation.
I would be careful to make changes in snowpipe code.
Most folks havent complained because we dont have schematization in snowpipe. they have base table and then more tables on top of that. This change will break their downstream pipeline right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You might have to take a giant step and add another arg in convertToJson
:(
@sfc-gh-rcheng 's suggestion is the only way forward.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added as an argument, PTAL
ping, can I get a review on this? Thanks! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm after e2e tests pass. Do we need doc changes as well?
@@ -201,16 +213,26 @@ private static Type convertJsonNodeTypeToKafkaType(JsonNode value) { | |||
} | |||
|
|||
/** Convert the kafka data type to Snowflake data type */ | |||
private static String convertToSnowflakeType(Type kafkaType) { | |||
private static String convertToSnowflakeType(Type kafkaType, String schemaName) { | |||
switch (kafkaType) { | |||
case INT8: | |||
return "BYTEINT"; | |||
case INT16: | |||
return "SMALLINT"; | |||
case INT32: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit can we add a comment explaining why we chose these 4? something like a link to the code or to this pr's description
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did add it to the PR description, do you want to take a look to see what's missing? We will definitely update our online doc to reflect this change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pr description is great, but adding it could help with readability, but i dont feel too strongly about it, up to you
@@ -87,7 +87,7 @@ public class RecordService { | |||
}); | |||
|
|||
public static final ThreadLocal<SimpleDateFormat> TIME_FORMAT = | |||
ThreadLocal.withInitial(() -> new SimpleDateFormat("HH:mm:ss.SSSZ")); | |||
ThreadLocal.withInitial(() -> new SimpleDateFormat("HH:mm:ss.SSSXXX")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
out of scope of this PR, but RecordService does have a lot of overlap between snowpipe and streaming, might be useful to refactor or split it into two files for simplicity
schemaMap.put(field.name(), convertToSnowflakeType(field.schema().type())); | ||
String snowflakeType = convertToSnowflakeType(field.schema().type(), field.schema().name()); | ||
LOGGER.info( | ||
"Got the snowflake data type for field:{}, schema_name:{}, kafkaType:{}," |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: to match with other fields schemaName
src/main/java/com/snowflake/kafka/connector/internal/streaming/SchematizationUtils.java
Show resolved
Hide resolved
if (Date.LOGICAL_NAME.equals(schemaName)) { | ||
return "DATE"; | ||
} else if (Time.LOGICAL_NAME.equals(schemaName)) { | ||
return "TIME(6)"; | ||
} else { | ||
return "INT"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thinking out loud here: feel free to ignore.
will this be a behavior change? before this, we converted all INT32 to INT, now we are introducing new snowflake data types- which essentially mean new columns?
I also understand this is a good distinction for them and also we are still in PuPr..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not really, not sure if you remember, but before the insert will fail because it's inserting VARCHAR into a INT column
if (Decimal.LOGICAL_NAME.equals(schemaName)) { | ||
return "VARCHAR"; | ||
} else { | ||
return "BINARY"; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it would be good to mention in description that it is not decimal to varchar but the bytes that represents decimal. Is that correct understanding?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm, thanks!
There're 10 AVRO logical types listed in https://avro.apache.org/docs/1.11.0/spec.html#Logical+Types and we're able to support 4 of them, the mapping is as below: date -> DATE time-mills -> TIME(6) timestamp-mills -> TIMESTAMP_NTZ(6) decimal -> VARCHAR (We can't do NUMBER because we could have precision bigger than 36) We can't do for the rest of 6 types because that's not supported by ConnectSchema, see code for more detail. We need to find another way to support other logical types or any sources (like Debezium).
There're 10 AVRO logical types listed in https://avro.apache.org/docs/1.11.0/spec.html#Logical+Types and we're able to support 4 of them, the mapping is as below: date -> DATE time-mills -> TIME(6) timestamp-mills -> TIMESTAMP_NTZ(6) decimal -> VARCHAR (We can't do NUMBER because we could have precision bigger than 36) We can't do for the rest of 6 types because that's not supported by ConnectSchema, see code for more detail. We need to find another way to support other logical types or any sources (like Debezium).
There're 10 AVRO logical types listed in https://avro.apache.org/docs/1.11.0/spec.html#Logical+Types and we're able to support 4 of them, the mapping is as below: date -> DATE time-mills -> TIME(6) timestamp-mills -> TIMESTAMP_NTZ(6) decimal -> VARCHAR (We can't do NUMBER because we could have precision bigger than 36) We can't do for the rest of 6 types because that's not supported by ConnectSchema, see code for more detail. We need to find another way to support other logical types or any sources (like Debezium).
There're 10 AVRO logical types listed in https://avro.apache.org/docs/1.11.0/spec.html#Logical+Types and we're able to support 4 of them, the mapping is as below: date -> DATE time-mills -> TIME(6) timestamp-mills -> TIMESTAMP_NTZ(6) decimal -> VARCHAR (We can't do NUMBER because we could have precision bigger than 36) We can't do for the rest of 6 types because that's not supported by ConnectSchema, see code for more detail. We need to find another way to support other logical types or any sources (like Debezium).
There're 10 AVRO logical types listed in https://avro.apache.org/docs/1.11.0/spec.html#Logical+Types and we're able to support 4 of them, the mapping is as below: date -> DATE time-mills -> TIME(6) timestamp-mills -> TIMESTAMP_NTZ(6) decimal -> VARCHAR (We can't do NUMBER because we could have precision bigger than 36) We can't do for the rest of 6 types because that's not supported by ConnectSchema, see code for more detail. We need to find another way to support other logical types or any sources (like Debezium).
There're 10 AVRO logical types listed in https://avro.apache.org/docs/1.11.0/spec.html#Logical+Types and we're able to support 4 of them, the mapping is as below:
We can't do for the rest of 6 types because that's not supported by ConnectSchema, see code for more detail. We need to find another way to support other logical types or any sources (like Debezium).