From f27b5e1c91274b65925d167bc501f5887fa02045 Mon Sep 17 00:00:00 2001 From: Florian Hartung Date: Thu, 25 Jul 2024 14:22:59 +0200 Subject: [PATCH 1/2] refactor(core): split queuing ports logic into multiple module --- core/src/queuing.rs | 202 +--------------------------------- core/src/queuing/datagrams.rs | 141 ++++++++++++++++++++++++ core/src/queuing/message.rs | 69 ++++++++++++ core/src/queuing/queue.rs | 45 +------- 4 files changed, 218 insertions(+), 239 deletions(-) create mode 100644 core/src/queuing/datagrams.rs create mode 100644 core/src/queuing/message.rs diff --git a/core/src/queuing.rs b/core/src/queuing.rs index f9adae6..4dd0ad8 100644 --- a/core/src/queuing.rs +++ b/core/src/queuing.rs @@ -2,213 +2,21 @@ use std::fmt::Debug; use std::mem; use std::mem::size_of; use std::os::fd::{AsRawFd, OwnedFd, RawFd}; -use std::ptr::slice_from_raw_parts; use std::time::Instant; use a653rs::bindings::PortDirection; +use datagrams::{DestinationDatagram, SourceDatagram}; use memfd::{FileSeal, Memfd, MemfdOptions}; use memmap2::MmapMut; +use message::Message; use crate::channel::{PortConfig, QueuingChannelConfig}; use crate::error::{ResultExt, SystemError, TypedError, TypedResult}; use crate::partition::QueuingConstant; -pub mod queue; -use queue::ConcurrentQueue; - -#[derive(Debug)] -struct SourceDatagram<'a> { - num_messages_in_destination: &'a mut usize, - has_overflowed: &'a mut bool, - message_queue: &'a ConcurrentQueue, -} - -#[derive(Debug)] -struct DestinationDatagram<'a> { - num_messages_in_source: &'a mut usize, - clear_requested_timestamp: &'a mut Option, - has_overflowed: &'a mut bool, - message_queue: &'a ConcurrentQueue, -} - -impl<'a> SourceDatagram<'a> { - fn size(msg_size: usize, msg_capacity: usize) -> usize { - size_of::() // number of messages in destination - + size_of::() // flag if queue has overflowed - + ConcurrentQueue::size(Message::size(msg_size), msg_capacity) // the message queue - } - - fn init_at(msg_size: usize, msg_capacity: usize, buffer: &'a mut [u8]) -> Self { - let (num_messages_in_destination, buffer) = unsafe { buffer.strip_field_mut::() }; - let (has_overflowed, buffer) = unsafe { buffer.strip_field_mut::() }; - - let message_queue = ConcurrentQueue::init_at(buffer, Message::size(msg_size), msg_capacity); - - Self { - num_messages_in_destination, - has_overflowed, - message_queue, - } - } - - unsafe fn load_from(buffer: &'a mut [u8]) -> Self { - let (num_messages_in_destination, buffer) = unsafe { buffer.strip_field_mut::() }; - let (has_overflowed, buffer) = unsafe { buffer.strip_field_mut::() }; - - let message_queue = ConcurrentQueue::load_from(buffer); - - Self { - num_messages_in_destination, - has_overflowed, - message_queue, - } - } - - fn pop_then) -> T, T>(&'_ mut self, f: F) -> Option { - self.message_queue - .pop_then(|entry| f(Message::from_bytes(entry))) - } - - fn push<'b>(&'b mut self, data: &'_ [u8], message_timestamp: Instant) -> Option> { - // We need to check if there is enough space left in the queue. - // This is important, because we could theoretically store twice the number of - // our queue size, because we use a separate source and destination queueu. - // Thus we need to limit the number of messages in both queues at the same time. - let queue_is_full = *self.num_messages_in_destination + self.message_queue.len() - == self.message_queue.msg_capacity; - - if queue_is_full { - *self.has_overflowed = true; - return None; - } - let entry = self.message_queue - .push_then(|entry| Message::init_at(entry, data, message_timestamp)).expect("push to be successful because we just checked if there is space in both the source and destination"); - - Some(Message::from_bytes(entry)) - } -} - -impl<'a> DestinationDatagram<'a> { - fn size(msg_size: usize, msg_capacity: usize) -> usize { - size_of::() // number of messages in source - + size_of::() // flag if queue is overflowed - + size_of::>() // flag for the timestamp when a clear was requested - + ConcurrentQueue::size(Message::size(msg_size), msg_capacity) // the message queue - } - fn init_at(msg_size: usize, msg_capacity: usize, buffer: &'a mut [u8]) -> Self { - let (num_messages_in_source, buffer) = unsafe { buffer.strip_field_mut::() }; - let (clear_requested_timestamp, buffer) = - unsafe { buffer.strip_field_mut::>() }; - let (has_overflowed, buffer) = unsafe { buffer.strip_field_mut::() }; - - *num_messages_in_source = 0; - unsafe { - std::ptr::write(clear_requested_timestamp, None); - std::ptr::write(has_overflowed, false); - } - - Self { - num_messages_in_source, - clear_requested_timestamp, - has_overflowed, - message_queue: ConcurrentQueue::init_at(buffer, Message::size(msg_size), msg_capacity), - } - } - unsafe fn load_from(buffer: &'a mut [u8]) -> Self { - let (num_messages_in_source, buffer) = unsafe { buffer.strip_field_mut::() }; - let (clear_requested_timestamp, buffer) = - unsafe { buffer.strip_field_mut::>() }; - let (has_overflown, buffer) = unsafe { buffer.strip_field_mut::() }; - - Self { - num_messages_in_source, - clear_requested_timestamp, - has_overflowed: has_overflown, - message_queue: ConcurrentQueue::load_from(buffer), - } - } - - /// Takes a closure that maps the popped message to some type. - /// If there is a message in the queue, the resulting type and a flag - /// whether the queue has overflowed is returned. - fn pop_then) -> T, T>(&mut self, msg_mapper: F) -> Option<(T, bool)> { - self.message_queue - .pop_then(|entry| msg_mapper(Message::from_bytes(entry))) - .map(|t| (t, *self.has_overflowed)) - } - - /// Pushes a data onto the destination queue - fn push<'b>(&'b mut self, data: &'_ [u8]) -> Option> { - let entry = self.message_queue.push(data)?; - let msg = Message::from_bytes(entry); - - Some(msg) - } -} - -struct Message<'a> { - len: &'a usize, - timestamp: &'a Instant, - /// This data slice is always of the same size, controlled by the owning - /// ConcurrentQueue. That means, that only the first `self.len` bytes in - /// it contain actual data. Use [Message::get_data] to access just the - /// contained bytes. - data: &'a [u8], -} - -impl<'a> Message<'a> { - fn size(msg_size: usize) -> usize { - size_of::() // length of this message - + size_of::() // timestamp when this message was sent - + msg_size // actual message byte data - } - fn from_bytes(bytes: &'a [u8]) -> Self { - let (len, bytes) = unsafe { bytes.strip_field::() }; - let (timestamp, data) = unsafe { bytes.strip_field::() }; - - assert!( - *len <= data.len(), - "*len={} data.len()={}", - *len, - data.len() - ); - - Self { - len, - timestamp, - data, - } - } - - fn init_at(uninitialized_bytes: &mut [u8], data: &[u8], initialization_timestamp: Instant) { - let (len_field, uninitialized_bytes) = - unsafe { uninitialized_bytes.strip_field_mut::() }; - let (timestamp, data_field) = unsafe { uninitialized_bytes.strip_field_mut::() }; - assert!(data_field.len() >= data.len()); - - unsafe { - std::ptr::write(timestamp, initialization_timestamp); - } - - *len_field = data.len(); - data_field[0..data.len()].copy_from_slice(data); - } - - fn to_bytes(&self) -> &[u8] { - // # Safety - // len and data should be contiguous memory - unsafe { - &*slice_from_raw_parts( - self.len as *const usize as *const u8, - Self::size(self.data.len()), - ) - } - } - - fn get_data(&self) -> &[u8] { - &self.data[0..*self.len] - } -} +mod datagrams; +mod message; +mod queue; #[derive(Debug)] pub struct Queuing { diff --git a/core/src/queuing/datagrams.rs b/core/src/queuing/datagrams.rs new file mode 100644 index 0000000..f884cf2 --- /dev/null +++ b/core/src/queuing/datagrams.rs @@ -0,0 +1,141 @@ +use std::fmt::Debug; +use std::mem::size_of; +use std::time::Instant; + +use crate::queuing::message::Message; +use crate::queuing::queue::ConcurrentQueue; +use crate::queuing::StripFieldExt; + +#[derive(Debug)] +pub struct SourceDatagram<'a> { + pub num_messages_in_destination: &'a mut usize, + pub has_overflowed: &'a mut bool, + pub message_queue: &'a ConcurrentQueue, +} + +#[derive(Debug)] +pub struct DestinationDatagram<'a> { + pub num_messages_in_source: &'a mut usize, + pub clear_requested_timestamp: &'a mut Option, + pub has_overflowed: &'a mut bool, + pub message_queue: &'a ConcurrentQueue, +} + +impl<'a> SourceDatagram<'a> { + pub fn size(msg_size: usize, msg_capacity: usize) -> usize { + size_of::() // number of messages in destination + + size_of::() // flag if queue has overflowed + + ConcurrentQueue::size(Message::size(msg_size), msg_capacity) // the message queue + } + + pub fn init_at(msg_size: usize, msg_capacity: usize, buffer: &'a mut [u8]) -> Self { + let (num_messages_in_destination, buffer) = unsafe { buffer.strip_field_mut::() }; + let (has_overflowed, buffer) = unsafe { buffer.strip_field_mut::() }; + + let message_queue = ConcurrentQueue::init_at(buffer, Message::size(msg_size), msg_capacity); + + Self { + num_messages_in_destination, + has_overflowed, + message_queue, + } + } + + pub unsafe fn load_from(buffer: &'a mut [u8]) -> Self { + let (num_messages_in_destination, buffer) = unsafe { buffer.strip_field_mut::() }; + let (has_overflowed, buffer) = unsafe { buffer.strip_field_mut::() }; + + let message_queue = ConcurrentQueue::load_from(buffer); + + Self { + num_messages_in_destination, + has_overflowed, + message_queue, + } + } + + pub fn pop_then) -> T, T>(&'_ mut self, f: F) -> Option { + self.message_queue + .pop_then(|entry| f(Message::from_bytes(entry))) + } + + pub fn push<'b>( + &'b mut self, + data: &'_ [u8], + message_timestamp: Instant, + ) -> Option> { + // We need to check if there is enough space left in the queue. + // This is important, because we could theoretically store twice the number of + // our queue size, because we use a separate source and destination queueu. + // Thus we need to limit the number of messages in both queues at the same time. + let queue_is_full = *self.num_messages_in_destination + self.message_queue.len() + == self.message_queue.msg_capacity; + + if queue_is_full { + *self.has_overflowed = true; + return None; + } + let entry = self.message_queue + .push_then(|entry| Message::init_at(entry, data, message_timestamp)).expect("push to be successful because we just checked if there is space in both the source and destination"); + + Some(Message::from_bytes(entry)) + } +} + +impl<'a> DestinationDatagram<'a> { + pub fn size(msg_size: usize, msg_capacity: usize) -> usize { + size_of::() // number of messages in source + + size_of::() // flag if queue is overflowed + + size_of::>() // flag for the timestamp when a clear was requested + + ConcurrentQueue::size(Message::size(msg_size), msg_capacity) // the message queue + } + pub fn init_at(msg_size: usize, msg_capacity: usize, buffer: &'a mut [u8]) -> Self { + let (num_messages_in_source, buffer) = unsafe { buffer.strip_field_mut::() }; + let (clear_requested_timestamp, buffer) = + unsafe { buffer.strip_field_mut::>() }; + let (has_overflowed, buffer) = unsafe { buffer.strip_field_mut::() }; + + *num_messages_in_source = 0; + unsafe { + std::ptr::write(clear_requested_timestamp, None); + std::ptr::write(has_overflowed, false); + } + + Self { + num_messages_in_source, + clear_requested_timestamp, + has_overflowed, + message_queue: ConcurrentQueue::init_at(buffer, Message::size(msg_size), msg_capacity), + } + } + pub unsafe fn load_from(buffer: &'a mut [u8]) -> Self { + let (num_messages_in_source, buffer) = unsafe { buffer.strip_field_mut::() }; + let (clear_requested_timestamp, buffer) = + unsafe { buffer.strip_field_mut::>() }; + let (has_overflown, buffer) = unsafe { buffer.strip_field_mut::() }; + + Self { + num_messages_in_source, + clear_requested_timestamp, + has_overflowed: has_overflown, + message_queue: ConcurrentQueue::load_from(buffer), + } + } + + /// Takes a closure that maps the popped message to some type. + /// If there is a message in the queue, the resulting type and a flag + /// whether the queue has overflowed is returned. + pub fn pop_then) -> T, T>(&mut self, msg_mapper: F) -> Option<(T, bool)> { + self.message_queue + .pop_then(|entry| msg_mapper(Message::from_bytes(entry))) + .map(|t| (t, *self.has_overflowed)) + } + + /// Pushes a data onto the destination queue + pub fn push<'b>(&'b mut self, data: &'_ [u8]) -> Option> { + let entry = self.message_queue.push(data)?; + let msg = Message::from_bytes(entry); + + Some(msg) + } +} diff --git a/core/src/queuing/message.rs b/core/src/queuing/message.rs new file mode 100644 index 0000000..7298b9d --- /dev/null +++ b/core/src/queuing/message.rs @@ -0,0 +1,69 @@ +use std::mem::size_of; +use std::ptr::slice_from_raw_parts; +use std::time::Instant; + +use super::StripFieldExt; + +pub struct Message<'a> { + pub len: &'a usize, + pub timestamp: &'a Instant, + /// This data slice is always of the same size, controlled by the owning + /// ConcurrentQueue. That means, that only the first `self.len` bytes in + /// it contain actual data. Use [Message::get_data] to access just the + /// contained bytes. + pub data: &'a [u8], +} + +impl<'a> Message<'a> { + pub fn size(msg_size: usize) -> usize { + size_of::() // length of this message + + size_of::() // timestamp when this message was sent + + msg_size // actual message byte data + } + pub fn from_bytes(bytes: &'a [u8]) -> Self { + let (len, bytes) = unsafe { bytes.strip_field::() }; + let (timestamp, data) = unsafe { bytes.strip_field::() }; + + assert!( + *len <= data.len(), + "*len={} data.len()={}", + *len, + data.len() + ); + + Self { + len, + timestamp, + data, + } + } + + pub fn init_at(uninitialized_bytes: &mut [u8], data: &[u8], initialization_timestamp: Instant) { + let (len_field, uninitialized_bytes) = + unsafe { uninitialized_bytes.strip_field_mut::() }; + let (timestamp, data_field) = unsafe { uninitialized_bytes.strip_field_mut::() }; + assert!(data_field.len() >= data.len()); + + unsafe { + std::ptr::write(timestamp, initialization_timestamp); + } + + *len_field = data.len(); + data_field[0..data.len()].copy_from_slice(data); + } + + pub fn to_bytes(&self) -> &[u8] { + // # Safety + // len and data should be contiguous memory + unsafe { + &*slice_from_raw_parts( + self.len as *const usize as *const u8, + Self::size(self.data.len()), + ) + } + } + + pub fn get_data(&self) -> &[u8] { + &self.data[0..*self.len] + } +} diff --git a/core/src/queuing/queue.rs b/core/src/queuing/queue.rs index e7177ed..602b789 100644 --- a/core/src/queuing/queue.rs +++ b/core/src/queuing/queue.rs @@ -8,29 +8,9 @@ use std::{mem, ptr}; /// created inside a buffer of type `&[u8]` via [ConcurrentQueue::init_at]. /// The required buffer size can be requested in advance via /// [ConcurrentQueue::size] by providing the size and maximum number of -/// entries. # Example -/// ``` -/// # use a653rs_linux_core::queuing::queue::ConcurrentQueue; -/// // Create a ConcurrentQueue inside of a Vec buffer object -/// let required_size = ConcurrentQueue::size(1, 4); -/// let mut buffer = vec![0u8; required_size]; -/// ConcurrentQueue::init_at(&mut buffer, 1, 4); -/// let queue1 = unsafe { ConcurrentQueue::load_from(&buffer) }; -/// let queue2 = unsafe { ConcurrentQueue::load_from(&buffer) }; +/// entries. /// -/// // Let's push some values in the queue -/// assert!(queue1.push(&[1]).is_some()); -/// assert!(queue2.push(&[2]).is_some()); -/// -/// // Now pop them using the Fifo method -/// assert_eq!(queue2.pop().unwrap()[0], 1); -/// assert_eq!(queue1.pop().unwrap()[0], 2); -/// -/// // When the queue is empty, pop will return None -/// assert_eq!(queue1.pop(), None); -/// assert_eq!(queue2.pop(), None); -/// ``` -#[repr(C)] +/// See [tests] for examples pub struct ConcurrentQueue { pub msg_size: usize, pub msg_capacity: usize, @@ -158,21 +138,6 @@ impl ConcurrentQueue { (first + idx) % self.msg_capacity * self.msg_size } - /// Gets an element from the queue at a specific index - pub fn get(&self, idx: usize) -> Option<&[u8]> { - assert!(idx < self.msg_capacity); - - let current_len = self.len.load(Ordering::SeqCst); - if idx > current_len { - return None; - } - - let idx = self.to_physical_idx(self.first.load(Ordering::SeqCst), idx); - - let msg = &unsafe { self.data.get().as_mut().unwrap() }[idx..(idx + self.msg_size)]; - Some(msg) - } - /// Pushes an element to the back of the queue. If there was space, a /// mutable reference to the inserted element is returned. pub fn push(&self, data: &[u8]) -> Option<&mut [u8]> { @@ -202,6 +167,7 @@ impl ConcurrentQueue { } /// Tries to pop an element from the front of the queue. + #[allow(unused)] pub fn pop(&self) -> Option> { self.pop_then(|entry| Vec::from(entry).into_boxed_slice()) } @@ -247,11 +213,6 @@ impl ConcurrentQueue { self.len.load(Ordering::SeqCst) } - #[must_use] - pub fn is_empty(&self) -> bool { - self.len() == 0 - } - pub fn clear(&self) { self.len.store(0, Ordering::SeqCst); } From 48a4cd0a20037aa7240f94f5438a5807399d03a1 Mon Sep 17 00:00:00 2001 From: Florian Hartung Date: Wed, 31 Jul 2024 13:17:03 +0200 Subject: [PATCH 2/2] refactor(core): move queuing.rs into its own module --- core/src/{queuing.rs => queuing/mod.rs} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename core/src/{queuing.rs => queuing/mod.rs} (100%) diff --git a/core/src/queuing.rs b/core/src/queuing/mod.rs similarity index 100% rename from core/src/queuing.rs rename to core/src/queuing/mod.rs