diff --git a/src/main/java/com/snowflake/kafka/connector/internal/FileNameUtils.java b/src/main/java/com/snowflake/kafka/connector/internal/FileNameUtils.java index 938fe0dba..08402b207 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/FileNameUtils.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/FileNameUtils.java @@ -3,11 +3,12 @@ import com.google.common.base.Strings; import java.math.BigInteger; import java.nio.charset.StandardCharsets; -import java.util.Arrays; -import java.util.Comparator; +import java.util.ArrayList; import java.util.List; import java.util.regex.Matcher; import java.util.regex.Pattern; +import java.util.stream.Collectors; +import org.apache.commons.lang3.tuple.Pair; import org.apache.kafka.common.utils.Crc32C; public class FileNameUtils { @@ -202,35 +203,32 @@ private static String readFromFileName(String fileName, int index) { * @param files list of files * @return string that must be logged */ - static String prepareFilesOffsetsLogString( - List files - ) { - StringBuilder logString = new StringBuilder(); - logString.append(", offset range: ["); - long[][] offsetArray = new long[files.size()][2]; - String file; - for (int i =0; i < files.size(); i++) { - file = files.get(i); - offsetArray[i][0] = FileNameUtils.fileNameToStartOffset(file); - offsetArray[i][1] = FileNameUtils.fileNameToEndOffset(file); - } - Arrays.sort(offsetArray, Comparator.comparing(a -> a[0])); - StringBuilder missingRangeString = new StringBuilder().append("["); - long nextExpectedStartOffset = offsetArray[0][0]; - for (long[] offsetRange : offsetArray) { - logString.append("[").append(offsetRange[0]).append(",").append(offsetRange[1]).append("]"); - if (offsetRange[0] != nextExpectedStartOffset) { - missingRangeString.append("[").append(nextExpectedStartOffset) - .append(",").append(offsetRange[0] -1).append("]"); + static OffsetScanResult prepareFilesOffsetsLogString(List files) { + List> missingOffsets = new ArrayList<>(); + + List> continousOffsets = + files.stream() + .map( + file -> + Pair.of( + FileNameUtils.fileNameToStartOffset(file), + FileNameUtils.fileNameToEndOffset(file))) + .collect(Collectors.toList()); + + for (int i = 0; i < continousOffsets.size(); i++) { + Pair current = continousOffsets.get(i); + + // The first range is skipped + if (i == 0) { + continue; } - nextExpectedStartOffset = offsetRange[1] + 1; - } - logString.append("]"); - missingRangeString.append("]"); - if (missingRangeString.length() > 2) { - logString.append(", missing offset ranges :").append(missingRangeString); + Pair previous = continousOffsets.get(i - 1); + + if (previous.getRight() + 1 != current.getLeft()) { + missingOffsets.add(Pair.of(previous.getRight() + 1, current.getLeft() - 1)); + } } - return logString.toString(); + return new OffsetScanResult(continousOffsets, missingOffsets); } } diff --git a/src/main/java/com/snowflake/kafka/connector/internal/KCLogger.java b/src/main/java/com/snowflake/kafka/connector/internal/KCLogger.java index 694765e13..91a9494ab 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/KCLogger.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/KCLogger.java @@ -103,11 +103,4 @@ private String getFormattedLogMessage(String format, Object... vars) { return Utils.formatLogMessage(format, vars); } - - /** - * Check if DEBUG or TRACE logging is enabled - */ - public boolean isDebugOrTraceEnabled() { - return this.logger.isDebugEnabled() || this.logger.isTraceEnabled(); - } } diff --git a/src/main/java/com/snowflake/kafka/connector/internal/OffsetScanResult.java b/src/main/java/com/snowflake/kafka/connector/internal/OffsetScanResult.java new file mode 100644 index 000000000..7f4339aaf --- /dev/null +++ b/src/main/java/com/snowflake/kafka/connector/internal/OffsetScanResult.java @@ -0,0 +1,30 @@ +package com.snowflake.kafka.connector.internal; + +import java.util.List; +import java.util.stream.Collectors; +import org.apache.commons.lang3.tuple.Pair; + +public class OffsetScanResult { + private final List> continuousOffsets; + private final List> missingOffsets; + + private static String parseList(List> list) { + return list.stream() + .map(range -> "[" + range.getLeft() + "," + range.getRight() + "]") + .collect(Collectors.joining("", "[", "]")); + } + + public OffsetScanResult( + List> continuousOffsets, List> missingOffsets) { + this.continuousOffsets = continuousOffsets; + this.missingOffsets = missingOffsets; + } + + public String getContinuousOffsets() { + return parseList(continuousOffsets); + } + + public String getMissingOffsets() { + return parseList(missingOffsets); + } +} diff --git a/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeSinkServiceV1.java b/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeSinkServiceV1.java index 25e33abbf..c8b9255a6 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeSinkServiceV1.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeSinkServiceV1.java @@ -1086,18 +1086,28 @@ private void filterResultFromSnowpipeScan( private void purge(List files) { if (!files.isEmpty()) { - LOGGER.info("Purging loaded files for pipe: {}, loadedFileCount: {}, offsets: {}", pipeName, files.size(), prepareFilesOffsetsLogString( - files)); + OffsetScanResult offsets = prepareFilesOffsetsLogString(files); + LOGGER.info( + "Purging loaded files for pipe: {}, loadedFileCount: {}, continuousOffsets: {}," + + " missingOffsets: {}", + pipeName, + files.size(), + offsets.getContinuousOffsets(), + offsets.getMissingOffsets()); conn.purgeStage(stageName, files); } } private void moveToTableStage(List failedFiles) { if (!failedFiles.isEmpty()) { + OffsetScanResult offsets = prepareFilesOffsetsLogString(failedFiles); LOGGER.info( - "Moving failed files for pipe: {} to tableStage failedFileCount: {}, offsets: {}", - pipeName, failedFiles.size(), prepareFilesOffsetsLogString(failedFiles) - ); + "Moving failed files for pipe: {} to tableStage failedFileCount: {}, continuousOffsets:" + + " {}, missingOffsets: {}", + pipeName, + failedFiles.size(), + offsets.getContinuousOffsets(), + offsets.getMissingOffsets()); conn.moveToTableStage(tableName, stageName, failedFiles); } } diff --git a/src/test/java/com/snowflake/kafka/connector/internal/FileNameUtilsTest.java b/src/test/java/com/snowflake/kafka/connector/internal/FileNameUtilsTest.java index d4adb0012..e8fed3b77 100644 --- a/src/test/java/com/snowflake/kafka/connector/internal/FileNameUtilsTest.java +++ b/src/test/java/com/snowflake/kafka/connector/internal/FileNameUtilsTest.java @@ -4,19 +4,15 @@ import static com.snowflake.kafka.connector.internal.FileNameUtils.prepareFilesOffsetsLogString; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; -import static org.junit.Assert.assertEquals; -import com.google.common.collect.Lists; -import org.junit.Test; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.Arguments; -import org.junit.jupiter.params.provider.MethodSource; - -import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.stream.Stream; +import org.junit.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; public class FileNameUtilsTest { @@ -171,12 +167,14 @@ public void testFileNameWontSupportMoreThan32767Partitions() { @ParameterizedTest @MethodSource("testData") - public void testPrepareFilesOffsetsLogString(List fileNames, String result) { + public void testPrepareFilesOffsetsLogString( + List fileNames, String expectedContinuousOffsets, String expectedMissingOffsets) { // when - String resultString = prepareFilesOffsetsLogString(fileNames); + OffsetScanResult result = prepareFilesOffsetsLogString(fileNames); // then - assertThat(resultString).isEqualTo(result); + assertThat(result.getContinuousOffsets()).isEqualTo(expectedContinuousOffsets); + assertThat(result.getMissingOffsets()).isEqualTo(expectedMissingOffsets); } public static Stream testData() { @@ -185,29 +183,29 @@ public static Stream testData() { String filePrefix = filePrefix(TestUtils.TEST_CONNECTOR_NAME, tableName, "topic", partition); return Stream.of( - Arguments.of( - Arrays.asList( - fileName(filePrefix, 0, 10), - fileName(filePrefix, 11, 20), - fileName(filePrefix, 21, 100), - fileName(filePrefix, 101, 1991) - ), - ", offset range: [[0,10][11,20][21,100][101,1991]]" - ), - Arguments.of( - Arrays.asList( - fileName(filePrefix, 0, 10), - fileName(filePrefix, 11, 20), - fileName(filePrefix, 21, 100), - fileName(filePrefix, 101, 1991), - fileName(filePrefix, 1996, 2000), - fileName(filePrefix, 2001, 2024) - - ), - ", offset range: [[0,10][11,20][21,100][101,1991][1996,2000][2001,2024]], missing offset ranges :[[1992,1995]]" - ) - ); + Arguments.of(Collections.emptyList(), "[]", "[]"), + Arguments.of(Collections.singletonList(fileName(filePrefix, 0, 10)), "[[0,10]]", "[]"), + Arguments.of( + Arrays.asList(fileName(filePrefix, 0, 10), fileName(filePrefix, 100, 2137)), + "[[0,10][100,2137]]", + "[[11,99]]"), + Arguments.of( + Arrays.asList( + fileName(filePrefix, 0, 10), + fileName(filePrefix, 11, 20), + fileName(filePrefix, 21, 100), + fileName(filePrefix, 101, 1991)), + "[[0,10][11,20][21,100][101,1991]]", + "[]"), + Arguments.of( + Arrays.asList( + fileName(filePrefix, 0, 10), + fileName(filePrefix, 11, 20), + fileName(filePrefix, 21, 100), + fileName(filePrefix, 101, 1991), + fileName(filePrefix, 1996, 2000), + fileName(filePrefix, 2001, 2024)), + "[[0,10][11,20][21,100][101,1991][1996,2000][2001,2024]]", + "[[1992,1995]]")); } - - }