Skip to content

Commit

Permalink
refactor(core): split queuing ports logic into multiple module
Browse files Browse the repository at this point in the history
  • Loading branch information
florianhartung committed Jul 31, 2024
1 parent 1b66204 commit 9fbe007
Show file tree
Hide file tree
Showing 4 changed files with 218 additions and 220 deletions.
204 changes: 6 additions & 198 deletions core/src/queuing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,213 +2,21 @@ use std::fmt::Debug;
use std::mem;
use std::mem::size_of;
use std::os::fd::{AsRawFd, OwnedFd, RawFd};
use std::ptr::slice_from_raw_parts;
use std::time::Instant;

use a653rs::bindings::PortDirection;
use datagrams::{DestinationDatagram, SourceDatagram};
use memfd::{FileSeal, Memfd, MemfdOptions};
use memmap2::MmapMut;
use message::Message;

use crate::channel::{PortConfig, QueuingChannelConfig};
use crate::error::{ResultExt, SystemError, TypedError, TypedResult};
use crate::partition::QueuingConstant;

pub mod queue;
use queue::ConcurrentQueue;

#[derive(Debug)]
struct SourceDatagram<'a> {
num_messages_in_destination: &'a mut usize,
has_overflowed: &'a mut bool,
message_queue: &'a ConcurrentQueue,
}

#[derive(Debug)]
struct DestinationDatagram<'a> {
num_messages_in_source: &'a mut usize,
clear_requested_timestamp: &'a mut Option<Instant>,
has_overflowed: &'a mut bool,
message_queue: &'a ConcurrentQueue,
}

impl<'a> SourceDatagram<'a> {
fn size(msg_size: usize, msg_capacity: usize) -> usize {
size_of::<usize>() // number of messages in destination
+ size_of::<bool>() // flag if queue has overflowed
+ ConcurrentQueue::size(Message::size(msg_size), msg_capacity) // the message queue
}

fn init_at(msg_size: usize, msg_capacity: usize, buffer: &'a mut [u8]) -> Self {
let (num_messages_in_destination, buffer) = unsafe { buffer.strip_field_mut::<usize>() };
let (has_overflowed, buffer) = unsafe { buffer.strip_field_mut::<bool>() };

let message_queue = ConcurrentQueue::init_at(buffer, Message::size(msg_size), msg_capacity);

Self {
num_messages_in_destination,
has_overflowed,
message_queue,
}
}

unsafe fn load_from(buffer: &'a mut [u8]) -> Self {
let (num_messages_in_destination, buffer) = unsafe { buffer.strip_field_mut::<usize>() };
let (has_overflowed, buffer) = unsafe { buffer.strip_field_mut::<bool>() };

let message_queue = ConcurrentQueue::load_from(buffer);

Self {
num_messages_in_destination,
has_overflowed,
message_queue,
}
}

fn pop_then<F: FnOnce(Message<'_>) -> T, T>(&'_ mut self, f: F) -> Option<T> {
self.message_queue
.pop_then(|entry| f(Message::from_bytes(entry)))
}

fn push<'b>(&'b mut self, data: &'_ [u8], message_timestamp: Instant) -> Option<Message<'b>> {
// We need to check if there is enough space left in the queue.
// This is important, because we could theoretically store twice the number of
// our queue size, because we use a separate source and destination queueu.
// Thus we need to limit the number of messages in both queues at the same time.
let queue_is_full = *self.num_messages_in_destination + self.message_queue.len()
== self.message_queue.msg_capacity;

if queue_is_full {
*self.has_overflowed = true;
return None;
}
let entry = self.message_queue
.push_then(|entry| Message::init_at(entry, data, message_timestamp)).expect("push to be successful because we just checked if there is space in both the source and destination");

Some(Message::from_bytes(entry))
}
}

impl<'a> DestinationDatagram<'a> {
fn size(msg_size: usize, msg_capacity: usize) -> usize {
size_of::<usize>() // number of messages in source
+ size_of::<bool>() // flag if queue is overflowed
+ size_of::<Option<Instant>>() // flag for the timestamp when a clear was requested
+ ConcurrentQueue::size(Message::size(msg_size), msg_capacity) // the message queue
}
fn init_at(msg_size: usize, msg_capacity: usize, buffer: &'a mut [u8]) -> Self {
let (num_messages_in_source, buffer) = unsafe { buffer.strip_field_mut::<usize>() };
let (clear_requested_timestamp, buffer) =
unsafe { buffer.strip_field_mut::<Option<Instant>>() };
let (has_overflowed, buffer) = unsafe { buffer.strip_field_mut::<bool>() };

*num_messages_in_source = 0;
unsafe {
std::ptr::write(clear_requested_timestamp, None);
std::ptr::write(has_overflowed, false);
}

Self {
num_messages_in_source,
clear_requested_timestamp,
has_overflowed,
message_queue: ConcurrentQueue::init_at(buffer, Message::size(msg_size), msg_capacity),
}
}
unsafe fn load_from(buffer: &'a mut [u8]) -> Self {
let (num_messages_in_source, buffer) = unsafe { buffer.strip_field_mut::<usize>() };
let (clear_requested_timestamp, buffer) =
unsafe { buffer.strip_field_mut::<Option<Instant>>() };
let (has_overflown, buffer) = unsafe { buffer.strip_field_mut::<bool>() };

Self {
num_messages_in_source,
clear_requested_timestamp,
has_overflowed: has_overflown,
message_queue: ConcurrentQueue::load_from(buffer),
}
}

/// Takes a closure that maps the popped message to some type.
/// If there is a message in the queue, the resulting type and a flag
/// whether the queue has overflowed is returned.
fn pop_then<F: FnOnce(Message<'_>) -> T, T>(&mut self, msg_mapper: F) -> Option<(T, bool)> {
self.message_queue
.pop_then(|entry| msg_mapper(Message::from_bytes(entry)))
.map(|t| (t, *self.has_overflowed))
}

/// Pushes a data onto the destination queue
fn push<'b>(&'b mut self, data: &'_ [u8]) -> Option<Message<'b>> {
let entry = self.message_queue.push(data)?;
let msg = Message::from_bytes(entry);

Some(msg)
}
}

struct Message<'a> {
len: &'a usize,
timestamp: &'a Instant,
/// This data slice is always of the same size, controlled by the owning
/// ConcurrentQueue. That means, that only the first `self.len` bytes in
/// it contain actual data. Use [Message::get_data] to access just the
/// contained bytes.
data: &'a [u8],
}

impl<'a> Message<'a> {
fn size(msg_size: usize) -> usize {
size_of::<usize>() // length of this message
+ size_of::<Instant>() // timestamp when this message was sent
+ msg_size // actual message byte data
}
fn from_bytes(bytes: &'a [u8]) -> Self {
let (len, bytes) = unsafe { bytes.strip_field::<usize>() };
let (timestamp, data) = unsafe { bytes.strip_field::<Instant>() };

assert!(
*len <= data.len(),
"*len={} data.len()={}",
*len,
data.len()
);

Self {
len,
timestamp,
data,
}
}

fn init_at(uninitialized_bytes: &mut [u8], data: &[u8], initialization_timestamp: Instant) {
let (len_field, uninitialized_bytes) =
unsafe { uninitialized_bytes.strip_field_mut::<usize>() };
let (timestamp, data_field) = unsafe { uninitialized_bytes.strip_field_mut::<Instant>() };
assert!(data_field.len() >= data.len());

unsafe {
std::ptr::write(timestamp, initialization_timestamp);
}

*len_field = data.len();
data_field[0..data.len()].copy_from_slice(data);
}

fn to_bytes(&self) -> &[u8] {
// # Safety
// len and data should be contiguous memory
unsafe {
&*slice_from_raw_parts(
self.len as *const usize as *const u8,
Self::size(self.data.len()),
)
}
}

fn get_data(&self) -> &[u8] {
&self.data[0..*self.len]
}
}
pub(self) mod datagrams;
pub(self) mod message;
pub(self) mod queue;

#[derive(Debug)]
pub struct Queuing {
Expand Down Expand Up @@ -462,7 +270,7 @@ impl TryFrom<RawFd> for QueuingDestination {
}

/// An extension trait for stripping generic types off of byte arrays.
trait StripFieldExt {
pub(self) trait StripFieldExt {
unsafe fn strip_field<T>(&self) -> (&T, &Self);
unsafe fn strip_field_mut<T>(&mut self) -> (&mut T, &mut Self);
}
Expand Down
141 changes: 141 additions & 0 deletions core/src/queuing/datagrams.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
use std::fmt::Debug;
use std::mem::size_of;
use std::time::Instant;

use crate::queuing::message::Message;
use crate::queuing::queue::ConcurrentQueue;
use crate::queuing::StripFieldExt;

#[derive(Debug)]
pub struct SourceDatagram<'a> {
pub num_messages_in_destination: &'a mut usize,
pub has_overflowed: &'a mut bool,
pub message_queue: &'a ConcurrentQueue,
}

#[derive(Debug)]
pub struct DestinationDatagram<'a> {
pub num_messages_in_source: &'a mut usize,
pub clear_requested_timestamp: &'a mut Option<Instant>,
pub has_overflowed: &'a mut bool,
pub message_queue: &'a ConcurrentQueue,
}

impl<'a> SourceDatagram<'a> {
pub fn size(msg_size: usize, msg_capacity: usize) -> usize {
size_of::<usize>() // number of messages in destination
+ size_of::<bool>() // flag if queue has overflowed
+ ConcurrentQueue::size(Message::size(msg_size), msg_capacity) // the message queue
}

pub fn init_at(msg_size: usize, msg_capacity: usize, buffer: &'a mut [u8]) -> Self {
let (num_messages_in_destination, buffer) = unsafe { buffer.strip_field_mut::<usize>() };
let (has_overflowed, buffer) = unsafe { buffer.strip_field_mut::<bool>() };

let message_queue = ConcurrentQueue::init_at(buffer, Message::size(msg_size), msg_capacity);

Self {
num_messages_in_destination,
has_overflowed,
message_queue,
}
}

pub unsafe fn load_from(buffer: &'a mut [u8]) -> Self {
let (num_messages_in_destination, buffer) = unsafe { buffer.strip_field_mut::<usize>() };
let (has_overflowed, buffer) = unsafe { buffer.strip_field_mut::<bool>() };

let message_queue = ConcurrentQueue::load_from(buffer);

Self {
num_messages_in_destination,
has_overflowed,
message_queue,
}
}

pub fn pop_then<F: FnOnce(Message<'_>) -> T, T>(&'_ mut self, f: F) -> Option<T> {
self.message_queue
.pop_then(|entry| f(Message::from_bytes(entry)))
}

pub fn push<'b>(
&'b mut self,
data: &'_ [u8],
message_timestamp: Instant,
) -> Option<Message<'b>> {
// We need to check if there is enough space left in the queue.
// This is important, because we could theoretically store twice the number of
// our queue size, because we use a separate source and destination queueu.
// Thus we need to limit the number of messages in both queues at the same time.
let queue_is_full = *self.num_messages_in_destination + self.message_queue.len()
== self.message_queue.msg_capacity;

if queue_is_full {
*self.has_overflowed = true;
return None;
}
let entry = self.message_queue
.push_then(|entry| Message::init_at(entry, data, message_timestamp)).expect("push to be successful because we just checked if there is space in both the source and destination");

Some(Message::from_bytes(entry))
}
}

impl<'a> DestinationDatagram<'a> {
pub fn size(msg_size: usize, msg_capacity: usize) -> usize {
size_of::<usize>() // number of messages in source
+ size_of::<bool>() // flag if queue is overflowed
+ size_of::<Option<Instant>>() // flag for the timestamp when a clear was requested
+ ConcurrentQueue::size(Message::size(msg_size), msg_capacity) // the message queue
}
pub fn init_at(msg_size: usize, msg_capacity: usize, buffer: &'a mut [u8]) -> Self {
let (num_messages_in_source, buffer) = unsafe { buffer.strip_field_mut::<usize>() };
let (clear_requested_timestamp, buffer) =
unsafe { buffer.strip_field_mut::<Option<Instant>>() };
let (has_overflowed, buffer) = unsafe { buffer.strip_field_mut::<bool>() };

*num_messages_in_source = 0;
unsafe {
std::ptr::write(clear_requested_timestamp, None);
std::ptr::write(has_overflowed, false);
}

Self {
num_messages_in_source,
clear_requested_timestamp,
has_overflowed,
message_queue: ConcurrentQueue::init_at(buffer, Message::size(msg_size), msg_capacity),
}
}
pub unsafe fn load_from(buffer: &'a mut [u8]) -> Self {
let (num_messages_in_source, buffer) = unsafe { buffer.strip_field_mut::<usize>() };
let (clear_requested_timestamp, buffer) =
unsafe { buffer.strip_field_mut::<Option<Instant>>() };
let (has_overflown, buffer) = unsafe { buffer.strip_field_mut::<bool>() };

Self {
num_messages_in_source,
clear_requested_timestamp,
has_overflowed: has_overflown,
message_queue: ConcurrentQueue::load_from(buffer),
}
}

/// Takes a closure that maps the popped message to some type.
/// If there is a message in the queue, the resulting type and a flag
/// whether the queue has overflowed is returned.
pub fn pop_then<F: FnOnce(Message<'_>) -> T, T>(&mut self, msg_mapper: F) -> Option<(T, bool)> {
self.message_queue
.pop_then(|entry| msg_mapper(Message::from_bytes(entry)))
.map(|t| (t, *self.has_overflowed))
}

/// Pushes a data onto the destination queue
pub fn push<'b>(&'b mut self, data: &'_ [u8]) -> Option<Message<'b>> {
let entry = self.message_queue.push(data)?;
let msg = Message::from_bytes(entry);

Some(msg)
}
}
Loading

0 comments on commit 9fbe007

Please sign in to comment.