From 9e42f22267559b13fe73d3844749235d8eaf5b02 Mon Sep 17 00:00:00 2001 From: Dariusz Seweryn Date: Fri, 20 Dec 2024 14:14:44 +0100 Subject: [PATCH] Fix tests and address comment --- .../com/snowflake/kafka/connector/Utils.java | 14 +++++++--- .../internal/SnowflakeSinkServiceV1.java | 27 ++++++++++--------- .../snowflake/kafka/connector/UtilsTest.java | 16 +++++------ 3 files changed, 34 insertions(+), 23 deletions(-) diff --git a/src/main/java/com/snowflake/kafka/connector/Utils.java b/src/main/java/com/snowflake/kafka/connector/Utils.java index 2abeefaf7..2a472bafb 100644 --- a/src/main/java/com/snowflake/kafka/connector/Utils.java +++ b/src/main/java/com/snowflake/kafka/connector/Utils.java @@ -436,8 +436,8 @@ public static boolean isSnowpipeStreamingIngestion(Map config) { * Utils#generateTableName(String, Map)} */ public static class GeneratedName { - public final String name; - public final boolean isNameFromMap; + private final String name; + private final boolean isNameFromMap; private GeneratedName(String name, boolean isNameFromMap) { this.name = name; @@ -448,9 +448,17 @@ private static GeneratedName fromMap(String name) { return new GeneratedName(name, true); } - private static GeneratedName generated(String name) { + public static GeneratedName generated(String name) { return new GeneratedName(name, false); } + + public String getName() { + return name; + } + + public boolean isNameFromMap() { + return isNameFromMap; + } } /** diff --git a/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeSinkServiceV1.java b/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeSinkServiceV1.java index 26471d504..24869a88d 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeSinkServiceV1.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeSinkServiceV1.java @@ -125,18 +125,21 @@ class SnowflakeSinkServiceV1 implements SnowflakeSinkService { * Create new ingestion task from existing table and stage, tries to reuse existing pipe and * recover previous task, otherwise, create a new pipe. * - * @param ignoredTableName destination table name in Snowflake. Is ignored and recalculated to - * accommodate proper cleaning of staged files. + * @param tableName destination table name in Snowflake * @param topicPartition TopicPartition passed from Kafka */ @Override - public void startPartition(final String ignoredTableName, final TopicPartition topicPartition) { + public void startPartition(final String tableName, final TopicPartition topicPartition) { Utils.GeneratedName generatedTableName = Utils.generateTableName(topicPartition.topic(), topic2TableMap); - final String tableName = generatedTableName.name; - if (!tableName.equals(ignoredTableName)) { + if (!tableName.equals(generatedTableName.getName())) { LOGGER.warn( - "tableNames do not match: original={}, recalculated={}", ignoredTableName, tableName); + "tableNames do not match, this is acceptable in tests but not in production! Resorting to" + + " originalName and assuming no potential clashes on file prefixes. original={}," + + " recalculated={}", + tableName, + generatedTableName.getName()); + generatedTableName = Utils.GeneratedName.generated(tableName); } String stageName = Utils.stageName(conn.getConnectorName(), tableName); String nameIndex = getNameIndex(topicPartition.topic(), topicPartition.partition()); @@ -501,7 +504,7 @@ private ServiceContext( int partition, ScheduledExecutorService v2CleanerExecutor) { this.pipeName = pipeName; - this.tableName = generatedTableName.name; + this.tableName = generatedTableName.getName(); this.stageName = stageName; this.conn = conn; this.fileNames = new LinkedList<>(); @@ -512,7 +515,7 @@ private ServiceContext( // prefix is unique per topic - otherwise, file cleaners for different topics will try to // clean the same prefixed files creating a race condition and a potential to delete // not yet ingested files created by another topic - if (generatedTableName.isNameFromMap && !enableStageFilePrefixExtension) { + if (generatedTableName.isNameFromMap() && !enableStageFilePrefixExtension) { LOGGER.warn( "The table {} may be used as ingestion target by multiple topics - including this one" + " '{}'.\nTo prevent potential data loss consider setting '{}' to true", @@ -520,11 +523,11 @@ private ServiceContext( topicName, SNOWPIPE_SINGLE_TABLE_MULTIPLE_TOPICS_FIX_ENABLED); } - if (generatedTableName.isNameFromMap && enableStageFilePrefixExtension) { + { + final String topicForPrefix = + generatedTableName.isNameFromMap() && enableStageFilePrefixExtension ? topicName : ""; this.prefix = - FileNameUtils.filePrefix(conn.getConnectorName(), tableName, topicName, partition); - } else { - this.prefix = FileNameUtils.filePrefix(conn.getConnectorName(), tableName, "", partition); + FileNameUtils.filePrefix(conn.getConnectorName(), tableName, topicForPrefix, partition); } this.processedOffset = new AtomicLong(-1); this.flushedOffset = new AtomicLong(-1); diff --git a/src/test/java/com/snowflake/kafka/connector/UtilsTest.java b/src/test/java/com/snowflake/kafka/connector/UtilsTest.java index 7e26323e7..41e434154 100644 --- a/src/test/java/com/snowflake/kafka/connector/UtilsTest.java +++ b/src/test/java/com/snowflake/kafka/connector/UtilsTest.java @@ -118,23 +118,23 @@ public void testGenerateTableName() { String topic0 = "ab@cd"; Utils.GeneratedName generatedTableName1 = Utils.generateTableName(topic0, topic2table); - Assert.assertEquals("abcd", generatedTableName1.name); - Assert.assertTrue(generatedTableName1.isNameFromMap); + Assert.assertEquals("abcd", generatedTableName1.getName()); + Assert.assertTrue(generatedTableName1.isNameFromMap()); String topic1 = "1234"; Utils.GeneratedName generatedTableName2 = Utils.generateTableName(topic1, topic2table); - Assert.assertEquals("_1234", generatedTableName2.name); - Assert.assertTrue(generatedTableName2.isNameFromMap); + Assert.assertEquals("_1234", generatedTableName2.getName()); + Assert.assertTrue(generatedTableName2.isNameFromMap()); String topic2 = "bc*def"; Utils.GeneratedName generatedTableName3 = Utils.generateTableName(topic2, topic2table); - Assert.assertEquals("bc_def_" + Math.abs(topic2.hashCode()), generatedTableName3.name); - Assert.assertFalse(generatedTableName3.isNameFromMap); + Assert.assertEquals("bc_def_" + Math.abs(topic2.hashCode()), generatedTableName3.getName()); + Assert.assertFalse(generatedTableName3.isNameFromMap()); String topic3 = "12345"; Utils.GeneratedName generatedTableName4 = Utils.generateTableName(topic3, topic2table); - Assert.assertEquals("_12345_" + Math.abs(topic3.hashCode()), generatedTableName4.name); - Assert.assertFalse(generatedTableName4.isNameFromMap); + Assert.assertEquals("_12345_" + Math.abs(topic3.hashCode()), generatedTableName4.getName()); + Assert.assertFalse(generatedTableName4.isNameFromMap()); TestUtils.assertError( SnowflakeErrors.ERROR_0020, () -> Utils.generateTableName("", topic2table));