Skip to content

Commit

Permalink
NO-SNOW: allow StringConverter when schematization is in the config b…
Browse files Browse the repository at this point in the history
…ut set to false (#560)

* fix

* format

* add tests
  • Loading branch information
sfc-gh-tzhang authored Mar 1, 2023
1 parent b0625f4 commit b86440c
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,9 @@ private static boolean validateSchematizationConfig(Map<String, String> inputCon
SnowflakeSinkConnectorConfig.ENABLE_SCHEMATIZATION_CONFIG,
inputConfig.get(SnowflakeSinkConnectorConfig.ENABLE_SCHEMATIZATION_CONFIG));

if (inputConfig.get(VALUE_CONVERTER_CONFIG_FIELD) != null
if (Boolean.parseBoolean(
inputConfig.get(SnowflakeSinkConnectorConfig.ENABLE_SCHEMATIZATION_CONFIG))
&& inputConfig.get(VALUE_CONVERTER_CONFIG_FIELD) != null
&& (inputConfig.get(VALUE_CONVERTER_CONFIG_FIELD).contains(STRING_CONVERTER_KEYWORD)
|| inputConfig
.get(VALUE_CONVERTER_CONFIG_FIELD)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -572,6 +572,32 @@ public void testInvalidValueConvertersForStreamingSnowpipe() {
Utils.validateConfig(config);
}

@Test(expected = SnowflakeKafkaConnectorException.class)
public void testInValidConfigFileTypeForSnowpipe() {
Map<String, String> config = getConfig();
config.put(SnowflakeSinkConnectorConfig.SNOWPIPE_STREAMING_FILE_VERSION, "3");
Utils.validateConfig(config);
}

@Test
public void testValidFileTypesForSnowpipeStreaming() {
Map<String, String> config = getConfig();
config.put(
SnowflakeSinkConnectorConfig.INGESTION_METHOD_OPT,
IngestionMethodConfig.SNOWPIPE_STREAMING.toString());
config.put(Utils.SF_ROLE, "ACCOUNTADMIN");

config.put(SnowflakeSinkConnectorConfig.SNOWPIPE_STREAMING_FILE_VERSION, "3");
Utils.validateConfig(config);

config.put(SnowflakeSinkConnectorConfig.SNOWPIPE_STREAMING_FILE_VERSION, "1");
Utils.validateConfig(config);

// lower case
config.put(SnowflakeSinkConnectorConfig.SNOWPIPE_STREAMING_FILE_VERSION, "abcd");
Utils.validateConfig(config);
}

@Test(expected = SnowflakeKafkaConnectorException.class)
public void testInvalidSchematizationForSnowpipe() {
Map<String, String> config = getConfig();
Expand All @@ -595,28 +621,30 @@ public void testValidSchematizationForStreamingSnowpipe() {
}

@Test(expected = SnowflakeKafkaConnectorException.class)
public void testInValidConfigFileTypeForSnowpipe() {
public void testSchematizationWithUnsupportedConverter() {
Map<String, String> config = getConfig();
config.put(SnowflakeSinkConnectorConfig.SNOWPIPE_STREAMING_FILE_VERSION, "3");
config.put(
SnowflakeSinkConnectorConfig.INGESTION_METHOD_OPT,
IngestionMethodConfig.SNOWPIPE_STREAMING.toString());
config.put(SnowflakeSinkConnectorConfig.ENABLE_SCHEMATIZATION_CONFIG, "true");
config.put(
SnowflakeSinkConnectorConfig.VALUE_CONVERTER_CONFIG_FIELD,
"org.apache.kafka.connect.storage.StringConverter");
config.put(Utils.SF_ROLE, "ACCOUNTADMIN");
Utils.validateConfig(config);
}

@Test
public void testValidFileTypesForSnowpipeStreaming() {
public void testDisabledSchematizationWithUnsupportedConverter() {
Map<String, String> config = getConfig();
config.put(
SnowflakeSinkConnectorConfig.INGESTION_METHOD_OPT,
IngestionMethodConfig.SNOWPIPE_STREAMING.toString());
config.put(SnowflakeSinkConnectorConfig.ENABLE_SCHEMATIZATION_CONFIG, "false");
config.put(
SnowflakeSinkConnectorConfig.VALUE_CONVERTER_CONFIG_FIELD,
"org.apache.kafka.connect.storage.StringConverter");
config.put(Utils.SF_ROLE, "ACCOUNTADMIN");

config.put(SnowflakeSinkConnectorConfig.SNOWPIPE_STREAMING_FILE_VERSION, "3");
Utils.validateConfig(config);

config.put(SnowflakeSinkConnectorConfig.SNOWPIPE_STREAMING_FILE_VERSION, "1");
Utils.validateConfig(config);

// lower case
config.put(SnowflakeSinkConnectorConfig.SNOWPIPE_STREAMING_FILE_VERSION, "abcd");
Utils.validateConfig(config);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,24 @@
"connector.class": "com.snowflake.kafka.connector.SnowflakeSinkConnector",
"topics": "SNOWFLAKE_TEST_TOPIC",
"tasks.max": "1",
"buffer.flush.time":"10",
"buffer.count.records":"100",
"buffer.size.bytes":"5000000",
"snowflake.url.name":"SNOWFLAKE_HOST",
"snowflake.user.name":"SNOWFLAKE_USER",
"snowflake.private.key":"SNOWFLAKE_PRIVATE_KEY",
"snowflake.database.name":"SNOWFLAKE_DATABASE",
"snowflake.schema.name":"SNOWFLAKE_SCHEMA",
"buffer.flush.time": "10",
"buffer.count.records": "100",
"buffer.size.bytes": "5000000",
"snowflake.url.name": "SNOWFLAKE_HOST",
"snowflake.user.name": "SNOWFLAKE_USER",
"snowflake.private.key": "SNOWFLAKE_PRIVATE_KEY",
"snowflake.database.name": "SNOWFLAKE_DATABASE",
"snowflake.schema.name": "SNOWFLAKE_SCHEMA",
"snowflake.role.name": "SNOWFLAKE_ROLE",
"snowflake.ingestion.method": "SNOWPIPE_STREAMING",
"key.converter":"org.apache.kafka.connect.storage.StringConverter",
"value.converter":"org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable":"false",
"snowflake.enable.schematization": "false",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"jmx": "true",
"errors.tolerance":"all",
"errors.tolerance": "all",
"errors.log.enable": true,
"errors.deadletterqueue.topic.name":"DLQ_TOPIC",
"errors.deadletterqueue.topic.replication.factor":1
"errors.deadletterqueue.topic.name": "DLQ_TOPIC",
"errors.deadletterqueue.topic.replication.factor": 1
}
}

0 comments on commit b86440c

Please sign in to comment.