Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clean up queuing port logic #126

Merged
merged 2 commits into from
Jul 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 141 additions & 0 deletions core/src/queuing/datagrams.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
use std::fmt::Debug;
use std::mem::size_of;
use std::time::Instant;

use crate::queuing::message::Message;
use crate::queuing::queue::ConcurrentQueue;
use crate::queuing::StripFieldExt;

#[derive(Debug)]
pub struct SourceDatagram<'a> {
pub num_messages_in_destination: &'a mut usize,
pub has_overflowed: &'a mut bool,
pub message_queue: &'a ConcurrentQueue,
}

#[derive(Debug)]
pub struct DestinationDatagram<'a> {
pub num_messages_in_source: &'a mut usize,
pub clear_requested_timestamp: &'a mut Option<Instant>,
pub has_overflowed: &'a mut bool,
pub message_queue: &'a ConcurrentQueue,
}

impl<'a> SourceDatagram<'a> {
pub fn size(msg_size: usize, msg_capacity: usize) -> usize {
size_of::<usize>() // number of messages in destination
+ size_of::<bool>() // flag if queue has overflowed
+ ConcurrentQueue::size(Message::size(msg_size), msg_capacity) // the message queue
}

pub fn init_at(msg_size: usize, msg_capacity: usize, buffer: &'a mut [u8]) -> Self {
let (num_messages_in_destination, buffer) = unsafe { buffer.strip_field_mut::<usize>() };
let (has_overflowed, buffer) = unsafe { buffer.strip_field_mut::<bool>() };

let message_queue = ConcurrentQueue::init_at(buffer, Message::size(msg_size), msg_capacity);

Self {
num_messages_in_destination,
has_overflowed,
message_queue,
}
}

pub unsafe fn load_from(buffer: &'a mut [u8]) -> Self {
let (num_messages_in_destination, buffer) = unsafe { buffer.strip_field_mut::<usize>() };
let (has_overflowed, buffer) = unsafe { buffer.strip_field_mut::<bool>() };

let message_queue = ConcurrentQueue::load_from(buffer);

Self {
num_messages_in_destination,
has_overflowed,
message_queue,
}
}

pub fn pop_then<F: FnOnce(Message<'_>) -> T, T>(&'_ mut self, f: F) -> Option<T> {
self.message_queue
.pop_then(|entry| f(Message::from_bytes(entry)))
}

pub fn push<'b>(
&'b mut self,
data: &'_ [u8],
message_timestamp: Instant,
) -> Option<Message<'b>> {
// We need to check if there is enough space left in the queue.
// This is important, because we could theoretically store twice the number of
// our queue size, because we use a separate source and destination queueu.
// Thus we need to limit the number of messages in both queues at the same time.
let queue_is_full = *self.num_messages_in_destination + self.message_queue.len()
== self.message_queue.msg_capacity;

if queue_is_full {
*self.has_overflowed = true;
return None;
}
let entry = self.message_queue
.push_then(|entry| Message::init_at(entry, data, message_timestamp)).expect("push to be successful because we just checked if there is space in both the source and destination");

Some(Message::from_bytes(entry))
}
}

impl<'a> DestinationDatagram<'a> {
pub fn size(msg_size: usize, msg_capacity: usize) -> usize {
size_of::<usize>() // number of messages in source
+ size_of::<bool>() // flag if queue is overflowed
+ size_of::<Option<Instant>>() // flag for the timestamp when a clear was requested
+ ConcurrentQueue::size(Message::size(msg_size), msg_capacity) // the message queue
}
pub fn init_at(msg_size: usize, msg_capacity: usize, buffer: &'a mut [u8]) -> Self {
let (num_messages_in_source, buffer) = unsafe { buffer.strip_field_mut::<usize>() };
let (clear_requested_timestamp, buffer) =
unsafe { buffer.strip_field_mut::<Option<Instant>>() };
let (has_overflowed, buffer) = unsafe { buffer.strip_field_mut::<bool>() };

*num_messages_in_source = 0;
unsafe {
std::ptr::write(clear_requested_timestamp, None);
std::ptr::write(has_overflowed, false);
}

Self {
num_messages_in_source,
clear_requested_timestamp,
has_overflowed,
message_queue: ConcurrentQueue::init_at(buffer, Message::size(msg_size), msg_capacity),
}
}
pub unsafe fn load_from(buffer: &'a mut [u8]) -> Self {
let (num_messages_in_source, buffer) = unsafe { buffer.strip_field_mut::<usize>() };
let (clear_requested_timestamp, buffer) =
unsafe { buffer.strip_field_mut::<Option<Instant>>() };
let (has_overflown, buffer) = unsafe { buffer.strip_field_mut::<bool>() };

Self {
num_messages_in_source,
clear_requested_timestamp,
has_overflowed: has_overflown,
message_queue: ConcurrentQueue::load_from(buffer),
}
}

/// Takes a closure that maps the popped message to some type.
/// If there is a message in the queue, the resulting type and a flag
/// whether the queue has overflowed is returned.
pub fn pop_then<F: FnOnce(Message<'_>) -> T, T>(&mut self, msg_mapper: F) -> Option<(T, bool)> {
self.message_queue
.pop_then(|entry| msg_mapper(Message::from_bytes(entry)))
.map(|t| (t, *self.has_overflowed))
}

/// Pushes a data onto the destination queue
pub fn push<'b>(&'b mut self, data: &'_ [u8]) -> Option<Message<'b>> {
let entry = self.message_queue.push(data)?;
let msg = Message::from_bytes(entry);

Some(msg)
}
}
69 changes: 69 additions & 0 deletions core/src/queuing/message.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
use std::mem::size_of;
use std::ptr::slice_from_raw_parts;
use std::time::Instant;

use super::StripFieldExt;

pub struct Message<'a> {
pub len: &'a usize,
pub timestamp: &'a Instant,
/// This data slice is always of the same size, controlled by the owning
/// ConcurrentQueue. That means, that only the first `self.len` bytes in
/// it contain actual data. Use [Message::get_data] to access just the
/// contained bytes.
pub data: &'a [u8],
}

impl<'a> Message<'a> {
pub fn size(msg_size: usize) -> usize {
size_of::<usize>() // length of this message
+ size_of::<Instant>() // timestamp when this message was sent
+ msg_size // actual message byte data
}
pub fn from_bytes(bytes: &'a [u8]) -> Self {
let (len, bytes) = unsafe { bytes.strip_field::<usize>() };
let (timestamp, data) = unsafe { bytes.strip_field::<Instant>() };

assert!(
*len <= data.len(),
"*len={} data.len()={}",
*len,
data.len()
);

Self {
len,
timestamp,
data,
}
}

pub fn init_at(uninitialized_bytes: &mut [u8], data: &[u8], initialization_timestamp: Instant) {
let (len_field, uninitialized_bytes) =
unsafe { uninitialized_bytes.strip_field_mut::<usize>() };
let (timestamp, data_field) = unsafe { uninitialized_bytes.strip_field_mut::<Instant>() };
assert!(data_field.len() >= data.len());

unsafe {
std::ptr::write(timestamp, initialization_timestamp);
}

*len_field = data.len();
data_field[0..data.len()].copy_from_slice(data);
}

pub fn to_bytes(&self) -> &[u8] {
// # Safety
// len and data should be contiguous memory
unsafe {
&*slice_from_raw_parts(
self.len as *const usize as *const u8,
Self::size(self.data.len()),
)
}
}

pub fn get_data(&self) -> &[u8] {
&self.data[0..*self.len]
}
}
Loading
Loading