Skip to content

Commit

Permalink
feat: add progress bar support
Browse files Browse the repository at this point in the history
  • Loading branch information
HoKim98 committed Jun 18, 2024
1 parent b6e188e commit 9701b63
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 11 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ clap = { version = "4.5", features = ["derive", "env"] }
dotenv = { version = "0.15" }
duration-string = { version = "0.4", features = ["serde"] }
futures = { version = "0.3" }
indicatif = { version = "0.17", features = ["futures"] }
rand = { version = "0.8" }
rust-s3 = { version = "0.34", default-features = false, features = [
"fail-on-err",
Expand Down
3 changes: 3 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ ARG PACKAGE="sos"
# Be ready for serving
FROM docker.io/library/debian:${DEBIAN_VERSION} as server

# Configure server environment variables
ENV SOS_NO_PROGRESS_BAR="true"

# Server Configuration
EXPOSE 80/tcp
WORKDIR /usr/local/bin
Expand Down
18 changes: 17 additions & 1 deletion src/args.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::fmt;

use byte_unit::Byte;
use clap::Parser;
use clap::{ArgAction, Parser};
use duration_string::DurationString;
use s3::{creds::Credentials, Region};
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -165,6 +165,15 @@ pub struct LoadTesterJobArgs {
#[serde(default)]
pub duration: Option<DurationString>,

#[arg(
long,
env = "SOS_NO_PROGRESS_BAR",
action = ArgAction::SetTrue,
default_value_t = LoadTesterJobArgs::default_no_progress_bar(),
)]
#[serde(default = "LoadTesterJobArgs::default_no_progress_bar")]
pub no_progress_bar: bool,

#[arg(
long,
env = "SOS_THREADS_MAX",
Expand All @@ -179,19 +188,25 @@ impl Default for LoadTesterJobArgs {
fn default() -> Self {
Self {
duration: None,
no_progress_bar: Self::default_no_progress_bar(),
threads_max: Self::default_threads_max(),
}
}
}

impl LoadTesterJobArgs {
const fn default_no_progress_bar() -> bool {
false
}

const fn default_threads_max() -> usize {
8
}

fn print(&self) {
let Self {
duration,
no_progress_bar,
threads_max,
} = self;

Expand All @@ -202,6 +217,7 @@ impl LoadTesterJobArgs {
.map(ToString::to_string)
.unwrap_or_else(|| "None".into(),)
);
info!("no_progress_bar: {no_progress_bar}");
info!("threads_max: {threads_max}");
}
}
Expand Down
94 changes: 84 additions & 10 deletions src/session.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,22 @@
use std::{
convert::identity,
fmt::Write,
sync::{
atomic::{AtomicU64, Ordering},
Arc,
},
time::{Duration, Instant},
};

use anyhow::{anyhow, bail, Result};
use ark_core::signal::FunctionSignal;
use byte_unit::{Byte, UnitType};
use clap::Parser;
use futures::{stream::FuturesUnordered, FutureExt, TryFutureExt, TryStreamExt};
use indicatif::{ProgressBar, ProgressState, ProgressStyle};
use rand::{rngs::SmallRng, RngCore, SeedableRng};
use s3::Bucket;
use tokio::{spawn, task::JoinHandle};
use tokio::{spawn, task::JoinHandle, time::sleep};
use tracing::{error, info};

use crate::args::{Args, LoadTesterArgs, LoadTesterJobArgs};
Expand Down Expand Up @@ -74,16 +81,19 @@ impl ObjectStorageSession {
load_tester_job:
LoadTesterJobArgs {
duration,
no_progress_bar,
threads_max,
},
} = self;

let duration = duration.map(Into::into);
let counter = Arc::<AtomicU64>::default();

(0..threads_max)
let task_handler = (0..threads_max)
.map(|id| SessionTask {
args: args.clone(),
bucket: bucket.clone(),
counter: counter.clone(),
duration,
id,
signal: signal.clone(),
Expand All @@ -94,15 +104,70 @@ impl ObjectStorageSession {
.map(|result| result.map_err(Into::into).and_then(identity))
})
.collect::<FuturesUnordered<_>>()
.try_collect()
.and_then(|()| cleanup(bucket))
.await
.try_collect();

if no_progress_bar {
task_handler.await
} else {
let LoadTesterArgs {
count,
size,
step: _,
} = args;

fn write_eta(state: &ProgressState, w: &mut dyn Write) {
let eta = state.eta().as_secs_f64();
write!(w, "{eta:.1}s").unwrap()
}

fn write_speed(state: &ProgressState, w: &mut dyn Write) {
match Byte::from_f64(state.per_sec())
.map(|byte| byte.get_appropriate_unit(UnitType::Decimal))
{
Some(byte) => write!(w, "{byte:.1}/s").unwrap(),
None => write!(w, "UNK").unwrap(),
}
}

let pb = match count {
Some(count) => ProgressBar::new(count.as_u64() * size.as_u64()),
None => ProgressBar::new_spinner(),
};

let style = ProgressStyle::with_template(
"{spinner:.green} [{elapsed_precise}] [{wide_bar:.cyan/blue}] {bytes}/{total_bytes} ({eta} | {speed})"
)?
.with_key("eta", write_eta)
.with_key("speed", write_speed)
.progress_chars("#>-");
pb.set_style(style);

loop {
let progressed = counter.load(Ordering::SeqCst);
pb.set_position(progressed * size.as_u64());

let is_finished = count
.as_ref()
.map(|count| count.as_u64() == progressed)
.unwrap_or_default();
if is_finished || signal.is_terminating() {
if is_finished {
pb.finish();
}
task_handler.await?;
break cleanup(bucket, signal).await;
}

sleep(Duration::from_millis(50)).await;
}
}
}
}

struct SessionTask {
args: LoadTesterArgs,
bucket: Bucket,
counter: Arc<AtomicU64>,
duration: Option<Duration>,
id: usize,
signal: FunctionSignal,
Expand All @@ -114,6 +179,7 @@ impl SessionTask {
let Self {
args: LoadTesterArgs { count, size, step },
bucket,
counter,
duration,
id,
signal,
Expand All @@ -122,14 +188,20 @@ impl SessionTask {

let instant = Instant::now();

let mut rng = SmallRng::from_entropy();
let count = count.map(|count| count.as_u64() as usize);
let size = size.as_u64() as usize;
let step = step.as_u64() as usize;

let mut buf = vec![0; size];
info!("Creating buffer map: {id}/{total_tasks}");
let mut buf = vec![0; size + step];
{
let mut rng = SmallRng::from_entropy();
rng.fill_bytes(&mut buf);
}

let mut index = id;

info!("Starting task: {id}/{total_tasks}");
loop {
if signal.is_terminating() {
break;
Expand All @@ -152,14 +224,16 @@ impl SessionTask {
};
let path = format!("/sample/{index:06}.bin");

rng.fill_bytes(&mut buf);
bucket.put_object(&path, &buf).await?;
bucket.put_object(&path, &buf[index..index + size]).await?;
counter.fetch_add(1, Ordering::SeqCst);
}

info!("Stopped task: {id}/{total_tasks}");
Ok(())
}
}

async fn cleanup(bucket: Bucket) -> Result<()> {
async fn cleanup(bucket: Bucket, signal: FunctionSignal) -> Result<()> {
info!("Cleaning up...");

let delimeter = "/".into();
Expand Down

0 comments on commit 9701b63

Please sign in to comment.