Skip to content

Commit

Permalink
NO-SNOW Rewritten to streams
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-mbobowski committed Dec 10, 2024
1 parent 813a66f commit a4dbded
Show file tree
Hide file tree
Showing 5 changed files with 105 additions and 76 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@
import com.google.common.base.Strings;
import java.math.BigInteger;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Comparator;
import java.util.ArrayList;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.kafka.common.utils.Crc32C;

public class FileNameUtils {
Expand Down Expand Up @@ -202,35 +203,32 @@ private static String readFromFileName(String fileName, int index) {
* @param files list of files
* @return string that must be logged
*/
static String prepareFilesOffsetsLogString(
List<String> files
) {
StringBuilder logString = new StringBuilder();
logString.append(", offset range: [");
long[][] offsetArray = new long[files.size()][2];
String file;
for (int i =0; i < files.size(); i++) {
file = files.get(i);
offsetArray[i][0] = FileNameUtils.fileNameToStartOffset(file);
offsetArray[i][1] = FileNameUtils.fileNameToEndOffset(file);
}
Arrays.sort(offsetArray, Comparator.comparing(a -> a[0]));
StringBuilder missingRangeString = new StringBuilder().append("[");
long nextExpectedStartOffset = offsetArray[0][0];
for (long[] offsetRange : offsetArray) {
logString.append("[").append(offsetRange[0]).append(",").append(offsetRange[1]).append("]");
if (offsetRange[0] != nextExpectedStartOffset) {
missingRangeString.append("[").append(nextExpectedStartOffset)
.append(",").append(offsetRange[0] -1).append("]");
static OffsetScanResult prepareFilesOffsetsLogString(List<String> files) {
List<Pair<Long, Long>> missingOffsets = new ArrayList<>();

List<Pair<Long, Long>> continousOffsets =
files.stream()
.map(
file ->
Pair.of(
FileNameUtils.fileNameToStartOffset(file),
FileNameUtils.fileNameToEndOffset(file)))
.collect(Collectors.toList());

for (int i = 0; i < continousOffsets.size(); i++) {
Pair<Long, Long> current = continousOffsets.get(i);

// The first range is skipped
if (i == 0) {
continue;
}
nextExpectedStartOffset = offsetRange[1] + 1;
}
logString.append("]");
missingRangeString.append("]");

if (missingRangeString.length() > 2) {
logString.append(", missing offset ranges :").append(missingRangeString);
Pair<Long, Long> previous = continousOffsets.get(i - 1);

if (previous.getRight() + 1 != current.getLeft()) {
missingOffsets.add(Pair.of(previous.getRight() + 1, current.getLeft() - 1));
}
}
return logString.toString();
return new OffsetScanResult(continousOffsets, missingOffsets);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -103,11 +103,4 @@ private String getFormattedLogMessage(String format, Object... vars) {

return Utils.formatLogMessage(format, vars);
}

/**
* Check if DEBUG or TRACE logging is enabled
*/
public boolean isDebugOrTraceEnabled() {
return this.logger.isDebugEnabled() || this.logger.isTraceEnabled();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package com.snowflake.kafka.connector.internal;

import java.util.List;
import java.util.stream.Collectors;
import org.apache.commons.lang3.tuple.Pair;

public class OffsetScanResult {
private final List<Pair<Long, Long>> continuousOffsets;
private final List<Pair<Long, Long>> missingOffsets;

private static String parseList(List<Pair<Long, Long>> list) {
return list.stream()
.map(range -> "[" + range.getLeft() + "," + range.getRight() + "]")
.collect(Collectors.joining("", "[", "]"));
}

public OffsetScanResult(
List<Pair<Long, Long>> continuousOffsets, List<Pair<Long, Long>> missingOffsets) {
this.continuousOffsets = continuousOffsets;
this.missingOffsets = missingOffsets;
}

public String getContinuousOffsets() {
return parseList(continuousOffsets);
}

public String getMissingOffsets() {
return parseList(missingOffsets);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1086,18 +1086,28 @@ private void filterResultFromSnowpipeScan(

private void purge(List<String> files) {
if (!files.isEmpty()) {
LOGGER.info("Purging loaded files for pipe: {}, loadedFileCount: {}, offsets: {}", pipeName, files.size(), prepareFilesOffsetsLogString(
files));
OffsetScanResult offsets = prepareFilesOffsetsLogString(files);
LOGGER.info(
"Purging loaded files for pipe: {}, loadedFileCount: {}, continuousOffsets: {},"
+ " missingOffsets: {}",
pipeName,
files.size(),
offsets.getContinuousOffsets(),
offsets.getMissingOffsets());
conn.purgeStage(stageName, files);
}
}

private void moveToTableStage(List<String> failedFiles) {
if (!failedFiles.isEmpty()) {
OffsetScanResult offsets = prepareFilesOffsetsLogString(failedFiles);
LOGGER.info(
"Moving failed files for pipe: {} to tableStage failedFileCount: {}, offsets: {}",
pipeName, failedFiles.size(), prepareFilesOffsetsLogString(failedFiles)
);
"Moving failed files for pipe: {} to tableStage failedFileCount: {}, continuousOffsets:"
+ " {}, missingOffsets: {}",
pipeName,
failedFiles.size(),
offsets.getContinuousOffsets(),
offsets.getMissingOffsets());
conn.moveToTableStage(tableName, stageName, failedFiles);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,15 @@
import static com.snowflake.kafka.connector.internal.FileNameUtils.prepareFilesOffsetsLogString;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.junit.Assert.assertEquals;

import com.google.common.collect.Lists;
import org.junit.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.stream.Stream;
import org.junit.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

public class FileNameUtilsTest {

Expand Down Expand Up @@ -171,12 +167,14 @@ public void testFileNameWontSupportMoreThan32767Partitions() {

@ParameterizedTest
@MethodSource("testData")
public void testPrepareFilesOffsetsLogString(List<String> fileNames, String result) {
public void testPrepareFilesOffsetsLogString(
List<String> fileNames, String expectedContinuousOffsets, String expectedMissingOffsets) {
// when
String resultString = prepareFilesOffsetsLogString(fileNames);
OffsetScanResult result = prepareFilesOffsetsLogString(fileNames);

// then
assertThat(resultString).isEqualTo(result);
assertThat(result.getContinuousOffsets()).isEqualTo(expectedContinuousOffsets);
assertThat(result.getMissingOffsets()).isEqualTo(expectedMissingOffsets);
}

public static Stream<Arguments> testData() {
Expand All @@ -185,29 +183,29 @@ public static Stream<Arguments> testData() {
String filePrefix = filePrefix(TestUtils.TEST_CONNECTOR_NAME, tableName, "topic", partition);

return Stream.of(
Arguments.of(
Arrays.asList(
fileName(filePrefix, 0, 10),
fileName(filePrefix, 11, 20),
fileName(filePrefix, 21, 100),
fileName(filePrefix, 101, 1991)
),
", offset range: [[0,10][11,20][21,100][101,1991]]"
),
Arguments.of(
Arrays.asList(
fileName(filePrefix, 0, 10),
fileName(filePrefix, 11, 20),
fileName(filePrefix, 21, 100),
fileName(filePrefix, 101, 1991),
fileName(filePrefix, 1996, 2000),
fileName(filePrefix, 2001, 2024)

),
", offset range: [[0,10][11,20][21,100][101,1991][1996,2000][2001,2024]], missing offset ranges :[[1992,1995]]"
)
);
Arguments.of(Collections.emptyList(), "[]", "[]"),
Arguments.of(Collections.singletonList(fileName(filePrefix, 0, 10)), "[[0,10]]", "[]"),
Arguments.of(
Arrays.asList(fileName(filePrefix, 0, 10), fileName(filePrefix, 100, 2137)),
"[[0,10][100,2137]]",
"[[11,99]]"),
Arguments.of(
Arrays.asList(
fileName(filePrefix, 0, 10),
fileName(filePrefix, 11, 20),
fileName(filePrefix, 21, 100),
fileName(filePrefix, 101, 1991)),
"[[0,10][11,20][21,100][101,1991]]",
"[]"),
Arguments.of(
Arrays.asList(
fileName(filePrefix, 0, 10),
fileName(filePrefix, 11, 20),
fileName(filePrefix, 21, 100),
fileName(filePrefix, 101, 1991),
fileName(filePrefix, 1996, 2000),
fileName(filePrefix, 2001, 2024)),
"[[0,10][11,20][21,100][101,1991][1996,2000][2001,2024]]",
"[[1992,1995]]"));
}


}

0 comments on commit a4dbded

Please sign in to comment.