Skip to content

Commit

Permalink
Streamline job manager
Browse files Browse the repository at this point in the history
  • Loading branch information
Thomas Braun committed Oct 25, 2023
1 parent 3faa90b commit ebc021d
Showing 1 changed file with 124 additions and 4 deletions.
128 changes: 124 additions & 4 deletions src/job_manager.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use parking_lot::RwLock;
use std::fmt::{Debug, Display};
use std::future::Future;
use std::ops::{Add, Sub};
use std::{
collections::{HashMap, HashSet, VecDeque},
hash::{Hash, Hasher},
Expand Down Expand Up @@ -54,15 +55,44 @@ pub type EnqueuedMessage<A, B, C> = HashMap<A, HashMap<B, VecDeque<C>>>;

pub trait WorkManagerInterface: Send + Sync + 'static + Sized {
type SSID: Copy + Hash + Eq + PartialEq + Send + Sync + 'static;
type Clock: Copy + Debug + Eq + PartialEq + Send + Sync + 'static;
type Clock: Copy
+ Debug
+ Default
+ Eq
+ Ord
+ PartialOrd
+ PartialEq
+ Send
+ Sync
+ Sub<Output = Self::Clock>
+ Add<Output = Self::Clock>
+ 'static;
type ProtocolMessage: ProtocolMessageMetadata<Self> + Send + Sync + 'static;
type Error: Debug + Send + Sync + 'static;
type SessionID: Copy + Hash + Eq + PartialEq + Display + Debug + Send + Sync + 'static;
fn debug(&self, input: String);
fn error(&self, input: String);
fn warn(&self, input: String);
fn clock(&self) -> Self::Clock;
fn associated_block_id_acceptable(now: Self::Clock, compare: Self::Clock) -> bool;
fn acceptable_block_tolerance() -> Self::Clock;
fn associated_block_id_acceptable(expected: Self::Clock, received: Self::Clock) -> bool {
// Favor explicit logic for readability
let tolerance = Self::acceptable_block_tolerance();
let is_acceptable_above = received >= expected && received <= expected + tolerance;
let is_acceptable_below =
received < expected && received >= saturating_sub(expected, tolerance);
let is_equal = expected == received;

is_acceptable_above || is_acceptable_below || is_equal
}
}

fn saturating_sub<T: Sub<Output = T> + Ord + Default>(a: T, b: T) -> T {
if a < b {
T::default()
} else {
a - b
}
}

pub trait ProtocolMessageMetadata<WM: WorkManagerInterface> {
Expand Down Expand Up @@ -568,8 +598,9 @@ mod tests {
fn clock(&self) -> Self::Clock {
0
}
fn associated_block_id_acceptable(now: Self::Clock, compare: Self::Clock) -> bool {
now == compare

fn acceptable_block_tolerance() -> Self::Clock {
0
}
}

Expand Down Expand Up @@ -1126,4 +1157,93 @@ mod tests {
let delivery_type = work_manager.deliver_message(msg, [2; 32]).unwrap(); // incorrect task hash
assert_eq!(delivery_type, DeliveryType::EnqueuedMessage); // message should be enqueued because the task hash is incorrect
}

struct DummyRangeChecker<const N: u64>;
struct DummyProtocolMessage;

impl<const N: u64> ProtocolMessageMetadata<DummyRangeChecker<N>> for DummyProtocolMessage {
fn associated_block_id(&self) -> u64 {
0u64
}

fn associated_session_id(&self) -> u64 {
0u64
}

fn associated_ssid(&self) -> u64 {
0u64
}
}

impl<const N: u64> WorkManagerInterface for DummyRangeChecker<N> {
type SSID = u64;
type Clock = u64;
type ProtocolMessage = DummyProtocolMessage;
type Error = ();
type SessionID = u64;

fn debug(&self, _input: String) {
todo!()
}

fn error(&self, _input: String) {
todo!()
}

fn warn(&self, _input: String) {
todo!()
}

fn clock(&self) -> Self::Clock {
todo!()
}

fn acceptable_block_tolerance() -> Self::Clock {
N
}
}

#[test]
fn test_range_above() {
let current_block: u64 = 10;
const TOL: u64 = 5;
assert!(DummyRangeChecker::<TOL>::associated_block_id_acceptable(
current_block,
current_block
));
assert!(DummyRangeChecker::<TOL>::associated_block_id_acceptable(
current_block,
current_block + 1
));
assert!(DummyRangeChecker::<TOL>::associated_block_id_acceptable(
current_block,
current_block + TOL
));
assert!(!DummyRangeChecker::<TOL>::associated_block_id_acceptable(
current_block,
current_block + TOL + 1
));
}

#[test]
fn test_range_below() {
let current_block: u64 = 10;
const TOL: u64 = 5;
assert!(DummyRangeChecker::<TOL>::associated_block_id_acceptable(
current_block,
current_block
));
assert!(DummyRangeChecker::<TOL>::associated_block_id_acceptable(
current_block,
current_block - 1
));
assert!(DummyRangeChecker::<TOL>::associated_block_id_acceptable(
current_block,
current_block - TOL
));
assert!(!DummyRangeChecker::<TOL>::associated_block_id_acceptable(
current_block,
current_block - TOL - 1
));
}
}

0 comments on commit ebc021d

Please sign in to comment.