diff --git a/src/main/java/com/snowflake/kafka/connector/SnowflakeSinkConnectorConfig.java b/src/main/java/com/snowflake/kafka/connector/SnowflakeSinkConnectorConfig.java index 48510964d..9f8c59e32 100644 --- a/src/main/java/com/snowflake/kafka/connector/SnowflakeSinkConnectorConfig.java +++ b/src/main/java/com/snowflake/kafka/connector/SnowflakeSinkConnectorConfig.java @@ -99,6 +99,9 @@ public class SnowflakeSinkConnectorConfig { private static final String SNOWFLAKE_METADATA_FLAGS = "Snowflake Metadata Flags"; public static final String SNOWFLAKE_METADATA_CREATETIME = "snowflake.metadata.createtime"; public static final String SNOWFLAKE_METADATA_TOPIC = "snowflake.metadata.topic"; + public static final String SNOWFLAKE_METADATA_SF_CONNECTOR_VERSION = + "snowflake.metadata.sf.connector.version"; + public static final String SNOWFLAKE_METADATA_SF_CONNECTOR_VERSION_DEFAULT = "false"; public static final String SNOWFLAKE_METADATA_OFFSET_AND_PARTITION = "snowflake.metadata.offset.and.partition"; public static final String SNOWFLAKE_METADATA_ALL = "snowflake.metadata.all"; @@ -486,6 +489,17 @@ static ConfigDef newConfigDef() { 2, ConfigDef.Width.NONE, SNOWFLAKE_METADATA_TOPIC) + .define( + SNOWFLAKE_METADATA_SF_CONNECTOR_VERSION, + Type.BOOLEAN, + SNOWFLAKE_METADATA_SF_CONNECTOR_VERSION_DEFAULT, + Importance.LOW, + "Flag to control whether Snowflake Connector version is collected in snowflake" + + " metadata", + SNOWFLAKE_METADATA_FLAGS, + 3, + ConfigDef.Width.NONE, + SNOWFLAKE_METADATA_SF_CONNECTOR_VERSION) .define( SNOWFLAKE_METADATA_OFFSET_AND_PARTITION, Type.BOOLEAN, @@ -494,7 +508,7 @@ static ConfigDef newConfigDef() { "Flag to control whether kafka partition and offset are collected in snowflake" + " metadata", SNOWFLAKE_METADATA_FLAGS, - 3, + 4, ConfigDef.Width.NONE, SNOWFLAKE_METADATA_OFFSET_AND_PARTITION) .define( diff --git a/src/main/java/com/snowflake/kafka/connector/records/RecordService.java b/src/main/java/com/snowflake/kafka/connector/records/RecordService.java index 0293dd884..e564aae51 100644 --- a/src/main/java/com/snowflake/kafka/connector/records/RecordService.java +++ b/src/main/java/com/snowflake/kafka/connector/records/RecordService.java @@ -66,6 +66,7 @@ public class RecordService { static final String CONTENT = "content"; static final String META = "meta"; static final String SCHEMA_ID = "schema_id"; + static final String SF_CONNECTOR_VERSION = "sf_connector_version"; private static final String KEY_SCHEMA_ID = "key_schema_id"; static final String HEADERS = "headers"; @@ -191,6 +192,9 @@ private SnowflakeTableRow processRecord(SinkRecord record) { if (metadataConfig.topicFlag) { meta.put(TOPIC, record.topic()); } + if (metadataConfig.sfConnectorVersionFlag) { + meta.put(SF_CONNECTOR_VERSION, Utils.VERSION); + } if (metadataConfig.offsetAndPartitionFlag) { meta.put(OFFSET, record.kafkaOffset()); meta.put(PARTITION, record.kafkaPartition()); diff --git a/src/main/java/com/snowflake/kafka/connector/records/SnowflakeMetadataConfig.java b/src/main/java/com/snowflake/kafka/connector/records/SnowflakeMetadataConfig.java index 9023c3457..dd49c14f3 100644 --- a/src/main/java/com/snowflake/kafka/connector/records/SnowflakeMetadataConfig.java +++ b/src/main/java/com/snowflake/kafka/connector/records/SnowflakeMetadataConfig.java @@ -8,6 +8,7 @@ public class SnowflakeMetadataConfig { final boolean createtimeFlag; final boolean topicFlag; final boolean offsetAndPartitionFlag; + final boolean sfConnectorVersionFlag; final boolean allFlag; /** initialize with default config */ @@ -25,6 +26,7 @@ public SnowflakeMetadataConfig(Map config) { // these values are the default values of the configuration boolean createtime = true; boolean topic = true; + boolean version = false; boolean offsetAndPartition = true; boolean all = true; if (config.containsKey(SnowflakeSinkConnectorConfig.SNOWFLAKE_METADATA_CREATETIME) @@ -39,6 +41,12 @@ public SnowflakeMetadataConfig(Map config) { .equals(SnowflakeSinkConnectorConfig.SNOWFLAKE_METADATA_DEFAULT)) { topic = false; } + if (config.containsKey(SnowflakeSinkConnectorConfig.SNOWFLAKE_METADATA_SF_CONNECTOR_VERSION) + && config + .get(SnowflakeSinkConnectorConfig.SNOWFLAKE_METADATA_SF_CONNECTOR_VERSION) + .equals(SnowflakeSinkConnectorConfig.SNOWFLAKE_METADATA_DEFAULT)) { + version = true; + } if (config.containsKey(SnowflakeSinkConnectorConfig.SNOWFLAKE_METADATA_OFFSET_AND_PARTITION) && !config .get(SnowflakeSinkConnectorConfig.SNOWFLAKE_METADATA_OFFSET_AND_PARTITION) @@ -54,6 +62,7 @@ public SnowflakeMetadataConfig(Map config) { createtimeFlag = createtime; topicFlag = topic; + sfConnectorVersionFlag = version; offsetAndPartitionFlag = offsetAndPartition; allFlag = all; } @@ -64,6 +73,8 @@ public String toString() { + ", " + "topicFlag: " + topicFlag + + "versionFlag: " + + sfConnectorVersionFlag + ", " + "offsetAndPartitionFlag: " + offsetAndPartitionFlag diff --git a/src/test/java/com/snowflake/kafka/connector/ConnectorIT.java b/src/test/java/com/snowflake/kafka/connector/ConnectorIT.java index 3dcd4e8f1..01d605bf6 100644 --- a/src/test/java/com/snowflake/kafka/connector/ConnectorIT.java +++ b/src/test/java/com/snowflake/kafka/connector/ConnectorIT.java @@ -24,6 +24,7 @@ public class ConnectorIT { SnowflakeSinkConnectorConfig.BUFFER_FLUSH_TIME_SEC, SnowflakeSinkConnectorConfig.SNOWFLAKE_METADATA_ALL, SnowflakeSinkConnectorConfig.SNOWFLAKE_METADATA_TOPIC, + SnowflakeSinkConnectorConfig.SNOWFLAKE_METADATA_SF_CONNECTOR_VERSION, SnowflakeSinkConnectorConfig.SNOWFLAKE_METADATA_OFFSET_AND_PARTITION, SnowflakeSinkConnectorConfig.SNOWFLAKE_METADATA_CREATETIME, SnowflakeSinkConnectorConfig.TOPICS_TABLES_MAP, @@ -78,6 +79,7 @@ static Map getErrorConfig() { config.put(SnowflakeSinkConnectorConfig.BUFFER_FLUSH_TIME_SEC, "-1"); config.put(SnowflakeSinkConnectorConfig.SNOWFLAKE_METADATA_ALL, "falseee"); config.put(SnowflakeSinkConnectorConfig.SNOWFLAKE_METADATA_TOPIC, "falseee"); + config.put(SnowflakeSinkConnectorConfig.SNOWFLAKE_METADATA_SF_CONNECTOR_VERSION, "falseee"); config.put(SnowflakeSinkConnectorConfig.SNOWFLAKE_METADATA_OFFSET_AND_PARTITION, "falseee"); config.put(SnowflakeSinkConnectorConfig.SNOWFLAKE_METADATA_CREATETIME, "falseee"); config.put(SnowflakeSinkConnectorConfig.TOPICS_TABLES_MAP, "jfsja,,"); diff --git a/src/test/java/com/snowflake/kafka/connector/records/MetaColumnTest.java b/src/test/java/com/snowflake/kafka/connector/records/MetaColumnTest.java index 8011c70f1..dd06673ea 100644 --- a/src/test/java/com/snowflake/kafka/connector/records/MetaColumnTest.java +++ b/src/test/java/com/snowflake/kafka/connector/records/MetaColumnTest.java @@ -8,6 +8,7 @@ import java.nio.file.Files; import java.nio.file.Paths; import java.util.HashMap; +import net.snowflake.client.jdbc.internal.fasterxml.jackson.core.JsonProcessingException; import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.JsonNode; import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.ObjectMapper; import org.apache.kafka.common.record.TimestampType; @@ -38,6 +39,12 @@ public class MetaColumnTest { put(SnowflakeSinkConnectorConfig.SNOWFLAKE_METADATA_TOPIC, "false"); } }; + private HashMap versionConfig = + new HashMap() { + { + put(SnowflakeSinkConnectorConfig.SNOWFLAKE_METADATA_SF_CONNECTOR_VERSION, "false"); + } + }; private HashMap offsetAndPartitionConfig = new HashMap() { { @@ -113,6 +120,18 @@ public void testConfig() throws IOException { assert result.get(META).has(RecordService.OFFSET); assert result.get(META).has(RecordService.PARTITION); assert result.get(META).has(record.timestampType().name); + assert !result.get(META).has(RecordService.SF_CONNECTOR_VERSION); + + // test metadata configuration -- add version + metadataConfig = new SnowflakeMetadataConfig(versionConfig); + service.setMetadataConfig(metadataConfig); + result = mapper.readTree(service.getProcessedRecordForSnowpipe(record)); + assert result.has(META); + assert !result.get(META).has(RecordService.SF_CONNECTOR_VERSION); + assert result.get(META).has(RecordService.OFFSET); + assert result.get(META).has(RecordService.PARTITION); + assert result.get(META).has(record.timestampType().name); + assert result.get(META).has(RecordService.TOPIC); // test metadata configuration -- remove offset and partition metadataConfig = new SnowflakeMetadataConfig(offsetAndPartitionConfig); @@ -121,6 +140,7 @@ public void testConfig() throws IOException { assert result.has(META); assert !result.get(META).has(RecordService.OFFSET); assert !result.get(META).has(RecordService.PARTITION); + assert !result.get(META).has(RecordService.SF_CONNECTOR_VERSION); assert result.get(META).has(record.timestampType().name); assert result.get(META).has(RecordService.TOPIC); @@ -130,6 +150,7 @@ public void testConfig() throws IOException { result = mapper.readTree(service.getProcessedRecordForSnowpipe(record)); assert result.has(META); assert !result.get(META).has(record.timestampType().name); + assert !result.get(META).has(RecordService.SF_CONNECTOR_VERSION); assert result.get(META).has(RecordService.TOPIC); assert result.get(META).has(RecordService.OFFSET); assert result.get(META).has(RecordService.PARTITION); @@ -143,6 +164,42 @@ public void testConfig() throws IOException { System.out.println("Config test success"); } + @Test + public void testSfConnectorVersion() throws JsonProcessingException { + + SnowflakeConverter converter = new SnowflakeJsonConverter(); + RecordService service = new RecordService(); + SchemaAndValue input = + converter.toConnectData( + topic, ("{\"name\":\"test" + "\"}").getBytes(StandardCharsets.UTF_8)); + long timestamp = System.currentTimeMillis(); + + SinkRecord record = + new SinkRecord( + topic, + partition, + Schema.STRING_SCHEMA, + "test", + input.schema(), + input.value(), + 0, + timestamp, + TimestampType.CREATE_TIME); + + SnowflakeMetadataConfig metadataConfig = new SnowflakeMetadataConfig(topicConfig); + service.setMetadataConfig(metadataConfig); + + metadataConfig = new SnowflakeMetadataConfig(versionConfig); + service.setMetadataConfig(metadataConfig); + JsonNode result = mapper.readTree(service.getProcessedRecordForSnowpipe(record)); + assert result.has(META); + assert result.get(META).has(RecordService.SF_CONNECTOR_VERSION); + assert result.get(META).has(RecordService.OFFSET); + assert result.get(META).has(RecordService.PARTITION); + assert result.get(META).has(record.timestampType().name); + assert result.get(META).has(RecordService.TOPIC); + } + @Test public void testTimeStamp() throws IOException { SnowflakeConverter converter = new SnowflakeJsonConverter(); diff --git a/src/test/java/com/snowflake/kafka/connector/records/RecordContentTest.java b/src/test/java/com/snowflake/kafka/connector/records/RecordContentTest.java index 94ce51299..5416fd449 100644 --- a/src/test/java/com/snowflake/kafka/connector/records/RecordContentTest.java +++ b/src/test/java/com/snowflake/kafka/connector/records/RecordContentTest.java @@ -264,7 +264,10 @@ public void testSchematizationArrayOfObject() throws JsonProcessingException { service.setEnableSchematization(true); String value = "{\"players\":[{\"name\":\"John Doe\",\"age\":30},{\"name\":\"Jane Doe\",\"age\":30}]}"; - byte[] valueContents = (value).getBytes(StandardCharsets.UTF_8); + String value2 = + "{\"cricket\":{\"team\":{\"MI\":{\"players\":[{\"name\":\"John" + + " Doe\",\"age\":30},{\"name\":\"Jane Doe\",\"age\":30}]}}}}"; + byte[] valueContents = (value2).getBytes(StandardCharsets.UTF_8); SchemaAndValue sv = jsonConverter.toConnectData(topic, valueContents); SinkRecord record =