Skip to content

Commit

Permalink
File prefix salted with topic for all topic2table map name results.
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-dseweryn committed Dec 19, 2024
1 parent 3ec125f commit 82ab2f1
Show file tree
Hide file tree
Showing 6 changed files with 85 additions and 64 deletions.
54 changes: 49 additions & 5 deletions src/main/java/com/snowflake/kafka/connector/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,28 @@ public static boolean isSnowpipeStreamingIngestion(Map<String, String> config) {
return !isSnowpipeIngestion(config);
}

/**
* Class for returned GeneratedName. isNameFromMap equal to True indicates that the name was resolved by using the
* map passed to appropriate function. {@link Utils#generateTableName(String, Map)}
*/
public static class GeneratedName {
public final String name;
public final boolean isNameFromMap;

private GeneratedName(String name, boolean isNameFromMap) {
this.name = name;
this.isNameFromMap = isNameFromMap;
}

private static GeneratedName fromMap(String name) {
return new GeneratedName(name, true);
}

private static GeneratedName generated(String name) {
return new GeneratedName(name, false);
}
}

/**
* modify invalid application name in config and return the generated application name
*
Expand All @@ -438,7 +460,7 @@ public static boolean isSnowpipeStreamingIngestion(Map<String, String> config) {
public static void convertAppName(Map<String, String> config) {
String appName = config.getOrDefault(SnowflakeSinkConnectorConfig.NAME, "");
// If appName is empty the following call will throw error
String validAppName = generateValidName(appName, new HashMap<String, String>());
String validAppName = generateValidName(appName, new HashMap<>());

config.put(SnowflakeSinkConnectorConfig.NAME, validAppName);
}
Expand All @@ -454,6 +476,17 @@ public static String tableName(String topic, Map<String, String> topic2table) {
return generateValidName(topic, topic2table);
}

/**
* Verify topic name and generate a valid table name. The returned GeneratedName has a flag isNameFromMap that indicates if the name was retrieved from the passed topic2table map which has particular outcomes for the SnowflakeSinkServiceV1
*
* @param topic input topic name
* @param topic2table topic to table map
* @return return GeneratedName with valid table name and a flag whether the name was taken from the topic2table
*/
public static GeneratedName generateTableName(String topic, Map<String, String> topic2table) {
return generateValidNameFromMap(topic, topic2table);
}

/**
* verify topic name, and generate valid table/application name
*
Expand All @@ -462,23 +495,34 @@ public static String tableName(String topic, Map<String, String> topic2table) {
* @return valid table/application name
*/
public static String generateValidName(String topic, Map<String, String> topic2table) {
return generateValidNameFromMap(topic, topic2table).name;
}

/**
* verify topic name, and generate valid table/application name
*
* @param topic input topic name
* @param topic2table topic to table map
* @return valid generated table/application name
*/
private static GeneratedName generateValidNameFromMap(String topic, Map<String, String> topic2table) {
final String PLACE_HOLDER = "_";
if (topic == null || topic.isEmpty()) {
throw SnowflakeErrors.ERROR_0020.getException("topic name: " + topic);
}
if (topic2table.containsKey(topic)) {
return topic2table.get(topic);
return GeneratedName.fromMap(topic2table.get(topic));
}

// try matching regex tables
for (String regexTopic : topic2table.keySet()) {
if (topic.matches(regexTopic)) {
return topic2table.get(regexTopic);
return GeneratedName.fromMap(topic2table.get(regexTopic));
}
}

if (Utils.isValidSnowflakeObjectIdentifier(topic)) {
return topic;
return GeneratedName.generated(topic);
}
int hash = Math.abs(topic.hashCode());

Expand Down Expand Up @@ -507,7 +551,7 @@ public static String generateValidName(String topic, Map<String, String> topic2t
result.append(PLACE_HOLDER);
result.append(hash);

return result.toString();
return GeneratedName.generated(result.toString());
}

public static Map<String, String> parseTopicToTableMap(String input) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,6 @@

public class TopicToTableModeExtractor {

private static Pattern topicRegexPattern =
Pattern.compile("\\[([0-9]-[0-9]|[a-z]-[a-z]|[A-Z]-[A-Z]|[!-@])\\][*+]?");

/** Defines whether single target table is fed by one or many source topics. */
public enum Topic2TableMode {
// Single topic = single table
Expand All @@ -30,15 +27,6 @@ private TopicToTableModeExtractor() {}
*/
public static Topic2TableMode determineTopic2TableMode(
Map<String, String> topic2TableMap, String topic) {

boolean anyTopicInMapIsRegex =
topic2TableMap.keySet().stream()
.anyMatch(topic2TableMapKey -> topicRegexPattern.matcher(topic2TableMapKey).find());

if (anyTopicInMapIsRegex) {
return Topic2TableMode.MANY_TOPICS_SINGLE_TABLE;
}

String tableName = Utils.tableName(topic, topic2TableMap);
return topic2TableMap.values().stream()
.filter(table -> table.equalsIgnoreCase(tableName))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,11 @@ private static BigInteger calculatePartitionPart(String topic, int partition) {
BigInteger partitionPart = BigInteger.valueOf(partition);
if (!Strings.isNullOrEmpty(topic)) {
// if topic is provided as part of the file prefix,
// 1. lets calculate stable hash code out of it,
// 1. let's calculate stable hash code out of it,
// 2. bit shift it by 16 bits left,
// 3. add 0x8000 (light up 15th bit as a marker)
// 4. add partition id (which should in production use cases never reach a value above 5.000
// partitions pers topic).
// partitions per topic).
// In theory - we would support 32767 partitions, which is more than any reasonable value for
// a single topic
byte[] bytes = topic.toUpperCase().getBytes(StandardCharsets.UTF_8);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package com.snowflake.kafka.connector.internal;

import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.SNOWPIPE_SINGLE_TABLE_MULTIPLE_TOPICS_FIX_ENABLED;
import static com.snowflake.kafka.connector.config.TopicToTableModeExtractor.determineTopic2TableMode;
import static com.snowflake.kafka.connector.internal.FileNameUtils.searchForMissingOffsets;
import static com.snowflake.kafka.connector.internal.metrics.MetricsUtil.BUFFER_RECORD_COUNT;
import static com.snowflake.kafka.connector.internal.metrics.MetricsUtil.BUFFER_SIZE_BYTES;
Expand Down Expand Up @@ -126,11 +125,13 @@ class SnowflakeSinkServiceV1 implements SnowflakeSinkService {
* Create new ingestion task from existing table and stage, tries to reuse existing pipe and
* recover previous task, otherwise, create a new pipe.
*
* @param tableName destination table name in Snowflake
* @param ignoredTableName destination table name in Snowflake. Is ignored and recalculated to accommodate proper cleaning of staged files.
* @param topicPartition TopicPartition passed from Kafka
*/
@Override
public void startPartition(final String tableName, final TopicPartition topicPartition) {
public void startPartition(final String ignoredTableName, final TopicPartition topicPartition) {
Utils.GeneratedName generatedTableName = Utils.generateTableName(topicPartition.topic(), topic2TableMap);
final String tableName = generatedTableName.name;
String stageName = Utils.stageName(conn.getConnectorName(), tableName);
String nameIndex = getNameIndex(topicPartition.topic(), topicPartition.partition());
if (pipes.containsKey(nameIndex)) {
Expand All @@ -142,7 +143,7 @@ public void startPartition(final String tableName, final TopicPartition topicPar
pipes.put(
nameIndex,
new ServiceContext(
tableName,
generatedTableName,
stageName,
pipeName,
topicPartition.topic(),
Expand Down Expand Up @@ -486,30 +487,27 @@ private class ServiceContext {
private boolean forceCleanerFileReset = false;

private ServiceContext(
String tableName,
Utils.GeneratedName generatedTableName,
String stageName,
String pipeName,
String topicName,
SnowflakeConnectionService conn,
int partition,
ScheduledExecutorService v2CleanerExecutor) {
this.pipeName = pipeName;
this.tableName = tableName;
this.tableName = generatedTableName.name;
this.stageName = stageName;
this.conn = conn;
this.fileNames = new LinkedList<>();
this.cleanerFileNames = new LinkedList<>();
this.buffer = new SnowpipeBuffer();
this.ingestionService = conn.buildIngestService(stageName, pipeName);
// SNOW-1642799 = if multiple topics load data into single table, we need to ensure prefix is
// unique per table - otherwise, file cleaners for different channels may run into race
// unique per topic - otherwise, file cleaners for different channels may run into race
// condition
TopicToTableModeExtractor.Topic2TableMode mode =
determineTopic2TableMode(topic2TableMap, topicName);
if (mode == TopicToTableModeExtractor.Topic2TableMode.MANY_TOPICS_SINGLE_TABLE
&& !enableStageFilePrefixExtension) {
if (generatedTableName.isNameFromMap && !enableStageFilePrefixExtension) {
LOGGER.warn(
"The table {} is used as ingestion target by multiple topics - including this one"
"The table {} may be used as ingestion target by multiple topics - including this one"
+ " '{}'.\n"
+ "To prevent potential data loss consider setting"
+ " '"
Expand All @@ -518,10 +516,8 @@ private ServiceContext(
topicName,
tableName);
}
if (mode == TopicToTableModeExtractor.Topic2TableMode.MANY_TOPICS_SINGLE_TABLE
&& enableStageFilePrefixExtension) {
this.prefix =
FileNameUtils.filePrefix(conn.getConnectorName(), tableName, topicName, partition);
if (generatedTableName.isNameFromMap && enableStageFilePrefixExtension) {
this.prefix = FileNameUtils.filePrefix(conn.getConnectorName(), tableName, topicName, partition);
} else {
this.prefix = FileNameUtils.filePrefix(conn.getConnectorName(), tableName, "", partition);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,12 @@

import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.*;
import static com.snowflake.kafka.connector.Utils.HTTP_NON_PROXY_HOSTS;
import static com.snowflake.kafka.connector.config.TopicToTableModeExtractor.Topic2TableMode.MANY_TOPICS_SINGLE_TABLE;
import static com.snowflake.kafka.connector.config.TopicToTableModeExtractor.Topic2TableMode.SINGLE_TOPIC_SINGLE_TABLE;
import static com.snowflake.kafka.connector.internal.TestUtils.getConfig;
import static org.assertj.core.api.Assertions.*;
import static org.junit.Assert.assertEquals;

import com.snowflake.kafka.connector.config.IcebergConfigValidator;
import com.snowflake.kafka.connector.config.SnowflakeSinkConnectorConfigBuilder;
import com.snowflake.kafka.connector.config.TopicToTableModeExtractor;
import com.snowflake.kafka.connector.internal.SnowflakeErrors;
import com.snowflake.kafka.connector.internal.SnowflakeKafkaConnectorException;
import com.snowflake.kafka.connector.internal.streaming.DefaultStreamingConfigValidator;
Expand Down Expand Up @@ -223,32 +220,6 @@ public void testNameMapCovered() {
connectorConfigValidator.validateConfig(config);
}

@ParameterizedTest
@MethodSource("topicToTableTestData")
public void testTopic2TableCorrectlyDeterminesMode(
String topicToTable, String topic, TopicToTableModeExtractor.Topic2TableMode expected) {
// given
Map<String, String> topic2Table = Utils.parseTopicToTableMap(topicToTable);

// when
TopicToTableModeExtractor.Topic2TableMode actual =
TopicToTableModeExtractor.determineTopic2TableMode(topic2Table, topic);

// then
assertThat(actual).isEqualTo(expected);
}

public static Stream<Arguments> topicToTableTestData() {
return Stream.of(
Arguments.of("src1:target1,src2:target2,src3:target1", "src1", MANY_TOPICS_SINGLE_TABLE),
Arguments.of("src1:target1,src2:target2,src3:target1", "src2", SINGLE_TOPIC_SINGLE_TABLE),
Arguments.of("topic[0-9]:tableA", "tableA", MANY_TOPICS_SINGLE_TABLE),
Arguments.of("to[0-9]pic:tableA", "tableA", MANY_TOPICS_SINGLE_TABLE),
Arguments.of("[0-9]topic:tableA", "tableA", MANY_TOPICS_SINGLE_TABLE),
Arguments.of("topic[a-z]:tableA", "tableA", MANY_TOPICS_SINGLE_TABLE),
Arguments.of("topic[0-9]:tableA", "randomTopicName", MANY_TOPICS_SINGLE_TABLE));
}

@Test
public void testBufferSizeRange() {
Map<String, String> config = getConfig();
Expand Down
22 changes: 22 additions & 0 deletions src/test/java/com/snowflake/kafka/connector/UtilsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,28 @@ public void testTableName() {
assert Utils.tableName(topic, topic2table).equals("_12345_" + Math.abs(topic.hashCode()));
}

@Test
public void testGenerateTableName() {
Map<String, String> topic2table = Utils.parseTopicToTableMap("ab@cd:abcd, 1234:_1234");

Utils.GeneratedName generatedTableName1 = Utils.generateTableName("ab@cd", topic2table);
Assert.assertEquals("ab@cd", generatedTableName1.name);
Assert.assertTrue(generatedTableName1.isNameFromMap);

TestUtils.assertError(SnowflakeErrors.ERROR_0020, () -> Utils.generateTableName("", topic2table));
TestUtils.assertError(SnowflakeErrors.ERROR_0020, () -> Utils.generateTableName(null, topic2table));

String topic = "bc*def";
Utils.GeneratedName generatedTableName2 = Utils.generateTableName(topic, topic2table);
Assert.assertEquals("bc_def_" + Math.abs(topic.hashCode()), generatedTableName2.name);
Assert.assertFalse(generatedTableName2.isNameFromMap);

topic = "12345";
Utils.GeneratedName generatedTableName3 = Utils.generateTableName(topic, topic2table);
Assert.assertEquals("_12345_" + Math.abs(topic.hashCode()), generatedTableName3.name);
Assert.assertFalse(generatedTableName3.isNameFromMap);
}

@Test
public void testTableNameRegex() {
String catTable = "cat_table";
Expand Down

0 comments on commit 82ab2f1

Please sign in to comment.