Skip to content

Commit

Permalink
Toby's comments and more tests
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-japatel committed Oct 23, 2023
1 parent ab5a9ca commit 120233d
Show file tree
Hide file tree
Showing 7 changed files with 128 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -168,17 +168,19 @@ public class SnowflakeSinkConnectorConfig {
"Whether to optimize the streaming client to reduce cost. Note that this may affect"
+ " throughput or latency and can only be set if Streaming Snowpipe is enabled";

public static final String ENABLE_CONNECTOR_NAME_IN_STREAMING_CHANNEL_NAME =
"enable.connector_name.in.streaming_channel_name";
public static final boolean ENABLE_CONNECTOR_NAME_IN_STREAMING_CHANNEL_NAME_DEFAULT = false;
public static final String SNOWFLAKE_ENABLE_STREAMING_CHANNEL_FORMAT_V2 =
"snowflake.enable.streaming.channel.format.v2";
public static final boolean SNOWFLAKE_ENABLE_STREAMING_CHANNEL_FORMAT_V2_DEFAULT = false;

public static final String ENABLE_CONNECTOR_NAME_IN_STREAMING_CHANNEL_NAME_DISPLAY =
"Enable Connector Name in Snowpipe Streaming Channel Name";
public static final String SNOWFLAKE_ENABLE_STREAMING_CHANNEL_FORMAT_V2_DISPLAY =
"Enable Connector Name in Snowpipe Streaming Channel Name - V2 of Channel Name";

public static final String ENABLE_CONNECTOR_NAME_IN_STREAMING_CHANNEL_NAME_DOC =
public static final String SNOWFLAKE_ENABLE_STREAMING_CHANNEL_FORMAT_V2_DOC =
"Whether to use connector name in streaming channels. If it is set to false, we will not use"
+ " connector name in channel name. Note: Please use this config cautiously and it is not"
+ " advised to use this if you are coming from old Snowflake Kafka Connector Version. ";
+ " connector name in channel name(Which is version 2 of Channel Name). Note: Please use"
+ " this config cautiously and it is not advised to use this if you are coming from old"
+ " Snowflake Kafka Connector Version where Default Channel Name doesnt contain Connector"
+ " Name, contains Topic Name and Partition # only.";

// MDC logging header
public static final String ENABLE_MDC_LOGGING_CONFIG = "enable.mdc.logging";
Expand Down Expand Up @@ -605,15 +607,15 @@ static ConfigDef newConfigDef() {
ConfigDef.Width.NONE,
ENABLE_MDC_LOGGING_DISPLAY)
.define(
ENABLE_CONNECTOR_NAME_IN_STREAMING_CHANNEL_NAME,
SNOWFLAKE_ENABLE_STREAMING_CHANNEL_FORMAT_V2,
Type.BOOLEAN,
ENABLE_CONNECTOR_NAME_IN_STREAMING_CHANNEL_NAME_DEFAULT,
SNOWFLAKE_ENABLE_STREAMING_CHANNEL_FORMAT_V2_DEFAULT,
Importance.LOW,
ENABLE_CONNECTOR_NAME_IN_STREAMING_CHANNEL_NAME_DOC,
SNOWFLAKE_ENABLE_STREAMING_CHANNEL_FORMAT_V2_DOC,
CONNECTOR_CONFIG,
9,
ConfigDef.Width.NONE,
ENABLE_CONNECTOR_NAME_IN_STREAMING_CHANNEL_NAME_DISPLAY);
SNOWFLAKE_ENABLE_STREAMING_CHANNEL_FORMAT_V2_DISPLAY);
}

public static class TopicToTableValidator implements ConfigDef.Validator {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package com.snowflake.kafka.connector.internal.streaming;

import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.BUFFER_SIZE_BYTES_DEFAULT;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.ENABLE_CONNECTOR_NAME_IN_STREAMING_CHANNEL_NAME;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.ENABLE_CONNECTOR_NAME_IN_STREAMING_CHANNEL_NAME_DEFAULT;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.SNOWFLAKE_ENABLE_STREAMING_CHANNEL_FORMAT_V2;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.SNOWFLAKE_ENABLE_STREAMING_CHANNEL_FORMAT_V2_DEFAULT;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.SNOWFLAKE_ROLE;
import static com.snowflake.kafka.connector.internal.streaming.StreamingUtils.STREAMING_BUFFER_COUNT_RECORDS_DEFAULT;
import static com.snowflake.kafka.connector.internal.streaming.StreamingUtils.STREAMING_BUFFER_FLUSH_TIME_DEFAULT_SEC;
Expand Down Expand Up @@ -103,7 +103,7 @@ public class SnowflakeSinkServiceV2 implements SnowflakeSinkService {
// Cache for schema evolution
private final Map<String, Boolean> tableName2SchemaEvolutionPermission;

// Used to create a channel name.
// This is the V2 of channel Name creation. (This corresponds to the config SNOWFLAKE_ENABLE_STREAMING_CHANNEL_FORMAT_V2)
private final boolean shouldUseConnectorNameInChannelName;

public SnowflakeSinkServiceV2(
Expand Down Expand Up @@ -146,8 +146,8 @@ public SnowflakeSinkServiceV2(
this.shouldUseConnectorNameInChannelName =
Boolean.parseBoolean(
connectorConfig.getOrDefault(
ENABLE_CONNECTOR_NAME_IN_STREAMING_CHANNEL_NAME,
String.valueOf(ENABLE_CONNECTOR_NAME_IN_STREAMING_CHANNEL_NAME_DEFAULT)));
SNOWFLAKE_ENABLE_STREAMING_CHANNEL_FORMAT_V2,
String.valueOf(SNOWFLAKE_ENABLE_STREAMING_CHANNEL_FORMAT_V2_DEFAULT)));
}

@VisibleForTesting
Expand Down Expand Up @@ -196,8 +196,8 @@ public SnowflakeSinkServiceV2(
this.shouldUseConnectorNameInChannelName =
Boolean.parseBoolean(
connectorConfig.getOrDefault(
ENABLE_CONNECTOR_NAME_IN_STREAMING_CHANNEL_NAME,
String.valueOf(ENABLE_CONNECTOR_NAME_IN_STREAMING_CHANNEL_NAME_DEFAULT)));
SNOWFLAKE_ENABLE_STREAMING_CHANNEL_FORMAT_V2,
String.valueOf(SNOWFLAKE_ENABLE_STREAMING_CHANNEL_FORMAT_V2_DEFAULT)));
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,6 @@
package com.snowflake.kafka.connector.internal.streaming;

import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.BOOLEAN_VALIDATOR;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.CUSTOM_SNOWFLAKE_CONVERTERS;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.ERRORS_LOG_ENABLE_CONFIG;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.ERRORS_TOLERANCE_CONFIG;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.ErrorTolerance;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.INGESTION_METHOD_OPT;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.KEY_CONVERTER_CONFIG_FIELD;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.VALUE_CONVERTER_CONFIG_FIELD;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.*;

import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -221,6 +213,11 @@ public static ImmutableMap<String, String> validateStreamingSnowpipeConfig(
BOOLEAN_VALIDATOR.ensureValid(
ERRORS_LOG_ENABLE_CONFIG, inputConfig.get(ERRORS_LOG_ENABLE_CONFIG));
}
if (inputConfig.containsKey(SNOWFLAKE_ENABLE_STREAMING_CHANNEL_FORMAT_V2)) {
BOOLEAN_VALIDATOR.ensureValid(
SNOWFLAKE_ENABLE_STREAMING_CHANNEL_FORMAT_V2,
inputConfig.get(SNOWFLAKE_ENABLE_STREAMING_CHANNEL_FORMAT_V2));
}

// Valid schematization for Snowpipe Streaming
invalidParams.putAll(validateSchematizationConfig(inputConfig));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -867,6 +867,34 @@ public void testInvalidEnableOptimizeStreamingClientConfig() {
}
}

@Test
public void testEnableStreamingChannelFormatV2Config() {
Map<String, String> config = getConfig();
config.put(
SnowflakeSinkConnectorConfig.INGESTION_METHOD_OPT,
IngestionMethodConfig.SNOWPIPE_STREAMING.toString());
config.put(SnowflakeSinkConnectorConfig.SNOWFLAKE_ENABLE_STREAMING_CHANNEL_FORMAT_V2, "true");
config.put(Utils.SF_ROLE, "ACCOUNTADMIN");
Utils.validateConfig(config);
}

@Test
public void testInvalidEnableStreamingChannelFormatV2Config() {
try {
Map<String, String> config = getConfig();
config.put(
SnowflakeSinkConnectorConfig.INGESTION_METHOD_OPT,
IngestionMethodConfig.SNOWPIPE_STREAMING.toString());
config.put(Utils.SF_ROLE, "ACCOUNTADMIN");
config.put(SnowflakeSinkConnectorConfig.SNOWFLAKE_ENABLE_STREAMING_CHANNEL_FORMAT_V2, "yes");
Utils.validateConfig(config);
} catch (SnowflakeKafkaConnectorException exception) {
assert exception
.getMessage()
.contains(SnowflakeSinkConnectorConfig.SNOWFLAKE_ENABLE_STREAMING_CHANNEL_FORMAT_V2);
}
}

@Test
public void testInvalidEmptyConfig() {
try {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package com.snowflake.kafka.connector;

import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.BUFFER_COUNT_RECORDS;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.ENABLE_CONNECTOR_NAME_IN_STREAMING_CHANNEL_NAME;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.ERRORS_TOLERANCE_CONFIG;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.INGESTION_METHOD_OPT;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.SNOWFLAKE_ENABLE_STREAMING_CHANNEL_FORMAT_V2;
import static com.snowflake.kafka.connector.internal.TestUtils.TEST_CONNECTOR_NAME;
import static com.snowflake.kafka.connector.internal.streaming.SnowflakeSinkServiceV2.partitionChannelKey;

Expand Down Expand Up @@ -77,7 +77,7 @@ public void testSinkTaskInvalidRecord_InMemoryDLQ() throws Exception {
config.put(ERRORS_TOLERANCE_CONFIG, SnowflakeSinkConnectorConfig.ErrorTolerance.ALL.toString());
config.put(ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG, "test_DLQ");
config.put(
ENABLE_CONNECTOR_NAME_IN_STREAMING_CHANNEL_NAME,
SNOWFLAKE_ENABLE_STREAMING_CHANNEL_FORMAT_V2,
String.valueOf(this.shouldUseConnectorNameInChannelName));
InMemoryKafkaRecordErrorReporter errorReporter = new InMemoryKafkaRecordErrorReporter();
SnowflakeConnectionService mockConnectionService =
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.snowflake.kafka.connector.internal.streaming;

import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.ENABLE_CONNECTOR_NAME_IN_STREAMING_CHANNEL_NAME;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.SNOWFLAKE_ENABLE_STREAMING_CHANNEL_FORMAT_V2;
import static com.snowflake.kafka.connector.internal.TestUtils.TEST_CONNECTOR_NAME;
import static com.snowflake.kafka.connector.internal.streaming.SnowflakeSinkServiceV2.partitionChannelKey;
import static com.snowflake.kafka.connector.internal.streaming.TopicPartitionChannel.NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE;
Expand Down Expand Up @@ -129,7 +129,7 @@ public void testChannelCloseIngestion() throws Exception {
Map<String, String> config = getConfig();
SnowflakeSinkConnectorConfig.setDefaultValues(config);
config.put(
ENABLE_CONNECTOR_NAME_IN_STREAMING_CHANNEL_NAME,
SNOWFLAKE_ENABLE_STREAMING_CHANNEL_FORMAT_V2,
String.valueOf(this.shouldUseConnectorNameInChannelName));
conn.createTable(table);

Expand Down Expand Up @@ -181,7 +181,7 @@ public void testStreamingIngest_multipleChannelPartitions_closeSubsetOfPartition
Map<String, String> config = TestUtils.getConfForStreaming();
SnowflakeSinkConnectorConfig.setDefaultValues(config);
config.put(
ENABLE_CONNECTOR_NAME_IN_STREAMING_CHANNEL_NAME,
SNOWFLAKE_ENABLE_STREAMING_CHANNEL_FORMAT_V2,
String.valueOf(this.shouldUseConnectorNameInChannelName));
conn.createTable(table);
TopicPartition tp1 = new TopicPartition(table, partition);
Expand Down Expand Up @@ -312,7 +312,7 @@ public void testStreamingIngestion() throws Exception {
Map<String, String> config = getConfig();
SnowflakeSinkConnectorConfig.setDefaultValues(config);
config.put(
ENABLE_CONNECTOR_NAME_IN_STREAMING_CHANNEL_NAME,
SNOWFLAKE_ENABLE_STREAMING_CHANNEL_FORMAT_V2,
String.valueOf(this.shouldUseConnectorNameInChannelName));
conn.createTable(table);

Expand Down Expand Up @@ -379,7 +379,7 @@ public void testStreamingIngest_multipleChannelPartitions_withMetrics() throws E
Map<String, String> config = getConfig();
SnowflakeSinkConnectorConfig.setDefaultValues(config);
config.put(
ENABLE_CONNECTOR_NAME_IN_STREAMING_CHANNEL_NAME,
SNOWFLAKE_ENABLE_STREAMING_CHANNEL_FORMAT_V2,
String.valueOf(this.shouldUseConnectorNameInChannelName));

// set up telemetry service spy
Expand Down Expand Up @@ -1511,6 +1511,65 @@ public void testStreamingIngestion_invalid_file_version() throws Exception {
}
}

@Test
public void testStreamingIngestion_ChannelNameFormats() throws Exception {
Map<String, String> config = TestUtils.getConfForStreaming();
SnowflakeSinkConnectorConfig.setDefaultValues(config);
Map<String, String> overriddenConfig = new HashMap<>(config);

config.put(
SNOWFLAKE_ENABLE_STREAMING_CHANNEL_FORMAT_V2,
String.valueOf(this.shouldUseConnectorNameInChannelName));

conn.createTable(table);
// opens a channel for partition 0, table and topic
SnowflakeSinkService service =
SnowflakeSinkServiceFactory.builder(conn, IngestionMethodConfig.SNOWPIPE_STREAMING, config)
.setRecordNumber(1)
.setErrorReporter(new InMemoryKafkaRecordErrorReporter())
.setSinkTaskContext(new InMemorySinkTaskContext(Collections.singleton(topicPartition)))
.addTask(table, new TopicPartition(topic, partition)) // Internally calls startTask
.build();

SnowflakeConverter converter = new SnowflakeJsonConverter();
SchemaAndValue input =
converter.toConnectData(topic, "{\"name\":\"test\"}".getBytes(StandardCharsets.UTF_8));
long offset = 0;

SinkRecord record1 =
new SinkRecord(
topic,
partition,
Schema.STRING_SCHEMA,
"test_key" + offset,
input.schema(),
input.value(),
offset);

// No need to verify results
service.insert(record1);

SnowflakeSinkServiceV2 snowflakeSinkServiceV2 = (SnowflakeSinkServiceV2) service;

TopicPartitionChannel channel =
snowflakeSinkServiceV2
.getTopicPartitionChannelFromCacheKey(
partitionChannelKey(
TEST_CONNECTOR_NAME,
topic,
partition,
this.shouldUseConnectorNameInChannelName))
.orElseThrow(RuntimeException::new);
assert channel
.getChannelName()
.toLowerCase()
.contains(
SnowflakeSinkServiceV2.partitionChannelKey(
TEST_CONNECTOR_NAME, topic, partition, this.shouldUseConnectorNameInChannelName)
.toLowerCase());
service.closeAll();
}

private void createNonNullableColumn(String tableName, String colName) {
String createTableQuery = "alter table identifier(?) add " + colName + " int not null";

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.snowflake.kafka.connector.internal.streaming;

import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.ENABLE_CONNECTOR_NAME_IN_STREAMING_CHANNEL_NAME;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.SNOWFLAKE_ENABLE_STREAMING_CHANNEL_FORMAT_V2;
import static com.snowflake.kafka.connector.internal.TestUtils.TEST_CONNECTOR_NAME;

import com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig;
Expand Down Expand Up @@ -78,7 +78,7 @@ public void testAutoChannelReopenOn_OffsetTokenSFException() throws Exception {
Map<String, String> config = TestUtils.getConfForStreaming();
SnowflakeSinkConnectorConfig.setDefaultValues(config);
config.put(
ENABLE_CONNECTOR_NAME_IN_STREAMING_CHANNEL_NAME,
SNOWFLAKE_ENABLE_STREAMING_CHANNEL_FORMAT_V2,
String.valueOf(this.shouldUseConnectorNameInChannelName));

InMemorySinkTaskContext inMemorySinkTaskContext =
Expand Down Expand Up @@ -139,7 +139,7 @@ public void testInsertRowsOnChannelClosed() throws Exception {
SnowflakeSinkConnectorConfig.setDefaultValues(config);
SnowflakeSinkConnectorConfig.setDefaultValues(config);
config.put(
ENABLE_CONNECTOR_NAME_IN_STREAMING_CHANNEL_NAME,
SNOWFLAKE_ENABLE_STREAMING_CHANNEL_FORMAT_V2,
String.valueOf(this.shouldUseConnectorNameInChannelName));

InMemorySinkTaskContext inMemorySinkTaskContext =
Expand Down Expand Up @@ -207,7 +207,7 @@ public void testAutoChannelReopen_InsertRowsSFException() throws Exception {
SnowflakeSinkConnectorConfig.setDefaultValues(config);
SnowflakeSinkConnectorConfig.setDefaultValues(config);
config.put(
ENABLE_CONNECTOR_NAME_IN_STREAMING_CHANNEL_NAME,
SNOWFLAKE_ENABLE_STREAMING_CHANNEL_FORMAT_V2,
String.valueOf(this.shouldUseConnectorNameInChannelName));

InMemorySinkTaskContext inMemorySinkTaskContext =
Expand Down Expand Up @@ -299,7 +299,7 @@ public void testAutoChannelReopen_MultiplePartitionsInsertRowsSFException() thro
config.put(SnowflakeSinkConnectorConfig.ENABLE_STREAMING_CLIENT_OPTIMIZATION_CONFIG, "true");
SnowflakeSinkConnectorConfig.setDefaultValues(config);
config.put(
ENABLE_CONNECTOR_NAME_IN_STREAMING_CHANNEL_NAME,
SNOWFLAKE_ENABLE_STREAMING_CHANNEL_FORMAT_V2,
String.valueOf(this.shouldUseConnectorNameInChannelName));

InMemorySinkTaskContext inMemorySinkTaskContext =
Expand Down Expand Up @@ -422,7 +422,7 @@ public void testAutoChannelReopen_SinglePartitionsInsertRowsSFException() throws
config.put(SnowflakeSinkConnectorConfig.ENABLE_STREAMING_CLIENT_OPTIMIZATION_CONFIG, "true");
SnowflakeSinkConnectorConfig.setDefaultValues(config);
config.put(
ENABLE_CONNECTOR_NAME_IN_STREAMING_CHANNEL_NAME,
SNOWFLAKE_ENABLE_STREAMING_CHANNEL_FORMAT_V2,
String.valueOf(this.shouldUseConnectorNameInChannelName));

InMemorySinkTaskContext inMemorySinkTaskContext =
Expand Down Expand Up @@ -501,7 +501,7 @@ public void testSimpleInsertRowsFailureWithArrowBDECFormat() throws Exception {
overriddenConfig.put(SnowflakeSinkConnectorConfig.SNOWPIPE_STREAMING_FILE_VERSION, "1");
SnowflakeSinkConnectorConfig.setDefaultValues(overriddenConfig);
overriddenConfig.put(
ENABLE_CONNECTOR_NAME_IN_STREAMING_CHANNEL_NAME,
SNOWFLAKE_ENABLE_STREAMING_CHANNEL_FORMAT_V2,
String.valueOf(this.shouldUseConnectorNameInChannelName));

InMemorySinkTaskContext inMemorySinkTaskContext =
Expand Down

0 comments on commit 120233d

Please sign in to comment.