diff --git a/crates/core/src/datasource/kafka/kafka_stream_read.rs b/crates/core/src/datasource/kafka/kafka_stream_read.rs index 26dc488..9d484ee 100644 --- a/crates/core/src/datasource/kafka/kafka_stream_read.rs +++ b/crates/core/src/datasource/kafka/kafka_stream_read.rs @@ -253,7 +253,7 @@ impl PartitionStream for KafkaStreamRead { max_timestamp, offsets_read, }; - let _ = state_backend + state_backend .as_ref() .put(channel_tag.as_bytes().to_vec(), off.to_bytes().unwrap()); debug!("checkpointed offsets {:?}", off); diff --git a/crates/core/src/physical_plan/continuous/grouped_window_agg_stream.rs b/crates/core/src/physical_plan/continuous/grouped_window_agg_stream.rs index 4114584..da43430 100644 --- a/crates/core/src/physical_plan/continuous/grouped_window_agg_stream.rs +++ b/crates/core/src/physical_plan/continuous/grouped_window_agg_stream.rs @@ -168,7 +168,7 @@ impl GroupedWindowAggStream { context, epoch: 0, partition, - channel_tag: channel_tag, + channel_tag, receiver, state_backend, }; @@ -184,7 +184,7 @@ impl GroupedWindowAggStream { .collect(); let _ = stream.ensure_window_frames_for_ranges(&ranges); state.frames.iter().for_each(|f| { - let _ = stream.update_accumulators_for_frame(f.window_start_time, &f); + let _ = stream.update_accumulators_for_frame(f.window_start_time, f); }); let state_watermark = state.watermark.unwrap(); stream.process_watermark(RecordBatchWatermark { @@ -387,7 +387,7 @@ impl GroupedWindowAggStream { let watermark = { let watermark_lock = self.latest_watermark.lock().unwrap(); - watermark_lock.clone() + *watermark_lock }; let checkpointed_state = CheckpointedGroupedWindowAggStream { diff --git a/examples/examples/simple_aggregation.rs b/examples/examples/simple_aggregation.rs index 2fe1306..d785d65 100644 --- a/examples/examples/simple_aggregation.rs +++ b/examples/examples/simple_aggregation.rs @@ -32,7 +32,7 @@ async fn main() -> Result<()> { ])) .await?; - let _ctx = Context::new()? + Context::new()? .with_slatedb_backend(String::from("/tmp/checkpoints/simple-agg-checkpoint-1")) .await .from_topic(source_topic)