Skip to content

Commit

Permalink
feat: add multipart uploading support
Browse files Browse the repository at this point in the history
  • Loading branch information
HoKim98 committed Jun 20, 2024
1 parent 9701b63 commit a18c10d
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 4 deletions.
26 changes: 25 additions & 1 deletion src/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,15 @@ pub struct LoadTesterArgs {
#[serde(default)]
pub count: Option<Byte>,

#[arg(
long,
env = "SOS_MULTIPART_THRESHOLD",
value_name = "BYTES",
default_value_t = LoadTesterArgs::default_multipart_threshold(),
)]
#[serde(default = "LoadTesterArgs::default_multipart_threshold")]
pub multipart_threshold: Byte,

#[arg(long, env = "SOS_SIZE", value_name = "BYTES", default_value_t = LoadTesterArgs::default_size())]
#[serde(default = "LoadTesterArgs::default_size")]
pub size: Byte,
Expand All @@ -127,13 +136,18 @@ impl Default for LoadTesterArgs {
fn default() -> Self {
Self {
count: None,
multipart_threshold: Self::default_multipart_threshold(),
size: Self::default_size(),
step: Self::default_step(),
}
}
}

impl LoadTesterArgs {
const fn default_multipart_threshold() -> Byte {
Byte::from_u64(8_000_000) // 8MB
}

const fn default_size() -> Byte {
Byte::from_u64(4_000_000) // 4MB
}
Expand All @@ -142,8 +156,17 @@ impl LoadTesterArgs {
Byte::from_u64(64)
}

pub const fn minimal_multipart_threshold() -> Byte {
Byte::from_u64(5_000_000) // 5MB
}

fn print(&self) {
let Self { count, size, step } = self;
let Self {
count,
multipart_threshold,
size,
step,
} = self;

info!(
"count: {count}",
Expand All @@ -152,6 +175,7 @@ impl LoadTesterArgs {
.map(ToString::to_string)
.unwrap_or_else(|| "None".into(),)
);
info!("multipart_threshold: {multipart_threshold}");
info!("size: {size}");
info!("step: {step}");
}
Expand Down
59 changes: 56 additions & 3 deletions src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use clap::Parser;
use futures::{stream::FuturesUnordered, FutureExt, TryFutureExt, TryStreamExt};
use indicatif::{ProgressBar, ProgressState, ProgressStyle};
use rand::{rngs::SmallRng, RngCore, SeedableRng};
use s3::Bucket;
use s3::{serde_types::InitiateMultipartUploadResponse, Bucket};
use tokio::{spawn, task::JoinHandle, time::sleep};
use tracing::{error, info};

Expand Down Expand Up @@ -111,6 +111,7 @@ impl ObjectStorageSession {
} else {
let LoadTesterArgs {
count,
multipart_threshold: _,
size,
step: _,
} = args;
Expand Down Expand Up @@ -177,7 +178,13 @@ struct SessionTask {
impl SessionTask {
async fn try_loop_forever(self) -> Result<()> {
let Self {
args: LoadTesterArgs { count, size, step },
args:
LoadTesterArgs {
count,
multipart_threshold,
size,
step,
},
bucket,
counter,
duration,
Expand All @@ -188,9 +195,13 @@ impl SessionTask {

let instant = Instant::now();

let content_type = "application/octet-stream";
let count = count.map(|count| count.as_u64() as usize);
let multipart_minimal = LoadTesterArgs::minimal_multipart_threshold().as_u64() as usize;
let multipart_threshold = multipart_threshold.as_u64() as usize;
let size = size.as_u64() as usize;
let step = step.as_u64() as usize;
let use_multipart = size > multipart_threshold;

info!("Creating buffer map: {id}/{total_tasks}");
let mut buf = vec![0; size + step];
Expand Down Expand Up @@ -224,7 +235,49 @@ impl SessionTask {
};
let path = format!("/sample/{index:06}.bin");

bucket.put_object(&path, &buf[index..index + size]).await?;
let data = &buf[index..index + size];
if use_multipart {
let InitiateMultipartUploadResponse { upload_id, .. } = bucket
.initiate_multipart_upload(&path, content_type)
.await?;

let mut chunks = vec![];
{
let mut pos = 0;
let len = data.len();
while pos < len {
let pos_next = pos + multipart_threshold;
let remaining = len - pos_next;

let pos_next = if remaining >= multipart_minimal {
pos_next
} else {
len
};

let chunk = &data[pos..pos_next];
chunks.push(chunk);
pos = pos_next;
}
}

let mut parts = vec![];
for (part_number, reader) in chunks.iter_mut().enumerate() {
let part_number = (part_number + 1).try_into()?;

let part = bucket
.put_multipart_stream(reader, &path, part_number, &upload_id, content_type)
.await?;
parts.push(part);
}

bucket
.complete_multipart_upload(&path, &upload_id, parts)
.await?;
} else {
let mut reader = data;
bucket.put_object_stream(&mut reader, &path).await?;
}
counter.fetch_add(1, Ordering::SeqCst);
}

Expand Down

0 comments on commit a18c10d

Please sign in to comment.