Skip to content

Commit

Permalink
Block size buffers (#1105)
Browse files Browse the repository at this point in the history
This PR changes the `struct Buffer` to enforce that it is always an exact
multiple of blocks.

The property is enforced at construction: the only way to build or resize a
`Buffer` is to provide a tuple of `(block_count, block_size)`.  Various
implementors of `BlockIO` also now check that their reads and writes are
block-aligned, returning the existing `CrucibleError::DataLenUnaligned` if
that's not the case.
  • Loading branch information
mkeeter authored Jan 24, 2024
1 parent 9525b3e commit 627f3c9
Show file tree
Hide file tree
Showing 19 changed files with 356 additions and 290 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ slog-async = { version = "2.8" }
slog-bunyan = "2.4.0"
slog-dtrace = "0.2"
slog-term = { version = "2.9" }
static_assertions = "1.1.0"
statistical = "1.0.0"
subprocess = "0.2.9"
tempfile = "3"
Expand Down
17 changes: 11 additions & 6 deletions crudd/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ async fn cmd_read<T: BlockIO + std::marker::Send + 'static>(
cmp::min(num_bytes, native_block_size - offset_misalignment);
if offset_misalignment != 0 {
// Read the full block
let mut buffer = Buffer::new(native_block_size as usize);
let mut buffer = Buffer::new(1, native_block_size as usize);
let block_idx = opt.byte_offset / native_block_size;
let offset = Block::new(block_idx, native_block_size.trailing_zeros());
crucible.read(offset, &mut buffer).await?;
Expand Down Expand Up @@ -207,9 +207,13 @@ async fn cmd_read<T: BlockIO + std::marker::Send + 'static>(
let cmd_count = num_bytes / (opt.iocmd_block_count * native_block_size);
let remainder = num_bytes % (opt.iocmd_block_count * native_block_size);

let buffer_size = (opt.iocmd_block_count * native_block_size) as usize;
let mut buffers: Vec<Buffer> = (0..opt.pipeline_length)
.map(|_| Buffer::new(buffer_size))
.map(|_| {
Buffer::new(
opt.iocmd_block_count as usize,
native_block_size as usize,
)
})
.collect();

// Issue all of our read commands
Expand Down Expand Up @@ -255,7 +259,8 @@ async fn cmd_read<T: BlockIO + std::marker::Send + 'static>(
// let block_remainder = remainder % native_block_size;
// round up
let blocks = (remainder + native_block_size - 1) / native_block_size;
let mut buffer = Buffer::new((blocks * native_block_size) as usize);
let mut buffer =
Buffer::new(blocks as usize, native_block_size as usize);
let block_idx = (cmd_count * opt.iocmd_block_count) + block_offset;
let offset = Block::new(block_idx, native_block_size.trailing_zeros());
crucible.read(offset, &mut buffer).await?;
Expand Down Expand Up @@ -309,7 +314,7 @@ async fn write_remainder_and_finalize<'a, T: BlockIO>(
offset.value + uflow_blocks,
native_block_size.trailing_zeros(),
);
let mut uflow_r_buf = Buffer::new(native_block_size as usize);
let mut uflow_r_buf = Buffer::new(1, native_block_size as usize);
crucible.read(uflow_offset, &mut uflow_r_buf).await?;

// Copy it into w_buf
Expand Down Expand Up @@ -394,7 +399,7 @@ async fn cmd_write<T: BlockIO>(
// We need to read-modify-write here.

// Read the full block
let mut buffer = Buffer::new(native_block_size as usize);
let mut buffer = Buffer::new(1, native_block_size as usize);
let block_idx = opt.byte_offset / native_block_size;
let offset = Block::new(block_idx, native_block_size.trailing_zeros());
crucible.read(offset, &mut buffer).await?;
Expand Down
4 changes: 1 addition & 3 deletions crutest/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,9 +224,7 @@ async fn cli_read(
* Convert offset to its byte value.
*/
let offset = Block::new(block_index as u64, ri.block_size.trailing_zeros());
let length: usize = size * ri.block_size as usize;

let mut data = crucible::Buffer::from_vec(vec![255; length]);
let mut data = crucible::Buffer::repeat(255, size, ri.block_size as usize);

println!("Read at block {:5}, len:{:7}", offset.value, data.len());
guest.read(offset, &mut data).await?;
Expand Down
47 changes: 24 additions & 23 deletions crutest/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1120,8 +1120,11 @@ async fn verify_volume(
io_sz
};

let length: usize = next_io_blocks * ri.block_size as usize;
let mut data = crucible::Buffer::from_vec(vec![255; length]);
let mut data = crucible::Buffer::repeat(
255,
next_io_blocks,
ri.block_size as usize,
);
guest.read(offset, &mut data).await?;

let dl = data.into_vec();
Expand Down Expand Up @@ -1359,8 +1362,8 @@ async fn balloon_workload(
guest.write(offset, data).await?;
guest.flush(None).await?;

let length: usize = size * ri.block_size as usize;
let mut data = crucible::Buffer::from_vec(vec![255; length]);
let mut data =
crucible::Buffer::repeat(255, size, ri.block_size as usize);
guest.read(offset, &mut data).await?;

let dl = data.into_vec();
Expand Down Expand Up @@ -1576,8 +1579,8 @@ async fn generic_workload(
guest.write(offset, data).await?;
} else {
// Read (+ verify)
let length: usize = size * ri.block_size as usize;
let mut data = crucible::Buffer::from_vec(vec![255; length]);
let mut data =
crucible::Buffer::repeat(255, size, ri.block_size as usize);
if !quiet {
match wtq {
WhenToQuit::Count { count } => {
Expand All @@ -1602,6 +1605,7 @@ async fn generic_workload(
}
guest.read(offset, &mut data).await?;

let data_len = data.len();
let dl = data.into_vec();
match validate_vec(
dl,
Expand All @@ -1611,7 +1615,7 @@ async fn generic_workload(
false,
) {
ValidateStatus::Bad | ValidateStatus::InRange => {
bail!("Verify Error at {} len:{}", block_index, length);
bail!("Verify Error at {block_index} len:{data_len}")
}
ValidateStatus::Good => {}
}
Expand Down Expand Up @@ -2065,8 +2069,9 @@ async fn perf_workload(
})
.collect();

let mut read_buffers: Vec<Buffer> =
(0..io_depth).map(|_| Buffer::new(io_size)).collect();
let mut read_buffers: Vec<Buffer> = (0..io_depth)
.map(|_| Buffer::new(blocks_per_io, ri.block_size as usize))
.collect();

let es = ri.extent_size.value;
let ec = ri.total_blocks as u64 / es;
Expand Down Expand Up @@ -2229,8 +2234,7 @@ async fn one_workload(guest: &Arc<Guest>, ri: &mut RegionInfo) -> Result<()> {

guest.write(offset, data).await?;

let length: usize = size * ri.block_size as usize;
let mut data = crucible::Buffer::from_vec(vec![255; length]);
let mut data = crucible::Buffer::repeat(255, size, ri.block_size as usize);

println!("Read at block {:5}, len:{:7}", offset.value, data.len());
guest.read(offset, &mut data).await?;
Expand Down Expand Up @@ -2383,8 +2387,8 @@ async fn write_flush_read_workload(

guest.flush(None).await?;

let length: usize = size * ri.block_size as usize;
let mut data = crucible::Buffer::from_vec(vec![255; length]);
let mut data =
crucible::Buffer::repeat(255, size, ri.block_size as usize);
guest.read(offset, &mut data).await?;

let dl = data.into_vec();
Expand Down Expand Up @@ -2544,8 +2548,8 @@ async fn repair_workload(
guest.write(offset, data).await?;
} else {
// Read
let length: usize = size * ri.block_size as usize;
let mut data = crucible::Buffer::from_vec(vec![255; length]);
let mut data =
crucible::Buffer::repeat(255, size, ri.block_size as usize);
println!(
"{:>0width$}/{:>0width$} Read \
block {:>bw$} len {:>sw$}",
Expand Down Expand Up @@ -2626,13 +2630,12 @@ async fn demo_workload(
futures.push(future);
} else {
// Read
let length: usize = size * ri.block_size as usize;

let block_size = ri.block_size as usize;
let future = {
let guest = guest.clone();
tokio::spawn(async move {
let mut data =
crucible::Buffer::from_vec(vec![255; length]);
crucible::Buffer::repeat(255, size, block_size);
guest.read(offset, &mut data).await?;
Ok(data)
})
Expand Down Expand Up @@ -2699,8 +2702,7 @@ async fn span_workload(guest: &Arc<Guest>, ri: &mut RegionInfo) -> Result<()> {
println!("Sending a flush");
guest.flush(None).await?;

let length: usize = 2 * ri.block_size as usize;
let mut data = crucible::Buffer::from_vec(vec![99; length]);
let mut data = crucible::Buffer::repeat(99, 2, ri.block_size as usize);

println!("Sending a read spanning two extents");
guest.read(offset, &mut data).await?;
Expand Down Expand Up @@ -2739,8 +2741,7 @@ async fn big_workload(guest: &Arc<Guest>, ri: &mut RegionInfo) -> Result<()> {

guest.flush(None).await?;

let length: usize = ri.block_size as usize;
let mut data = crucible::Buffer::from_vec(vec![255; length]);
let mut data = crucible::Buffer::repeat(255, 1, ri.block_size as usize);
guest.read(offset, &mut data).await?;

let dl = data.into_vec();
Expand Down Expand Up @@ -2871,7 +2872,7 @@ async fn dep_workload(guest: &Arc<Guest>, ri: &mut RegionInfo) -> Result<()> {
futures.push(future);
} else {
let mut data =
crucible::Buffer::from_vec(vec![0; ri.block_size as usize]);
crucible::Buffer::repeat(0, 1, ri.block_size as usize);

println!(
"Loop:{} send read {} @ offset:{} len:{}",
Expand Down
Loading

0 comments on commit 627f3c9

Please sign in to comment.