diff --git a/utils/buffer/README.md b/utils/buffer/README.md index 39ac180e2..1e0f0132f 100644 --- a/utils/buffer/README.md +++ b/utils/buffer/README.md @@ -1,106 +1,151 @@ -# BufferPool +# `buffer_sv2` -This crate provides a `Write` trait used to replace `std::io::Write` in a non_std environment a `Buffer` -trait and two implementations of `Buffer`: `BufferFromSystemMemory` and `BufferPool`. +[![crates.io](https://img.shields.io/crates/v/buffer_sv2.svg)](https://crates.io/crates/buffer_sv2) +[![docs.rs](https://docs.rs/buffer_sv2/badge.svg)](https://docs.rs/buffer_sv2) +[![rustc+](https://img.shields.io/badge/rustc-1.75.0%2B-lightgrey.svg)](https://blog.rust-lang.org/2023/12/28/Rust-1.75.0.html) +[![license](https://img.shields.io/badge/license-MIT%2FApache--2.0-blue.svg)](https://github.com/stratum-mining/stratum/blob/main/LICENSE.md) +[![codecov](https://codecov.io/gh/stratum-mining/stratum/branch/main/graph/badge.svg?flag=buffer_sv2-coverage)](https://codecov.io/gh/stratum-mining/stratum) -## Intro -`BufferPool` is useful whenever we need to work with buffers sequentially (fill a buffer, get -the filled buffer, fill a new buffer, get the filled buffer, and so on). +`buffer_sv2` handles memory management for Stratum V2 (Sv2) roles. It provides a memory-efficient +buffer pool that minimizes allocations and deallocations for high-throughput message frame +processing in Sv2 roles. Memory allocation overhead is minimized by reusing large buffers, +improving performance and reducing latency. The buffer pool tracks the usage of memory slices, +using shared state tracking to safely manage memory across multiple threads. -To fill a buffer `BufferPool` returns an `&mut [u8]` with the requested len (the filling part). -When the buffer is filled and the owner needs to be changed, `BufferPool` returns a `Slice` that -implements `Send` and `AsMut` (the get part). +## Main Components -`BufferPool` pre-allocates a user-defined capacity in the heap and use it to allocate the buffers, -when a `Slice` is dropped `BufferPool` acknowledge it and reuse the freed space in the pre-allocated -memory. +- **Buffer Trait**: An interface for working with memory buffers. This trait has two implementations + (`BufferPool` and `BufferFromSystemMemory`) that includes a `Write` trait to replace + `std::io::Write` in `no_std` environments. +- **BufferPool**: A thread-safe pool of reusable memory buffers for high-throughput applications. +- **BufferFromSystemMemory**: Manages a dynamically growing buffer in system memory for applications + where performance is not a concern. +- **Slice**: A contiguous block of memory, either preallocated or dynamically allocated. -## Implementation -The crate is `[no_std]` and lock-free, so, to synchronize the state of the pre-allocated memory -(taken or free) between different contexts an `AtomicU8` is used. -Each bit of the `u8` represent a memory slot, if the bit is 0 the memory slot is free if is -1 is taken. Whenever `BufferPool` creates a `Slice` a bit is set to 1 and whenever a `Slice` is -dropped a bit is set to 0. +## Usage -## Use case -`BufferPool` has been developed to be used in proxies with thousand of connection, each connection -must parse a particular data format via a `Decoder` each decoder use 1 or 2 `Buffer` for each received -message. With `BufferPool` each connection can be instantiated with its own `BufferPool` and reuse -the space freed by old messages for new ones. +To include this crate in your project, run: -## Unsafe -There are 5 unsafes: -buffer_pool/mod.rs 550 -slice.rs 8 -slice.rs 27 +```bash +cargo add buffer_sv2 +``` + +This crate can be built with the following feature flags: + +- `debug`: Provides additional tracking for debugging memory management issues. +- `fuzz`: Enables support for fuzz testing. +- `with_serde`: builds [`binary_sv2`](https://crates.io/crates/binary_sv2) and + [`buffer_sv2`](https://crates.io/crates/buffer_sv2) crates with `serde`-based encoding and + decoding. Note that this feature flag is only used for the Message Generator, and deprecated + for any other kind of usage. It will likely be fully deprecated in the future. + +### Unsafe Code +There are four unsafe code blocks instances: + +- `buffer_pool/mod.rs`: `fn get_writable_(&mut self, len: usize, shared_state: u8, without_check: bool) -> &mut [u8] { .. }` in the `impl BufferPool` +- `slice.rs`: + - `unsafe impl Send for Slice {}` + - `fn as_mut(&mut self) -> &mut [u8] { .. }` in the `impl AsMut<[u8]> for Slice` + - `fn as_ref(&mut self) -> &mut [u8] { .. }` in the `impl AsMut<[u8]> for Slice` + +### Examples + +This crate provides three examples demonstrating how the memory is managed: + +1. **[Basic Usage Example](https://github.com/stratum-mining/stratum/blob/main/utils/buffer/examples/basic_buffer_pool.rs)**: + Creates a buffer pool, writes to it, and retrieves the data from it. + +2. **[Buffer Pool Exhaustion Example](https://github.com/stratum-mining/stratum/blob/main/utils/buffer/examples/buffer_pool_exhaustion.rs)**: + Demonstrates how data is added to a buffer pool and dynamically allocates directly to the heap + once the buffer pool's capacity has been exhausted. -## Write -Waiting for `Write` in `core` a compatible trait is used so that it can be replaced. +3. **[Variable Sized Messages Example](https://github.com/stratum-mining/stratum/blob/main/utils/buffer/examples/variable_sized_messages.rs)**: + Writes messages of variable sizes to the buffer pool. -## Buffer -The `Buffer` trait has been written to work with `codec_sv2::Decoder`. +## `Buffer` Trait -`codec_sv2::Decoder` works by: -1. fill a buffer of the size of the header of the protocol that is decoding -2. parse the filled bytes and compute the message length -3. fill a buffer of the size of the message -4. use the header and the message to construct a `frame_sv2::Frame` +The `Buffer` trait is designed to work with the +[`codec_sv2`](https://docs.rs/codec_sv2/1.3.0/codec_sv2/index.html) decoders, which operate by: -To fill the buffer `Decoder` must pass a reference of the buffer to a filler. In order -to construct a `Frame` the `Decoder` must pass the ownership of the buffer to `Frame`. +1. Filling a buffer with the size of the protocol header being decoded. +2. Parsing the filled bytes to compute the message length. +3. Filling a buffer with the size of the message. +4. Using the header and message to construct a + [`framing_sv2::framing::Frame`](https://docs.rs/framing_sv2/2.0.0/framing_sv2/framing/enum.Frame.html). + +To fill the buffer, the `codec_sv2` decoder must pass a reference of the buffer to a filler. To +construct a `Frame`, the decoder must pass ownership of the buffer to the `Frame`. ```rust -get_writable(&mut self, len: usize) -> &mut [u8] +fn get_writable(&mut self, len: usize) -> &mut [u8]; ``` -Return a mutable reference to the buffer, starting at buffer length and ending at buffer length + `len`. -and set buffer len at previous len + len. + +This `get_writable` method returns a mutable reference to the buffer, starting at the current +length and ending at `len`, and sets the buffer length to the previous length plus `len`. ```rust -get_data_owned(&mut self) -> Slice { +get_data_owned(&mut self) -> Slice; ``` -It returns `Slice`: something that implements `AsMut[u8]` and `Send`, and sets the buffer len to 0. -## BufferFromSystemMemory -Is the simplest implementation of a `Buffer`: each time that a new buffer is needed it create a new -`Vec`. +This `get_data_owned` method returns a `Slice` that implements `AsMut<[u8]>` and `Send`. -`get_writable(..)` returns mutable references to the inner vector. +The `Buffer` trait is implemented for `BufferFromSystemMemory` and `BufferPool`. It includes a +`Write` trait to replace `std::io::Write` in `no_std` environments. -`get_data_owned(..)` returns the inner vector. +## `BufferPoolFromSystemMemory` +`BufferFromSystemMemory` is a simple implementation of the `Buffer` trait. Each time a new buffer is +needed, it creates a new `Vec`. +- `get_writable(..)` returns mutable references to the inner vector. +- `get_data_owned(..)` returns the inner vector. -## BufferPool -Usually `BufferFromSystemMemory` should be enough, but sometimes it is better to use something faster. +## `BufferPool` +While `BufferFromSystemMemory` is sufficient for many cases, `BufferPool` offers a more efficient +solution for high-performance applications, such as proxies and pools with thousands of connections. -For each Sv2 connection, there is a `Decoder` and for each decoder, there are 1 or 2 buffers. +When created, `BufferPool` preallocates a user-defined capacity of bytes in the heap using a +`Vec`. When `get_data_owned(..)` is called, it creates a `Slice` that contains a view into the +preallocated memory. `BufferPool` guarantees that slices never overlap and maintains unique +ownership of each `Slice`. -Proxies and pools with thousands of connections should use `Decoder` rather than -`Decoder` +`Slice` implements the `Drop`, allowing the view into the preallocated memory to be reused upon +dropping. -`BufferPool` when created preallocate a user-defined capacity of bytes in the heap using a -`Vec`, then when `get_data_owned(..)` is called it create a `Slice` that contains a view into -the preallocated memory. `BufferPool` guarantees that slices never overlap and the uniqueness of -the `Slice` ownership. +### Buffer Management and Allocation -`Slice` implements `Drop` so that the view into the preallocated memory can be reused. +`BufferPool` is useful for working with sequentially processed buffers, such as filling a buffer, +retrieving it, and then reusing it as needed. `BufferPool` optimizes for memory reuse by providing +pre-allocated memory that can be used in one of three modes: -### Fragmentation overflow and optimization -`BufferPool` can allocate a maximum of 8 `Slices` (cause it uses an `AtomicU8` to keep track of the -used and freed slots) and at maximum `capacity` bytes. Whenever all the 8 slots are tacked or there -is no more space on the preallocated memory `BufferPool` failover to a `BufferFromSystemMemory`. +1. **Back Mode**: Default mode where allocations start from the back of the buffer. +2. **Front Mode**: Used when slots at the back are full but memory can still be reused by moving to + the front. +3. **Alloc Mode**: Falls back to system memory allocation (`BufferFromSystemMemory`) when both back + and front sections are full, providing additional capacity but with reduced performance. -Usually, a message is decoded then a response is sent then a new message is decoded, etc. -So `BufferPool` is optimized for use all the slots then check if the first slot has been dropped -If so use it, then check if the second slot has been dropped, and so on. -`BufferPool` is also optimized to drop all the slices. -`BufferPool` is also optimized to drop the last slice. +`BufferPool` can only be fragmented between the front and back and between back and end. -Below a graphical representation of the most optimized cases: -``` -A number [0f] means that the slot is taken, the minus symbol (-) means the slot is free -There are 8 slot +#### Fragmentation, Overflow, and Optimization +`BufferPool` can allocate a maximum of `8` `Slice`s (as it uses an `AtomicU8` to track used and +freed slots) and up to the defined capacity in bytes. If all `8` slots are taken or there is no more +space in the preallocated memory, `BufferPool` falls back to `BufferFromSystemMemory`. + +Typically, `BufferPool` is used to process messages sequentially (decode, respond, decode). It is +optimized to check for any freed slots starting from the beginning, then reuse these before +considering further allocation. It is also optimized to drop all the slices and to drop the last +slice. It also efficiently handles scenarios where all slices are dropped or when the last slice is +released, reducing memory fragmentation. + +The following cases illustrate typical memory usage patterns within `BufferPool`: +1. Slots fill from back to front, switching as each area reaches capacity. +2. Pool resets upon full usage, then reuses back slots. +3. After filling the back, front slots are used when they become available. -CASE 1 +Below is a graphical representation of the most optimized cases. A number means that the slot is +taken, the minus symbol (`-`) means the slot is free. There are `8` slots. + +Case 1: Buffer pool exhaustion +``` -------- BACK MODE 1------- BACK MODE 12------ BACK MODE @@ -109,14 +154,14 @@ CASE 1 12345--- BACK MODE 123456-- BACK MODE 1234567- BACK MODE -12345678 BACK MODE -12345698 BACK MODE -1234569a BACK MODE -123456ba BACK MODE -123456ca BACK MODE +12345678 BACK MODE (buffer is now full) +12345678 ALLOC MODE (new bytes being allocated in a new space in the heap) +12345678 ALLOC MODE (new bytes being allocated in a new space in the heap) ..... and so on +``` -CASE 2 +Case 2: Buffer pool reset to remain in back mode +``` -------- BACK MODE 1------- BACK MODE 12------ BACK MODE @@ -125,52 +170,83 @@ CASE 2 12345--- BACK MODE 123456-- BACK MODE 1234567- BACK MODE -12345678 BACK MODE +12345678 BACK MODE (buffer is now full) -------- RESET +9------- BACK MODE 9a------ BACK MODE -9ab----- BACK MODE +``` + +Case 3: Buffer pool switches from back to front to back modes +``` +-------- BACK MODE +1------- BACK MODE +12------ BACK MODE +123----- BACK MODE +1234---- BACK MODE +12345--- BACK MODE +123456-- BACK MODE +1234567- BACK MODE +12345678 BACK MODE (buffer is now full) +--345678 Consume first two data bytes from the buffer +-9345678 SWITCH TO FRONT MODE +a9345678 FRONT MODE (buffer is now full) +a93456-- Consume last two data bytes from the buffer +a93456b- SWITCH TO BACK MODE +a93456bc BACK MODE (buffer is now full) +``` -CASE 3 --------- BACK MODE -1------- BACK MODE -12------ BACK MODE -123----- BACK MODE -1234---- BACK MODE -12345--- BACK MODE -123456-- BACK MODE -1234567- BACK MODE -12345678 BACK MODE -12345678 BACK MODE ---345678 SWITCH TO FRONT MODE -92345678 FRONT MODE -9a345678 FRONT MODE -9a3456-- SWITCH TO BACK MODE -9a3456b- BACK MODE -9a3456bc BACK MODE +## Benchmarks and Performance +To run benchmarks, execute: + +``` +cargo bench --features criterion ``` -`BufferPool` can operate in three modalities: -1. Back: it allocates in the back of the inner vector -2. Front: it allocates in the front of the inner vector -3. Alloc: failover to `BufferFromSystemMemory` +## Benchmarks Comparisons -`BufferPool` can be fragmented only between front and back and between back and end. +`BufferPool` is benchmarked against `BufferFromSystemMemory` and two additional structure for +reference: `PPool` (a hashmap-based pool) and `MaxEfficeincy` (a highly optimized but unrealistic +control implementation written such that the benchmarks do not panic and the compiler does not +complain). `BufferPool` generally provides better performance and lower latency than `PPool` and +`BufferFromSystemMemory`. -### Performance +**Note**: Both `PPool` and `MaxEfficeincy` are completely broken and are only useful as references +for the benchmarks. + +### `BENCHES.md` Benchmarks +The `BufferPool` always outperforms the `PPool` (hashmap-based pool) and the solution without a +pool. + +Executed for 2,000 samples: + +``` +* single thread with `BufferPool`: ---------------------------------- 7.5006 ms +* single thread with `BufferFromSystemMemory`: ---------------------- 10.274 ms +* single thread with `PPoll`: --------------------------------------- 32.593 ms +* single thread with `MaxEfficeincy`: ------------------------------- 1.2618 ms +* multi-thread with `BufferPool`: ---------------------------------- 34.660 ms +* multi-thread with `BufferFromSystemMemory`: ---------------------- 142.23 ms +* multi-thread with `PPoll`: --------------------------------------- 49.790 ms +* multi-thread with `MaxEfficeincy`: ------------------------------- 18.201 ms +* multi-thread 2 with `BufferPool`: ---------------------------------- 80.869 ms +* multi-thread 2 with `BufferFromSystemMemory`: ---------------------- 192.24 ms +* multi-thread 2 with `PPoll`: --------------------------------------- 101.75 ms +* multi-thread 2 with `MaxEfficeincy`: ------------------------------- 66.972 ms +``` -To run the benchmarks `cargo bench --features criterion`. +### Single Thread Benchmarks -To have an idea of the performance gains, `BufferPool` is benchmarked against -`BufferFromSystemMemory` and two control structures `PPool` and `MaxEfficeincy`. +If the buffer is not sent to another context `BufferPool`, it is 1.4 times faster than no pool, 4.3 +time faster than the `PPool`, and 5.7 times slower than max efficiency. -`PPool` is a buffer pool implemented with a hashmap and `MaxEfficeincy` is a `Buffer` implemented in the -fastest possible way so that the benches do not panic and the compiler does not complain. Btw they are -both completely broken, useful only as references for the benchmarks. +Average times for 1,000 operations: -The benchmarks are: +- `BufferPool`: 7.5 ms +- `BufferFromSystemMemory`: 10.27 ms +- `PPool`: 32.59 ms +- `MaxEfficiency`: 1.26 ms -#### Single thread ``` for 0..1000: add random bytes to the buffer @@ -178,9 +254,18 @@ for 0..1000: add random bytes to the buffer get the buffer drop the 2 buffer - ``` +``` + +### Multi-Threaded Benchmarks (most similar to actual use case) + +If the buffer is sent to other contexts, `BufferPool` is 4 times faster than no pool, 0.6 times +faster than `PPool`, and 1.8 times slower than max efficiency. + +- `BufferPool`: 34.66 ms +- `BufferFromSystemMemory`: 142.23 ms +- `PPool`: 49.79 ms +- `MaxEfficiency`: 18.20 ms -#### Multi threads (this is the most similar to the actual use case IMHO) ``` for 0..1000: add random bytes to the buffer @@ -189,9 +274,9 @@ for 0..1000: add random bytes to the buffer get the buffer send the buffer to another thread -> wait 1 ms and then drop it - ``` +``` -#### Multi threads 2 +### Multi threads 2 ``` for 0..1000: add random bytes to the buffer @@ -201,54 +286,35 @@ for 0..1000: get the buffer send the buffer to another thread -> wait 1 ms and then drop it wait for the 2 buffer to be dropped - ``` - -#### Test -Some failing cases from fuzz. - -#### From the benchmark in BENCHES.md executed for 2000 samples: -``` -* single thread with `BufferPool`: ---------------------------------- 7.5006 ms -* single thread with `BufferFromSystemMemory`: ---------------------- 10.274 ms -* single thread with `PPoll`: --------------------------------------- 32.593 ms -* single thread with `MaxEfficeincy`: ------------------------------- 1.2618 ms -* multi-thread with `BufferPool`: ---------------------------------- 34.660 ms -* multi-thread with `BufferFromSystemMemory`: ---------------------- 142.23 ms -* multi-thread with `PPoll`: --------------------------------------- 49.790 ms -* multi-thread with `MaxEfficeincy`: ------------------------------- 18.201 ms -* multi-thread 2 with `BufferPool`: ---------------------------------- 80.869 ms -* multi-thread 2 with `BufferFromSystemMemory`: ---------------------- 192.24 ms -* multi-thread 2 with `PPoll`: --------------------------------------- 101.75 ms -* multi-thread 2 with `MaxEfficeincy`: ------------------------------- 66.972 ms ``` -From the above numbers, it results that `BufferPool` always outperform the hashmap buffer pool and -the solution without a pool: - -#### Single thread -If the buffer is not sent to another context `BufferPool` is 1.4 times faster than no pool and 4.3 time -faster than the hashmap pool and 5.7 times slower than max efficiency. +## Fuzz Testing +Install `cargo-fuzz` with: -#### Multi threads -If the buffer is sent to other contexts `BufferPool` is 4 times faster than no pool, 0.6 times faster -than the hashmap pool and 1.8 times slower than max efficiency. +```bash +cargo install cargo-fuzz +``` -### Fuzzy tests -Install cargo fuzz with `cargo install cargo-fuzz` +Run the fuzz tests: -Then do `cd ./fuzz` +```bash +cd ./fuzz +cargo fuzz run slower -- -rss_limit_mb=5000000000 +cargo fuzz run faster -- -rss_limit_mb=5000000000 +``` +The test must be run with `-rss_limit_mb=5000000000` as this flag checks `BufferPool` with +capacities from `0` to `2^32`. -Run them with `cargo fuzz run slower -- -rss_limit_mb=5000000000` and -`cargo fuzz run faster -- -rss_limit_mb=5000000000` +`BufferPool` is fuzz-tested to ensure memory reliability across different scenarios, including +delayed memory release and cross-thread access. The tests checks if slices created by `BufferPool` +still contain the same bytes contained at creation time after a random amount of time and after it +has been sent to other threads. -`BufferPool` is fuzzy tested with `cargo fuzzy`. The test checks if slices created by `BufferPool` -still contain the same bytes contained at creation time after a random amount of time and after been -sent to other threads. There are 2 fuzzy test, the first (faster) it map a smaller input space to -test the most likely inputs, the second (slower) it have a bigger input space to pick "all" the -corner case. The slower also forces the buffer to be sent to different cores. I run both for several -hours without crashes. +There are 2 fuzzy test, the first (faster) it map a smaller input space to +Two main fuzz tests are provided: -The test must be run with `-rss_limit_mb=5000000000` cause they check `BufferPool` with capacities -from 0 to 2^32. +1. Faster: Maps a smaller input space to test the most likely inputs +2. Slower: Has a bigger input space to explore "all" the edge case. It forces the buffer to be sent + to different cores. -(1) TODO check if is always true +Both tests have been run for several hours without crashes. diff --git a/utils/buffer/examples/basic_buffer_pool.rs b/utils/buffer/examples/basic_buffer_pool.rs new file mode 100644 index 000000000..ca559039a --- /dev/null +++ b/utils/buffer/examples/basic_buffer_pool.rs @@ -0,0 +1,40 @@ +// # Simple `BufferPool` Usage +// +// This example showcases how to: +// 1. Creating a `BufferPool`. +// 2. Obtaining a writable buffer. +// 3. Writing data into the buffer. +// 4. Retrieving the data as a referenced slice. +// 5. Retrieving the data as an owned slice. +// +// # Run +// +// ``` +// cargo run --example basic_buffer_pool +// ``` + +use buffer_sv2::{Buffer, BufferPool}; + +fn main() { + // Create a new BufferPool with a capacity of 32 bytes + let mut buffer_pool = BufferPool::new(32); + + // Get a writable buffer from the pool + let data_to_write = b"Ciao, mundo!"; // 12 bytes + let writable = buffer_pool.get_writable(data_to_write.len()); + + // Write data (12 bytes) into the buffer. + writable.copy_from_slice(data_to_write); + assert_eq!(buffer_pool.len(), 12); + + // Retrieve the data as a referenced slice + let _data_slice = buffer_pool.get_data_by_ref(12); + assert_eq!(buffer_pool.len(), 12); + + // Retrieve the data as an owned slice + let data_slice = buffer_pool.get_data_owned(); + assert_eq!(buffer_pool.len(), 0); + + let expect = [67, 105, 97, 111, 44, 32, 109, 117, 110, 100, 111, 33]; // "Ciao, mundo!" ASCII + assert_eq!(data_slice.as_ref(), expect); +} diff --git a/utils/buffer/examples/buffer_pool_exhaustion.rs b/utils/buffer/examples/buffer_pool_exhaustion.rs new file mode 100644 index 000000000..b52f271ae --- /dev/null +++ b/utils/buffer/examples/buffer_pool_exhaustion.rs @@ -0,0 +1,80 @@ +// # Handling Buffer Pool Exhaustion and Heap Allocation +// +// This example demonstrates how a buffer pool is filled. The back slots of the buffer pool are +// exhausted first, followed by the front of the buffer pool. Once both the back and front are +// exhausted, data is allocated on the heap at a performance decrease. +// +// 1. Fills up the back slots of the buffer pool until they’re exhausted. +// 2. Releases one slot to allow the buffer pool to switch to front mode. +// 3. Fully fills the front slots of the buffer pool. +// 4. Switches to alloc mode for direct heap allocation when both the buffer pool's back and front +// slots are at capacity. +// +// Below is a visual representation of how the buffer pool evolves as the example progresses: +// +// -------- BACK MODE +// a------- BACK MODE (add "a" via loop) +// aa------ BACK MODE (add "a" via loop) +// aaa----- BACK MODE (add "a" via loop) +// aaaa---- BACK MODE (add "a" via loop) +// -aaa---- BACK MODE (pop front) +// -aaab--- BACK MODE (add "b") +// -aaabc-- BACK MODE (add "c" via loop) +// -aaabcc- BACK MODE (add "c" via loop) +// -aaabccc BACK MODE (add "c" via loop) +// caaabccc BACK MODE (add "c" via loop, which gets added via front mode) +// caaabccc ALLOC MODE (add "d", allocated in a new space in the heap) +// +// # Run +// +// ``` +// cargo run --example buffer_pool_exhaustion +// ``` + +use buffer_sv2::{Buffer, BufferPool}; +use std::collections::VecDeque; + +fn main() { + // 8 byte capacity + let mut buffer_pool = BufferPool::new(8); + let mut slices = VecDeque::new(); + + // Write data to fill back slots + for _ in 0..4 { + let data_bytes = b"a"; // 1 byte + let writable = buffer_pool.get_writable(data_bytes.len()); // Mutable slice to internal + // buffer + writable.copy_from_slice(data_bytes); + let data_slice = buffer_pool.get_data_owned(); // Take ownership of allocated segment + slices.push_back(data_slice); + } + assert!(buffer_pool.is_back_mode()); + + // Release one slice and add another in the back (one slice in back mode must be free to switch + // to front mode) + slices.pop_front(); // Free the slice's associated segment in the buffer pool + let data_bytes = b"b"; // 1 byte + let writable = buffer_pool.get_writable(data_bytes.len()); + writable.copy_from_slice(data_bytes); + let data_slice = buffer_pool.get_data_owned(); + slices.push_back(data_slice); + assert!(buffer_pool.is_back_mode()); // Still in back mode + + // Write data to switch to front mode + for _ in 0..4 { + let data_bytes = b"c"; // 1 byte + let writable = buffer_pool.get_writable(data_bytes.len()); + writable.copy_from_slice(data_bytes); + let data_slice = buffer_pool.get_data_owned(); + slices.push_back(data_slice); + } + assert!(buffer_pool.is_front_mode()); // Confirm front mode + + // Add another slice, causing a switch to alloc mode + let data_bytes = b"d"; // 1 byte + let writable = buffer_pool.get_writable(data_bytes.len()); + writable.copy_from_slice(data_bytes); + let data_slice = buffer_pool.get_data_owned(); + slices.push_back(data_slice); + assert!(buffer_pool.is_alloc_mode()); +} diff --git a/utils/buffer/examples/variable_sized_messages.rs b/utils/buffer/examples/variable_sized_messages.rs new file mode 100644 index 000000000..922ec7fd3 --- /dev/null +++ b/utils/buffer/examples/variable_sized_messages.rs @@ -0,0 +1,58 @@ +// # Handling Variable-Sized Messages +// +// This example demonstrates how to the `BufferPool` handles messages of varying sizes. +// +// # Run +// +// ``` +// cargo run --example variable_sized_messages +// ``` + +use buffer_sv2::{Buffer, BufferPool}; +use std::collections::VecDeque; + +fn main() { + // Initialize a BufferPool with a capacity of 32 bytes + let mut buffer_pool = BufferPool::new(32); + let mut slices = VecDeque::new(); + + // Function to write data to the buffer pool and store the slice + let write_data = |pool: &mut BufferPool<_>, data: &[u8], slices: &mut VecDeque<_>| { + let writable = pool.get_writable(data.len()); + writable.copy_from_slice(data); + let data_slice = pool.get_data_owned(); + slices.push_back(data_slice); + println!("{:?}", &pool); + println!(""); + }; + + // Write a small message to the first slot + let small_message = b"Hello"; + write_data(&mut buffer_pool, small_message, &mut slices); + assert!(buffer_pool.is_back_mode()); + assert_eq!(slices.back().unwrap().as_ref(), small_message); + + // Write a medium-sized message to the second slot + let medium_message = b"Rust programming"; + write_data(&mut buffer_pool, medium_message, &mut slices); + assert!(buffer_pool.is_back_mode()); + assert_eq!(slices.back().unwrap().as_ref(), medium_message); + + // Write a large message that exceeds the remaining pool capacity + let large_message = b"This message is larger than the remaining buffer pool capacity."; + write_data(&mut buffer_pool, large_message, &mut slices); + assert!(buffer_pool.is_alloc_mode()); + assert_eq!(slices.back().unwrap().as_ref(), large_message); + + while let Some(slice) = slices.pop_front() { + drop(slice); + } + + // Write another small message + let another_small_message = b"Hi"; + write_data(&mut buffer_pool, another_small_message, &mut slices); + assert_eq!(slices.back().unwrap().as_ref(), another_small_message); + + // Verify that the buffer pool has returned to back mode for the last write + assert!(buffer_pool.is_back_mode()); +} diff --git a/utils/buffer/src/buffer.rs b/utils/buffer/src/buffer.rs index f4b0dab50..0a9a020c1 100644 --- a/utils/buffer/src/buffer.rs +++ b/utils/buffer/src/buffer.rs @@ -1,15 +1,41 @@ +// # Buffer from System Memory +// +// Provides memory management for encoding and transmitting message frames between Sv2 roles when +// buffer pools have been exhausted. +// +// `BufferFromSystemMemory` serves as a fallback when a `BufferPool` is full or unable to allocate +// memory fast enough. Instead of relying on pre-allocated memory, it dynamically allocates memory +// on the heap using a `Vec`, ensuring that message frames can still be processed. +// +// This fallback mechanism allows the buffer to resize dynamically based on data needs, making it +// suitable for scenarios where message sizes vary. However, it introduces performance trade-offs +// such as slower allocation, increased memory fragmentation, and higher system overhead compared +// to using pre-allocated buffers. + use crate::Buffer; use aes_gcm::aead::Buffer as AeadBuffer; use alloc::vec::Vec; +/// Manages a dynamically growing buffer in system memory using an internal [`Vec`]. +/// +/// Operates on a dynamically sized buffer and provides utilities for writing, reading, and +/// manipulating data. It tracks the current position where data is written, and resizes the buffer +/// as needed. #[derive(Debug)] pub struct BufferFromSystemMemory { + // Underlying buffer storing the data. inner: Vec, + + // Current cursor indicating where the next byte should be written. cursor: usize, + + // Starting index for the buffer. Useful for scenarios where part of the buffer is skipped or + // invalid. start: usize, } impl BufferFromSystemMemory { + /// Creates a new buffer with no initial data. pub fn new(_: usize) -> Self { Self { inner: Vec::new(), @@ -20,6 +46,7 @@ impl BufferFromSystemMemory { } impl Default for BufferFromSystemMemory { + // Creates a new buffer with no initial data. fn default() -> Self { Self::new(0) } @@ -28,57 +55,81 @@ impl Default for BufferFromSystemMemory { impl Buffer for BufferFromSystemMemory { type Slice = Vec; + // Dynamically allocates or resizes the internal `Vec` to ensure there is enough space for + // writing. #[inline] fn get_writable(&mut self, len: usize) -> &mut [u8] { let cursor = self.cursor; + + // Reserve space in the buffer for writing based on the requested `len` let len = self.cursor + len; + // If the internal buffer is not large enough to hold the new data, resize it if len > self.inner.len() { self.inner.resize(len, 0) }; self.cursor = len; + // Portion of the buffer where data can be written &mut self.inner[cursor..len] } + // Splits off the written portion of the buffer, returning it as a new `Vec`. Swaps the + // internal buffer with a newly allocated empty one, effectively returning ownership of the + // written data while resetting the internal buffer for future use. #[inline] fn get_data_owned(&mut self) -> Vec { + // Split the internal buffer at the cursor position let mut tail = self.inner.split_off(self.cursor); + + // Swap the data after the cursor (tail) with the remaining buffer core::mem::swap(&mut tail, &mut self.inner); + + // Move ownership of the buffer content up to the cursor, resetting the internal buffer + // state for future writes let head = tail; self.cursor = 0; head } + // Returns a mutable reference to the written portion of the internal buffer that has been + // filled up with data, up to the specified length (`len`). #[inline] fn get_data_by_ref(&mut self, len: usize) -> &mut [u8] { &mut self.inner[..usize::min(len, self.cursor)] } + // Returns an immutable reference to the written portion of the internal buffer that has been + // filled up with data, up to the specified length (`len`). #[inline] fn get_data_by_ref_(&self, len: usize) -> &[u8] { &self.inner[..usize::min(len, self.cursor)] } + // Returns the current write position (cursor) in the buffer, representing how much of the + // internal buffer has been filled with data. #[inline] fn len(&self) -> usize { self.cursor } + // Sets the start index for the buffer, adjusting where reads and writes begin. Used to discard + // part of the buffer by adjusting the starting point for future operations. #[inline] fn danger_set_start(&mut self, index: usize) { self.start = index; } + // Indicates that the buffer is always safe to drop, as `Vec` manages memory internally. #[inline] fn is_droppable(&self) -> bool { true } } +// Used to test if `BufferPool` tries to allocate from system memory. #[cfg(test)] -// Used to test if BufferPool try to allocate from system memory pub struct TestBufferFromMemory(pub Vec); #[cfg(test)] @@ -96,6 +147,7 @@ impl Buffer for TestBufferFromMemory { fn get_data_by_ref(&mut self, _len: usize) -> &mut [u8] { &mut self.0[0..0] } + fn get_data_by_ref_(&self, _len: usize) -> &[u8] { &self.0[0..0] } @@ -103,32 +155,47 @@ impl Buffer for TestBufferFromMemory { fn len(&self) -> usize { 0 } + fn danger_set_start(&mut self, _index: usize) { todo!() } + fn is_droppable(&self) -> bool { true } } impl AsRef<[u8]> for BufferFromSystemMemory { + /// Returns a reference to the internal buffer as a byte slice, starting from the specified + /// `start` index. Provides an immutable view into the buffer's contents, allowing it to be + /// used as a regular slice for reading. fn as_ref(&self) -> &[u8] { let start = self.start; &self.get_data_by_ref_(Buffer::len(self))[start..] } } + impl AsMut<[u8]> for BufferFromSystemMemory { + /// Returns a mutable reference to the internal buffer as a byte slice, starting from the + /// specified `start` index. Allows direct modification of the buffer's contents, while + /// restricting access to the data after the `start` index. fn as_mut(&mut self) -> &mut [u8] { let start = self.start; self.get_data_by_ref(Buffer::len(self))[start..].as_mut() } } + impl AeadBuffer for BufferFromSystemMemory { + /// Extends the internal buffer by appending the given byte slice. Dynamically resizes the + /// internal buffer to accommodate the new data and copies the contents of `other` into it. fn extend_from_slice(&mut self, other: &[u8]) -> aes_gcm::aead::Result<()> { self.get_writable(other.len()).copy_from_slice(other); Ok(()) } + /// Truncates the internal buffer to the specified length, adjusting for the `start` index. + /// Resets the buffer cursor to reflect the new size, effectively discarding any data beyond + /// the truncated length. fn truncate(&mut self, len: usize) { let len = len + self.start; self.cursor = len; diff --git a/utils/buffer/src/buffer_pool/mod.rs b/utils/buffer/src/buffer_pool/mod.rs index 705facfd8..280b128ac 100644 --- a/utils/buffer/src/buffer_pool/mod.rs +++ b/utils/buffer/src/buffer_pool/mod.rs @@ -1,3 +1,36 @@ +// # Buffer Pool +// +// A memory-efficient buffer pool that minimizes allocations and deallocations for high-throughput +// message frame processing in Sv2 roles. +// +// Provides primitives for reusing memory buffers, clearing old memory chunks, and switching +// between different modes (back, front, and alloc) to manage memory effectively and reduce +// expensive heap allocations. It uses atomic operations and shared state tracking to safely manage +// memory across multiple threads. +// +// Supports different allocation modes to optimize memory usage: +// +// - **Back Mode**: Allocates from the back of the buffer pool (default). +// - **Front Mode**: Allocates from the front when the back is full the front has space. +// - **Alloc Mode**: Falls back to heap allocation when the buffer pool cannot fulfill requests +// (with reduced performance). +// +// ## Usage +// +// When an incoming Sv2 message is received, it needs to be buffered for processing. The pool first +// checks the back part (`PoolBack`) to see if there is enough memory available. If the back is +// full, the pool attempts to clear used memory chunks. If clearing fails, it switches to the front +// (`PoolFront`) and tries again, this time allocating memory from the front of the `BufferPool`. +// If both back and front are full and no memory can be cleared, the pool falls back to allocating +// fresh memory from the heap (`PoolMode::Alloc`) at a performance reduction. After processing the +// message, the memory can be cleared, and the buffer pool resets, making the memory available +// for future messages. + +// **Note**: To prevent leaks or deadlocks, ensure that memory slices are properly released after +// use by allowing them to go out of scope or explicitly dropping them. Avoid holding onto slices +// longer than necessary or cloning them. After processing, you can obtain ownership of the data +// using methods like `get_data_owned()` and then let the slice be dropped. + use alloc::{vec, vec::Vec}; use core::sync::atomic::Ordering; @@ -13,19 +46,39 @@ use std::time::SystemTime; use aes_gcm::aead::Buffer as AeadBuffer; -pub const POOL_CAPACITY: usize = 8; mod pool_back; pub use pool_back::PoolBack; +// Maximum number of memory slices the buffer pool can concurrently manage. +// +// This value limits the number of slices the `BufferPool` can track and manage at once. Once the +// pool reaches its capacity of `8` slices, it may need to free memory or switch modes (e.g., to +// system memory). The choice of `8` ensures a balance between performance and memory management. +// The use of `usize` allows for compatibility with platform-dependent memory operations. +pub const POOL_CAPACITY: usize = 8; + +// Manages the "front" section of the `BufferPool`. +// +// Handles the allocation of memory slices at the front of the buffer pool. It tracks the number of +// slices in use and attempts to free unused slices when necessary to maximize available memory. +// The front of the buffer pool is used if the back of the buffer pool is filled up. #[derive(Debug, Clone)] -/// Used by BufferPool to allocate in the front of the inner vector pub struct PoolFront { + // Starting index of the front section of the buffer pool. back_start: usize, + + // Maximum number of bytes that can be allocated in the front section of the buffer pool. + // + // Helps manage how much memory can be used before triggering memory clearing or switching + // `PoolMode`. byte_capacity: usize, + + // Number of allocated slices in the front section of the buffer pool. len: usize, } impl PoolFront { + // Initializes a new `PoolFront` with the specified byte capacity and back start position. #[inline(always)] fn new(byte_capacity: usize, back_start: usize) -> Self { Self { @@ -35,8 +88,11 @@ impl PoolFront { } } + // Attempts to clear unused memory slices at the tail of the front section. + // + // Returns `true` if slices were successfully cleared, otherwise `false` if no slices could be + // cleared or memory conditions prevent clearing. #[inline(always)] - // try to clear the tail of the head :D fn try_clear_tail(&mut self, memory: &mut InnerMemory, mut shared_state: u8) -> bool { #[cfg(feature = "fuzz")] assert!(self.len > 0); @@ -73,6 +129,12 @@ impl PoolFront { } } + // Clears the front memory slices if conditions allow and checks if the memory pool has + // capacity to allocate `len` bytes in the buffer. + // + // Returns `Ok` if memory was successfully cleared and there is sufficient capacity, otherwise + // an `Err(PoolMode::Back)` if the memory cannot be cleared or lacks capacity. This error + // indicates the `BufferPool` should attempt a transition to use the back of the buffer pool. #[inline(always)] fn clear( &mut self, @@ -91,6 +153,12 @@ impl PoolFront { } } + // Attempts to allocate a writable memory region in the front section of the buffer pool, + // returning a writable slice if successful, or transitioning to a new pool mode if necessary. + // + // Returns a pointer to the writable memory (`Ok(*mut u8)`) if successful, otherwise an + // `Err(PoolMode::Back)` if the memory cannot be cleared or lacks capacity. This error + // indicates the `BufferPool` should attempt a transition to use the back of the buffer pool. #[inline(always)] fn get_writable( &mut self, @@ -110,25 +178,54 @@ impl PoolFront { } } +/// Current mode of operation for the `BufferPool`. +/// +/// The pool operates in three modes based on memory availability: it first allocates from the +/// back, then from the front if the back is full, and finally from system memory (with reduced +/// performance) if both sections are exhausted. #[derive(Debug, Clone)] -/// Internal state of the BufferPool pub enum PoolMode { + /// The buffer pool is operating in "back" mode, where memory is allocated from the back of the + // buffer pool. Back, + + /// The buffer pool is operating in "front" mode, where memory is allocated from the front of + /// the buffer pool. Used when the back is full. Front(PoolFront), + + /// The pool has exhausted its internal memory, and it is now allocating directly from the + /// system memory (heap). Alloc, } +// Internal memory management for the `BufferPool`. +// +// Handles allocating, tracking, and managing memory slices for manipulating memory offsets, +// copying data, and managing capacity. It uses a contiguous block of memory (`Vec`), tracking +// its usage through offsets (`raw_offset`, `raw_length`), and manages slice allocations through +// `slots`. Used by `BufferPool` to optimize memory reused and minimize heap allocations. #[derive(Debug, Clone)] -/// Internal memory of the BufferPool pub struct InnerMemory { + // Underlying contiguous block of memory to be managed. pool: Vec, + + // Current offset into the contiguous block of memory where the next write will occur. pub(crate) raw_offset: usize, + + // Length of the valid data within the contiguous block of memory, starting from `raw_offset`. pub(crate) raw_len: usize, + + // Tracks individual chunks of memory being used in the buffer pool by tracking the start and + // length of each allocated memory slice. slots: [(usize, usize); POOL_CAPACITY], + + // A pointer to the current slot. Represents how many slots are currently occupied. len: usize, } impl InnerMemory { + // Initializes a new `InnerMemory` with a specified size of the internal memory buffer + // (`capacity`), in bytes. fn new(capacity: usize) -> Self { let pool = vec![0; capacity]; Self { @@ -140,6 +237,7 @@ impl InnerMemory { } } + // Resets the internal memory pool, clearing all used memory and resetting the slot tracking. #[inline(always)] fn reset(&mut self) { self.raw_len = 0; @@ -147,12 +245,17 @@ impl InnerMemory { self.slots = [(0_usize, 0_usize); POOL_CAPACITY]; } + // Resets only the raw memory state, without affecting the slot tracking. #[inline(always)] fn reset_raw(&mut self) { self.raw_len = 0; self.raw_offset = 0; } + // Returns the capacity of the front portion of the buffer. + // + // Used to determine how much space is available in the front of the buffer pool, based on the + // `back_start` position. #[inline(always)] fn get_front_capacity(&self, back_start: usize) -> usize { #[cfg(feature = "fuzz")] @@ -167,6 +270,8 @@ impl InnerMemory { self.slots[back_start].0 } + // Calculates the offset for the next writable section of memory. Returns the offset based on + // the current memory length and slot usage. #[inline(always)] fn raw_offset(&self) -> usize { match self.len { @@ -187,6 +292,8 @@ impl InnerMemory { } } + // Calculates the offset for a specific length of memory. Returns the offset based on the + // provided length and the current state of the memory slots. #[inline(always)] fn raw_offset_from_len(&self, len: usize) -> usize { match len { @@ -207,6 +314,11 @@ impl InnerMemory { } } + // Moves the raw data to the front of the memory pool to avoid fragmentation, if necessary. + // + // Used to compact the raw data by moving all the active slices to the front of the memory + // pool, making the pool contiguous again. This process is only performed when needed to free + // up space for new allocations without increasing the total memory footprint. #[inline(always)] fn move_raw_at_front(&mut self) { match self.raw_len { @@ -219,6 +331,8 @@ impl InnerMemory { } } + // Tries to update the length and offset of the memory pool and moves the raw offset if there + // is enough capacity to accommodate new memory. Returns `true` if successful, otherwise false. #[inline(always)] fn try_change_len(&mut self, slot_len: usize, raw_len: usize) -> bool { let raw_offset = self.raw_offset_from_len(slot_len); @@ -233,6 +347,11 @@ impl InnerMemory { } } + // Moves the raw data to a specific offset within the memory pool to avoid fragmentation, if + // necessary. + // + // Misuse of this function can lead to undefined behavior, such as memory corruption or + // crashes, if it operates on out-of-bounds or misaligned memory. #[inline(always)] fn move_raw_at_offset_unchecked(&mut self, offset: usize) { match self.raw_len { @@ -245,6 +364,7 @@ impl InnerMemory { } } + // Inserts raw data at the front of the memory pool, adjusting the raw offset and length. #[inline(never)] fn prepend_raw_data(&mut self, raw_data: &[u8]) { self.raw_offset = 0; @@ -255,23 +375,31 @@ impl InnerMemory { dest.copy_from_slice(raw_data); } + // Copies the internal raw memory into another buffer. Used when transitioning memory between + // different pool modes. #[inline(never)] fn copy_into_buffer(&mut self, buffer: &mut impl Buffer) { let writable = buffer.get_writable(self.raw_len); writable.copy_from_slice(&self.pool[self.raw_offset..self.raw_offset + self.raw_len]); } + // Checks if there is enough capacity at the tail of the memory pool to accommodate `len` + // bytes. #[inline(always)] fn has_tail_capacity(&self, len: usize) -> bool { let end = self.raw_offset + self.raw_len; end + len <= self.pool.capacity() } + // Checks if there is enough capacity in the memory pool up to the specified offset to + // accommodate `len` bytes. #[inline(always)] fn has_capacity_until_offset(&self, len: usize, offset: usize) -> bool { self.raw_offset + self.raw_len + len <= offset } + // Returns a raw pointer to the writable memory region of the memory pool, marking the section + // as used. #[inline(always)] fn get_writable_raw_unchecked(&mut self, len: usize) -> *mut u8 { let writable_offset = self.raw_offset + self.raw_len; @@ -279,6 +407,17 @@ impl InnerMemory { self.pool[writable_offset..writable_offset + len].as_mut_ptr() } + /// Provides access to the raw memory slice containing the data written into the buffer, + /// returning it as a [`Slice`]. This method is the primary mechanism for making processed data + /// available from the buffer to the rest of the system. + /// + /// After this call, the buffer advances internally to a new slot, allowing new data to be + /// written into an unused portion of memory. This approach avoids memory duplication and + /// ensures efficient reuse of the buffer without transferring ownership of the memory. + /// + /// This method is typically used when the data in the buffer is ready for processing, sending, + /// or further manipulation. The returned `Slice` contains the data, while the buffer itself + /// remains ready to handle new incoming data by pointing to a fresh memory region. #[inline(always)] fn get_data_owned( &mut self, @@ -321,19 +460,46 @@ impl InnerMemory { } } +/// A pool of reusable memory buffers to optimize memory allocation for Sv2 message frames. +/// +/// Manages memory slices across three pool modes: back (default), front, and system memory. It +/// reuses preallocated memory slices, minimizing the need for frequent memory allocation. The pool +/// is thread-safe, using atomic state tracking. +/// +/// Type `T` implements the [`Buffer`] trait, which defines how memory is handled in the buffer +/// pool. [`BufferFromSystemMemory`] is used as the default system memory allocator. #[derive(Debug)] pub struct BufferPool { + // Manages memory allocation from the back section of the buffer pool. pool_back: PoolBack, + + /// Tracks the current mode of memory allocation (back, front, or system). pub mode: PoolMode, + + // Tracks the usage state of memory slices using atomic operations, ensuring memory is not + // prematurely reused and allowing safe concurrent access across threads. shared_state: SharedState, + + // Core memory area from which slices are allocated and reused. Manages the actual memory + // buffer used by the buffer pool. inner_memory: InnerMemory, + + // Allocates memory directly from system memory when the buffer pool is full, acting as a + // fallback when preallocated memory cannot satisfy buffer requests. system_memory: T, - // Used only when we need as_ref or as_mut, set the first element to the one with index equal - // to start + + // Tracks the starting index for buffer access, determining where data begins to be read or + // written in the buffer pool. Primarily used when `as_ref` or `as_mut` is called, ensuring + // that the buffer starts at the element specified by `start`. start: usize, } impl BufferPool { + /// Creates a new [`BufferPool`] with the specified memory capacity, in bytes. + /// + /// Initializes the buffer pool with pre-allocated memory (`inner_memory`) and sets the pool + /// mode to the back. The buffer pool uses [`BufferFromSystemMemory`] as a fallback when the + /// buffer sections are full. pub fn new(capacity: usize) -> Self { Self { pool_back: PoolBack::new(), @@ -361,7 +527,13 @@ impl BufferPool { } } +// Defines methods specific to internal behavior and management of the `BufferPool`. impl BufferPool { + /// Checks if the buffer pool is operating in the front mode. + /// + /// This mode indicates that the back of the buffer pool has been filled and the system is now + /// using the front section for memory allocation. Returns `true` if the pool is in front mode, + /// otherwise `false`. pub fn is_front_mode(&self) -> bool { match self.mode { PoolMode::Back => false, @@ -370,6 +542,10 @@ impl BufferPool { } } + /// Checks if the buffer pool is operating in the back mode. + /// + /// The back mode is the default state, where the buffer pool first tries to allocate memory. + /// Returns `true` if the pool is in back mode, otherwise `false`. pub fn is_back_mode(&self) -> bool { match self.mode { PoolMode::Back => true, @@ -378,6 +554,11 @@ impl BufferPool { } } + /// Checks if the buffer pool is operating in the system memory allocation mode. + /// + /// This mode is used when both the back and front sections of the buffer pool are full, + /// leading the system to allocate memory from the heap, which has performance trade-offs. + /// Returns `true` if the pool is in alloc mode, otherwise `false`. pub fn is_alloc_mode(&self) -> bool { match self.mode { PoolMode::Back => false, @@ -386,6 +567,12 @@ impl BufferPool { } } + // Resets the buffer pool based on its current mode when the shared state indicates all slices + // have been dropped, preparing it for reuse. + // + // - In back or front mode, the internal memory is moved to the front, and the back is reset. + // - In alloc mode, system memory is checked and, if smaller than pool capacity, transferred + // back into the pool, switching the mode to `Back`. #[inline(always)] fn reset(&mut self) { #[cfg(feature = "debug")] @@ -420,6 +607,13 @@ impl BufferPool { } } + // Allocates writable memory of the specified `len` from the heap when the buffer pool cannot + // fulfill the request. + // + // Determines whether to allocate memory from system memory or try to clear memory from the + // back section of the buffer pool. If clearing is unsuccessful, it may switch to alloc mode or + // remain in back or front pool modes. When `without_check` is `true`, the function bypasses + // memory checks and allocates directly from the heap. #[inline(never)] fn get_writable_from_system_memory( &mut self, @@ -458,11 +652,19 @@ impl BufferPool { } } + // Returns ownership of the heap-allocated buffer data by converting it into a `Slice` for + // further use or processing. #[inline(never)] fn get_data_owned_from_sytem_memory(&mut self) -> Slice { self.system_memory.get_data_owned().into() } + // Switches the buffer pool to a different mode of operation, adjusting based on the required + // memory size (`len`). + // + // Depending on the current and target modes (back, front, or alloc), this method adjusts the + // internal buffer pool's state and memory to ensure a smooth transition while allocating the + // necessary buffer space (`len`), ensuring no data is lost. #[inline(always)] fn change_mode(&mut self, mode: PoolMode, len: usize, shared_state: u8) { match (&mut self.mode, &mode) { @@ -543,6 +745,12 @@ impl BufferPool { } } + // Recursively attempts to allocate writable memory of the specified `len`, switching modes if + // necessary. + // + // First tries to allocate memory from the current mode (back, front, or alloc), and if + // unsuccessful, switches modes and retries, starting with the memory pool before resorting to + // system memory. #[inline(always)] fn get_writable_(&mut self, len: usize, shared_state: u8, without_check: bool) -> &mut [u8] { let writable = match &mut self.mode { @@ -576,18 +784,30 @@ impl BufferPool { impl Buffer for BufferPool { type Slice = Slice; + // Provides a mutable slice of length `len` for writing data into the buffer pool. + // + // Checks the current `shared_state` to determine if the buffer pool can be reset. If all + // slices have been dropped and there are allocated slices in `pool_back`, it resets the buffer + // pool to free up memory. It then attempts to allocate writable memory, which may switch + // between different modes as needed. #[inline(always)] fn get_writable(&mut self, len: usize) -> &mut [u8] { let shared_state = self.shared_state.load(Ordering::Relaxed); - // If all the slices have been dropped just reset the pool + // If all the slices have been dropped, reset the pool to free up memory if shared_state == 0 && self.pool_back.len() != 0 { self.reset(); } + // Attempt to allocate writable memory, potentially switching pool modes self.get_writable_(len, shared_state, false) } + // Transfers ownership of the written data as a `Slice`, handling different pool modes. + // + // Depending on the current mode (back, front, or alloc), it retrieves the data from the + // appropriate memory source. In `Back` or `Front` modes, it updates internal state + // accordingly. In alloc mode, it retrieves data from the heap-allocated system memory. #[inline(always)] fn get_data_owned(&mut self) -> Self::Slice { let shared_state = &mut self.shared_state; @@ -628,12 +848,14 @@ impl Buffer for BufferPool { #[cfg(not(feature = "debug"))] match &mut self.mode { PoolMode::Back => { + // Retrieve data and update state in Back mode let res = self.inner_memory.get_data_owned(shared_state); self.pool_back .set_len_from_inner_memory(self.inner_memory.len); res } PoolMode::Front(f) => { + // Retrieve data and update state in Front mode let res = self.inner_memory.get_data_owned(shared_state); f.len = self.inner_memory.len; res @@ -642,6 +864,9 @@ impl Buffer for BufferPool { } } + // Retrieves data differently based on the current buffer pool mode: + // - In alloc mode, it delegates to the system memory buffer. + // - In back or front modes, it returns a mutable slice of the internal memory buffer. fn get_data_by_ref(&mut self, len: usize) -> &mut [u8] { match self.mode { PoolMode::Alloc => self.system_memory.get_data_by_ref(len), @@ -652,6 +877,9 @@ impl Buffer for BufferPool { } } + // Retrieves data differently based on the current pool mode: + // - In back or front modes, it returns an immutable slice of the internal memory buffer. + // - In alloc mode, it delegates to the system memory buffer. fn get_data_by_ref_(&self, len: usize) -> &[u8] { match self.mode { PoolMode::Alloc => self.system_memory.get_data_by_ref_(len), @@ -662,6 +890,12 @@ impl Buffer for BufferPool { } } + // Returns the length of the written data in the buffer. + // + // The implementation checks the current pool mode to determine where to retrieve the length + // from: + // - In back or front modes, it returns the length from `inner_memory.raw_len`. + // - In alloc mode, it returns the length from the system memory buffer. fn len(&self) -> usize { match self.mode { PoolMode::Back => self.inner_memory.raw_len, @@ -670,10 +904,14 @@ impl Buffer for BufferPool { } } + // Sets the start index for the buffer, adjusting where reads and writes begin. Used to discard + // part of the buffer by adjusting the starting point for future operations. fn danger_set_start(&mut self, index: usize) { self.start = index; } + // Returns `true` if all memory slices have been released (`shared_state` is zero), indicating + // that no other threads or components are using the pool's memory. #[inline(always)] fn is_droppable(&self) -> bool { self.shared_state.load(Ordering::Relaxed) == 0 @@ -682,6 +920,8 @@ impl Buffer for BufferPool { #[cfg(not(test))] impl Drop for BufferPool { + // Waits until all slices are released before dropping the `BufferPool`. Will not drop the + // buffer pool while slices are still in use. fn drop(&mut self) { while self.shared_state.load(Ordering::Relaxed) != 0 { core::hint::spin_loop(); @@ -689,7 +929,13 @@ impl Drop for BufferPool { } } +// Allows `BufferPool` to be treated as a buffer. impl BufferPool { + /// Determines if the [`BufferPool`] can be safely dropped. + /// + /// Returns `true` if all memory slices managed by the buffer pool have been released (i.e., + /// the `shared_state` is zero), indicating that all the slices are dropped. This check helps + /// prevent dropping the buffer pool while it's still in use. pub fn droppable(&self) -> bool { self.shared_state.load(Ordering::Relaxed) == 0 } @@ -700,12 +946,14 @@ impl AsRef<[u8]> for BufferPool { &self.get_data_by_ref_(Buffer::len(self))[self.start..] } } + impl AsMut<[u8]> for BufferPool { fn as_mut(&mut self) -> &mut [u8] { let start = self.start; self.get_data_by_ref(Buffer::len(self))[start..].as_mut() } } + impl AeadBuffer for BufferPool { fn extend_from_slice(&mut self, other: &[u8]) -> aes_gcm::aead::Result<()> { self.get_writable(other.len()).copy_from_slice(other); diff --git a/utils/buffer/src/buffer_pool/pool_back.rs b/utils/buffer/src/buffer_pool/pool_back.rs index ad89bf169..86e8f746d 100644 --- a/utils/buffer/src/buffer_pool/pool_back.rs +++ b/utils/buffer/src/buffer_pool/pool_back.rs @@ -1,12 +1,45 @@ +// # Back of Buffer Pool +// +// Manages the "back" section of the buffer pool (`BufferPool`). +// +// The `PoolBack` struct is responsible for allocating, clearing, and managing memory slices from +// the back of the pool. It tracks the number of allocated slices and attempts to free up memory +// that is no longer in use, preventing unnecessary memory growth. +// +// Key functions in this module handle: +// - Clearing unused slices from the back of the pool to reclaim memory. +// - Managing slice allocation and ensuring enough capacity for new operations. +// - Switching between different pool modes, such as front or back, depending on memory state. +// +// By default, memory is always first allocated from the back of the `BufferPool`. If the all of +// the initially allocated buffer memory is completely filled and a new memory request comes in, +// `BufferPool` checks whether any memory has been freed at the back or the front using +// `SharedState`. If, for example, a slice has been freed that corresponds to the head of +// `SharedState`, `BufferPool` will switch to front mode and start allocating incoming memory +// requests from there. However, if the entire `SharedState` is full, it will switch to alloc mode +// and begin allocating system memory to fulfill incoming requests at a performance reduction. For +// subsequent memory requests, it will continue to check whether the prefix or suffix of +// `SharedState` has been freed. If it has, `BufferPool` will switch modes and start consuming +// pre-allocated `BufferPool` memory; if not, it will remain in alloc mode. + use crate::buffer_pool::{InnerMemory, PoolFront, PoolMode, POOL_CAPACITY}; +// Manages the "back" section of the `BufferPool`. +// +// Handles the allocation of memory slices at the back of the buffer pool. It tracks the number of +// slices in use and attempts to free unused slices when necessary to maximize available memory. +// The back of the buffer pool is used first, if it fills up, the front of the buffer pool is used. #[derive(Debug, Clone)] pub struct PoolBack { + // Starting index of the back section of the buffer pool. back_start: usize, + + // Number of allocated slices in the back section of the buffer pool. len: usize, } impl PoolBack { + // Initializes a new `PoolBack` with no allocated slices. pub fn new() -> Self { Self { back_start: 0, @@ -14,11 +47,13 @@ impl PoolBack { } } + // Returns the number of allocated slices in the back section of the buffer pool. #[inline(always)] pub fn len(&self) -> usize { self.len } + // Updates the length of the back section based on the state of the inner memory. #[inline(always)] pub fn set_len_from_inner_memory(&mut self, len: usize) { let len = len - self.back_start; @@ -29,17 +64,27 @@ impl PoolBack { self.len = len; } + // Returns the starting index of the back section in the buffer pool. #[inline(always)] pub fn back_start(&self) -> usize { self.back_start } + // Resets the back section of the pool by clearing its start index and length. #[inline(always)] pub fn reset(&mut self) { self.back_start = 0; self.len = 0; } + // From the back section, checks if there are any unset bits and adjusts the length if there + // are. + // + // Assumes the caller has already ensured the safety of the operation (such as slice bounds and + // memory validity). It skips internal safety checks, relying on the caller to manage the + // state, making it faster but potentially unsafe if misused. + // + // Returns `true` if the tail was cleared successfully, otherwise `false`. #[inline(always)] pub fn try_clear_tail_unchecked( &mut self, @@ -91,14 +136,24 @@ impl PoolBack { } } + // Checks if the tail of the back section can be cleared. + // + // Should always be called before attempting to clear the tail. Returns `true` if the tail can + // be cleared, otherwise `false`. #[inline(always)] - // Check if it is possible to clear the tail it must always be called before try_clear_tail pub fn tail_is_clearable(&self, shared_state: u8) -> bool { let element_in_back = POOL_CAPACITY - self.back_start; let element_to_drop = usize::min(element_in_back, shared_state.trailing_zeros() as usize); + element_to_drop <= self.len && self.back_start + self.len >= POOL_CAPACITY } + // Attempts to clear the head of the back section, transitioning to the buffer pool front + // section if not possible. + // + // Returns `Ok` if the head was cleared successfully. Otherwise an `Err(PoolMode::Front)` if + // the buffer pool should switch to front mode, or an `Err(PoolMode::Alloc)` if the buffer pool + // should switch to allocation mode. #[inline(always)] fn try_clear_head( &mut self, @@ -107,9 +162,8 @@ impl PoolBack { ) -> Result<(), PoolMode> { // 0b00111110 2 leading zeros // - // the first 2 elements have been dropped so back start at 2 and BufferPool can go in Front - // mode - // + // The first 2 elements have been dropped so back start at 2 and `BufferPool` can go in + // front mode self.back_start = shared_state.leading_zeros() as usize; if self.back_start >= 1 && memory.raw_len < memory.slots[self.back_start].0 { @@ -126,6 +180,16 @@ impl PoolBack { } } + // Clears the tail or head of the back section, transitioning pool modes if necessary. + // + // Called when the state of the `BufferPool`, along with both `raw_offset` and `raw_len`, + // reaches its maximum limits. It is used to clear any available space remaining in the + // buffer's suffix or prefix. Based on that, the `BufferPool`'s state is updated by changing + // its pool mode. + // + // Returns `Ok` if the tail or head was cleared successfully. Otherwise an + // `Err(PoolMode::Front)` if the buffer pool should switch to front mode, or an + // `Err(PoolMode::Alloc)` if the buffer pool should switch to allocation mode. #[inline(always)] pub fn clear_unchecked( &mut self, @@ -142,6 +206,11 @@ impl PoolBack { self.try_clear_head(shared_state, memory) } + // Returns a writable slice of memory from the back section or transitions to a new pool mode + // if necessary. + // + // Returns `Ok(*mut u8)` if writable memory is available, otherwise an `Err(PoolMode)` if a + // mode change is required. #[inline(always)] pub fn get_writable( &mut self, diff --git a/utils/buffer/src/lib.rs b/utils/buffer/src/lib.rs index 11d1eb0d1..725c73906 100644 --- a/utils/buffer/src/lib.rs +++ b/utils/buffer/src/lib.rs @@ -1,3 +1,44 @@ +//! # `buffer_sv2` +//! +//! Handles memory management for Stratum V2 (Sv2) roles. +//! +//! Provides a memory-efficient buffer pool ([`BufferPool`]) that minimizes allocations and +//! deallocations for high-throughput message frame processing in Sv2 roles. [`Slice`] helps +//! minimize memory allocation overhead by reusing large buffers, improving performance and +//! reducing latency. The [`BufferPool`] tracks the usage of memory slices, using atomic operations +//! and shared state tracking to safely manage memory across multiple threads. +//! +//! ## Memory Structure +//! +//! The [`BufferPool`] manages a contiguous block of memory allocated on the heap, divided into +//! fixed-size slots. Memory allocation within this pool operates in three distinct modes: +//! +//! 1. **Back Mode**: By default, memory is allocated sequentially from the back (end) of the buffer +//! pool. This mode continues until the back slots are fully occupied. +//! 2. **Front Mode**: Once the back slots are exhausted, the [`BufferPool`] checks if any slots at +//! the front (beginning) have been freed. If available, it switches to front mode, allocating +//! memory from the front slots. +//! 3. **Alloc Mode**: If both back and front slots are occupied, the [`BufferPool`] enters alloc +//! mode, where it allocates additional memory directly from the system heap. This mode may +//! introduce performance overhead due to dynamic memory allocation. +//! +//! [`BufferPool`] dynamically transitions between these modes based on slot availability, +//! optimizing memory usage and performance. +//! +//! ## Usage +//! +//! When an incoming Sv2 message is received, it is buffered for processing. The [`BufferPool`] +//! attempts to allocate memory from its internal slots, starting in back mode. If the back slots +//! are full, it checks for available front slots to switch to front mode. If no internal slots are +//! free, it resorts to alloc mode, allocating memory from the system heap. +//! +//! For operations requiring dedicated buffers, the [`Slice`] type manages its own memory using +//! [`Vec`]. In high-performance scenarios, [`Slice`] can reference externally managed memory +//! from the [`BufferPool`], reducing dynamic memory allocations and increasing performance. +//! +//! ### Debug Mode +//! Provides additional tracking for debugging memory management issues. + #![cfg_attr(not(feature = "debug"), no_std)] //#![feature(backtrace)] @@ -15,23 +56,39 @@ pub use aes_gcm::aead::Buffer as AeadBuffer; pub use buffer_pool::BufferPool; pub use slice::Slice; +/// Represents errors that can occur while writing data into a buffer. pub enum WriteError { + /// No data could be written. WriteZero, } +/// Interface for writing data into a buffer. +/// +/// An abstraction over different buffer types ([`Vec`] or [`BufferPool`]), it provides methods +/// for writing data from a byte slice into the buffer, with the option to either write a portion +/// of the data or attempt to write the entire byte slice at once. pub trait Write { + /// Writes data from a byte slice (`buf`) into the buffer, returning the number of bytes that + /// were successfully written. fn write(&mut self, buf: &[u8]) -> Result; + /// Attempts to write the entire byte slice (`buf`) into the buffer. If the buffer cannot + /// accept the full length of the data, an error is returned. fn write_all(&mut self, buf: &[u8]) -> Result<(), WriteError>; } impl Write for Vec { + /// Writes data from a byte slice into a [`Vec`] buffer by extending the vector with the + /// contents of the provided slice. #[inline] fn write(&mut self, buf: &[u8]) -> Result { self.extend_from_slice(buf); Ok(buf.len()) } + /// Attempts to write all the data from a byte slice into a [`Vec`] buffer by extending the + /// vector. Since [`Vec`] can dynamically resize, this method will always succeed as long + /// as there is available memory. #[inline] fn write_all(&mut self, buf: &[u8]) -> Result<(), WriteError> { self.extend_from_slice(buf); @@ -40,6 +97,8 @@ impl Write for Vec { } impl Write for &mut [u8] { + /// Writes data from a byte slice into a mutable byte array (`&mut [u8]`), up to the length of + /// the provided buffer. #[inline] fn write(&mut self, data: &[u8]) -> Result { let amt = core::cmp::min(data.len(), self.len()); @@ -50,6 +109,8 @@ impl Write for &mut [u8] { Ok(amt) } + /// Attempts to write all the data from a byte slice into a mutable byte array (`&mut [u8]`). + /// If the buffer is not large enough to contain all the data, an error is returned. #[inline] fn write_all(&mut self, data: &[u8]) -> Result<(), WriteError> { if self.write(data)? == data.len() { @@ -60,30 +121,55 @@ impl Write for &mut [u8] { } } +/// Interface for working with memory buffers. +/// +/// An abstraction for buffer management, allowing implementors to handle either owned memory +/// ([`Slice`] with [`Vec`]). Utilities are provided to borrow writable memory, retrieve data +/// from the buffer, and manage memory slices. +/// +/// This trait is used during the serialization and deserialization +/// of message types in the [`binary_sv2` crate](https://crates.io/crates/binary_sv2). pub trait Buffer { + /// The type of slice that the buffer uses. type Slice: AsMut<[u8]> + AsRef<[u8]> + Into; - // Caller need to borrow a buffer to write some date + /// Borrows a mutable slice of the buffer, allowing the caller to write data into it. The + /// caller specifies the length of the data they need to write. fn get_writable(&mut self, len: usize) -> &mut [u8]; - // Caller need to get the previously written buffer and should own it + /// Provides ownership of a slice in the buffer pool to the caller and updates the buffer + /// pool's state by modifying the position in `shared_state` that the slice occupies. The pool + /// now points to the next set of uninitialized space. fn get_data_owned(&mut self) -> Self::Slice; - // Caller need a view in the written part of the buffer + /// Provides a mutable reference to the written portion of the buffer, up to the specified + /// length, without transferring ownership of the buffer. This allows the caller to modify the + /// buffer’s contents directly without taking ownership. fn get_data_by_ref(&mut self, len: usize) -> &mut [u8]; - // Caller need a view in the written part of the buffer + /// Provides an immutable reference to the written portion of the buffer, up to the specified + /// length, without transferring ownership of the buffer. This allows the caller to inspect the + /// buffer’s contents without modifying or taking ownership. fn get_data_by_ref_(&self, len: usize) -> &[u8]; - // Return the size of the written part of the buffer that is still owned by the Buffer + /// Returns the size of the written portion of the buffer. This is useful for tracking how much + /// of the buffer has been filled with data. The number of bytes currently written in the + /// buffer is returned. fn len(&self) -> usize; - // Set the first element of the buffer to the element at the given index (here only for - // perfomnce do not use unless you are really sure about what it do) + /// Modifies the starting point of the buffer, effectively discarding data up to the given + /// `index`. This can be useful for performance optimizations in situations where older data + /// is no longer needed, but its use can be unsafe unless you understand its implications. fn danger_set_start(&mut self, index: usize); + /// Returns `true` if the buffer is empty, `false` otherwise. fn is_empty(&self) -> bool { self.len() == 0 } + + /// Determines if the buffer is safe to drop. This typically checks if the buffer contains + /// essential data that still needs to be processed. + /// + /// Returns `true` if the buffer can be safely dropped, `false` otherwise. fn is_droppable(&self) -> bool; } diff --git a/utils/buffer/src/slice.rs b/utils/buffer/src/slice.rs index b36c6ee1d..c5da575d7 100644 --- a/utils/buffer/src/slice.rs +++ b/utils/buffer/src/slice.rs @@ -1,21 +1,99 @@ +// # Slice +// +// Provides efficient memory management for the Sv2 protocol by allowing memory reuse, either +// through owned memory (`Vec`) or externally managed memory in a buffer pool (`BufferPool`). +// +// `Slice` helps minimize memory allocation overhead by reusing large buffers, improving +// performance and reducing latency in high-throughput environments. Tracks the usage of memory +// slices, ensuring safe reuse across multiple threads via `SharedState`. +// +// ## Key Features +// - **Memory Reuse**: Divides large buffers into smaller slices, reducing the need for frequent +// allocations. +// - **Shared Access**: Allows safe concurrent access using atomic state tracking (`Arc`). +// - **Flexible Management**: Supports both owned memory and externally managed memory. +// +// ## Usage +// 1. **Owned Memory**: For isolated operations, `Slice` manages its own memory (`Vec`). +// 2. **Buffer Pool**: In high-performance systems, `Slice` references externally managed memory +// from a buffer pool (`BufferPool`), reducing dynamic memory allocation. +// +// ### Debug Mode +// Provides additional tracking for debugging memory management issues. + use alloc::{sync::Arc, vec::Vec}; use core::sync::atomic::{AtomicU8, Ordering}; #[cfg(feature = "debug")] use std::time::SystemTime; +// A special index value used to mark `Slice` as ignored in certain operations, such as memory pool +// tracking or state management. +// +// It can be used as a sentinel value for slices that should not be processed or tracked, helping +// differentiate valid slices from those that need to be skipped. When a `Slice`'s `index` is set +// to `INGORE_INDEX`, it is flagged to be ignored and by any logic that processes or tracks slice +// indices. pub const INGORE_INDEX: u8 = 59; +/// Allows [`Slice`] to be safely transferred between threads. +/// +/// [`Slice`] contains a raw pointer (`*mut u8`), so Rust cannot automatically implement [`Send`]. +/// The `unsafe` block asserts that memory access is thread-safe, relaying on `SharedState` and +/// atomic operations to prevent data races. unsafe impl Send for Slice {} +/// A contiguous block of memory, either preallocated or dynamically allocated. +/// +/// It serves as a lightweight handle to a memory buffer, allowing for direct manipulation and +/// shared access. It can either hold a reference to a preallocated memory block or own a +/// dynamically allocated buffer (via [`Vec`]). #[derive(Debug, Clone)] pub struct Slice { + // Raw pointer to the start of the memory block. + // + // Allows for efficient access to the underlying memory. Care should be taken when working with + // raw pointers to avoid memory safety issues. The pointer must be valid and must point to a + // properly allocated and initialized memory region. pub(crate) offset: *mut u8, + + // Length of the memory block in bytes. + // + // Represents how much memory is being used. This is critical for safe memory access, as it + // prevents reading or writing outside the bounds of the buffer. pub(crate) len: usize, + + /// Unique identifier (index) of the slice in the shared memory pool. + /// + /// When in back or front mode, tracks the slice within the pool and manages memory reuse. It + /// allows for quick identification of slices when freeing or reassigning memory. If in alloc + /// mode, it is set to `IGNORE_INDEX`. pub index: u8, + + /// Shared state of the memory pool. + /// + /// When in back or front mode, tracks how many slices are currently in use and ensures proper + /// synchronization of memory access across multiple contexts. pub shared_state: SharedState, + + /// Optional dynamically allocated buffer. + /// + /// If present, the slice owns the memory and is responsible for managing its lifecycle. If + /// [`None`], the buffer pool is in back or front mode and the slice points to memory managed + /// by the memory pool. Is `Some(Vec)` when in alloc mode. pub owned: Option>, + + // Mode flag to track the state of the slice during development. + // + // Useful for identifying whether the slice is being used correctly in different modes (e.g., + // whether is is currently being written to or read from). Typically used for logging and + // debugging. #[cfg(feature = "debug")] pub mode: u8, + + /// Timestamp to track when the slice was created. + /// + /// Useful for diagnosing time-related issues and tracking the lifespan of memory slices during + /// development and debugging. #[cfg(feature = "debug")] pub time: SystemTime, } @@ -34,6 +112,10 @@ impl Serialize for Slice { } impl Slice { + /// Returns the length of the slice in bytes. + /// + /// If the slice owns its memory (`owned`), it returns the length of the owned buffer. If the + /// slice does not own the memory, it returns `0`. pub fn len(&self) -> usize { if let Some(owned) = &self.owned { owned.len() @@ -41,6 +123,10 @@ impl Slice { 0 } } + + /// Checks if the slice is empty. + /// + /// Returns `true` if the slice is empty, i.e., it has no data. Otherwise, returns `false`. pub fn is_empty(&self) -> bool { self.len() == 0 } @@ -49,48 +135,82 @@ impl Slice { impl core::ops::Index for Slice { type Output = u8; + /// Provides immutable indexing access to the [`Slice`] at the specified position. + /// + /// Uses `as_ref` to get a reference to the underlying buffer and returns the byte at the + /// `index`. fn index(&self, index: usize) -> &Self::Output { self.as_ref().index(index) } } + impl core::ops::IndexMut for Slice { + /// Provides mutable indexing access to the [`Slice`] at the specified position. + /// + /// Uses `as_mut` to get a mutable reference to the underlying buffer and returns the byte at + /// the `index`. fn index_mut(&mut self, index: usize) -> &mut Self::Output { self.as_mut().index_mut(index) } } + impl core::ops::Index> for Slice { type Output = [u8]; + /// Provides immutable slicing access to a range starting from the given `index`. + /// + /// Uses `as_ref` to get a reference to the underlying buffer and returns the range. fn index(&self, index: core::ops::RangeFrom) -> &Self::Output { self.as_ref().index(index) } } + impl core::ops::IndexMut> for Slice { + /// Provides mutable slicing access to a range starting from the given `index`. + /// + /// Uses `as_mut` to get a mutable reference to the underlying buffer and returns the range. fn index_mut(&mut self, index: core::ops::RangeFrom) -> &mut Self::Output { self.as_mut().index_mut(index) } } + impl core::ops::Index> for Slice { type Output = [u8]; + /// Provides immutable slicing access to the specified range within the `Slice`. + /// + /// Uses `as_ref` to get a reference to the underlying buffer and returns the specified range. fn index(&self, index: core::ops::Range) -> &Self::Output { self.as_ref().index(index) } } + impl core::ops::IndexMut> for Slice { + /// Provides mutable slicing access to the specified range within the `Slice`. + /// + /// Uses `as_mut` to get a mutable reference to the underlying buffer and returns the specified + /// range. fn index_mut(&mut self, index: core::ops::Range) -> &mut Self::Output { self.as_mut().index_mut(index) } } + impl core::ops::Index for Slice { type Output = [u8]; + /// Provides immutable access to the entire range of the [`Slice`]. + /// + /// Uses `as_ref` to get a reference to the entire underlying buffer. fn index(&self, index: core::ops::RangeFull) -> &Self::Output { self.as_ref().index(index) } } impl AsMut<[u8]> for Slice { + /// Converts the [`Slice`] into a mutable slice of bytes (`&mut [u8]`). + /// + /// Returns the owned buffer if present, otherwise converts the raw pointer and length into a + /// mutable slice. #[inline(always)] fn as_mut(&mut self) -> &mut [u8] { match self.owned.as_mut() { @@ -99,7 +219,12 @@ impl AsMut<[u8]> for Slice { } } } + impl AsRef<[u8]> for Slice { + /// Converts the [`Slice`] into an immutable slice of bytes (`&[u8]`). + /// + /// Returns the owned buffer if present, otherwise converts the raw pointer and length into an + /// immutable slice. #[inline(always)] fn as_ref(&self) -> &[u8] { match self.owned.as_ref() { @@ -110,20 +235,29 @@ impl AsRef<[u8]> for Slice { } impl Drop for Slice { + /// Toggles the shared state when the slice is dropped, allowing the memory to be reused. + /// + /// In debug mode, it also tracks the `mode` of the slice when it is dropped. fn drop(&mut self) { #[cfg(feature = "debug")] self.shared_state.toogle(self.index, self.mode); + #[cfg(not(feature = "debug"))] self.shared_state.toogle(self.index); } } impl From> for Slice { + /// Creates a [`Slice`] from a [`Vec`], taking ownership of the vector. + /// + /// Initializes the [`Slice`] with the vector's pointer and sets the length to `0`. fn from(mut v: Vec) -> Self { let offset = v[0..].as_mut_ptr(); Slice { offset, len: 0, + // The slice's memory is owned by a `Vec`, so the slice does not need an `index` in + // the pool to manage the memory index: crate::slice::INGORE_INDEX, shared_state: SharedState::new(), owned: Some(v), @@ -135,25 +269,66 @@ impl From> for Slice { } } +// The shared state of the buffer pool. +// +// Encapsulates an atomic 8-bit value (`AtomicU8`) to track the shared state of memory slices in a +// thread-safe manner. It uses atomic operations to ensure that memory tracking can be done +// concurrently without locks. +// +// Each bit in the `AtomicU8` represents the state of a memory slot (e.g., whether it is allocated +// or free) in the buffer pool, allowing the system to manage and synchronize memory usage across +// multiple slices. +// +// `SharedState` acts like a reference counter, helping the buffer pool know when a buffer slice is +// safe to clear. Each time a memory slice is used or released, the corresponding bit in the shared +// state is toggled. When no slices are in use (all bits are zero), the buffer pool can safely +// reclaim or reuse the memory. +// +// This system ensures that no memory is prematurely cleared while it is still being referenced. +// The buffer pool checks whether any slice is still in use before clearing, and only when the +// shared state indicates that all references have been dropped (i.e., no unprocessed messages +// remain) can the buffer pool safely clear or reuse the memory. #[derive(Clone, Debug)] pub struct SharedState(Arc); impl Default for SharedState { + // Creates a new `SharedState` with an internal `AtomicU8` initialized to `0`, indicating no + // memory slots are in use. fn default() -> Self { Self::new() } } impl SharedState { + // Creates a new `SharedState` with an internal `AtomicU8` initialized to `0`, indicating no + // memory slots are in use. pub fn new() -> Self { Self(Arc::new(AtomicU8::new(0))) } + // Atomically loads and returns the current value of the `SharedState` using the specified + // memory ordering. + // + // Returns the current state of the memory slots as an 8-bit value. #[inline(always)] pub fn load(&self, ordering: Ordering) -> u8 { self.0.load(ordering) } + // Toggles the bit at the specified `position` in the `SharedState`, including logs regarding + // the shared state of the memory after toggling. The `mode` parameter is used to differentiate + // between different states or operations (e.g., reading or writing) for debugging purposes. + // + // After a message held by a buffer slice has been processed, the corresponding bit in the + // shared state is toggled (flipped). When the shared state for a given region reaches zero + // (i.e., all bits are cleared), the buffer pool knows it can safely reclaim or reuse that + // memory slice. + // + // Uses atomic bitwise operations to ensure thread-safe toggling without locks. It manipulates + // the shared state in-place using the `AtomicU8::fetch_update` method, which atomically + // applies a bitwise XOR (`^`) to toggle the bit at the specified `position`. + // + // Panics if the `position` is outside the range of 1-8, as this refers to an invalid bit. #[cfg(feature = "debug")] pub fn toogle(&self, position: u8, mode: u8) { let mask: u8 = match position { @@ -182,6 +357,18 @@ impl SharedState { .unwrap(); } + // Toggles the bit at the specified `position` in the `SharedState`. + // + // After a message held by a buffer slice has been processed, the corresponding bit in the + // shared state is toggled (flipped). When the shared state for a given region reaches zero + // (i.e., all bits are cleared), the buffer pool knows it can safely reclaim or reuse that + // memory slice. + // + // Uses atomic bitwise operations to ensure thread-safe toggling without locks. It manipulates + // the shared state in-place using the `AtomicU8::fetch_update` method, which atomically + // applies a bitwise XOR (`^`) to toggle the bit at the specified `position`. + // + // Panics if the `position` is outside the range of 1-8, as this refers to an invalid bit. #[cfg(not(feature = "debug"))] pub fn toogle(&self, position: u8) { let mask: u8 = match position {