Skip to content

Commit

Permalink
current not current+1
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-rcheng committed Nov 1, 2023
1 parent 12da4ed commit b254308
Showing 1 changed file with 3 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ public void insertRecordToBuffer(SinkRecord kafkaSinkRecord) {
// Accept the incoming record only if we don't have a valid offset token at server side, or the
// incoming record offset is 1 + the processed offset
if (currentProcessedOffset == NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE
|| kafkaSinkRecord.kafkaOffset() >= currentProcessedOffset + 1) {
|| kafkaSinkRecord.kafkaOffset() >= currentProcessedOffset) {
StreamingBuffer copiedStreamingBuffer = null;
bufferLock.lock();
try {
Expand Down Expand Up @@ -403,7 +403,7 @@ private boolean shouldIgnoreAddingRecordToBuffer(
}

// Don't ignore if we see the expected offset; otherwise log and skip
if ((kafkaSinkRecord.kafkaOffset() - currentProcessedOffset) == 1L) {
if (kafkaSinkRecord.kafkaOffset() == currentProcessedOffset) {
LOGGER.debug(
"Got the desired offset:{} from Kafka, we can add this offset to buffer for channel:{}",
kafkaSinkRecord.kafkaOffset(),
Expand Down Expand Up @@ -957,7 +957,7 @@ private void resetChannelMetadataAfterRecovery(
// Need to update the in memory processed offset otherwise if same offset is send again, it
// might get rejected.
this.offsetPersistedInSnowflake.set(offsetRecoveredFromSnowflake);
this.processedOffset.set(offsetRecoveredFromSnowflake);
this.processedOffset.set(offsetToResetInKafka);

// State that there was some exception and only clear that state when we have received offset
// starting from offsetRecoveredFromSnowflake
Expand Down

0 comments on commit b254308

Please sign in to comment.