Skip to content

Commit

Permalink
clippy fmt
Browse files Browse the repository at this point in the history
  • Loading branch information
emgeee committed Nov 7, 2024
1 parent 822ce24 commit d521829
Show file tree
Hide file tree
Showing 3 changed files with 5 additions and 5 deletions.
2 changes: 1 addition & 1 deletion crates/core/src/datasource/kafka/kafka_stream_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ impl PartitionStream for KafkaStreamRead {
max_timestamp,
offsets_read,
};
let _ = state_backend
state_backend
.as_ref()
.put(channel_tag.as_bytes().to_vec(), off.to_bytes().unwrap());
debug!("checkpointed offsets {:?}", off);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ impl GroupedWindowAggStream {
context,
epoch: 0,
partition,
channel_tag: channel_tag,
channel_tag,
receiver,
state_backend,
};
Expand All @@ -184,7 +184,7 @@ impl GroupedWindowAggStream {
.collect();
let _ = stream.ensure_window_frames_for_ranges(&ranges);
state.frames.iter().for_each(|f| {
let _ = stream.update_accumulators_for_frame(f.window_start_time, &f);
let _ = stream.update_accumulators_for_frame(f.window_start_time, f);
});
let state_watermark = state.watermark.unwrap();
stream.process_watermark(RecordBatchWatermark {
Expand Down Expand Up @@ -387,7 +387,7 @@ impl GroupedWindowAggStream {

let watermark = {
let watermark_lock = self.latest_watermark.lock().unwrap();
watermark_lock.clone()
*watermark_lock
};

let checkpointed_state = CheckpointedGroupedWindowAggStream {
Expand Down
2 changes: 1 addition & 1 deletion examples/examples/simple_aggregation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ async fn main() -> Result<()> {
]))
.await?;

let _ctx = Context::new()?
Context::new()?
.with_slatedb_backend(String::from("/tmp/checkpoints/simple-agg-checkpoint-1"))
.await
.from_topic(source_topic)
Expand Down

0 comments on commit d521829

Please sign in to comment.