Skip to content

Commit

Permalink
Introduce hold_max field in ProgressIter
Browse files Browse the repository at this point in the history
On the first seek it gets enabled, displaying the progress as the max
of the last 10 updates.
If there ever are more than 5 consecutive reads and writes without seek
it gets disabled again, keeping the performance impact low.

This also fixes a pre-existing logic error in the async BufRead impl.
  • Loading branch information
djugei committed Sep 13, 2024
1 parent fde06b7 commit 16ba414
Show file tree
Hide file tree
Showing 3 changed files with 154 additions and 24 deletions.
162 changes: 141 additions & 21 deletions src/iter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,97 @@ where
}
}

#[derive(Debug)]
struct RingBuf<const SIZE: usize = 10> {
vals: [u64; SIZE],
head: u8,
max: u64,
maxpos: u8,
}

impl<const SIZE: usize> RingBuf<SIZE> {
fn new() -> Self {
assert!(SIZE <= u8::MAX.into());
assert!(SIZE > 0);
Self {
vals: [0; SIZE],
head: 0,
maxpos: 0,
max: 0,
}
}

fn update(&mut self, new: u64) {
// % here is a purely for the optimizer
let head: usize = usize::from(self.head) % self.vals.len();
self.vals[head] = new;

if new > self.max {
// this is now the new maximum
self.maxpos = self.head;
self.max = new;
} else if self.maxpos == self.head && new < self.max {
// this was the maximum and may not be anymore
let (idx, val) = self
.vals
.iter()
.enumerate()
.max()
.expect("array has fixded size > 0");
self.maxpos = idx.try_into().unwrap();
self.max = *val;
}

self.head = (self.head + 1) % (self.vals.len() as u8);
}

fn max(&self) -> u64 {
self.max
}
}

#[derive(Debug)]
pub(crate) struct RingBufWrap<const RESET: u8 = 5, const BUFSIZE: usize = 10> {
buf: Option<(Box<RingBuf<BUFSIZE>>, u8)>,
}

impl<const RESET: u8, const BUFSIZE: usize> RingBufWrap<RESET, BUFSIZE> {
pub(crate) fn new() -> Self {
Self { buf: None }
}
fn update_seq(&mut self, oldpos: u64, delta: u64) -> u64 {
let newpos = oldpos + delta;
if let Some((buf, seq)) = &mut self.buf {
*seq += 1;
if *seq >= RESET {
self.buf = None;
newpos
} else {
buf.update(newpos);
buf.max()
}
} else {
newpos
}
}

fn update_seek(&mut self, newpos: u64) -> u64 {
let (b, seq) = self.buf.get_or_insert_with(|| {
let b = Box::new(RingBuf::<BUFSIZE>::new());
(b, 0)
});
*seq = 0;
b.update(newpos);
b.max()
}
}

/// Wraps an iterator to display its progress.
#[derive(Debug)]
pub struct ProgressBarIter<T> {
pub(crate) it: T,
pub progress: ProgressBar,
pub(crate) hold_max: RingBufWrap,
}

impl<T> ProgressBarIter<T> {
Expand Down Expand Up @@ -155,25 +241,37 @@ impl<T: FusedIterator> FusedIterator for ProgressBarIter<T> {}
impl<R: io::Read> io::Read for ProgressBarIter<R> {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
let inc = self.it.read(buf)?;
self.progress.inc(inc as u64);
self.progress.set_position(
self.hold_max
.update_seq(self.progress.position(), inc as u64),
);
Ok(inc)
}

fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> {
let inc = self.it.read_vectored(bufs)?;
self.progress.inc(inc as u64);
self.progress.set_position(
self.hold_max
.update_seq(self.progress.position(), inc as u64),
);
Ok(inc)
}

fn read_to_string(&mut self, buf: &mut String) -> io::Result<usize> {
let inc = self.it.read_to_string(buf)?;
self.progress.inc(inc as u64);
self.progress.set_position(
self.hold_max
.update_seq(self.progress.position(), inc as u64),
);
Ok(inc)
}

fn read_exact(&mut self, buf: &mut [u8]) -> io::Result<()> {
self.it.read_exact(buf)?;
self.progress.inc(buf.len() as u64);
self.progress.set_position(
self.hold_max
.update_seq(self.progress.position(), buf.len() as u64),
);
Ok(())
}
}
Expand All @@ -185,14 +283,17 @@ impl<R: io::BufRead> io::BufRead for ProgressBarIter<R> {

fn consume(&mut self, amt: usize) {
self.it.consume(amt);
self.progress.inc(amt as u64);
self.progress.set_position(
self.hold_max
.update_seq(self.progress.position(), amt.try_into().unwrap()),
);
}
}

impl<S: io::Seek> io::Seek for ProgressBarIter<S> {
fn seek(&mut self, f: io::SeekFrom) -> io::Result<u64> {
self.it.seek(f).map(|pos| {
self.progress.set_position(pos);
self.progress.set_position(self.hold_max.update_seek(pos));
pos
})
}
Expand All @@ -213,7 +314,9 @@ impl<W: tokio::io::AsyncWrite + Unpin> tokio::io::AsyncWrite for ProgressBarIter
) -> Poll<io::Result<usize>> {
Pin::new(&mut self.it).poll_write(cx, buf).map(|poll| {
poll.map(|inc| {
self.progress.inc(inc as u64);
let oldprog = self.progress.position();
let newprog = self.hold_max.update_seq(oldprog, inc.try_into().unwrap());
self.progress.set_position(newprog);
inc
})
})
Expand All @@ -237,12 +340,14 @@ impl<W: tokio::io::AsyncRead + Unpin> tokio::io::AsyncRead for ProgressBarIter<W
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
let prev_len = buf.filled().len() as u64;
if let Poll::Ready(e) = Pin::new(&mut self.it).poll_read(cx, buf) {
self.progress.inc(buf.filled().len() as u64 - prev_len);
Poll::Ready(e)
} else {
Poll::Pending
let poll = Pin::new(&mut self.it).poll_read(cx, buf);
if let Poll::Ready(_e) = &poll {
let inc = buf.filled().len() as u64 - prev_len;
let oldprog = self.progress.position();
let newprog = self.hold_max.update_seq(oldprog, inc);
self.progress.set_position(newprog);
}
poll
}
}

Expand All @@ -254,7 +359,13 @@ impl<W: tokio::io::AsyncSeek + Unpin> tokio::io::AsyncSeek for ProgressBarIter<W
}

fn poll_complete(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<u64>> {
Pin::new(&mut self.it).poll_complete(cx)
let poll = Pin::new(&mut self.it).poll_complete(cx);
if let Poll::Ready(Ok(pos)) = &poll {
let newpos = self.hold_max.update_seek(*pos);
self.progress.set_position(newpos);
}

poll
}
}

Expand All @@ -265,15 +376,14 @@ impl<W: tokio::io::AsyncBufRead + Unpin + tokio::io::AsyncRead> tokio::io::Async
{
fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
let this = self.get_mut();
let result = Pin::new(&mut this.it).poll_fill_buf(cx);
if let Poll::Ready(Ok(buf)) = &result {
this.progress.inc(buf.len() as u64);
}
result
Pin::new(&mut this.it).poll_fill_buf(cx)
}

fn consume(mut self: Pin<&mut Self>, amt: usize) {
Pin::new(&mut self.it).consume(amt);
let oldprog = self.progress.position();
let newprog = self.hold_max.update_seq(oldprog, amt.try_into().unwrap());
self.progress.set_position(newprog);
}
}

Expand All @@ -300,14 +410,20 @@ impl<S: futures_core::Stream + Unpin> futures_core::Stream for ProgressBarIter<S
impl<W: io::Write> io::Write for ProgressBarIter<W> {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.it.write(buf).map(|inc| {
self.progress.inc(inc as u64);
self.progress.set_position(
self.hold_max
.update_seq(self.progress.position(), inc as u64),
);
inc
})
}

fn write_vectored(&mut self, bufs: &[io::IoSlice]) -> io::Result<usize> {
self.it.write_vectored(bufs).map(|inc| {
self.progress.inc(inc as u64);
self.progress.set_position(
self.hold_max
.update_seq(self.progress.position(), inc as u64),
);
inc
})
}
Expand All @@ -323,7 +439,11 @@ impl<W: io::Write> io::Write for ProgressBarIter<W> {

impl<S, T: Iterator<Item = S>> ProgressIterator for T {
fn progress_with(self, progress: ProgressBar) -> ProgressBarIter<Self> {
ProgressBarIter { it: self, progress }
ProgressBarIter {
it: self,
progress,
hold_max: RingBufWrap::new(),
}
}
}

Expand Down
7 changes: 6 additions & 1 deletion src/progress_bar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use once_cell::sync::Lazy;
use crate::draw_target::ProgressDrawTarget;
use crate::state::{AtomicPosition, BarState, ProgressFinish, Reset, TabExpandedString};
use crate::style::ProgressStyle;
use crate::{ProgressBarIter, ProgressIterator, ProgressState};
use crate::{iter, ProgressBarIter, ProgressIterator, ProgressState};

/// A progress bar or spinner
///
Expand Down Expand Up @@ -447,6 +447,7 @@ impl ProgressBar {
ProgressBarIter {
progress: self.clone(),
it: read,
hold_max: iter::RingBufWrap::new(),
}
}

Expand All @@ -468,6 +469,7 @@ impl ProgressBar {
ProgressBarIter {
progress: self.clone(),
it: write,
hold_max: iter::RingBufWrap::new(),
}
}

Expand All @@ -494,6 +496,7 @@ impl ProgressBar {
ProgressBarIter {
progress: self.clone(),
it: write,
hold_max: iter::RingBufWrap::new(),
}
}

Expand All @@ -517,6 +520,7 @@ impl ProgressBar {
ProgressBarIter {
progress: self.clone(),
it: read,
hold_max: iter::RingBufWrap::new(),
}
}

Expand All @@ -539,6 +543,7 @@ impl ProgressBar {
ProgressBarIter {
progress: self.clone(),
it: stream,
hold_max: iter::RingBufWrap::new(),
}
}

Expand Down
9 changes: 7 additions & 2 deletions src/rayon.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use rayon::iter::plumbing::{Consumer, Folder, Producer, ProducerCallback, UnindexedConsumer};
use rayon::iter::{IndexedParallelIterator, ParallelIterator};

use crate::{ProgressBar, ProgressBarIter};
use crate::{iter, ProgressBar, ProgressBarIter};

/// Wraps a Rayon parallel iterator.
///
Expand Down Expand Up @@ -41,7 +41,11 @@ where

impl<S: Send, T: ParallelIterator<Item = S>> ParallelProgressIterator for T {
fn progress_with(self, progress: ProgressBar) -> ProgressBarIter<Self> {
ProgressBarIter { it: self, progress }
ProgressBarIter {
it: self,
progress,
hold_max: iter::RingBufWrap::new(),
}
}
}

Expand Down Expand Up @@ -99,6 +103,7 @@ impl<T, P: Producer<Item = T>> Producer for ProgressProducer<P> {
ProgressBarIter {
it: self.base.into_iter(),
progress: self.progress,
hold_max: iter::RingBufWrap::new(),
}
}

Expand Down

0 comments on commit 16ba414

Please sign in to comment.