From e299f87f8b38bdfc0579d269587080b7be484a86 Mon Sep 17 00:00:00 2001 From: Amey Chaugule Date: Mon, 14 Oct 2024 14:48:49 -0700 Subject: [PATCH] Fix the timeout poll overflow (#46) --- crates/core/src/datasource/kafka/kafka_stream_read.rs | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/crates/core/src/datasource/kafka/kafka_stream_read.rs b/crates/core/src/datasource/kafka/kafka_stream_read.rs index 4fd9b6e..12267e5 100644 --- a/crates/core/src/datasource/kafka/kafka_stream_read.rs +++ b/crates/core/src/datasource/kafka/kafka_stream_read.rs @@ -182,12 +182,7 @@ impl PartitionStream for KafkaStreamRead { let start_time = datafusion::common::instant::Instant::now(); while start_time.elapsed() < batch_timeout { - match tokio::time::timeout( - batch_timeout - start_time.elapsed(), - consumer.recv(), - ) - .await - { + match tokio::time::timeout(batch_timeout, consumer.recv()).await { Ok(Ok(m)) => { let payload = m.payload().expect("Message payload is empty"); decoder.push_to_buffer(payload.to_owned());