Skip to content

Commit

Permalink
update logic
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-tzhang committed Feb 14, 2024
1 parent de39f7e commit 1df51b4
Show file tree
Hide file tree
Showing 9 changed files with 44 additions and 107 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,7 @@ enum TelemetryType {
STREAMING_INGEST_LATENCY_IN_SEC("streaming_ingest_latency_in_ms"),
STREAMING_INGEST_CLIENT_FAILURE("streaming_ingest_client_failure"),
STREAMING_INGEST_THROUGHPUT_BYTES_PER_SEC("streaming_ingest_throughput_bytes_per_sec"),
STREAMING_INGEST_CPU_MEMORY_USAGE("streaming_ingest_cpu_memory_usage"),
STREAMING_INGEST_BATCH_OFFSET_MISMATCH("streaming_ingest_batch_offset_mismatch");
STREAMING_INGEST_CPU_MEMORY_USAGE("streaming_ingest_cpu_memory_usage");

private final String name;

Expand Down Expand Up @@ -123,21 +122,6 @@ public void reportCpuMemoryUsage(Histogram cpuUsage) {
}
}

public void reportBatchOffsetMismatch(
String channelName,
String prevBatchEndOffset,
String startOffset,
String endOffset,
long rowCount) {
ObjectNode msg = MAPPER.createObjectNode();
msg.put("channel_name", channelName);
msg.put("prev_batch_end_offset", prevBatchEndOffset);
msg.put("start_offset", startOffset);
msg.put("end_offset", endOffset);
msg.put("row_count", rowCount);
send(TelemetryType.STREAMING_INGEST_BATCH_OFFSET_MISMATCH, msg);
}

/** Send log to Snowflake asynchronously through JDBC client telemetry */
void send(TelemetryType type, ObjectNode msg) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ public InsertValidationResponse insertRows(
InsertValidationResponse response = new InsertValidationResponse();
float rowsSizeInBytes = 0F;
int rowIndex = 0;
int prevRowCount = rowBuffer.bufferedRowCount;
for (Map<String, Object> row : rows) {
InsertValidationResponse.InsertError error =
new InsertValidationResponse.InsertError(row, rowIndex);
Expand All @@ -173,9 +174,7 @@ public InsertValidationResponse insertRows(
rowIndex++;
}
checkBatchSizeRecommendedMaximum(rowsSizeInBytes);
checkOffsetMismatch(
rowBuffer.channelState.getOffsetToken(), startOffsetToken, endOffsetToken, rowIndex);
rowBuffer.channelState.setOffsetToken(endOffsetToken);
rowBuffer.channelState.updateOffsetToken(startOffsetToken, endOffsetToken, prevRowCount);
rowBuffer.bufferSize += rowsSizeInBytes;
rowBuffer.rowSizeMetric.accept(rowsSizeInBytes);
return response;
Expand Down Expand Up @@ -210,15 +209,14 @@ public InsertValidationResponse insertRows(
moveTempRowsToActualBuffer(tempRowCount);

rowsSizeInBytes = tempRowsSizeInBytes;
rowBuffer.bufferedRowCount += tempRowCount;
rowBuffer.statsMap.forEach(
(colName, stats) ->
rowBuffer.statsMap.put(
colName,
RowBufferStats.getCombinedStats(stats, rowBuffer.tempStatsMap.get(colName))));
checkOffsetMismatch(
rowBuffer.channelState.getOffsetToken(), startOffsetToken, endOffsetToken, tempRowCount);
rowBuffer.channelState.setOffsetToken(endOffsetToken);
rowBuffer.channelState.updateOffsetToken(
startOffsetToken, endOffsetToken, rowBuffer.bufferedRowCount);
rowBuffer.bufferedRowCount += tempRowCount;
rowBuffer.bufferSize += rowsSizeInBytes;
rowBuffer.rowSizeMetric.accept(rowsSizeInBytes);
return response;
Expand Down Expand Up @@ -265,15 +263,14 @@ public InsertValidationResponse insertRows(
checkBatchSizeRecommendedMaximum(tempRowsSizeInBytes);
moveTempRowsToActualBuffer(tempRowCount);
rowsSizeInBytes = tempRowsSizeInBytes;
rowBuffer.bufferedRowCount += tempRowCount;
rowBuffer.statsMap.forEach(
(colName, stats) ->
rowBuffer.statsMap.put(
colName,
RowBufferStats.getCombinedStats(stats, rowBuffer.tempStatsMap.get(colName))));
checkOffsetMismatch(
rowBuffer.channelState.getOffsetToken(), startOffsetToken, endOffsetToken, rowIndex);
rowBuffer.channelState.setOffsetToken(endOffsetToken);
rowBuffer.channelState.updateOffsetToken(
startOffsetToken, endOffsetToken, rowBuffer.bufferedRowCount);
rowBuffer.bufferedRowCount += tempRowCount;
rowBuffer.bufferSize += rowsSizeInBytes;
rowBuffer.rowSizeMetric.accept(rowsSizeInBytes);
}
Expand Down Expand Up @@ -316,17 +313,13 @@ public InsertValidationResponse insertRows(
// Buffer parameters that are set at the owning client level
final ClientBufferParameters clientBufferParameters;

// Telemetry service use to report telemetry to SF
private final TelemetryService telemetryService;

AbstractRowBuffer(
OpenChannelRequest.OnErrorOption onErrorOption,
ZoneId defaultTimezone,
String fullyQualifiedChannelName,
Consumer<Float> rowSizeMetric,
ChannelRuntimeState channelRuntimeState,
ClientBufferParameters clientBufferParameters,
TelemetryService telemetryService) {
ClientBufferParameters clientBufferParameters) {
this.onErrorOption = onErrorOption;
this.defaultTimezone = defaultTimezone;
this.rowSizeMetric = rowSizeMetric;
Expand All @@ -337,7 +330,6 @@ public InsertValidationResponse insertRows(
this.bufferedRowCount = 0;
this.bufferSize = 0F;
this.clientBufferParameters = clientBufferParameters;
this.telemetryService = telemetryService;

// Initialize empty stats
this.statsMap = new HashMap<>();
Expand Down Expand Up @@ -492,7 +484,7 @@ public ChannelData<T> flush(final String filePath) {
oldRowCount = this.bufferedRowCount;
oldBufferSize = this.bufferSize;
oldRowSequencer = this.channelState.incrementAndGetRowSequencer();
oldOffsetToken = this.channelState.getOffsetToken();
oldOffsetToken = this.channelState.getEndOffsetToken();
oldColumnEps = new HashMap<>(this.statsMap);
oldMinMaxInsertTimeInMs =
new Pair<>(
Expand Down Expand Up @@ -677,42 +669,6 @@ private void checkBatchSizeRecommendedMaximum(float batchSizeInBytes) {
}
}

/**
* We verify some offset expect behavior and report to SF if there is a mismatch. Note that there
* are false positives because the input could give us a batch with offset gaps. For example in
* Kafka Connector, we could have gaps if some of the offsets are filtered out by SMT.
*/
private void checkOffsetMismatch(
String prevEndOffset, String curStartOffset, String curEndOffset, int rowCount) {
if (telemetryService != null && curStartOffset != null) {
boolean reportMismatch = false;
try {
long curStart = Long.parseLong(curStartOffset);
long curEnd = Long.parseLong(curEndOffset);

// We verify that the end_offset - start_offset + 1 = row_count
if (curEnd - curStart + 1 != rowCount) {
reportMismatch = true;
}

// We verify that start_offset_of_current_batch = end_offset_of_previous_batch+1
if (prevEndOffset != null) {
long prevEnd = Long.parseLong(prevEndOffset);
if (curStart != prevEnd + 1) {
reportMismatch = true;
}
}
} catch (NumberFormatException ignored) {
// Do nothing since we can't compare the offset
}

if (reportMismatch) {
this.telemetryService.reportBatchOffsetMismatch(
channelFullyQualifiedName, prevEndOffset, curStartOffset, curEndOffset, rowCount);
}
}
}

/** Create the ingestion strategy based on the channel on_error option */
IngestionStrategy<T> createIngestionStrategy(OpenChannelRequest.OnErrorOption onErrorOption) {
switch (onErrorOption) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,11 @@ class ChannelRuntimeState {
// Indicates whether the channel is still valid
private volatile boolean isValid;

// The channel's current offset token
private volatile String offsetToken;
// The channel's current start offset token
private volatile String startOffsetToken;

// The channel's current end offset token
private volatile String endOffsetToken;

// The channel's current row sequencer
private final AtomicLong rowSequencer;
Expand All @@ -21,8 +24,8 @@ class ChannelRuntimeState {
private Long firstInsertInMs;
private Long lastInsertInMs;

ChannelRuntimeState(String offsetToken, long rowSequencer, boolean isValid) {
this.offsetToken = offsetToken;
ChannelRuntimeState(String endOffsetToken, long rowSequencer, boolean isValid) {
this.endOffsetToken = endOffsetToken;
this.rowSequencer = new AtomicLong(rowSequencer);
this.isValid = isValid;
}
Expand All @@ -41,9 +44,14 @@ void invalidate() {
isValid = false;
}

/** @return current offset token */
String getOffsetToken() {
return offsetToken;
/** @return current end offset token */
String getEndOffsetToken() {
return endOffsetToken;
}

/** @return current start offset token */
String getStartOffsetToken() {
return startOffsetToken;
}

/** @return current offset token after first incrementing it by one. */
Expand All @@ -57,12 +65,16 @@ long getRowSequencer() {
}

/**
* Updates the channel's offset token.
* Updates the channel's start and end offset token.
*
* @param offsetToken new offset token
* @param startOffsetToken new start offset token of the batch
* @param endOffsetToken new end offset token
*/
void setOffsetToken(String offsetToken) {
this.offsetToken = offsetToken;
void updateOffsetToken(String startOffsetToken, String endOffsetToken, int rowCount) {
if (rowCount == 0) {
this.startOffsetToken = startOffsetToken;
}
this.endOffsetToken = endOffsetToken;
}

/** Update the insert stats for the current row buffer whenever needed */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,7 @@ public class ParquetRowBuffer extends AbstractRowBuffer<ParquetChunkData> {
fullyQualifiedChannelName,
rowSizeMetric,
channelRuntimeState,
clientBufferParameters,
telemetryService);
clientBufferParameters);
this.fieldIndex = new HashMap<>();
this.metadata = new HashMap<>();
this.data = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ class SnowflakeStreamingIngestChannelInternal<T> implements SnowflakeStreamingIn
String dbName,
String schemaName,
String tableName,
String offsetToken,
String endOffsetToken,
Long channelSequencer,
Long rowSequencer,
SnowflakeStreamingIngestClientInternal<T> client,
Expand All @@ -117,7 +117,7 @@ class SnowflakeStreamingIngestChannelInternal<T> implements SnowflakeStreamingIn
this.channelFlushContext =
new ChannelFlushContext(
name, dbName, schemaName, tableName, channelSequencer, encryptionKey, encryptionKeyId);
this.channelState = new ChannelRuntimeState(offsetToken, rowSequencer, true);
this.channelState = new ChannelRuntimeState(endOffsetToken, rowSequencer, true);
this.rowBuffer =
AbstractRowBuffer.createRowBuffer(
onErrorOption,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -801,7 +801,7 @@ List<SnowflakeStreamingIngestChannelInternal<?>> verifyChannelsAreFullyCommitted
channelStatus.getStatusCode(),
channel.getChannelSequencer(),
rowSequencer,
channel.getChannelState().getOffsetToken(),
channel.getChannelState().getEndOffsetToken(),
channelStatus.getPersistedRowSequencer(),
channelStatus.getPersistedOffsetToken());
if (channelStatus.getStatusCode() != RESPONSE_SUCCESS) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,18 +73,4 @@ public void testReportCpuMemoryUsage() {
// Make sure there is no exception thrown
telemetryService.reportCpuMemoryUsage(cpuHistogram);
}

@Test
public void testReportBatchOffsetMismatch() {
CloseableHttpClient httpClient = Mockito.mock(CloseableHttpClient.class);

TelemetryService telemetryService =
Mockito.spy(
new TelemetryService(
httpClient, "testReportClientFailure", "snowflake.dev.local:8082"));
Mockito.doNothing().when(telemetryService).send(Mockito.any(), Mockito.any());

// Make sure there is no exception thrown
telemetryService.reportBatchOffsetMismatch("channel", "0", "1", "2", 1);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ public void testChannelFactorySuccess() {
Assert.assertEquals(dbName, channel.getDBName());
Assert.assertEquals(schemaName, channel.getSchemaName());
Assert.assertEquals(tableName, channel.getTableName());
Assert.assertEquals(offsetToken, channel.getChannelState().getOffsetToken());
Assert.assertEquals(offsetToken, channel.getChannelState().getEndOffsetToken());
Assert.assertEquals(channelSequencer, channel.getChannelSequencer());
Assert.assertEquals(rowSequencer + 1L, channel.getChannelState().incrementAndGetRowSequencer());
Assert.assertEquals(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,7 @@ public void testRegisterBlobRequestCreationSuccess() throws Exception {
ChannelMetadata.builder()
.setOwningChannelFromContext(channel.getChannelContext())
.setRowSequencer(channel.getChannelState().incrementAndGetRowSequencer())
.setOffsetToken(channel.getChannelState().getOffsetToken())
.setOffsetToken(channel.getChannelState().getEndOffsetToken())
.build();

Map<String, RowBufferStats> columnEps = new HashMap<>();
Expand Down Expand Up @@ -514,25 +514,25 @@ private Pair<List<BlobMetadata>, Set<ChunkRegisterStatus>> getRetryBlobMetadata(
ChannelMetadata.builder()
.setOwningChannelFromContext(channel1.getChannelContext())
.setRowSequencer(channel1.getChannelState().incrementAndGetRowSequencer())
.setOffsetToken(channel1.getChannelState().getOffsetToken())
.setOffsetToken(channel1.getChannelState().getEndOffsetToken())
.build();
ChannelMetadata channelMetadata2 =
ChannelMetadata.builder()
.setOwningChannelFromContext(channel2.getChannelContext())
.setRowSequencer(channel2.getChannelState().incrementAndGetRowSequencer())
.setOffsetToken(channel2.getChannelState().getOffsetToken())
.setOffsetToken(channel2.getChannelState().getEndOffsetToken())
.build();
ChannelMetadata channelMetadata3 =
ChannelMetadata.builder()
.setOwningChannelFromContext(channel3.getChannelContext())
.setRowSequencer(channel3.getChannelState().incrementAndGetRowSequencer())
.setOffsetToken(channel3.getChannelState().getOffsetToken())
.setOffsetToken(channel3.getChannelState().getEndOffsetToken())
.build();
ChannelMetadata channelMetadata4 =
ChannelMetadata.builder()
.setOwningChannelFromContext(channel4.getChannelContext())
.setRowSequencer(channel4.getChannelState().incrementAndGetRowSequencer())
.setOffsetToken(channel4.getChannelState().getOffsetToken())
.setOffsetToken(channel4.getChannelState().getEndOffsetToken())
.build();

List<BlobMetadata> blobs = new ArrayList<>();
Expand Down

0 comments on commit 1df51b4

Please sign in to comment.