diff --git a/src/main/java/com/snowflake/kafka/connector/internal/FileNameUtils.java b/src/main/java/com/snowflake/kafka/connector/internal/FileNameUtils.java index c3de6efe3..ce594f033 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/FileNameUtils.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/FileNameUtils.java @@ -213,16 +213,11 @@ static OffsetContinuityRanges searchForMissingOffsets(List filenames) { Pair.of( FileNameUtils.fileNameToStartOffset(file), FileNameUtils.fileNameToEndOffset(file))) + .sorted() .collect(Collectors.toList()); - for (int i = 0; i < continuousOffsets.size(); i++) { + for (int i = 1; i < continuousOffsets.size(); i++) { Pair current = continuousOffsets.get(i); - - // The first range is skipped - if (i == 0) { - continue; - } - Pair previous = continuousOffsets.get(i - 1); if (previous.getRight() + 1 != current.getLeft()) { diff --git a/src/main/java/com/snowflake/kafka/connector/internal/OffsetContinuityRanges.java b/src/main/java/com/snowflake/kafka/connector/internal/OffsetContinuityRanges.java index 8cdebf436..01c0f476f 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/OffsetContinuityRanges.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/OffsetContinuityRanges.java @@ -15,14 +15,14 @@ public OffsetContinuityRanges( } public String getContinuousOffsets() { - return parseList(continuousOffsets); + return serializeList(continuousOffsets); } public String getMissingOffsets() { - return parseList(missingOffsets); + return serializeList(missingOffsets); } - private static String parseList(List> list) { + private static String serializeList(List> list) { return list.stream() .map(range -> "[" + range.getLeft() + "," + range.getRight() + "]") .collect(Collectors.joining("", "[", "]")); diff --git a/src/test/java/com/snowflake/kafka/connector/internal/FileNameUtilsTest.java b/src/test/java/com/snowflake/kafka/connector/internal/FileNameUtilsTest.java index 3c5c6e2fc..1cddc5a06 100644 --- a/src/test/java/com/snowflake/kafka/connector/internal/FileNameUtilsTest.java +++ b/src/test/java/com/snowflake/kafka/connector/internal/FileNameUtilsTest.java @@ -206,6 +206,16 @@ public static Stream testData() { fileName(filePrefix, 1996, 2000), fileName(filePrefix, 2001, 2024)), "[[0,10][11,20][21,100][101,1991][1996,2000][2001,2024]]", + "[[1992,1995]]"), + Arguments.of( + Arrays.asList( + fileName(filePrefix, 1996, 2000), + fileName(filePrefix, 11, 20), + fileName(filePrefix, 21, 100), + fileName(filePrefix, 2001, 2024), + fileName(filePrefix, 101, 1991), + fileName(filePrefix, 0, 10)), + "[[0,10][11,20][21,100][101,1991][1996,2000][2001,2024]]", "[[1992,1995]]")); } }