Skip to content

Commit

Permalink
NO-SNOW Improve offset logging for Snowpipe (#1027)
Browse files Browse the repository at this point in the history
Co-authored-by: Wojciech Trefon <[email protected]>
  • Loading branch information
sfc-gh-mbobowski and sfc-gh-wtrefon authored Dec 11, 2024
1 parent 768d9fa commit c8e6df9
Show file tree
Hide file tree
Showing 5 changed files with 154 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,12 @@
import com.google.common.base.Strings;
import java.math.BigInteger;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.kafka.common.utils.Crc32C;

public class FileNameUtils {
Expand Down Expand Up @@ -192,4 +196,34 @@ private static String readFromFileName(String fileName, int index) {

return matcher.group(index);
}

/**
* Find gaps in offset ranges.
*
* @param filenames list of files
* @return continuous and missing offsets for given filenames
*/
static OffsetContinuityRanges searchForMissingOffsets(List<String> filenames) {
List<Pair<Long, Long>> missingOffsets = new ArrayList<>();

List<Pair<Long, Long>> continuousOffsets =
filenames.stream()
.map(
file ->
Pair.of(
FileNameUtils.fileNameToStartOffset(file),
FileNameUtils.fileNameToEndOffset(file)))
.sorted()
.collect(Collectors.toList());

for (int i = 1; i < continuousOffsets.size(); i++) {
Pair<Long, Long> current = continuousOffsets.get(i);
Pair<Long, Long> previous = continuousOffsets.get(i - 1);

if (previous.getRight() + 1 != current.getLeft()) {
missingOffsets.add(Pair.of(previous.getRight() + 1, current.getLeft() - 1));
}
}
return new OffsetContinuityRanges(continuousOffsets, missingOffsets);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,10 @@ public void error(String format, Object... vars) {
}
}

public boolean isDebugEnabled() {
return logger.isDebugEnabled();
}

private String getFormattedLogMessage(String format, Object... vars) {
if (prependMdcContext) {
String connCtx = MDC.get(MDC_CONN_CTX_KEY);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package com.snowflake.kafka.connector.internal;

import java.util.List;
import java.util.stream.Collectors;
import org.apache.commons.lang3.tuple.Pair;

public class OffsetContinuityRanges {
private final List<Pair<Long, Long>> continuousOffsets;
private final List<Pair<Long, Long>> missingOffsets;

public OffsetContinuityRanges(
List<Pair<Long, Long>> continuousOffsets, List<Pair<Long, Long>> missingOffsets) {
this.continuousOffsets = continuousOffsets;
this.missingOffsets = missingOffsets;
}

public String getContinuousOffsets() {
return serializeList(continuousOffsets);
}

public String getMissingOffsets() {
return serializeList(missingOffsets);
}

private static String serializeList(List<Pair<Long, Long>> list) {
return list.stream()
.map(range -> "[" + range.getLeft() + "," + range.getRight() + "]")
.collect(Collectors.joining("", "[", "]"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.SNOWPIPE_SINGLE_TABLE_MULTIPLE_TOPICS_FIX_ENABLED;
import static com.snowflake.kafka.connector.config.TopicToTableModeExtractor.determineTopic2TableMode;
import static com.snowflake.kafka.connector.internal.FileNameUtils.searchForMissingOffsets;
import static com.snowflake.kafka.connector.internal.metrics.MetricsUtil.BUFFER_RECORD_COUNT;
import static com.snowflake.kafka.connector.internal.metrics.MetricsUtil.BUFFER_SIZE_BYTES;
import static com.snowflake.kafka.connector.internal.metrics.MetricsUtil.BUFFER_SUB_DOMAIN;
Expand Down Expand Up @@ -1085,22 +1086,35 @@ private void filterResultFromSnowpipeScan(

private void purge(List<String> files) {
if (!files.isEmpty()) {
LOGGER.debug(
"Purging loaded files for pipe:{}, loadedFileCount:{}, loadedFiles:{}",
OffsetContinuityRanges offsets = searchForMissingOffsets(files);
LOGGER.info(
"Purging loaded files for pipe: {}, loadedFileCount: {}, continuousOffsets: {},"
+ " missingOffsets: {}",
pipeName,
files.size(),
Arrays.toString(files.toArray()));
offsets.getContinuousOffsets(),
offsets.getMissingOffsets());
LOGGER.debug("Purging files: {}", files);
conn.purgeStage(stageName, files);
}
}

private void moveToTableStage(List<String> failedFiles) {
if (!failedFiles.isEmpty()) {
LOGGER.debug(
"Moving failed files for pipe:{} to tableStage failedFileCount:{}, failedFiles:{}",
pipeName,
failedFiles.size(),
Arrays.toString(failedFiles.toArray()));
OffsetContinuityRanges offsets = searchForMissingOffsets(failedFiles);
String baseLog =
String.format(
"Moving failed files for pipe: %s to tableStage failedFileCount: %d,"
+ " continuousOffsets: %s, missingOffsets: %s",
pipeName,
failedFiles.size(),
offsets.getContinuousOffsets(),
offsets.getMissingOffsets());
if (LOGGER.isDebugEnabled()) {
LOGGER.info("{}, failedFiles: {}", baseLog, failedFiles);
} else {
LOGGER.info(baseLog);
}
conn.moveToTableStage(tableName, stageName, failedFiles);
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,21 @@
package com.snowflake.kafka.connector.internal;

import static com.snowflake.kafka.connector.internal.FileNameUtils.*;
import static com.snowflake.kafka.connector.internal.FileNameUtils.searchForMissingOffsets;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.stream.Stream;
import org.junit.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

public class FileNameUtilsTest {

@Test
public void testFileNameFunctions() throws InterruptedException {
int partition = 123;
Expand Down Expand Up @@ -154,4 +164,58 @@ public void testFileNameWontSupportMoreThan32767Partitions() {
endOffset))
.isInstanceOf(IllegalArgumentException.class);
}

@ParameterizedTest
@MethodSource("testData")
public void testSearchForMissingOffsets(
List<String> fileNames, String expectedContinuousOffsets, String expectedMissingOffsets) {
// when
OffsetContinuityRanges result = searchForMissingOffsets(fileNames);

// then
assertThat(result.getContinuousOffsets()).isEqualTo(expectedContinuousOffsets);
assertThat(result.getMissingOffsets()).isEqualTo(expectedMissingOffsets);
}

public static Stream<Arguments> testData() {
int partition = 123;
String tableName = "test_table";
String filePrefix = filePrefix(TestUtils.TEST_CONNECTOR_NAME, tableName, "topic", partition);

return Stream.of(
Arguments.of(Collections.emptyList(), "[]", "[]"),
Arguments.of(Collections.singletonList(fileName(filePrefix, 0, 10)), "[[0,10]]", "[]"),
Arguments.of(
Arrays.asList(fileName(filePrefix, 0, 10), fileName(filePrefix, 100, 2137)),
"[[0,10][100,2137]]",
"[[11,99]]"),
Arguments.of(
Arrays.asList(
fileName(filePrefix, 0, 10),
fileName(filePrefix, 11, 20),
fileName(filePrefix, 21, 100),
fileName(filePrefix, 101, 1991)),
"[[0,10][11,20][21,100][101,1991]]",
"[]"),
Arguments.of(
Arrays.asList(
fileName(filePrefix, 0, 10),
fileName(filePrefix, 11, 20),
fileName(filePrefix, 21, 100),
fileName(filePrefix, 101, 1991),
fileName(filePrefix, 1996, 2000),
fileName(filePrefix, 2001, 2024)),
"[[0,10][11,20][21,100][101,1991][1996,2000][2001,2024]]",
"[[1992,1995]]"),
Arguments.of(
Arrays.asList(
fileName(filePrefix, 1996, 2000),
fileName(filePrefix, 11, 20),
fileName(filePrefix, 21, 100),
fileName(filePrefix, 2001, 2024),
fileName(filePrefix, 101, 1991),
fileName(filePrefix, 0, 10)),
"[[0,10][11,20][21,100][101,1991][1996,2000][2001,2024]]",
"[[1992,1995]]"));
}
}

0 comments on commit c8e6df9

Please sign in to comment.