Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enforce buffer alignment #1106

Merged
merged 3 commits into from
Jan 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 76 additions & 13 deletions upstairs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1475,7 +1475,7 @@ impl fmt::Display for AckStatus {
}

/*
* Provides a shared Buffer that Read operations will write into.
* Provides a strongly-owned Buffer that Read operations will write into.
*
* Originally BytesMut was used here, but it didn't guarantee that memory was
* shared between cloned BytesMut objects. Additionally, we added the idea of
Expand All @@ -1496,6 +1496,11 @@ impl fmt::Display for AckStatus {
* Because persistence is required, ownership will always come from the
* Downstairs (or other `BlockIO` implementations that persist ownership
* information) and be propagated "up".
*
* The `Buffer` is a block-oriented data structure: any functions which read or
* write to it's data must be block-aligned and block sized. Otherwise, these
* functions will panic. Any function which panics also notes those conditions
* in its docstring.
*/
#[must_use]
#[derive(Debug, PartialEq, Default)]
Expand Down Expand Up @@ -1551,20 +1556,42 @@ impl Buffer {
self.data.is_empty()
}

pub fn write(&mut self, offset: usize, data: &[u8]) {
/// Writes data to the buffer, setting `owned` to `true`
///
/// # Panics
/// - The offset must be block-aligned
/// - The data length must be divisible by block size
/// - Data cannot exceed the buffer's length
///
/// If any of these conditions are not met, the function will panic.
pub(crate) fn write(&mut self, offset: usize, data: &[u8]) {
assert!(offset + data.len() <= self.data.len());
assert_eq!(offset % self.block_size, 0);
assert_eq!(data.len() % self.block_size, 0);

self.data[offset..][..data.len()].copy_from_slice(data);
self.owned[offset..][..data.len()].fill(true);
}

pub fn write_with_ownership(
/// Writes both data and ownership to the buffer
///
/// # Panics
/// - The offset must be block-aligned
/// - The data length must be divisible by block size
/// - Data cannot exceed the buffer's length
/// - `data` and `owned` must be the same size
///
/// If any of these conditions are not met, the function will panic.
pub(crate) fn write_with_ownership(
&mut self,
offset: usize,
data: &[u8],
owned: &[bool],
) {
assert!(offset + data.len() <= self.data.len());
assert_eq!(data.len(), owned.len());
assert_eq!(data.len() % self.block_size, 0);
assert_eq!(offset % self.block_size, 0);
for i in 0..data.len() {
if owned[i] {
self.data[offset + i] = data[i];
Expand All @@ -1573,34 +1600,67 @@ impl Buffer {
}
}

pub fn write_read_response(
/// Writes a `ReadResponse` into the buffer, setting `owned` to true
///
/// # Panics
/// - The offset must be block-aligned
/// - The response data length must be divisible by block size
/// - Data cannot exceed the buffer's length
///
/// If any of these conditions are not met, the function will panic.
pub(crate) fn write_read_response(
&mut self,
offset: usize,
response: &ReadResponse,
) {
assert!(offset + response.data.len() <= self.data.len());
assert_eq!(offset % self.block_size, 0);
assert_eq!(response.data.len(), self.block_size);
if !response.block_contexts.is_empty() {
self.data[offset..][..response.data.len()]
.copy_from_slice(&response.data);
self.owned[offset..][..response.data.len()].fill(true);
}
}

pub fn read(&self, offset: usize, data: &mut [u8]) {
/// Reads buffer data into the given array
///
/// Values without `self.owned` are left unmodified
///
/// # Panics
/// - The offset must be block-aligned
/// - The response data length must be divisible by block size
/// - Data cannot exceed the buffer's length
///
/// If any of these conditions are not met, the function will panic.
pub(crate) fn read(&self, offset: usize, data: &mut [u8]) {
assert!(offset + data.len() <= self.data.len());
assert_eq!(offset % self.block_size, 0);
assert_eq!(data.len() % self.block_size, 0);
for (i, d) in data.iter_mut().enumerate() {
if self.owned[offset + i] {
*d = self.data[offset + i];
}
}
}

/// Consumes the buffer and returns a `Bytes` object
///
/// The allocation from the `Vec<u8>` is reused for efficiency
pub fn into_bytes(self) -> Bytes {
Bytes::from(self.data)
}

/// Consume and layer buffer contents on top of this one
pub fn eat(&mut self, offset: usize, buffer: &mut Buffer) {
///
/// # Panics
/// - The offset must be block-aligned
/// - Both buffers must have the same block size
///
/// If either of these conditions is not met, the function will panic
pub(crate) fn eat(&mut self, offset: usize, buffer: &mut Buffer) {
assert_eq!(offset % self.block_size, 0);
assert_eq!(self.block_size, buffer.block_size);
for (i, (data, owned)) in
std::iter::zip(&buffer.data, &buffer.owned).enumerate()
{
Expand Down Expand Up @@ -1666,8 +1726,9 @@ fn test_buffer_len_over_block_size() {

#[test]
fn test_buffer_writes() {
const READ_SIZE: usize = 512;
let mut data = Buffer::new(1, READ_SIZE);
const BLOCK_COUNT: usize = 8;
const BLOCK_SIZE: usize = 64;
let mut data = Buffer::new(BLOCK_COUNT, BLOCK_SIZE); // 512 bytes

assert_eq!(&data[..], &vec![0u8; 512]);

Expand All @@ -1687,25 +1748,27 @@ fn test_buffer_writes() {

#[test]
fn test_buffer_eats() {
const READ_SIZE: usize = 512;
let mut data = Buffer::new(1, READ_SIZE);
// We use an artificially low BLOCK_SIZE here for ease of alignment
const BLOCK_COUNT: usize = 8;
const BLOCK_SIZE: usize = 64;
let mut data = Buffer::new(BLOCK_COUNT, BLOCK_SIZE); // 512 bytes

assert_eq!(&data[..], &vec![0u8; 512]);

let mut buffer = Buffer::new(1, READ_SIZE);
let mut buffer = Buffer::new(BLOCK_COUNT, BLOCK_SIZE);
buffer.eat(0, &mut data);

assert_eq!(&buffer[..], &vec![0u8; 512]);

let mut data = Buffer::new(1, READ_SIZE);
let mut data = Buffer::new(BLOCK_COUNT, BLOCK_SIZE);
data.write(64, &[1u8; 64]);
buffer.eat(0, &mut data);

assert_eq!(&buffer[0..64], &vec![0u8; 64]);
assert_eq!(&buffer[64..128], &vec![1u8; 64]);
assert_eq!(&buffer[128..], &vec![0u8; 512 - 64 - 64]);

let mut data = Buffer::new(1, READ_SIZE);
let mut data = Buffer::new(BLOCK_COUNT, BLOCK_SIZE);
data.write(128, &[7u8; 128]);
buffer.eat(0, &mut data);

Expand Down
69 changes: 65 additions & 4 deletions upstairs/src/pseudo_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,15 +102,76 @@ impl IOSpan {
}

#[instrument]
pub fn read_from_blocks_into_buffer(&self, data: &mut [u8]) {
pub fn read_from_blocks_into_buffer(&self, mut data: &mut [u8]) {
assert_eq!(data.len(), self.sz as usize);
self.buffer.read(self.phase as usize, data);

if self.phase % self.block_size == 0
&& data.len() % self.block_size as usize == 0
{
self.buffer.read(self.phase as usize, data);
} else {
let mut pos = self.phase as usize;
let bs = self.block_size as usize;
while !data.is_empty() {
if pos % bs == 0 && data.len() >= bs {
// Read as many integral blocks as we can get
let n = (data.len() / bs) * bs;
let (chunk, next) = data.split_at_mut(n);
self.buffer.read(pos, chunk);
data = next;
pos += n;
} else {
let mut scratch = vec![0u8; bs];
self.buffer.read((pos / bs) * bs, &mut scratch);

// Patch the relevant chunk from the incoming data
let n = (self.sz as usize - (pos - self.phase as usize))
.min(bs - (pos % bs));
let (chunk, next) = data.split_at_mut(n);
chunk.copy_from_slice(&scratch[pos % bs..][..n]);
data = next;
pos += n;
}
}
}
}

#[instrument]
pub fn write_from_buffer_into_blocks(&mut self, data: &[u8]) {
pub fn write_from_buffer_into_blocks(&mut self, mut data: &[u8]) {
assert_eq!(data.len(), self.sz as usize);
self.buffer.write(self.phase as usize, data);
if self.phase % self.block_size == 0
&& data.len() % self.block_size as usize == 0
{
self.buffer.write(self.phase as usize, data);
} else {
let mut pos = self.phase as usize;
let bs = self.block_size as usize;
while !data.is_empty() {
if pos % bs == 0 && data.len() >= bs {
// Write as many integral blocks as we can get
let n = (data.len() / bs) * bs;
let (chunk, next) = data.split_at(n);
self.buffer.write(pos, chunk);
data = next;
pos += n;
} else {
// Read the relevant block from the buffer
let mut scratch = vec![0u8; bs];
self.buffer.read((pos / bs) * bs, &mut scratch);

// Patch the relevant chunk from the incoming data
let n = (self.sz as usize - (pos - self.phase as usize))
.min(bs - (pos % bs));
let (chunk, next) = data.split_at(n);
scratch[pos % bs..][..n].copy_from_slice(chunk);

// Write it back to the buffer
self.buffer.write((pos / bs) * bs, &scratch);
data = next;
pos += n;
}
}
}
}
}

Expand Down
Loading