diff --git a/src/main/java/com/snowflake/kafka/connector/SnowflakeSinkConnectorConfig.java b/src/main/java/com/snowflake/kafka/connector/SnowflakeSinkConnectorConfig.java index 4498c7d8a..84f336b3a 100644 --- a/src/main/java/com/snowflake/kafka/connector/SnowflakeSinkConnectorConfig.java +++ b/src/main/java/com/snowflake/kafka/connector/SnowflakeSinkConnectorConfig.java @@ -111,6 +111,10 @@ public class SnowflakeSinkConnectorConfig { public static final String INGESTION_METHOD_DEFAULT_SNOWPIPE = IngestionMethodConfig.SNOWPIPE.toString(); + // This is the streaming bdec file version which can be defined in config + // NOTE: Please do not override this value unless recommended from snowflake + public static final String SNOWPIPE_STREAMING_FILE_VERSION = "snowflake.streaming.file.version"; + // TESTING public static final String REBALANCING = "snowflake.test.rebalancing"; public static final boolean REBALANCING_DEFAULT = false; @@ -463,6 +467,18 @@ static ConfigDef newConfigDef() { 5, ConfigDef.Width.NONE, INGESTION_METHOD_OPT) + .define( + SNOWPIPE_STREAMING_FILE_VERSION, + Type.STRING, + "", // default is handled in Ingest SDK + null, // no validator + Importance.LOW, + "Acceptable values for Snowpipe Streaming BDEC Versions: 1 and 3. Check Ingest" + + " SDK for default behavior. Please do not set this unless Absolutely needed. ", + CONNECTOR_CONFIG, + 6, + ConfigDef.Width.NONE, + SNOWPIPE_STREAMING_FILE_VERSION) .define( ERRORS_TOLERANCE_CONFIG, Type.STRING, diff --git a/src/main/java/com/snowflake/kafka/connector/Utils.java b/src/main/java/com/snowflake/kafka/connector/Utils.java index 3f0c73ea6..01684e418 100644 --- a/src/main/java/com/snowflake/kafka/connector/Utils.java +++ b/src/main/java/com/snowflake/kafka/connector/Utils.java @@ -365,6 +365,13 @@ static String validateConfig(Map config) { "Schematization is only available with {}.", IngestionMethodConfig.SNOWPIPE_STREAMING.toString()); } + if (config.containsKey(SnowflakeSinkConnectorConfig.SNOWPIPE_STREAMING_FILE_VERSION)) { + configIsValid = false; + LOGGER.error( + "{} is only available with ingestion type: {}.", + SnowflakeSinkConnectorConfig.SNOWPIPE_STREAMING_FILE_VERSION, + IngestionMethodConfig.SNOWPIPE_STREAMING.toString()); + } } if (config.containsKey(SnowflakeSinkConnectorConfig.TOPICS_TABLES_MAP) diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2.java index 52fbbb2e8..8038f67a9 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2.java @@ -1,8 +1,10 @@ package com.snowflake.kafka.connector.internal.streaming; import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.BUFFER_SIZE_BYTES_DEFAULT; +import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.SNOWPIPE_STREAMING_FILE_VERSION; import static com.snowflake.kafka.connector.internal.streaming.StreamingUtils.STREAMING_BUFFER_COUNT_RECORDS_DEFAULT; import static com.snowflake.kafka.connector.internal.streaming.StreamingUtils.STREAMING_BUFFER_FLUSH_TIME_DEFAULT_SEC; +import static net.snowflake.ingest.utils.ParameterProvider.BLOB_FORMAT_VERSION; import com.codahale.metrics.MetricRegistry; import com.google.common.annotations.VisibleForTesting; @@ -473,10 +475,23 @@ private void initStreamingClient() { streamingClientProps.putAll(streamingPropertiesMap); if (this.streamingIngestClient == null || this.streamingIngestClient.isClosed()) { try { + // Override only if bdec version is explicitly set in config, default to the version set + // inside + // Ingest SDK + Map parameterOverrides = new HashMap<>(); + Optional snowpipeStreamingBdecVersion = + Optional.ofNullable(this.connectorConfig.get(SNOWPIPE_STREAMING_FILE_VERSION)); + snowpipeStreamingBdecVersion.ifPresent( + overriddenValue -> { + LOGGER.info("Config is overridden for {} ", SNOWPIPE_STREAMING_FILE_VERSION); + parameterOverrides.put(BLOB_FORMAT_VERSION, overriddenValue); + }); + LOGGER.info("Initializing Streaming Client. ClientName:{}", this.streamingIngestClientName); this.streamingIngestClient = SnowflakeStreamingIngestClientFactory.builder(this.streamingIngestClientName) .setProperties(streamingClientProps) + .setParameterOverrides(parameterOverrides) .build(); } catch (SFException ex) { LOGGER.error( diff --git a/src/test/java/com/snowflake/kafka/connector/ConnectorConfigTest.java b/src/test/java/com/snowflake/kafka/connector/ConnectorConfigTest.java index 7236a20b7..a94ba81c9 100644 --- a/src/test/java/com/snowflake/kafka/connector/ConnectorConfigTest.java +++ b/src/test/java/com/snowflake/kafka/connector/ConnectorConfigTest.java @@ -593,4 +593,30 @@ public void testValidSchematizationForStreamingSnowpipe() { config.put(Utils.SF_ROLE, "ACCOUNTADMIN"); Utils.validateConfig(config); } + + @Test(expected = SnowflakeKafkaConnectorException.class) + public void testInValidConfigFileTypeForSnowpipe() { + Map config = getConfig(); + config.put(SnowflakeSinkConnectorConfig.SNOWPIPE_STREAMING_FILE_VERSION, "3"); + Utils.validateConfig(config); + } + + @Test + public void testValidFileTypesForSnowpipeStreaming() { + Map config = getConfig(); + config.put( + SnowflakeSinkConnectorConfig.INGESTION_METHOD_OPT, + IngestionMethodConfig.SNOWPIPE_STREAMING.toString()); + config.put(Utils.SF_ROLE, "ACCOUNTADMIN"); + + config.put(SnowflakeSinkConnectorConfig.SNOWPIPE_STREAMING_FILE_VERSION, "3"); + Utils.validateConfig(config); + + config.put(SnowflakeSinkConnectorConfig.SNOWPIPE_STREAMING_FILE_VERSION, "1"); + Utils.validateConfig(config); + + // lower case + config.put(SnowflakeSinkConnectorConfig.SNOWPIPE_STREAMING_FILE_VERSION, "abcd"); + Utils.validateConfig(config); + } } diff --git a/src/test/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2IT.java b/src/test/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2IT.java index a218b51b4..ac4615a98 100644 --- a/src/test/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2IT.java +++ b/src/test/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2IT.java @@ -35,6 +35,7 @@ import org.apache.kafka.connect.json.JsonConverter; import org.apache.kafka.connect.sink.SinkRecord; import org.junit.After; +import org.junit.Assert; import org.junit.Test; public class SnowflakeSinkServiceV2IT { @@ -1128,6 +1129,30 @@ public void testSchematizationSchemaEvolutionWithNonNullableColumn() throws Exce service.closeAll(); } + @Test + public void testStreamingIngestion_invalid_file_version() throws Exception { + Map config = TestUtils.getConfForStreaming(); + SnowflakeSinkConnectorConfig.setDefaultValues(config); + Map overriddenConfig = new HashMap<>(config); + overriddenConfig.put( + SnowflakeSinkConnectorConfig.SNOWPIPE_STREAMING_FILE_VERSION, "TWOO_HUNDRED"); + + conn.createTable(table); + + try { + // This will fail in creation of client + SnowflakeSinkServiceFactory.builder( + conn, IngestionMethodConfig.SNOWPIPE_STREAMING, overriddenConfig) + .setRecordNumber(1) + .setErrorReporter(new InMemoryKafkaRecordErrorReporter()) + .setSinkTaskContext(new InMemorySinkTaskContext(Collections.singleton(topicPartition))) + .addTask(table, new TopicPartition(topic, partition)) // Internally calls startTask + .build(); + } catch (IllegalArgumentException ex) { + Assert.assertEquals(NumberFormatException.class, ex.getCause().getClass()); + } + } + private void createNonNullableColumn(String tableName, String colName) { String createTableQuery = "alter table identifier(?) add " + colName + " int not null"; diff --git a/src/test/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannelIT.java b/src/test/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannelIT.java index d0994c5a7..e4fb6bc11 100644 --- a/src/test/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannelIT.java +++ b/src/test/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannelIT.java @@ -9,6 +9,7 @@ import com.snowflake.kafka.connector.internal.TestUtils; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import net.snowflake.ingest.streaming.OpenChannelRequest; @@ -408,4 +409,35 @@ public void testAutoChannelReopen_SinglePartitionsInsertRowsSFException() throws assert TestUtils.getOffsetTokenForChannelAndTable(testTableName, testChannelName) == (recordsInPartition1 + anotherSetOfRecords - 1); } + + @Test + public void testSimpleInsertRowsWithArrowBDECFormat() throws Exception { + // add config which overrides the bdec file format + Map overriddenConfig = new HashMap<>(TestUtils.getConfForStreaming()); + overriddenConfig.put(SnowflakeSinkConnectorConfig.SNOWPIPE_STREAMING_FILE_VERSION, "1"); + + InMemorySinkTaskContext inMemorySinkTaskContext = + new InMemorySinkTaskContext(Collections.singleton(topicPartition)); + + // This will automatically create a channel for topicPartition. + SnowflakeSinkService service = + SnowflakeSinkServiceFactory.builder( + conn, IngestionMethodConfig.SNOWPIPE_STREAMING, overriddenConfig) + .setRecordNumber(1) + .setErrorReporter(new InMemoryKafkaRecordErrorReporter()) + .setSinkTaskContext(inMemorySinkTaskContext) + .addTask(testTableName, topicPartition) + .build(); + + final long noOfRecords = 1; + + // send regular data + List records = + TestUtils.createJsonStringSinkRecords(0, noOfRecords, topic, PARTITION); + + service.insert(records); + + TestUtils.assertWithRetry( + () -> service.getOffset(new TopicPartition(topic, PARTITION)) == noOfRecords, 20, 5); + } }