Skip to content

Commit

Permalink
Fix the timeout poll overflow (#46)
Browse files Browse the repository at this point in the history
  • Loading branch information
ameyc authored Oct 14, 2024
1 parent 35d4d80 commit e299f87
Showing 1 changed file with 1 addition and 6 deletions.
7 changes: 1 addition & 6 deletions crates/core/src/datasource/kafka/kafka_stream_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,12 +182,7 @@ impl PartitionStream for KafkaStreamRead {
let start_time = datafusion::common::instant::Instant::now();

while start_time.elapsed() < batch_timeout {
match tokio::time::timeout(
batch_timeout - start_time.elapsed(),
consumer.recv(),
)
.await
{
match tokio::time::timeout(batch_timeout, consumer.recv()).await {
Ok(Ok(m)) => {
let payload = m.payload().expect("Message payload is empty");
decoder.push_to_buffer(payload.to_owned());
Expand Down

0 comments on commit e299f87

Please sign in to comment.