diff --git a/upstairs/src/lib.rs b/upstairs/src/lib.rs index 7d60306bf..db4256402 100644 --- a/upstairs/src/lib.rs +++ b/upstairs/src/lib.rs @@ -1475,7 +1475,7 @@ impl fmt::Display for AckStatus { } /* - * Provides a shared Buffer that Read operations will write into. + * Provides a strongly-owned Buffer that Read operations will write into. * * Originally BytesMut was used here, but it didn't guarantee that memory was * shared between cloned BytesMut objects. Additionally, we added the idea of @@ -1496,6 +1496,11 @@ impl fmt::Display for AckStatus { * Because persistence is required, ownership will always come from the * Downstairs (or other `BlockIO` implementations that persist ownership * information) and be propagated "up". + * + * The `Buffer` is a block-oriented data structure: any functions which read or + * write to it's data must be block-aligned and block sized. Otherwise, these + * functions will panic. Any function which panics also notes those conditions + * in its docstring. */ #[must_use] #[derive(Debug, PartialEq, Default)] @@ -1551,20 +1556,42 @@ impl Buffer { self.data.is_empty() } - pub fn write(&mut self, offset: usize, data: &[u8]) { + /// Writes data to the buffer, setting `owned` to `true` + /// + /// # Panics + /// - The offset must be block-aligned + /// - The data length must be divisible by block size + /// - Data cannot exceed the buffer's length + /// + /// If any of these conditions are not met, the function will panic. + pub(crate) fn write(&mut self, offset: usize, data: &[u8]) { assert!(offset + data.len() <= self.data.len()); + assert_eq!(offset % self.block_size, 0); + assert_eq!(data.len() % self.block_size, 0); self.data[offset..][..data.len()].copy_from_slice(data); self.owned[offset..][..data.len()].fill(true); } - pub fn write_with_ownership( + /// Writes both data and ownership to the buffer + /// + /// # Panics + /// - The offset must be block-aligned + /// - The data length must be divisible by block size + /// - Data cannot exceed the buffer's length + /// - `data` and `owned` must be the same size + /// + /// If any of these conditions are not met, the function will panic. + pub(crate) fn write_with_ownership( &mut self, offset: usize, data: &[u8], owned: &[bool], ) { assert!(offset + data.len() <= self.data.len()); + assert_eq!(data.len(), owned.len()); + assert_eq!(data.len() % self.block_size, 0); + assert_eq!(offset % self.block_size, 0); for i in 0..data.len() { if owned[i] { self.data[offset + i] = data[i]; @@ -1573,12 +1600,22 @@ impl Buffer { } } - pub fn write_read_response( + /// Writes a `ReadResponse` into the buffer, setting `owned` to true + /// + /// # Panics + /// - The offset must be block-aligned + /// - The response data length must be divisible by block size + /// - Data cannot exceed the buffer's length + /// + /// If any of these conditions are not met, the function will panic. + pub(crate) fn write_read_response( &mut self, offset: usize, response: &ReadResponse, ) { assert!(offset + response.data.len() <= self.data.len()); + assert_eq!(offset % self.block_size, 0); + assert_eq!(response.data.len(), self.block_size); if !response.block_contexts.is_empty() { self.data[offset..][..response.data.len()] .copy_from_slice(&response.data); @@ -1586,8 +1623,20 @@ impl Buffer { } } - pub fn read(&self, offset: usize, data: &mut [u8]) { + /// Reads buffer data into the given array + /// + /// Values without `self.owned` are left unmodified + /// + /// # Panics + /// - The offset must be block-aligned + /// - The response data length must be divisible by block size + /// - Data cannot exceed the buffer's length + /// + /// If any of these conditions are not met, the function will panic. + pub(crate) fn read(&self, offset: usize, data: &mut [u8]) { assert!(offset + data.len() <= self.data.len()); + assert_eq!(offset % self.block_size, 0); + assert_eq!(data.len() % self.block_size, 0); for (i, d) in data.iter_mut().enumerate() { if self.owned[offset + i] { *d = self.data[offset + i]; @@ -1595,12 +1644,23 @@ impl Buffer { } } + /// Consumes the buffer and returns a `Bytes` object + /// + /// The allocation from the `Vec` is reused for efficiency pub fn into_bytes(self) -> Bytes { Bytes::from(self.data) } /// Consume and layer buffer contents on top of this one - pub fn eat(&mut self, offset: usize, buffer: &mut Buffer) { + /// + /// # Panics + /// - The offset must be block-aligned + /// - Both buffers must have the same block size + /// + /// If either of these conditions is not met, the function will panic + pub(crate) fn eat(&mut self, offset: usize, buffer: &mut Buffer) { + assert_eq!(offset % self.block_size, 0); + assert_eq!(self.block_size, buffer.block_size); for (i, (data, owned)) in std::iter::zip(&buffer.data, &buffer.owned).enumerate() { @@ -1666,8 +1726,9 @@ fn test_buffer_len_over_block_size() { #[test] fn test_buffer_writes() { - const READ_SIZE: usize = 512; - let mut data = Buffer::new(1, READ_SIZE); + const BLOCK_COUNT: usize = 8; + const BLOCK_SIZE: usize = 64; + let mut data = Buffer::new(BLOCK_COUNT, BLOCK_SIZE); // 512 bytes assert_eq!(&data[..], &vec![0u8; 512]); @@ -1687,17 +1748,19 @@ fn test_buffer_writes() { #[test] fn test_buffer_eats() { - const READ_SIZE: usize = 512; - let mut data = Buffer::new(1, READ_SIZE); + // We use an artificially low BLOCK_SIZE here for ease of alignment + const BLOCK_COUNT: usize = 8; + const BLOCK_SIZE: usize = 64; + let mut data = Buffer::new(BLOCK_COUNT, BLOCK_SIZE); // 512 bytes assert_eq!(&data[..], &vec![0u8; 512]); - let mut buffer = Buffer::new(1, READ_SIZE); + let mut buffer = Buffer::new(BLOCK_COUNT, BLOCK_SIZE); buffer.eat(0, &mut data); assert_eq!(&buffer[..], &vec![0u8; 512]); - let mut data = Buffer::new(1, READ_SIZE); + let mut data = Buffer::new(BLOCK_COUNT, BLOCK_SIZE); data.write(64, &[1u8; 64]); buffer.eat(0, &mut data); @@ -1705,7 +1768,7 @@ fn test_buffer_eats() { assert_eq!(&buffer[64..128], &vec![1u8; 64]); assert_eq!(&buffer[128..], &vec![0u8; 512 - 64 - 64]); - let mut data = Buffer::new(1, READ_SIZE); + let mut data = Buffer::new(BLOCK_COUNT, BLOCK_SIZE); data.write(128, &[7u8; 128]); buffer.eat(0, &mut data); diff --git a/upstairs/src/pseudo_file.rs b/upstairs/src/pseudo_file.rs index c98f40d2c..5327750d6 100644 --- a/upstairs/src/pseudo_file.rs +++ b/upstairs/src/pseudo_file.rs @@ -102,15 +102,76 @@ impl IOSpan { } #[instrument] - pub fn read_from_blocks_into_buffer(&self, data: &mut [u8]) { + pub fn read_from_blocks_into_buffer(&self, mut data: &mut [u8]) { assert_eq!(data.len(), self.sz as usize); - self.buffer.read(self.phase as usize, data); + + if self.phase % self.block_size == 0 + && data.len() % self.block_size as usize == 0 + { + self.buffer.read(self.phase as usize, data); + } else { + let mut pos = self.phase as usize; + let bs = self.block_size as usize; + while !data.is_empty() { + if pos % bs == 0 && data.len() >= bs { + // Read as many integral blocks as we can get + let n = (data.len() / bs) * bs; + let (chunk, next) = data.split_at_mut(n); + self.buffer.read(pos, chunk); + data = next; + pos += n; + } else { + let mut scratch = vec![0u8; bs]; + self.buffer.read((pos / bs) * bs, &mut scratch); + + // Patch the relevant chunk from the incoming data + let n = (self.sz as usize - (pos - self.phase as usize)) + .min(bs - (pos % bs)); + let (chunk, next) = data.split_at_mut(n); + chunk.copy_from_slice(&scratch[pos % bs..][..n]); + data = next; + pos += n; + } + } + } } #[instrument] - pub fn write_from_buffer_into_blocks(&mut self, data: &[u8]) { + pub fn write_from_buffer_into_blocks(&mut self, mut data: &[u8]) { assert_eq!(data.len(), self.sz as usize); - self.buffer.write(self.phase as usize, data); + if self.phase % self.block_size == 0 + && data.len() % self.block_size as usize == 0 + { + self.buffer.write(self.phase as usize, data); + } else { + let mut pos = self.phase as usize; + let bs = self.block_size as usize; + while !data.is_empty() { + if pos % bs == 0 && data.len() >= bs { + // Write as many integral blocks as we can get + let n = (data.len() / bs) * bs; + let (chunk, next) = data.split_at(n); + self.buffer.write(pos, chunk); + data = next; + pos += n; + } else { + // Read the relevant block from the buffer + let mut scratch = vec![0u8; bs]; + self.buffer.read((pos / bs) * bs, &mut scratch); + + // Patch the relevant chunk from the incoming data + let n = (self.sz as usize - (pos - self.phase as usize)) + .min(bs - (pos % bs)); + let (chunk, next) = data.split_at(n); + scratch[pos % bs..][..n].copy_from_slice(chunk); + + // Write it back to the buffer + self.buffer.write((pos / bs) * bs, &scratch); + data = next; + pos += n; + } + } + } } }