diff --git a/Cargo.toml b/Cargo.toml index 3098aff..b61d948 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -33,6 +33,7 @@ clap = { version = "4.5", features = ["derive", "env"] } dotenv = { version = "0.15" } duration-string = { version = "0.4", features = ["serde"] } futures = { version = "0.3" } +indicatif = { version = "0.17", features = ["futures"] } rand = { version = "0.8" } rust-s3 = { version = "0.34", default-features = false, features = [ "fail-on-err", diff --git a/Dockerfile b/Dockerfile index 0e33725..23cec56 100644 --- a/Dockerfile +++ b/Dockerfile @@ -9,6 +9,9 @@ ARG PACKAGE="sos" # Be ready for serving FROM docker.io/library/debian:${DEBIAN_VERSION} as server +# Configure server environment variables +ENV SOS_NO_PROGRESS_BAR="true" + # Server Configuration EXPOSE 80/tcp WORKDIR /usr/local/bin diff --git a/src/args.rs b/src/args.rs index 383bb90..15dbb31 100644 --- a/src/args.rs +++ b/src/args.rs @@ -1,7 +1,7 @@ use std::fmt; use byte_unit::Byte; -use clap::Parser; +use clap::{ArgAction, Parser}; use duration_string::DurationString; use s3::{creds::Credentials, Region}; use serde::{Deserialize, Serialize}; @@ -165,6 +165,15 @@ pub struct LoadTesterJobArgs { #[serde(default)] pub duration: Option, + #[arg( + long, + env = "SOS_NO_PROGRESS_BAR", + action = ArgAction::SetTrue, + default_value_t = LoadTesterJobArgs::default_no_progress_bar(), + )] + #[serde(default = "LoadTesterJobArgs::default_no_progress_bar")] + pub no_progress_bar: bool, + #[arg( long, env = "SOS_THREADS_MAX", @@ -179,12 +188,17 @@ impl Default for LoadTesterJobArgs { fn default() -> Self { Self { duration: None, + no_progress_bar: Self::default_no_progress_bar(), threads_max: Self::default_threads_max(), } } } impl LoadTesterJobArgs { + const fn default_no_progress_bar() -> bool { + false + } + const fn default_threads_max() -> usize { 8 } @@ -192,6 +206,7 @@ impl LoadTesterJobArgs { fn print(&self) { let Self { duration, + no_progress_bar, threads_max, } = self; @@ -202,6 +217,7 @@ impl LoadTesterJobArgs { .map(ToString::to_string) .unwrap_or_else(|| "None".into(),) ); + info!("no_progress_bar: {no_progress_bar}"); info!("threads_max: {threads_max}"); } } diff --git a/src/session.rs b/src/session.rs index ae239f4..92b67db 100644 --- a/src/session.rs +++ b/src/session.rs @@ -1,15 +1,22 @@ use std::{ convert::identity, + fmt::Write, + sync::{ + atomic::{AtomicU64, Ordering}, + Arc, + }, time::{Duration, Instant}, }; use anyhow::{anyhow, bail, Result}; use ark_core::signal::FunctionSignal; +use byte_unit::{Byte, UnitType}; use clap::Parser; use futures::{stream::FuturesUnordered, FutureExt, TryFutureExt, TryStreamExt}; +use indicatif::{ProgressBar, ProgressState, ProgressStyle}; use rand::{rngs::SmallRng, RngCore, SeedableRng}; use s3::Bucket; -use tokio::{spawn, task::JoinHandle}; +use tokio::{spawn, task::JoinHandle, time::sleep}; use tracing::{error, info}; use crate::args::{Args, LoadTesterArgs, LoadTesterJobArgs}; @@ -74,16 +81,19 @@ impl ObjectStorageSession { load_tester_job: LoadTesterJobArgs { duration, + no_progress_bar, threads_max, }, } = self; let duration = duration.map(Into::into); + let counter = Arc::::default(); - (0..threads_max) + let task_handler = (0..threads_max) .map(|id| SessionTask { args: args.clone(), bucket: bucket.clone(), + counter: counter.clone(), duration, id, signal: signal.clone(), @@ -94,15 +104,70 @@ impl ObjectStorageSession { .map(|result| result.map_err(Into::into).and_then(identity)) }) .collect::>() - .try_collect() - .and_then(|()| cleanup(bucket)) - .await + .try_collect(); + + if no_progress_bar { + task_handler.await + } else { + let LoadTesterArgs { + count, + size, + step: _, + } = args; + + fn write_eta(state: &ProgressState, w: &mut dyn Write) { + let eta = state.eta().as_secs_f64(); + write!(w, "{eta:.1}s").unwrap() + } + + fn write_speed(state: &ProgressState, w: &mut dyn Write) { + match Byte::from_f64(state.per_sec()) + .map(|byte| byte.get_appropriate_unit(UnitType::Decimal)) + { + Some(byte) => write!(w, "{byte:.1}/s").unwrap(), + None => write!(w, "UNK").unwrap(), + } + } + + let pb = match count { + Some(count) => ProgressBar::new(count.as_u64() * size.as_u64()), + None => ProgressBar::new_spinner(), + }; + + let style = ProgressStyle::with_template( + "{spinner:.green} [{elapsed_precise}] [{wide_bar:.cyan/blue}] {bytes}/{total_bytes} ({eta} | {speed})" + )? + .with_key("eta", write_eta) + .with_key("speed", write_speed) + .progress_chars("#>-"); + pb.set_style(style); + + loop { + let progressed = counter.load(Ordering::SeqCst); + pb.set_position(progressed * size.as_u64()); + + let is_finished = count + .as_ref() + .map(|count| count.as_u64() == progressed) + .unwrap_or_default(); + if is_finished || signal.is_terminating() { + if is_finished { + pb.finish(); + } + task_handler.await?; + break cleanup(bucket, signal).await; + } + + sleep(Duration::from_millis(50)).await; + } + } } } struct SessionTask { args: LoadTesterArgs, bucket: Bucket, + counter: Arc, duration: Option, id: usize, signal: FunctionSignal, @@ -114,6 +179,7 @@ impl SessionTask { let Self { args: LoadTesterArgs { count, size, step }, bucket, + counter, duration, id, signal, @@ -122,14 +188,20 @@ impl SessionTask { let instant = Instant::now(); - let mut rng = SmallRng::from_entropy(); let count = count.map(|count| count.as_u64() as usize); let size = size.as_u64() as usize; let step = step.as_u64() as usize; - let mut buf = vec![0; size]; + info!("Creating buffer map: {id}/{total_tasks}"); + let mut buf = vec![0; size + step]; + { + let mut rng = SmallRng::from_entropy(); + rng.fill_bytes(&mut buf); + } + let mut index = id; + info!("Starting task: {id}/{total_tasks}"); loop { if signal.is_terminating() { break; @@ -152,14 +224,16 @@ impl SessionTask { }; let path = format!("/sample/{index:06}.bin"); - rng.fill_bytes(&mut buf); - bucket.put_object(&path, &buf).await?; + bucket.put_object(&path, &buf[index..index + size]).await?; + counter.fetch_add(1, Ordering::SeqCst); } + + info!("Stopped task: {id}/{total_tasks}"); Ok(()) } } -async fn cleanup(bucket: Bucket) -> Result<()> { +async fn cleanup(bucket: Bucket, signal: FunctionSignal) -> Result<()> { info!("Cleaning up..."); let delimeter = "/".into();