Skip to content

Commit

Permalink
NO-SNOW Code polishing
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-mbobowski committed Dec 10, 2024
1 parent a4dbded commit b354f54
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -198,37 +198,37 @@ private static String readFromFileName(String fileName, int index) {
}

/**
* construct a log string that contains offset-range of input files
* Find gaps in offset ranges.
*
* @param files list of files
* @return string that must be logged
* @param filenames list of files
* @return continuous and missing offsets for given filenames
*/
static OffsetScanResult prepareFilesOffsetsLogString(List<String> files) {
static OffsetContinuityRanges searchForMissingOffsets(List<String> filenames) {
List<Pair<Long, Long>> missingOffsets = new ArrayList<>();

List<Pair<Long, Long>> continousOffsets =
files.stream()
List<Pair<Long, Long>> continuousOffsets =
filenames.stream()
.map(
file ->
Pair.of(
FileNameUtils.fileNameToStartOffset(file),
FileNameUtils.fileNameToEndOffset(file)))
.collect(Collectors.toList());

for (int i = 0; i < continousOffsets.size(); i++) {
Pair<Long, Long> current = continousOffsets.get(i);
for (int i = 0; i < continuousOffsets.size(); i++) {
Pair<Long, Long> current = continuousOffsets.get(i);

// The first range is skipped
if (i == 0) {
continue;
}

Pair<Long, Long> previous = continousOffsets.get(i - 1);
Pair<Long, Long> previous = continuousOffsets.get(i - 1);

if (previous.getRight() + 1 != current.getLeft()) {
missingOffsets.add(Pair.of(previous.getRight() + 1, current.getLeft() - 1));
}
}
return new OffsetScanResult(continousOffsets, missingOffsets);
return new OffsetContinuityRanges(continuousOffsets, missingOffsets);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,11 @@
import java.util.stream.Collectors;
import org.apache.commons.lang3.tuple.Pair;

public class OffsetScanResult {
public class OffsetContinuityRanges {
private final List<Pair<Long, Long>> continuousOffsets;
private final List<Pair<Long, Long>> missingOffsets;

private static String parseList(List<Pair<Long, Long>> list) {
return list.stream()
.map(range -> "[" + range.getLeft() + "," + range.getRight() + "]")
.collect(Collectors.joining("", "[", "]"));
}

public OffsetScanResult(
public OffsetContinuityRanges(
List<Pair<Long, Long>> continuousOffsets, List<Pair<Long, Long>> missingOffsets) {
this.continuousOffsets = continuousOffsets;
this.missingOffsets = missingOffsets;
Expand All @@ -27,4 +21,10 @@ public String getContinuousOffsets() {
public String getMissingOffsets() {
return parseList(missingOffsets);
}

private static String parseList(List<Pair<Long, Long>> list) {
return list.stream()
.map(range -> "[" + range.getLeft() + "," + range.getRight() + "]")
.collect(Collectors.joining("", "[", "]"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.SNOWPIPE_SINGLE_TABLE_MULTIPLE_TOPICS_FIX_ENABLED;
import static com.snowflake.kafka.connector.config.TopicToTableModeExtractor.determineTopic2TableMode;
import static com.snowflake.kafka.connector.internal.FileNameUtils.prepareFilesOffsetsLogString;
import static com.snowflake.kafka.connector.internal.FileNameUtils.searchForMissingOffsets;
import static com.snowflake.kafka.connector.internal.metrics.MetricsUtil.BUFFER_RECORD_COUNT;
import static com.snowflake.kafka.connector.internal.metrics.MetricsUtil.BUFFER_SIZE_BYTES;
import static com.snowflake.kafka.connector.internal.metrics.MetricsUtil.BUFFER_SUB_DOMAIN;
Expand Down Expand Up @@ -1086,7 +1086,7 @@ private void filterResultFromSnowpipeScan(

private void purge(List<String> files) {
if (!files.isEmpty()) {
OffsetScanResult offsets = prepareFilesOffsetsLogString(files);
OffsetContinuityRanges offsets = searchForMissingOffsets(files);
LOGGER.info(
"Purging loaded files for pipe: {}, loadedFileCount: {}, continuousOffsets: {},"
+ " missingOffsets: {}",
Expand All @@ -1100,7 +1100,7 @@ private void purge(List<String> files) {

private void moveToTableStage(List<String> failedFiles) {
if (!failedFiles.isEmpty()) {
OffsetScanResult offsets = prepareFilesOffsetsLogString(failedFiles);
OffsetContinuityRanges offsets = searchForMissingOffsets(failedFiles);
LOGGER.info(
"Moving failed files for pipe: {} to tableStage failedFileCount: {}, continuousOffsets:"
+ " {}, missingOffsets: {}",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.snowflake.kafka.connector.internal;

import static com.snowflake.kafka.connector.internal.FileNameUtils.*;
import static com.snowflake.kafka.connector.internal.FileNameUtils.prepareFilesOffsetsLogString;
import static com.snowflake.kafka.connector.internal.FileNameUtils.searchForMissingOffsets;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

Expand Down Expand Up @@ -167,10 +167,10 @@ public void testFileNameWontSupportMoreThan32767Partitions() {

@ParameterizedTest
@MethodSource("testData")
public void testPrepareFilesOffsetsLogString(
public void testSearchForMissingOffsets(
List<String> fileNames, String expectedContinuousOffsets, String expectedMissingOffsets) {
// when
OffsetScanResult result = prepareFilesOffsetsLogString(fileNames);
OffsetContinuityRanges result = searchForMissingOffsets(fileNames);

// then
assertThat(result.getContinuousOffsets()).isEqualTo(expectedContinuousOffsets);
Expand Down

0 comments on commit b354f54

Please sign in to comment.