From b354f54f92201837d979ac100f7f13aad4fb5162 Mon Sep 17 00:00:00 2001 From: Michal Bobowski Date: Tue, 10 Dec 2024 13:44:01 +0100 Subject: [PATCH] NO-SNOW Code polishing --- .../connector/internal/FileNameUtils.java | 20 +++++++++---------- ...esult.java => OffsetContinuityRanges.java} | 16 +++++++-------- .../internal/SnowflakeSinkServiceV1.java | 6 +++--- .../connector/internal/FileNameUtilsTest.java | 6 +++--- 4 files changed, 24 insertions(+), 24 deletions(-) rename src/main/java/com/snowflake/kafka/connector/internal/{OffsetScanResult.java => OffsetContinuityRanges.java} (92%) diff --git a/src/main/java/com/snowflake/kafka/connector/internal/FileNameUtils.java b/src/main/java/com/snowflake/kafka/connector/internal/FileNameUtils.java index 08402b207..c3de6efe3 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/FileNameUtils.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/FileNameUtils.java @@ -198,16 +198,16 @@ private static String readFromFileName(String fileName, int index) { } /** - * construct a log string that contains offset-range of input files + * Find gaps in offset ranges. * - * @param files list of files - * @return string that must be logged + * @param filenames list of files + * @return continuous and missing offsets for given filenames */ - static OffsetScanResult prepareFilesOffsetsLogString(List files) { + static OffsetContinuityRanges searchForMissingOffsets(List filenames) { List> missingOffsets = new ArrayList<>(); - List> continousOffsets = - files.stream() + List> continuousOffsets = + filenames.stream() .map( file -> Pair.of( @@ -215,20 +215,20 @@ static OffsetScanResult prepareFilesOffsetsLogString(List files) { FileNameUtils.fileNameToEndOffset(file))) .collect(Collectors.toList()); - for (int i = 0; i < continousOffsets.size(); i++) { - Pair current = continousOffsets.get(i); + for (int i = 0; i < continuousOffsets.size(); i++) { + Pair current = continuousOffsets.get(i); // The first range is skipped if (i == 0) { continue; } - Pair previous = continousOffsets.get(i - 1); + Pair previous = continuousOffsets.get(i - 1); if (previous.getRight() + 1 != current.getLeft()) { missingOffsets.add(Pair.of(previous.getRight() + 1, current.getLeft() - 1)); } } - return new OffsetScanResult(continousOffsets, missingOffsets); + return new OffsetContinuityRanges(continuousOffsets, missingOffsets); } } diff --git a/src/main/java/com/snowflake/kafka/connector/internal/OffsetScanResult.java b/src/main/java/com/snowflake/kafka/connector/internal/OffsetContinuityRanges.java similarity index 92% rename from src/main/java/com/snowflake/kafka/connector/internal/OffsetScanResult.java rename to src/main/java/com/snowflake/kafka/connector/internal/OffsetContinuityRanges.java index 7f4339aaf..8cdebf436 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/OffsetScanResult.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/OffsetContinuityRanges.java @@ -4,17 +4,11 @@ import java.util.stream.Collectors; import org.apache.commons.lang3.tuple.Pair; -public class OffsetScanResult { +public class OffsetContinuityRanges { private final List> continuousOffsets; private final List> missingOffsets; - private static String parseList(List> list) { - return list.stream() - .map(range -> "[" + range.getLeft() + "," + range.getRight() + "]") - .collect(Collectors.joining("", "[", "]")); - } - - public OffsetScanResult( + public OffsetContinuityRanges( List> continuousOffsets, List> missingOffsets) { this.continuousOffsets = continuousOffsets; this.missingOffsets = missingOffsets; @@ -27,4 +21,10 @@ public String getContinuousOffsets() { public String getMissingOffsets() { return parseList(missingOffsets); } + + private static String parseList(List> list) { + return list.stream() + .map(range -> "[" + range.getLeft() + "," + range.getRight() + "]") + .collect(Collectors.joining("", "[", "]")); + } } diff --git a/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeSinkServiceV1.java b/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeSinkServiceV1.java index c8b9255a6..31bf278ff 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeSinkServiceV1.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeSinkServiceV1.java @@ -2,7 +2,7 @@ import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.SNOWPIPE_SINGLE_TABLE_MULTIPLE_TOPICS_FIX_ENABLED; import static com.snowflake.kafka.connector.config.TopicToTableModeExtractor.determineTopic2TableMode; -import static com.snowflake.kafka.connector.internal.FileNameUtils.prepareFilesOffsetsLogString; +import static com.snowflake.kafka.connector.internal.FileNameUtils.searchForMissingOffsets; import static com.snowflake.kafka.connector.internal.metrics.MetricsUtil.BUFFER_RECORD_COUNT; import static com.snowflake.kafka.connector.internal.metrics.MetricsUtil.BUFFER_SIZE_BYTES; import static com.snowflake.kafka.connector.internal.metrics.MetricsUtil.BUFFER_SUB_DOMAIN; @@ -1086,7 +1086,7 @@ private void filterResultFromSnowpipeScan( private void purge(List files) { if (!files.isEmpty()) { - OffsetScanResult offsets = prepareFilesOffsetsLogString(files); + OffsetContinuityRanges offsets = searchForMissingOffsets(files); LOGGER.info( "Purging loaded files for pipe: {}, loadedFileCount: {}, continuousOffsets: {}," + " missingOffsets: {}", @@ -1100,7 +1100,7 @@ private void purge(List files) { private void moveToTableStage(List failedFiles) { if (!failedFiles.isEmpty()) { - OffsetScanResult offsets = prepareFilesOffsetsLogString(failedFiles); + OffsetContinuityRanges offsets = searchForMissingOffsets(failedFiles); LOGGER.info( "Moving failed files for pipe: {} to tableStage failedFileCount: {}, continuousOffsets:" + " {}, missingOffsets: {}", diff --git a/src/test/java/com/snowflake/kafka/connector/internal/FileNameUtilsTest.java b/src/test/java/com/snowflake/kafka/connector/internal/FileNameUtilsTest.java index e8fed3b77..3c5c6e2fc 100644 --- a/src/test/java/com/snowflake/kafka/connector/internal/FileNameUtilsTest.java +++ b/src/test/java/com/snowflake/kafka/connector/internal/FileNameUtilsTest.java @@ -1,7 +1,7 @@ package com.snowflake.kafka.connector.internal; import static com.snowflake.kafka.connector.internal.FileNameUtils.*; -import static com.snowflake.kafka.connector.internal.FileNameUtils.prepareFilesOffsetsLogString; +import static com.snowflake.kafka.connector.internal.FileNameUtils.searchForMissingOffsets; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -167,10 +167,10 @@ public void testFileNameWontSupportMoreThan32767Partitions() { @ParameterizedTest @MethodSource("testData") - public void testPrepareFilesOffsetsLogString( + public void testSearchForMissingOffsets( List fileNames, String expectedContinuousOffsets, String expectedMissingOffsets) { // when - OffsetScanResult result = prepareFilesOffsetsLogString(fileNames); + OffsetContinuityRanges result = searchForMissingOffsets(fileNames); // then assertThat(result.getContinuousOffsets()).isEqualTo(expectedContinuousOffsets);