diff --git a/core/src/queuing/queue.rs b/core/src/queuing/queue.rs index 1d230ae..04b502e 100644 --- a/core/src/queuing/queue.rs +++ b/core/src/queuing/queue.rs @@ -267,3 +267,162 @@ impl Debug for ConcurrentQueue { .finish_non_exhaustive() } } + +#[cfg(test)] +mod tests { + use super::ConcurrentQueue; + + #[test] + fn single_queue() { + const ELEMENT_SIZE: usize = 2; + const CAPACITY: usize = 3; + + let mut buffer: Vec = vec![0u8; ConcurrentQueue::size(ELEMENT_SIZE, CAPACITY)]; + + let _ = ConcurrentQueue::init_at(&mut buffer, ELEMENT_SIZE, CAPACITY); + + // let (queue1, queue2) = unsafe { + // ( + // ConcurrentQueue::load_from(&buffer), + // ConcurrentQueue::load_from(&buffer), + // ) + // }; + // + let queue = unsafe { ConcurrentQueue::load_from(&buffer) }; + + assert_eq!(queue.pop(), None); + assert_eq!(queue.pop(), None); + + assert_eq!(queue.push(&[0x4, 0x9]).unwrap(), &mut [0x4, 0x9]); + assert_eq!(queue.push(&[0x2, 0xe]).unwrap(), &mut [0x2, 0xe]); + assert_eq!(queue.push(&[0x1, 0x2]).unwrap(), &mut [0x1, 0x2]); + + // No more empty space + assert_eq!(queue.push(&[0x3, 0x4]), None); + + assert_eq!(queue.pop(), Some(vec![0x4, 0x9].into_boxed_slice())); + assert_eq!(queue.pop(), Some(vec![0x2, 0xe].into_boxed_slice())); + assert_eq!(queue.pop(), Some(vec![0x1, 0x2].into_boxed_slice())); + assert_eq!(queue.pop(), None); + } + + #[test] + fn two_queues() { + const ELEMENT_SIZE: usize = 2; + const CAPACITY: usize = 2; + + let mut buffer: Vec = vec![0u8; ConcurrentQueue::size(ELEMENT_SIZE, CAPACITY)]; + + let _ = ConcurrentQueue::init_at(&mut buffer, ELEMENT_SIZE, CAPACITY); + + let (queue1, queue2) = unsafe { + ( + ConcurrentQueue::load_from(&buffer), + ConcurrentQueue::load_from(&buffer), + ) + }; + assert_eq!(queue1.pop(), None); + assert_eq!(queue2.pop(), None); + + assert_eq!(queue1.push(&[0x4, 0x9]).unwrap(), &mut [0x4, 0x9]); + assert_eq!(queue2.push(&[0x2, 0xe]).unwrap(), &mut [0x2, 0xe]); + + // No more empty space + assert_eq!(queue2.push(&[0x0, 0x0]), None); + + assert_eq!(queue2.pop(), Some(vec![0x4, 0x9].into_boxed_slice())); + assert_eq!(queue2.pop(), Some(vec![0x2, 0xe].into_boxed_slice())); + assert_eq!(queue2.pop(), None); + assert_eq!(queue1.pop(), None); + } + + #[test] + /// Spawn one thousand threads with access to a single queue. Each thread + /// pushes four elements on the queue. In the end validate that there is the + /// correct amount of elements in the queue and that none of the elements + /// are corrupted. + fn one_thousand_queues() { + use std::sync::Arc; + use std::thread; + + const NUM_QUEUES: usize = 1000; + + const ELEMENT_SIZE: usize = 2; + const CAPACITY: usize = 4 * NUM_QUEUES; + + let mut buffer: Vec = vec![0u8; ConcurrentQueue::size(ELEMENT_SIZE, CAPACITY)]; + let _ = ConcurrentQueue::init_at(&mut buffer, ELEMENT_SIZE, CAPACITY); + + // Put the buffer into an Arc to be able to access it from other threads + let buffer: Arc> = Arc::new(buffer); + + // Spawn threads and create one queue per thread. Call push on the queue 4 + // times. + let threads = (0..NUM_QUEUES).map(|_| { + let cloned_buffer = buffer.clone(); + thread::spawn(move || { + let queue = unsafe { ConcurrentQueue::load_from(&*cloned_buffer) }; + queue.push(&[0x1, 0x2]).unwrap(); + queue.push(&[0x3, 0x4]).unwrap(); + queue.push(&[0x5, 0x6]).unwrap(); + queue.push(&[0x7, 0x8]).unwrap(); + }) + }); + + threads.for_each(|handle| handle.join().expect("that the thread has not panicked")); + + let queue = unsafe { ConcurrentQueue::load_from(&*buffer) }; + + // There should be 4 elements per thread in the queue + assert_eq!(queue.len(), 4 * NUM_QUEUES); + + // Check if there are any corrupted bytes + while let Some(element) = queue.pop() { + match &*element { + &[0x1, 0x2] | &[0x3, 0x4] | &[0x5, 0x6] | &[0x7, 0x8] => {} + invalid => panic!("Invalid bytes {invalid:?}"), + } + } + } + + #[test] + #[should_panic] + fn buffer_too_small() { + let mut buffer: Vec = vec![0u8; 2]; + let _ = ConcurrentQueue::init_at(&mut buffer, 1, 1); + } + + #[test] + #[should_panic] + fn buffer_too_big() { + let mut buffer: Vec = vec![0u8; 100]; + let _ = ConcurrentQueue::init_at(&mut buffer, 1, 1); + } + + #[test] + #[should_panic] + fn element_too_big() { + let mut buffer: Vec = vec![0u8; ConcurrentQueue::size(1, 1)]; + let queue = ConcurrentQueue::init_at(&mut buffer, 1, 1); + + queue.push(&[0x1, 0x1]); + } + + #[test] + #[should_panic] + fn element_too_small() { + let mut buffer: Vec = vec![0u8; ConcurrentQueue::size(2, 1)]; + let queue = ConcurrentQueue::init_at(&mut buffer, 2, 1); + + queue.push(&[0x1]); + } + + #[test] + #[should_panic] + fn empty_element() { + let mut buffer: Vec = vec![0u8; ConcurrentQueue::size(1, 1)]; + let queue = ConcurrentQueue::init_at(&mut buffer, 1, 1); + + queue.push(&[]); + } +}