diff --git a/src/main/java/net/snowflake/ingest/connection/TelemetryService.java b/src/main/java/net/snowflake/ingest/connection/TelemetryService.java index bcf98b712..ee1e1353d 100644 --- a/src/main/java/net/snowflake/ingest/connection/TelemetryService.java +++ b/src/main/java/net/snowflake/ingest/connection/TelemetryService.java @@ -28,8 +28,7 @@ enum TelemetryType { STREAMING_INGEST_LATENCY_IN_SEC("streaming_ingest_latency_in_ms"), STREAMING_INGEST_CLIENT_FAILURE("streaming_ingest_client_failure"), STREAMING_INGEST_THROUGHPUT_BYTES_PER_SEC("streaming_ingest_throughput_bytes_per_sec"), - STREAMING_INGEST_CPU_MEMORY_USAGE("streaming_ingest_cpu_memory_usage"), - STREAMING_INGEST_BATCH_OFFSET_MISMATCH("streaming_ingest_batch_offset_mismatch"); + STREAMING_INGEST_CPU_MEMORY_USAGE("streaming_ingest_cpu_memory_usage"); private final String name; @@ -123,21 +122,6 @@ public void reportCpuMemoryUsage(Histogram cpuUsage) { } } - public void reportBatchOffsetMismatch( - String channelName, - String prevBatchEndOffset, - String startOffset, - String endOffset, - long rowCount) { - ObjectNode msg = MAPPER.createObjectNode(); - msg.put("channel_name", channelName); - msg.put("prev_batch_end_offset", prevBatchEndOffset); - msg.put("start_offset", startOffset); - msg.put("end_offset", endOffset); - msg.put("row_count", rowCount); - send(TelemetryType.STREAMING_INGEST_BATCH_OFFSET_MISMATCH, msg); - } - /** Send log to Snowflake asynchronously through JDBC client telemetry */ void send(TelemetryType type, ObjectNode msg) { try { diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/AbstractRowBuffer.java b/src/main/java/net/snowflake/ingest/streaming/internal/AbstractRowBuffer.java index c884043f0..a786f1264 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/AbstractRowBuffer.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/AbstractRowBuffer.java @@ -150,6 +150,7 @@ public InsertValidationResponse insertRows( InsertValidationResponse response = new InsertValidationResponse(); float rowsSizeInBytes = 0F; int rowIndex = 0; + int prevRowCount = rowBuffer.bufferedRowCount; for (Map row : rows) { InsertValidationResponse.InsertError error = new InsertValidationResponse.InsertError(row, rowIndex); @@ -173,9 +174,7 @@ public InsertValidationResponse insertRows( rowIndex++; } checkBatchSizeRecommendedMaximum(rowsSizeInBytes); - checkOffsetMismatch( - rowBuffer.channelState.getOffsetToken(), startOffsetToken, endOffsetToken, rowIndex); - rowBuffer.channelState.setOffsetToken(endOffsetToken); + rowBuffer.channelState.updateOffsetToken(startOffsetToken, endOffsetToken, prevRowCount); rowBuffer.bufferSize += rowsSizeInBytes; rowBuffer.rowSizeMetric.accept(rowsSizeInBytes); return response; @@ -210,15 +209,14 @@ public InsertValidationResponse insertRows( moveTempRowsToActualBuffer(tempRowCount); rowsSizeInBytes = tempRowsSizeInBytes; - rowBuffer.bufferedRowCount += tempRowCount; rowBuffer.statsMap.forEach( (colName, stats) -> rowBuffer.statsMap.put( colName, RowBufferStats.getCombinedStats(stats, rowBuffer.tempStatsMap.get(colName)))); - checkOffsetMismatch( - rowBuffer.channelState.getOffsetToken(), startOffsetToken, endOffsetToken, tempRowCount); - rowBuffer.channelState.setOffsetToken(endOffsetToken); + rowBuffer.channelState.updateOffsetToken( + startOffsetToken, endOffsetToken, rowBuffer.bufferedRowCount); + rowBuffer.bufferedRowCount += tempRowCount; rowBuffer.bufferSize += rowsSizeInBytes; rowBuffer.rowSizeMetric.accept(rowsSizeInBytes); return response; @@ -265,15 +263,14 @@ public InsertValidationResponse insertRows( checkBatchSizeRecommendedMaximum(tempRowsSizeInBytes); moveTempRowsToActualBuffer(tempRowCount); rowsSizeInBytes = tempRowsSizeInBytes; - rowBuffer.bufferedRowCount += tempRowCount; rowBuffer.statsMap.forEach( (colName, stats) -> rowBuffer.statsMap.put( colName, RowBufferStats.getCombinedStats(stats, rowBuffer.tempStatsMap.get(colName)))); - checkOffsetMismatch( - rowBuffer.channelState.getOffsetToken(), startOffsetToken, endOffsetToken, rowIndex); - rowBuffer.channelState.setOffsetToken(endOffsetToken); + rowBuffer.channelState.updateOffsetToken( + startOffsetToken, endOffsetToken, rowBuffer.bufferedRowCount); + rowBuffer.bufferedRowCount += tempRowCount; rowBuffer.bufferSize += rowsSizeInBytes; rowBuffer.rowSizeMetric.accept(rowsSizeInBytes); } @@ -316,17 +313,13 @@ public InsertValidationResponse insertRows( // Buffer parameters that are set at the owning client level final ClientBufferParameters clientBufferParameters; - // Telemetry service use to report telemetry to SF - private final TelemetryService telemetryService; - AbstractRowBuffer( OpenChannelRequest.OnErrorOption onErrorOption, ZoneId defaultTimezone, String fullyQualifiedChannelName, Consumer rowSizeMetric, ChannelRuntimeState channelRuntimeState, - ClientBufferParameters clientBufferParameters, - TelemetryService telemetryService) { + ClientBufferParameters clientBufferParameters) { this.onErrorOption = onErrorOption; this.defaultTimezone = defaultTimezone; this.rowSizeMetric = rowSizeMetric; @@ -337,7 +330,6 @@ public InsertValidationResponse insertRows( this.bufferedRowCount = 0; this.bufferSize = 0F; this.clientBufferParameters = clientBufferParameters; - this.telemetryService = telemetryService; // Initialize empty stats this.statsMap = new HashMap<>(); @@ -492,7 +484,7 @@ public ChannelData flush(final String filePath) { oldRowCount = this.bufferedRowCount; oldBufferSize = this.bufferSize; oldRowSequencer = this.channelState.incrementAndGetRowSequencer(); - oldOffsetToken = this.channelState.getOffsetToken(); + oldOffsetToken = this.channelState.getEndOffsetToken(); oldColumnEps = new HashMap<>(this.statsMap); oldMinMaxInsertTimeInMs = new Pair<>( @@ -677,42 +669,6 @@ private void checkBatchSizeRecommendedMaximum(float batchSizeInBytes) { } } - /** - * We verify some offset expect behavior and report to SF if there is a mismatch. Note that there - * are false positives because the input could give us a batch with offset gaps. For example in - * Kafka Connector, we could have gaps if some of the offsets are filtered out by SMT. - */ - private void checkOffsetMismatch( - String prevEndOffset, String curStartOffset, String curEndOffset, int rowCount) { - if (telemetryService != null && curStartOffset != null) { - boolean reportMismatch = false; - try { - long curStart = Long.parseLong(curStartOffset); - long curEnd = Long.parseLong(curEndOffset); - - // We verify that the end_offset - start_offset + 1 = row_count - if (curEnd - curStart + 1 != rowCount) { - reportMismatch = true; - } - - // We verify that start_offset_of_current_batch = end_offset_of_previous_batch+1 - if (prevEndOffset != null) { - long prevEnd = Long.parseLong(prevEndOffset); - if (curStart != prevEnd + 1) { - reportMismatch = true; - } - } - } catch (NumberFormatException ignored) { - // Do nothing since we can't compare the offset - } - - if (reportMismatch) { - this.telemetryService.reportBatchOffsetMismatch( - channelFullyQualifiedName, prevEndOffset, curStartOffset, curEndOffset, rowCount); - } - } - } - /** Create the ingestion strategy based on the channel on_error option */ IngestionStrategy createIngestionStrategy(OpenChannelRequest.OnErrorOption onErrorOption) { switch (onErrorOption) { diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ChannelRuntimeState.java b/src/main/java/net/snowflake/ingest/streaming/internal/ChannelRuntimeState.java index 26314db3e..8fa7720d3 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ChannelRuntimeState.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ChannelRuntimeState.java @@ -11,8 +11,11 @@ class ChannelRuntimeState { // Indicates whether the channel is still valid private volatile boolean isValid; - // The channel's current offset token - private volatile String offsetToken; + // The channel's current start offset token + private volatile String startOffsetToken; + + // The channel's current end offset token + private volatile String endOffsetToken; // The channel's current row sequencer private final AtomicLong rowSequencer; @@ -21,8 +24,8 @@ class ChannelRuntimeState { private Long firstInsertInMs; private Long lastInsertInMs; - ChannelRuntimeState(String offsetToken, long rowSequencer, boolean isValid) { - this.offsetToken = offsetToken; + ChannelRuntimeState(String endOffsetToken, long rowSequencer, boolean isValid) { + this.endOffsetToken = endOffsetToken; this.rowSequencer = new AtomicLong(rowSequencer); this.isValid = isValid; } @@ -41,9 +44,14 @@ void invalidate() { isValid = false; } - /** @return current offset token */ - String getOffsetToken() { - return offsetToken; + /** @return current end offset token */ + String getEndOffsetToken() { + return endOffsetToken; + } + + /** @return current start offset token */ + String getStartOffsetToken() { + return startOffsetToken; } /** @return current offset token after first incrementing it by one. */ @@ -57,12 +65,16 @@ long getRowSequencer() { } /** - * Updates the channel's offset token. + * Updates the channel's start and end offset token. * - * @param offsetToken new offset token + * @param startOffsetToken new start offset token of the batch + * @param endOffsetToken new end offset token */ - void setOffsetToken(String offsetToken) { - this.offsetToken = offsetToken; + void updateOffsetToken(String startOffsetToken, String endOffsetToken, int rowCount) { + if (rowCount == 0) { + this.startOffsetToken = startOffsetToken; + } + this.endOffsetToken = endOffsetToken; } /** Update the insert stats for the current row buffer whenever needed */ diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java index 5fa5b5c44..0cba70c05 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java @@ -67,8 +67,7 @@ public class ParquetRowBuffer extends AbstractRowBuffer { fullyQualifiedChannelName, rowSizeMetric, channelRuntimeState, - clientBufferParameters, - telemetryService); + clientBufferParameters); this.fieldIndex = new HashMap<>(); this.metadata = new HashMap<>(); this.data = new ArrayList<>(); diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelInternal.java b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelInternal.java index 0b7820fb3..9104fdb14 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelInternal.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelInternal.java @@ -103,7 +103,7 @@ class SnowflakeStreamingIngestChannelInternal implements SnowflakeStreamingIn String dbName, String schemaName, String tableName, - String offsetToken, + String endOffsetToken, Long channelSequencer, Long rowSequencer, SnowflakeStreamingIngestClientInternal client, @@ -117,7 +117,7 @@ class SnowflakeStreamingIngestChannelInternal implements SnowflakeStreamingIn this.channelFlushContext = new ChannelFlushContext( name, dbName, schemaName, tableName, channelSequencer, encryptionKey, encryptionKeyId); - this.channelState = new ChannelRuntimeState(offsetToken, rowSequencer, true); + this.channelState = new ChannelRuntimeState(endOffsetToken, rowSequencer, true); this.rowBuffer = AbstractRowBuffer.createRowBuffer( onErrorOption, diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java index 071f5f7c4..effa357eb 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java @@ -801,7 +801,7 @@ List> verifyChannelsAreFullyCommitted channelStatus.getStatusCode(), channel.getChannelSequencer(), rowSequencer, - channel.getChannelState().getOffsetToken(), + channel.getChannelState().getEndOffsetToken(), channelStatus.getPersistedRowSequencer(), channelStatus.getPersistedOffsetToken()); if (channelStatus.getStatusCode() != RESPONSE_SUCCESS) { diff --git a/src/test/java/net/snowflake/ingest/connection/TelemetryServiceTest.java b/src/test/java/net/snowflake/ingest/connection/TelemetryServiceTest.java index 91a8449a7..962aa1a81 100644 --- a/src/test/java/net/snowflake/ingest/connection/TelemetryServiceTest.java +++ b/src/test/java/net/snowflake/ingest/connection/TelemetryServiceTest.java @@ -73,18 +73,4 @@ public void testReportCpuMemoryUsage() { // Make sure there is no exception thrown telemetryService.reportCpuMemoryUsage(cpuHistogram); } - - @Test - public void testReportBatchOffsetMismatch() { - CloseableHttpClient httpClient = Mockito.mock(CloseableHttpClient.class); - - TelemetryService telemetryService = - Mockito.spy( - new TelemetryService( - httpClient, "testReportClientFailure", "snowflake.dev.local:8082")); - Mockito.doNothing().when(telemetryService).send(Mockito.any(), Mockito.any()); - - // Make sure there is no exception thrown - telemetryService.reportBatchOffsetMismatch("channel", "0", "1", "2", 1); - } } diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java index 15bc52374..362178812 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java @@ -143,7 +143,7 @@ public void testChannelFactorySuccess() { Assert.assertEquals(dbName, channel.getDBName()); Assert.assertEquals(schemaName, channel.getSchemaName()); Assert.assertEquals(tableName, channel.getTableName()); - Assert.assertEquals(offsetToken, channel.getChannelState().getOffsetToken()); + Assert.assertEquals(offsetToken, channel.getChannelState().getEndOffsetToken()); Assert.assertEquals(channelSequencer, channel.getChannelSequencer()); Assert.assertEquals(rowSequencer + 1L, channel.getChannelState().incrementAndGetRowSequencer()); Assert.assertEquals( diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java index 0eece5f02..1107aad89 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java @@ -460,7 +460,7 @@ public void testRegisterBlobRequestCreationSuccess() throws Exception { ChannelMetadata.builder() .setOwningChannelFromContext(channel.getChannelContext()) .setRowSequencer(channel.getChannelState().incrementAndGetRowSequencer()) - .setOffsetToken(channel.getChannelState().getOffsetToken()) + .setOffsetToken(channel.getChannelState().getEndOffsetToken()) .build(); Map columnEps = new HashMap<>(); @@ -514,25 +514,25 @@ private Pair, Set> getRetryBlobMetadata( ChannelMetadata.builder() .setOwningChannelFromContext(channel1.getChannelContext()) .setRowSequencer(channel1.getChannelState().incrementAndGetRowSequencer()) - .setOffsetToken(channel1.getChannelState().getOffsetToken()) + .setOffsetToken(channel1.getChannelState().getEndOffsetToken()) .build(); ChannelMetadata channelMetadata2 = ChannelMetadata.builder() .setOwningChannelFromContext(channel2.getChannelContext()) .setRowSequencer(channel2.getChannelState().incrementAndGetRowSequencer()) - .setOffsetToken(channel2.getChannelState().getOffsetToken()) + .setOffsetToken(channel2.getChannelState().getEndOffsetToken()) .build(); ChannelMetadata channelMetadata3 = ChannelMetadata.builder() .setOwningChannelFromContext(channel3.getChannelContext()) .setRowSequencer(channel3.getChannelState().incrementAndGetRowSequencer()) - .setOffsetToken(channel3.getChannelState().getOffsetToken()) + .setOffsetToken(channel3.getChannelState().getEndOffsetToken()) .build(); ChannelMetadata channelMetadata4 = ChannelMetadata.builder() .setOwningChannelFromContext(channel4.getChannelContext()) .setRowSequencer(channel4.getChannelState().incrementAndGetRowSequencer()) - .setOffsetToken(channel4.getChannelState().getOffsetToken()) + .setOffsetToken(channel4.getChannelState().getEndOffsetToken()) .build(); List blobs = new ArrayList<>();