Skip to content

Commit

Permalink
SNOW-1859651 Salt prefix for files mapped from different topics
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-dseweryn committed Dec 20, 2024
1 parent bf4ff0a commit 1dcffc7
Show file tree
Hide file tree
Showing 4 changed files with 110 additions and 29 deletions.
59 changes: 54 additions & 5 deletions src/main/java/com/snowflake/kafka/connector/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,29 @@ public static boolean isSnowpipeStreamingIngestion(Map<String, String> config) {
return !isSnowpipeIngestion(config);
}

/**
* Class for returned GeneratedName. isNameFromMap equal to True indicates that the name was
* resolved by using the map passed to appropriate function. {@link
* Utils#generateTableName(String, Map)}
*/
public static class GeneratedName {
public final String name;
public final boolean isNameFromMap;

private GeneratedName(String name, boolean isNameFromMap) {
this.name = name;
this.isNameFromMap = isNameFromMap;
}

private static GeneratedName fromMap(String name) {
return new GeneratedName(name, true);
}

private static GeneratedName generated(String name) {
return new GeneratedName(name, false);
}
}

/**
* modify invalid application name in config and return the generated application name
*
Expand All @@ -438,7 +461,7 @@ public static boolean isSnowpipeStreamingIngestion(Map<String, String> config) {
public static void convertAppName(Map<String, String> config) {
String appName = config.getOrDefault(SnowflakeSinkConnectorConfig.NAME, "");
// If appName is empty the following call will throw error
String validAppName = generateValidName(appName, new HashMap<String, String>());
String validAppName = generateValidName(appName, new HashMap<>());

config.put(SnowflakeSinkConnectorConfig.NAME, validAppName);
}
Expand All @@ -454,6 +477,20 @@ public static String tableName(String topic, Map<String, String> topic2table) {
return generateValidName(topic, topic2table);
}

/**
* Verify topic name and generate a valid table name. The returned GeneratedName has a flag
* isNameFromMap that indicates if the name was retrieved from the passed topic2table map which
* has particular outcomes for the SnowflakeSinkServiceV1
*
* @param topic input topic name
* @param topic2table topic to table map
* @return return GeneratedName with valid table name and a flag whether the name was taken from
* the topic2table
*/
public static GeneratedName generateTableName(String topic, Map<String, String> topic2table) {
return generateValidNameFromMap(topic, topic2table);
}

/**
* verify topic name, and generate valid table/application name
*
Expand All @@ -462,23 +499,35 @@ public static String tableName(String topic, Map<String, String> topic2table) {
* @return valid table/application name
*/
public static String generateValidName(String topic, Map<String, String> topic2table) {
return generateValidNameFromMap(topic, topic2table).name;
}

/**
* verify topic name, and generate valid table/application name
*
* @param topic input topic name
* @param topic2table topic to table map
* @return valid generated table/application name
*/
private static GeneratedName generateValidNameFromMap(
String topic, Map<String, String> topic2table) {
final String PLACE_HOLDER = "_";
if (topic == null || topic.isEmpty()) {
throw SnowflakeErrors.ERROR_0020.getException("topic name: " + topic);
}
if (topic2table.containsKey(topic)) {
return topic2table.get(topic);
return GeneratedName.fromMap(topic2table.get(topic));
}

// try matching regex tables
for (String regexTopic : topic2table.keySet()) {
if (topic.matches(regexTopic)) {
return topic2table.get(regexTopic);
return GeneratedName.fromMap(topic2table.get(regexTopic));
}
}

if (Utils.isValidSnowflakeObjectIdentifier(topic)) {
return topic;
return GeneratedName.generated(topic);
}
int hash = Math.abs(topic.hashCode());

Expand Down Expand Up @@ -507,7 +556,7 @@ public static String generateValidName(String topic, Map<String, String> topic2t
result.append(PLACE_HOLDER);
result.append(hash);

return result.toString();
return GeneratedName.generated(result.toString());
}

public static Map<String, String> parseTopicToTableMap(String input) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,11 @@ private static BigInteger calculatePartitionPart(String topic, int partition) {
BigInteger partitionPart = BigInteger.valueOf(partition);
if (!Strings.isNullOrEmpty(topic)) {
// if topic is provided as part of the file prefix,
// 1. lets calculate stable hash code out of it,
// 1. let's calculate stable hash code out of it,
// 2. bit shift it by 16 bits left,
// 3. add 0x8000 (light up 15th bit as a marker)
// 4. add partition id (which should in production use cases never reach a value above 5.000
// partitions pers topic).
// partitions per topic).
// In theory - we would support 32767 partitions, which is more than any reasonable value for
// a single topic
byte[] bytes = topic.toUpperCase().getBytes(StandardCharsets.UTF_8);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package com.snowflake.kafka.connector.internal;

import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.SNOWPIPE_SINGLE_TABLE_MULTIPLE_TOPICS_FIX_ENABLED;
import static com.snowflake.kafka.connector.config.TopicToTableModeExtractor.determineTopic2TableMode;
import static com.snowflake.kafka.connector.internal.FileNameUtils.searchForMissingOffsets;
import static com.snowflake.kafka.connector.internal.metrics.MetricsUtil.BUFFER_RECORD_COUNT;
import static com.snowflake.kafka.connector.internal.metrics.MetricsUtil.BUFFER_SIZE_BYTES;
Expand Down Expand Up @@ -126,11 +125,19 @@ class SnowflakeSinkServiceV1 implements SnowflakeSinkService {
* Create new ingestion task from existing table and stage, tries to reuse existing pipe and
* recover previous task, otherwise, create a new pipe.
*
* @param tableName destination table name in Snowflake
* @param ignoredTableName destination table name in Snowflake. Is ignored and recalculated to
* accommodate proper cleaning of staged files.
* @param topicPartition TopicPartition passed from Kafka
*/
@Override
public void startPartition(final String tableName, final TopicPartition topicPartition) {
public void startPartition(final String ignoredTableName, final TopicPartition topicPartition) {
Utils.GeneratedName generatedTableName =
Utils.generateTableName(topicPartition.topic(), topic2TableMap);
final String tableName = generatedTableName.name;
if (!tableName.equals(ignoredTableName)) {
LOGGER.warn(
"tableNames do not match: original={}, recalculated={}", ignoredTableName, tableName);
}
String stageName = Utils.stageName(conn.getConnectorName(), tableName);
String nameIndex = getNameIndex(topicPartition.topic(), topicPartition.partition());
if (pipes.containsKey(nameIndex)) {
Expand All @@ -142,7 +149,7 @@ public void startPartition(final String tableName, final TopicPartition topicPar
pipes.put(
nameIndex,
new ServiceContext(
tableName,
generatedTableName,
stageName,
pipeName,
topicPartition.topic(),
Expand Down Expand Up @@ -486,40 +493,34 @@ private class ServiceContext {
private boolean forceCleanerFileReset = false;

private ServiceContext(
String tableName,
Utils.GeneratedName generatedTableName,
String stageName,
String pipeName,
String topicName,
SnowflakeConnectionService conn,
int partition,
ScheduledExecutorService v2CleanerExecutor) {
this.pipeName = pipeName;
this.tableName = tableName;
this.tableName = generatedTableName.name;
this.stageName = stageName;
this.conn = conn;
this.fileNames = new LinkedList<>();
this.cleanerFileNames = new LinkedList<>();
this.buffer = new SnowpipeBuffer();
this.ingestionService = conn.buildIngestService(stageName, pipeName);
// SNOW-1642799 = if multiple topics load data into single table, we need to ensure prefix is
// unique per table - otherwise, file cleaners for different channels may run into race
// condition
TopicToTableModeExtractor.Topic2TableMode mode =
determineTopic2TableMode(topic2TableMap, topicName);
if (mode == TopicToTableModeExtractor.Topic2TableMode.MANY_TOPICS_SINGLE_TABLE
&& !enableStageFilePrefixExtension) {
// SNOW-1642799 = if multiple topics load data into single table, we need to ensure the file
// prefix is unique per topic - otherwise, file cleaners for different topics will try to
// clean the same prefixed files creating a race condition and a potential to delete
// not yet ingested files created by another topic
if (generatedTableName.isNameFromMap && !enableStageFilePrefixExtension) {
LOGGER.warn(
"The table {} is used as ingestion target by multiple topics - including this one"
+ " '{}'.\n"
+ "To prevent potential data loss consider setting"
+ " '"
+ SNOWPIPE_SINGLE_TABLE_MULTIPLE_TOPICS_FIX_ENABLED
+ "' to true",
"The table {} may be used as ingestion target by multiple topics - including this one"
+ " '{}'.\nTo prevent potential data loss consider setting '{}' to true",
tableName,
topicName,
tableName);
SNOWPIPE_SINGLE_TABLE_MULTIPLE_TOPICS_FIX_ENABLED);
}
if (mode == TopicToTableModeExtractor.Topic2TableMode.MANY_TOPICS_SINGLE_TABLE
&& enableStageFilePrefixExtension) {
if (generatedTableName.isNameFromMap && enableStageFilePrefixExtension) {
this.prefix =
FileNameUtils.filePrefix(conn.getConnectorName(), tableName, topicName, partition);
} else {
Expand Down
31 changes: 31 additions & 0 deletions src/test/java/com/snowflake/kafka/connector/UtilsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,37 @@ public void testTableName() {
assert Utils.tableName(topic, topic2table).equals("_12345_" + Math.abs(topic.hashCode()));
}

@Test
public void testGenerateTableName() {
Map<String, String> topic2table = Utils.parseTopicToTableMap("ab@cd:abcd, 1234:_1234");

String topic0 = "ab@cd";
Utils.GeneratedName generatedTableName1 = Utils.generateTableName(topic0, topic2table);
Assert.assertEquals("abcd", generatedTableName1.name);
Assert.assertTrue(generatedTableName1.isNameFromMap);

String topic1 = "1234";
Utils.GeneratedName generatedTableName2 = Utils.generateTableName(topic1, topic2table);
Assert.assertEquals("_1234", generatedTableName2.name);
Assert.assertTrue(generatedTableName2.isNameFromMap);

String topic2 = "bc*def";
Utils.GeneratedName generatedTableName3 = Utils.generateTableName(topic2, topic2table);
Assert.assertEquals("bc_def_" + Math.abs(topic2.hashCode()), generatedTableName3.name);
Assert.assertFalse(generatedTableName3.isNameFromMap);

String topic3 = "12345";
Utils.GeneratedName generatedTableName4 = Utils.generateTableName(topic3, topic2table);
Assert.assertEquals("_12345_" + Math.abs(topic3.hashCode()), generatedTableName4.name);
Assert.assertFalse(generatedTableName4.isNameFromMap);

TestUtils.assertError(
SnowflakeErrors.ERROR_0020, () -> Utils.generateTableName("", topic2table));
//noinspection DataFlowIssue
TestUtils.assertError(
SnowflakeErrors.ERROR_0020, () -> Utils.generateTableName(null, topic2table));
}

@Test
public void testTableNameRegex() {
String catTable = "cat_table";
Expand Down

0 comments on commit 1dcffc7

Please sign in to comment.