Skip to content

Commit

Permalink
SNOW-740327 Create config for using Arrow BDEC file format - Redo aft…
Browse files Browse the repository at this point in the history
…er revert of earlier commits (#556)
  • Loading branch information
sfc-gh-japatel authored Mar 1, 2023
1 parent d9b0beb commit b0625f4
Show file tree
Hide file tree
Showing 6 changed files with 121 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,10 @@ public class SnowflakeSinkConnectorConfig {
public static final String INGESTION_METHOD_DEFAULT_SNOWPIPE =
IngestionMethodConfig.SNOWPIPE.toString();

// This is the streaming bdec file version which can be defined in config
// NOTE: Please do not override this value unless recommended from snowflake
public static final String SNOWPIPE_STREAMING_FILE_VERSION = "snowflake.streaming.file.version";

// TESTING
public static final String REBALANCING = "snowflake.test.rebalancing";
public static final boolean REBALANCING_DEFAULT = false;
Expand Down Expand Up @@ -463,6 +467,18 @@ static ConfigDef newConfigDef() {
5,
ConfigDef.Width.NONE,
INGESTION_METHOD_OPT)
.define(
SNOWPIPE_STREAMING_FILE_VERSION,
Type.STRING,
"", // default is handled in Ingest SDK
null, // no validator
Importance.LOW,
"Acceptable values for Snowpipe Streaming BDEC Versions: 1 and 3. Check Ingest"
+ " SDK for default behavior. Please do not set this unless Absolutely needed. ",
CONNECTOR_CONFIG,
6,
ConfigDef.Width.NONE,
SNOWPIPE_STREAMING_FILE_VERSION)
.define(
ERRORS_TOLERANCE_CONFIG,
Type.STRING,
Expand Down
7 changes: 7 additions & 0 deletions src/main/java/com/snowflake/kafka/connector/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,13 @@ static String validateConfig(Map<String, String> config) {
"Schematization is only available with {}.",
IngestionMethodConfig.SNOWPIPE_STREAMING.toString());
}
if (config.containsKey(SnowflakeSinkConnectorConfig.SNOWPIPE_STREAMING_FILE_VERSION)) {
configIsValid = false;
LOGGER.error(
"{} is only available with ingestion type: {}.",
SnowflakeSinkConnectorConfig.SNOWPIPE_STREAMING_FILE_VERSION,
IngestionMethodConfig.SNOWPIPE_STREAMING.toString());
}
}

if (config.containsKey(SnowflakeSinkConnectorConfig.TOPICS_TABLES_MAP)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package com.snowflake.kafka.connector.internal.streaming;

import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.BUFFER_SIZE_BYTES_DEFAULT;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.SNOWPIPE_STREAMING_FILE_VERSION;
import static com.snowflake.kafka.connector.internal.streaming.StreamingUtils.STREAMING_BUFFER_COUNT_RECORDS_DEFAULT;
import static com.snowflake.kafka.connector.internal.streaming.StreamingUtils.STREAMING_BUFFER_FLUSH_TIME_DEFAULT_SEC;
import static net.snowflake.ingest.utils.ParameterProvider.BLOB_FORMAT_VERSION;

import com.codahale.metrics.MetricRegistry;
import com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -473,10 +475,23 @@ private void initStreamingClient() {
streamingClientProps.putAll(streamingPropertiesMap);
if (this.streamingIngestClient == null || this.streamingIngestClient.isClosed()) {
try {
// Override only if bdec version is explicitly set in config, default to the version set
// inside
// Ingest SDK
Map<String, Object> parameterOverrides = new HashMap<>();
Optional<String> snowpipeStreamingBdecVersion =
Optional.ofNullable(this.connectorConfig.get(SNOWPIPE_STREAMING_FILE_VERSION));
snowpipeStreamingBdecVersion.ifPresent(
overriddenValue -> {
LOGGER.info("Config is overridden for {} ", SNOWPIPE_STREAMING_FILE_VERSION);
parameterOverrides.put(BLOB_FORMAT_VERSION, overriddenValue);
});

LOGGER.info("Initializing Streaming Client. ClientName:{}", this.streamingIngestClientName);
this.streamingIngestClient =
SnowflakeStreamingIngestClientFactory.builder(this.streamingIngestClientName)
.setProperties(streamingClientProps)
.setParameterOverrides(parameterOverrides)
.build();
} catch (SFException ex) {
LOGGER.error(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -593,4 +593,30 @@ public void testValidSchematizationForStreamingSnowpipe() {
config.put(Utils.SF_ROLE, "ACCOUNTADMIN");
Utils.validateConfig(config);
}

@Test(expected = SnowflakeKafkaConnectorException.class)
public void testInValidConfigFileTypeForSnowpipe() {
Map<String, String> config = getConfig();
config.put(SnowflakeSinkConnectorConfig.SNOWPIPE_STREAMING_FILE_VERSION, "3");
Utils.validateConfig(config);
}

@Test
public void testValidFileTypesForSnowpipeStreaming() {
Map<String, String> config = getConfig();
config.put(
SnowflakeSinkConnectorConfig.INGESTION_METHOD_OPT,
IngestionMethodConfig.SNOWPIPE_STREAMING.toString());
config.put(Utils.SF_ROLE, "ACCOUNTADMIN");

config.put(SnowflakeSinkConnectorConfig.SNOWPIPE_STREAMING_FILE_VERSION, "3");
Utils.validateConfig(config);

config.put(SnowflakeSinkConnectorConfig.SNOWPIPE_STREAMING_FILE_VERSION, "1");
Utils.validateConfig(config);

// lower case
config.put(SnowflakeSinkConnectorConfig.SNOWPIPE_STREAMING_FILE_VERSION, "abcd");
Utils.validateConfig(config);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.sink.SinkRecord;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;

public class SnowflakeSinkServiceV2IT {
Expand Down Expand Up @@ -1128,6 +1129,30 @@ public void testSchematizationSchemaEvolutionWithNonNullableColumn() throws Exce
service.closeAll();
}

@Test
public void testStreamingIngestion_invalid_file_version() throws Exception {
Map<String, String> config = TestUtils.getConfForStreaming();
SnowflakeSinkConnectorConfig.setDefaultValues(config);
Map<String, String> overriddenConfig = new HashMap<>(config);
overriddenConfig.put(
SnowflakeSinkConnectorConfig.SNOWPIPE_STREAMING_FILE_VERSION, "TWOO_HUNDRED");

conn.createTable(table);

try {
// This will fail in creation of client
SnowflakeSinkServiceFactory.builder(
conn, IngestionMethodConfig.SNOWPIPE_STREAMING, overriddenConfig)
.setRecordNumber(1)
.setErrorReporter(new InMemoryKafkaRecordErrorReporter())
.setSinkTaskContext(new InMemorySinkTaskContext(Collections.singleton(topicPartition)))
.addTask(table, new TopicPartition(topic, partition)) // Internally calls startTask
.build();
} catch (IllegalArgumentException ex) {
Assert.assertEquals(NumberFormatException.class, ex.getCause().getClass());
}
}

private void createNonNullableColumn(String tableName, String colName) {
String createTableQuery = "alter table identifier(?) add " + colName + " int not null";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.snowflake.kafka.connector.internal.TestUtils;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import net.snowflake.ingest.streaming.OpenChannelRequest;
Expand Down Expand Up @@ -408,4 +409,35 @@ public void testAutoChannelReopen_SinglePartitionsInsertRowsSFException() throws
assert TestUtils.getOffsetTokenForChannelAndTable(testTableName, testChannelName)
== (recordsInPartition1 + anotherSetOfRecords - 1);
}

@Test
public void testSimpleInsertRowsWithArrowBDECFormat() throws Exception {
// add config which overrides the bdec file format
Map<String, String> overriddenConfig = new HashMap<>(TestUtils.getConfForStreaming());
overriddenConfig.put(SnowflakeSinkConnectorConfig.SNOWPIPE_STREAMING_FILE_VERSION, "1");

InMemorySinkTaskContext inMemorySinkTaskContext =
new InMemorySinkTaskContext(Collections.singleton(topicPartition));

// This will automatically create a channel for topicPartition.
SnowflakeSinkService service =
SnowflakeSinkServiceFactory.builder(
conn, IngestionMethodConfig.SNOWPIPE_STREAMING, overriddenConfig)
.setRecordNumber(1)
.setErrorReporter(new InMemoryKafkaRecordErrorReporter())
.setSinkTaskContext(inMemorySinkTaskContext)
.addTask(testTableName, topicPartition)
.build();

final long noOfRecords = 1;

// send regular data
List<SinkRecord> records =
TestUtils.createJsonStringSinkRecords(0, noOfRecords, topic, PARTITION);

service.insert(records);

TestUtils.assertWithRetry(
() -> service.getOffset(new TopicPartition(topic, PARTITION)) == noOfRecords, 20, 5);
}
}

0 comments on commit b0625f4

Please sign in to comment.