Skip to content

Commit

Permalink
Merge pull request #126 from DLR-FT/dev/queuing_ports_cleanup
Browse files Browse the repository at this point in the history
Clean up queuing port logic
  • Loading branch information
sevenautumns authored Jul 31, 2024
2 parents 1b66204 + 48a4cd0 commit cecbedd
Show file tree
Hide file tree
Showing 4 changed files with 218 additions and 239 deletions.
141 changes: 141 additions & 0 deletions core/src/queuing/datagrams.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
use std::fmt::Debug;
use std::mem::size_of;
use std::time::Instant;

use crate::queuing::message::Message;
use crate::queuing::queue::ConcurrentQueue;
use crate::queuing::StripFieldExt;

#[derive(Debug)]
pub struct SourceDatagram<'a> {
pub num_messages_in_destination: &'a mut usize,
pub has_overflowed: &'a mut bool,
pub message_queue: &'a ConcurrentQueue,
}

#[derive(Debug)]
pub struct DestinationDatagram<'a> {
pub num_messages_in_source: &'a mut usize,
pub clear_requested_timestamp: &'a mut Option<Instant>,
pub has_overflowed: &'a mut bool,
pub message_queue: &'a ConcurrentQueue,
}

impl<'a> SourceDatagram<'a> {
pub fn size(msg_size: usize, msg_capacity: usize) -> usize {
size_of::<usize>() // number of messages in destination
+ size_of::<bool>() // flag if queue has overflowed
+ ConcurrentQueue::size(Message::size(msg_size), msg_capacity) // the message queue
}

pub fn init_at(msg_size: usize, msg_capacity: usize, buffer: &'a mut [u8]) -> Self {
let (num_messages_in_destination, buffer) = unsafe { buffer.strip_field_mut::<usize>() };
let (has_overflowed, buffer) = unsafe { buffer.strip_field_mut::<bool>() };

let message_queue = ConcurrentQueue::init_at(buffer, Message::size(msg_size), msg_capacity);

Self {
num_messages_in_destination,
has_overflowed,
message_queue,
}
}

pub unsafe fn load_from(buffer: &'a mut [u8]) -> Self {
let (num_messages_in_destination, buffer) = unsafe { buffer.strip_field_mut::<usize>() };
let (has_overflowed, buffer) = unsafe { buffer.strip_field_mut::<bool>() };

let message_queue = ConcurrentQueue::load_from(buffer);

Self {
num_messages_in_destination,
has_overflowed,
message_queue,
}
}

pub fn pop_then<F: FnOnce(Message<'_>) -> T, T>(&'_ mut self, f: F) -> Option<T> {
self.message_queue
.pop_then(|entry| f(Message::from_bytes(entry)))
}

pub fn push<'b>(
&'b mut self,
data: &'_ [u8],
message_timestamp: Instant,
) -> Option<Message<'b>> {
// We need to check if there is enough space left in the queue.
// This is important, because we could theoretically store twice the number of
// our queue size, because we use a separate source and destination queueu.
// Thus we need to limit the number of messages in both queues at the same time.
let queue_is_full = *self.num_messages_in_destination + self.message_queue.len()
== self.message_queue.msg_capacity;

if queue_is_full {
*self.has_overflowed = true;
return None;
}
let entry = self.message_queue
.push_then(|entry| Message::init_at(entry, data, message_timestamp)).expect("push to be successful because we just checked if there is space in both the source and destination");

Some(Message::from_bytes(entry))
}
}

impl<'a> DestinationDatagram<'a> {
pub fn size(msg_size: usize, msg_capacity: usize) -> usize {
size_of::<usize>() // number of messages in source
+ size_of::<bool>() // flag if queue is overflowed
+ size_of::<Option<Instant>>() // flag for the timestamp when a clear was requested
+ ConcurrentQueue::size(Message::size(msg_size), msg_capacity) // the message queue
}
pub fn init_at(msg_size: usize, msg_capacity: usize, buffer: &'a mut [u8]) -> Self {
let (num_messages_in_source, buffer) = unsafe { buffer.strip_field_mut::<usize>() };
let (clear_requested_timestamp, buffer) =
unsafe { buffer.strip_field_mut::<Option<Instant>>() };
let (has_overflowed, buffer) = unsafe { buffer.strip_field_mut::<bool>() };

*num_messages_in_source = 0;
unsafe {
std::ptr::write(clear_requested_timestamp, None);
std::ptr::write(has_overflowed, false);
}

Self {
num_messages_in_source,
clear_requested_timestamp,
has_overflowed,
message_queue: ConcurrentQueue::init_at(buffer, Message::size(msg_size), msg_capacity),
}
}
pub unsafe fn load_from(buffer: &'a mut [u8]) -> Self {
let (num_messages_in_source, buffer) = unsafe { buffer.strip_field_mut::<usize>() };
let (clear_requested_timestamp, buffer) =
unsafe { buffer.strip_field_mut::<Option<Instant>>() };
let (has_overflown, buffer) = unsafe { buffer.strip_field_mut::<bool>() };

Self {
num_messages_in_source,
clear_requested_timestamp,
has_overflowed: has_overflown,
message_queue: ConcurrentQueue::load_from(buffer),
}
}

/// Takes a closure that maps the popped message to some type.
/// If there is a message in the queue, the resulting type and a flag
/// whether the queue has overflowed is returned.
pub fn pop_then<F: FnOnce(Message<'_>) -> T, T>(&mut self, msg_mapper: F) -> Option<(T, bool)> {
self.message_queue
.pop_then(|entry| msg_mapper(Message::from_bytes(entry)))
.map(|t| (t, *self.has_overflowed))
}

/// Pushes a data onto the destination queue
pub fn push<'b>(&'b mut self, data: &'_ [u8]) -> Option<Message<'b>> {
let entry = self.message_queue.push(data)?;
let msg = Message::from_bytes(entry);

Some(msg)
}
}
69 changes: 69 additions & 0 deletions core/src/queuing/message.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
use std::mem::size_of;
use std::ptr::slice_from_raw_parts;
use std::time::Instant;

use super::StripFieldExt;

pub struct Message<'a> {
pub len: &'a usize,
pub timestamp: &'a Instant,
/// This data slice is always of the same size, controlled by the owning
/// ConcurrentQueue. That means, that only the first `self.len` bytes in
/// it contain actual data. Use [Message::get_data] to access just the
/// contained bytes.
pub data: &'a [u8],
}

impl<'a> Message<'a> {
pub fn size(msg_size: usize) -> usize {
size_of::<usize>() // length of this message
+ size_of::<Instant>() // timestamp when this message was sent
+ msg_size // actual message byte data
}
pub fn from_bytes(bytes: &'a [u8]) -> Self {
let (len, bytes) = unsafe { bytes.strip_field::<usize>() };
let (timestamp, data) = unsafe { bytes.strip_field::<Instant>() };

assert!(
*len <= data.len(),
"*len={} data.len()={}",
*len,
data.len()
);

Self {
len,
timestamp,
data,
}
}

pub fn init_at(uninitialized_bytes: &mut [u8], data: &[u8], initialization_timestamp: Instant) {
let (len_field, uninitialized_bytes) =
unsafe { uninitialized_bytes.strip_field_mut::<usize>() };
let (timestamp, data_field) = unsafe { uninitialized_bytes.strip_field_mut::<Instant>() };
assert!(data_field.len() >= data.len());

unsafe {
std::ptr::write(timestamp, initialization_timestamp);
}

*len_field = data.len();
data_field[0..data.len()].copy_from_slice(data);
}

pub fn to_bytes(&self) -> &[u8] {
// # Safety
// len and data should be contiguous memory
unsafe {
&*slice_from_raw_parts(
self.len as *const usize as *const u8,
Self::size(self.data.len()),
)
}
}

pub fn get_data(&self) -> &[u8] {
&self.data[0..*self.len]
}
}
Loading

0 comments on commit cecbedd

Please sign in to comment.