Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implement clone for LockFreeFrozenVec #57

Merged
merged 10 commits into from
Oct 24, 2023
105 changes: 93 additions & 12 deletions src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@ use std::hash::Hash;
use std::iter::{FromIterator, IntoIterator};
use std::ops::Index;

use std::sync::TryLockError;
use std::ptr::NonNull;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::AtomicPtr;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use std::sync::RwLock;
use std::sync::TryLockError;

/// Append-only threadsafe version of `std::collections::HashMap` where
/// insertion does not require mutable access
Expand All @@ -33,9 +34,7 @@ pub struct FrozenMap<K, V> {
impl<K: fmt::Debug, V: fmt::Debug> fmt::Debug for FrozenMap<K, V> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self.map.try_read() {
Ok(guard) => {
guard.fmt(f)
},
Ok(guard) => guard.fmt(f),
Err(TryLockError::Poisoned(err)) => {
f.debug_tuple("FrozenMap").field(&&**err.get_ref()).finish()
}
Expand All @@ -46,8 +45,10 @@ impl<K: fmt::Debug, V: fmt::Debug> fmt::Debug for FrozenMap<K, V> {
f.write_str("<locked>")
}
}
f.debug_tuple("FrozenMap").field(&LockedPlaceholder).finish()
},
f.debug_tuple("FrozenMap")
.field(&LockedPlaceholder)
.finish()
}
}
}
}
Expand All @@ -74,7 +75,6 @@ impl<T> From<Vec<T>> for FrozenVec<T> {
}
}


impl<K: Eq + Hash, V: StableDeref> FrozenMap<K, V> {
// these should never return &K or &V
// these should never delete any entries
Expand Down Expand Up @@ -440,9 +440,7 @@ pub struct FrozenVec<T> {
impl<T: fmt::Debug> fmt::Debug for FrozenVec<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self.vec.try_read() {
Ok(guard) => {
guard.fmt(f)
},
Ok(guard) => guard.fmt(f),
Err(TryLockError::Poisoned(err)) => {
f.debug_tuple("FrozenMap").field(&&**err.get_ref()).finish()
}
Expand All @@ -453,8 +451,10 @@ impl<T: fmt::Debug> fmt::Debug for FrozenVec<T> {
f.write_str("<locked>")
}
}
f.debug_tuple("FrozenMap").field(&LockedPlaceholder).finish()
},
f.debug_tuple("FrozenMap")
.field(&LockedPlaceholder)
.finish()
}
}
}
}
Expand Down Expand Up @@ -812,6 +812,33 @@ impl<T: Copy> LockFreeFrozenVec<T> {
let local_index = index - prior_total_buffer_size(buffer_idx);
unsafe { *buffer_ptr.add(local_index) }
}

/// Run a function on each buffer in the vector.
///
/// ## Arguments
/// - `func`: a function that takes a slice to the buffer and the buffer index
///
fn for_each_buffer(&self, mut func: impl FnMut(&[T], usize)) {
// for each buffer, run the function
for buffer_index in 0..NUM_BUFFERS {
// get the buffer pointer
if let Some(buffer_ptr) = NonNull::new(self.data[buffer_index].load(Ordering::Acquire))
{
// get the buffer size and index
let buffer_size = buffer_size(buffer_index);

// create a slice from the buffer pointer and size
let buffer_slice =
unsafe { std::slice::from_raw_parts(buffer_ptr.as_ptr(), buffer_size) };

// run the function
func(buffer_slice, buffer_index);
} else {
// no data in this buffer, so we're done
break;
}
}
}
}

#[test]
Expand Down Expand Up @@ -848,6 +875,35 @@ fn test_non_lockfree_unchecked() {
LockFreeFrozenVec::<()>::new();
}

impl<T: Copy + Clone> Clone for LockFreeFrozenVec<T> {
aminya marked this conversation as resolved.
Show resolved Hide resolved
fn clone(&self) -> Self {
let mut coppied_data = [Self::NULL; NUM_BUFFERS];
// for each buffer, copy the data
self.for_each_buffer(|buffer_slice, buffer_index| {
// allocate a new buffer
let layout = Self::layout(buffer_slice.len());
let new_buffer_ptr = unsafe { std::alloc::alloc(layout).cast::<T>() };
assert!(!new_buffer_ptr.is_null());
// copy the data to the new buffer
unsafe {
std::ptr::copy_nonoverlapping(
buffer_slice.as_ptr(),
new_buffer_ptr,
buffer_slice.len(),
);
};
// store the new buffer pointer
*coppied_data[buffer_index].get_mut() = new_buffer_ptr;
});

return Self {
data: coppied_data,
len: AtomicUsize::new(self.len.load(Ordering::Relaxed)),
locked: AtomicBool::new(false),
};
}
}

#[test]
fn test_non_lockfree() {
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
Expand Down Expand Up @@ -879,6 +935,31 @@ fn test_non_lockfree() {
}
});

// Test cloning
{
let vec2 = vec.clone();
assert_eq!(vec2.get(0), Some(Moo(1)));
assert_eq!(vec2.get(1), Some(Moo(2)));
assert_eq!(vec2.get(2), Some(Moo(3)));
}
// Test cloning a large vector
{
let large_vec = LockFreeFrozenVec::new();
for i in 0..1000 {
large_vec.push(Moo(i));
}
let large_vec_2 = large_vec.clone();
for i in 0..1000 {
assert_eq!(large_vec_2.get(i), Some(Moo(i as i32)));
}
}
// Test cloning an empty vector
{
let empty_vec = LockFreeFrozenVec::<()>::new();
let empty_vec_2 = empty_vec.clone();
assert_eq!(empty_vec_2.get(0), None);
}

// Test dropping empty vecs
LockFreeFrozenVec::<()>::new();
}
Expand Down
Loading