Skip to content

Commit

Permalink
Fix tests and address comment
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-dseweryn committed Dec 20, 2024
1 parent 1dcffc7 commit 9e42f22
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 23 deletions.
14 changes: 11 additions & 3 deletions src/main/java/com/snowflake/kafka/connector/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -436,8 +436,8 @@ public static boolean isSnowpipeStreamingIngestion(Map<String, String> config) {
* Utils#generateTableName(String, Map)}
*/
public static class GeneratedName {
public final String name;
public final boolean isNameFromMap;
private final String name;
private final boolean isNameFromMap;

private GeneratedName(String name, boolean isNameFromMap) {
this.name = name;
Expand All @@ -448,9 +448,17 @@ private static GeneratedName fromMap(String name) {
return new GeneratedName(name, true);
}

private static GeneratedName generated(String name) {
public static GeneratedName generated(String name) {
return new GeneratedName(name, false);
}

public String getName() {
return name;
}

public boolean isNameFromMap() {
return isNameFromMap;
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,18 +125,21 @@ class SnowflakeSinkServiceV1 implements SnowflakeSinkService {
* Create new ingestion task from existing table and stage, tries to reuse existing pipe and
* recover previous task, otherwise, create a new pipe.
*
* @param ignoredTableName destination table name in Snowflake. Is ignored and recalculated to
* accommodate proper cleaning of staged files.
* @param tableName destination table name in Snowflake
* @param topicPartition TopicPartition passed from Kafka
*/
@Override
public void startPartition(final String ignoredTableName, final TopicPartition topicPartition) {
public void startPartition(final String tableName, final TopicPartition topicPartition) {
Utils.GeneratedName generatedTableName =
Utils.generateTableName(topicPartition.topic(), topic2TableMap);
final String tableName = generatedTableName.name;
if (!tableName.equals(ignoredTableName)) {
if (!tableName.equals(generatedTableName.getName())) {
LOGGER.warn(
"tableNames do not match: original={}, recalculated={}", ignoredTableName, tableName);
"tableNames do not match, this is acceptable in tests but not in production! Resorting to"
+ " originalName and assuming no potential clashes on file prefixes. original={},"
+ " recalculated={}",
tableName,
generatedTableName.getName());
generatedTableName = Utils.GeneratedName.generated(tableName);
}
String stageName = Utils.stageName(conn.getConnectorName(), tableName);
String nameIndex = getNameIndex(topicPartition.topic(), topicPartition.partition());
Expand Down Expand Up @@ -501,7 +504,7 @@ private ServiceContext(
int partition,
ScheduledExecutorService v2CleanerExecutor) {
this.pipeName = pipeName;
this.tableName = generatedTableName.name;
this.tableName = generatedTableName.getName();
this.stageName = stageName;
this.conn = conn;
this.fileNames = new LinkedList<>();
Expand All @@ -512,19 +515,19 @@ private ServiceContext(
// prefix is unique per topic - otherwise, file cleaners for different topics will try to
// clean the same prefixed files creating a race condition and a potential to delete
// not yet ingested files created by another topic
if (generatedTableName.isNameFromMap && !enableStageFilePrefixExtension) {
if (generatedTableName.isNameFromMap() && !enableStageFilePrefixExtension) {
LOGGER.warn(
"The table {} may be used as ingestion target by multiple topics - including this one"
+ " '{}'.\nTo prevent potential data loss consider setting '{}' to true",
tableName,
topicName,
SNOWPIPE_SINGLE_TABLE_MULTIPLE_TOPICS_FIX_ENABLED);
}
if (generatedTableName.isNameFromMap && enableStageFilePrefixExtension) {
{
final String topicForPrefix =
generatedTableName.isNameFromMap() && enableStageFilePrefixExtension ? topicName : "";
this.prefix =
FileNameUtils.filePrefix(conn.getConnectorName(), tableName, topicName, partition);
} else {
this.prefix = FileNameUtils.filePrefix(conn.getConnectorName(), tableName, "", partition);
FileNameUtils.filePrefix(conn.getConnectorName(), tableName, topicForPrefix, partition);
}
this.processedOffset = new AtomicLong(-1);
this.flushedOffset = new AtomicLong(-1);
Expand Down
16 changes: 8 additions & 8 deletions src/test/java/com/snowflake/kafka/connector/UtilsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -118,23 +118,23 @@ public void testGenerateTableName() {

String topic0 = "ab@cd";
Utils.GeneratedName generatedTableName1 = Utils.generateTableName(topic0, topic2table);
Assert.assertEquals("abcd", generatedTableName1.name);
Assert.assertTrue(generatedTableName1.isNameFromMap);
Assert.assertEquals("abcd", generatedTableName1.getName());
Assert.assertTrue(generatedTableName1.isNameFromMap());

String topic1 = "1234";
Utils.GeneratedName generatedTableName2 = Utils.generateTableName(topic1, topic2table);
Assert.assertEquals("_1234", generatedTableName2.name);
Assert.assertTrue(generatedTableName2.isNameFromMap);
Assert.assertEquals("_1234", generatedTableName2.getName());
Assert.assertTrue(generatedTableName2.isNameFromMap());

String topic2 = "bc*def";
Utils.GeneratedName generatedTableName3 = Utils.generateTableName(topic2, topic2table);
Assert.assertEquals("bc_def_" + Math.abs(topic2.hashCode()), generatedTableName3.name);
Assert.assertFalse(generatedTableName3.isNameFromMap);
Assert.assertEquals("bc_def_" + Math.abs(topic2.hashCode()), generatedTableName3.getName());
Assert.assertFalse(generatedTableName3.isNameFromMap());

String topic3 = "12345";
Utils.GeneratedName generatedTableName4 = Utils.generateTableName(topic3, topic2table);
Assert.assertEquals("_12345_" + Math.abs(topic3.hashCode()), generatedTableName4.name);
Assert.assertFalse(generatedTableName4.isNameFromMap);
Assert.assertEquals("_12345_" + Math.abs(topic3.hashCode()), generatedTableName4.getName());
Assert.assertFalse(generatedTableName4.isNameFromMap());

TestUtils.assertError(
SnowflakeErrors.ERROR_0020, () -> Utils.generateTableName("", topic2table));
Expand Down

0 comments on commit 9e42f22

Please sign in to comment.