Skip to content

Commit

Permalink
RT Worker: More detailed state observability
Browse files Browse the repository at this point in the history
  • Loading branch information
uklotzde committed Oct 5, 2022
1 parent fd45eed commit bad4098
Show file tree
Hide file tree
Showing 8 changed files with 99 additions and 40 deletions.
4 changes: 2 additions & 2 deletions crates/msr-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ base64 = "0.13.0"
log = "0.4.17"
num-derive = "0.3.3"
num-traits = "0.2.15"
thiserror = "1.0.36"
time = { version = "0.3.14", features = ["local-offset", "macros", "formatting", "parsing"] }
thiserror = "1.0.37"
time = { version = "0.3.15", features = ["local-offset", "macros", "formatting", "parsing"] }

bs58 = { version = "0.4.0", optional = true, default-features = false }
csv = { version = "1.1.6", optional = true, default-features = false }
Expand Down
43 changes: 30 additions & 13 deletions crates/msr-core/src/realtime/worker/thread/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,13 @@ pub enum State {
#[default]
Unknown,
Starting,
Started,
Running,
Suspending,
Suspended,
Resumed,
Finishing,
Stopping,
Finished,
Terminating,
}

impl State {
Expand Down Expand Up @@ -278,14 +281,18 @@ where

worker.start_working(environment)?;

let rt_sched_scope = match thread_scheduling {
log::debug!("Started");
emit_event.emit_event(Event::StateChanged(State::Started));

let scheduling_scope = match thread_scheduling {
ThreadScheduling::Default => None,
ThreadScheduling::Realtime => Some(ThreadSchedulingScope::enter()?),
ThreadScheduling::RealtimeOrDefault => ThreadSchedulingScope::enter().ok(),
};
loop {
log::debug!("Running");
emit_event.emit_event(Event::StateChanged(State::Running));

match worker.perform_work(environment, progress_hint_rx)? {
CompletionStatus::Suspending => {
// The worker may have decided to suspend itself independent
Expand All @@ -295,9 +302,14 @@ where
log::debug!("Suspending rejected");
continue;
}
log::debug!("Suspending");
emit_event.emit_event(Event::StateChanged(State::Suspending));

log::debug!("Suspended");
emit_event.emit_event(Event::StateChanged(State::Suspended));

progress_hint_rx.wait_while_suspending();

log::debug!("Resumed");
emit_event.emit_event(Event::StateChanged(State::Resumed));
}
CompletionStatus::Finishing => {
// The worker may have decided to finish itself independent
Expand All @@ -307,19 +319,24 @@ where
log::debug!("Finishing rejected");
continue;
}
// Leave real-time scheduling scope
drop(rt_sched_scope);
log::debug!("Finishing");
emit_event.emit_event(Event::StateChanged(State::Finishing));
worker.finish_working(environment)?;
// Exit loop
// Leave custom scheduling scope before finishing
drop(scheduling_scope);
// Exit running loop
break;
}
}
}

log::debug!("Stopping");
emit_event.emit_event(Event::StateChanged(State::Stopping));
log::debug!("Finishing");
emit_event.emit_event(Event::StateChanged(State::Finishing));

worker.finish_working(environment)?;

log::debug!("Finished");
emit_event.emit_event(Event::StateChanged(State::Finished));

log::debug!("Terminating");
emit_event.emit_event(Event::StateChanged(State::Terminating));

Ok(())
}
Expand Down
58 changes: 45 additions & 13 deletions crates/msr-core/src/realtime/worker/thread/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,13 @@ impl Worker for SmokeTestWorker {
#[derive(Default)]
struct StateChangedCount {
starting: AtomicUsize,
started: AtomicUsize,
running: AtomicUsize,
suspending: AtomicUsize,
suspended: AtomicUsize,
resumed: AtomicUsize,
finishing: AtomicUsize,
stopping: AtomicUsize,
finished: AtomicUsize,
terminating: AtomicUsize,
}

struct SmokeTestEvents {
Expand All @@ -88,14 +91,19 @@ impl SmokeTestEvents {
.starting
.fetch_add(1, Ordering::SeqCst);
}
State::Started => {
self.state_changed_count
.started
.fetch_add(1, Ordering::SeqCst);
}
State::Running => {
self.state_changed_count
.running
.fetch_add(1, Ordering::SeqCst);
}
State::Suspending => {
State::Suspended => {
self.state_changed_count
.suspending
.suspended
.fetch_add(1, Ordering::SeqCst);
assert_eq!(
SwitchProgressHintOk::Accepted {
Expand All @@ -104,14 +112,24 @@ impl SmokeTestEvents {
self.progress_hint_tx.resume().expect("resuming")
);
}
State::Resumed => {
self.state_changed_count
.resumed
.fetch_add(1, Ordering::SeqCst);
}
State::Finishing => {
self.state_changed_count
.finishing
.fetch_add(1, Ordering::SeqCst);
}
State::Stopping => {
State::Finished => {
self.state_changed_count
.stopping
.finished
.fetch_add(1, Ordering::SeqCst);
}
State::Terminating => {
self.state_changed_count
.terminating
.fetch_add(1, Ordering::SeqCst);
}
},
Expand Down Expand Up @@ -169,7 +187,7 @@ fn smoke_test() -> anyhow::Result<()> {
1,
event_handler
.state_changed_count
.stopping
.started
.load(Ordering::SeqCst)
);
assert_eq!(
Expand All @@ -179,6 +197,20 @@ fn smoke_test() -> anyhow::Result<()> {
.running
.load(Ordering::SeqCst)
);
assert_eq!(
expected_perform_work_invocations - 1,
event_handler
.state_changed_count
.suspended
.load(Ordering::SeqCst)
);
assert_eq!(
expected_perform_work_invocations - 1,
event_handler
.state_changed_count
.resumed
.load(Ordering::SeqCst)
);
assert_eq!(
1,
event_handler
Expand All @@ -187,17 +219,17 @@ fn smoke_test() -> anyhow::Result<()> {
.load(Ordering::SeqCst)
);
assert_eq!(
1,
event_handler
.state_changed_count
.running
.finished
.load(Ordering::SeqCst)
- event_handler
.state_changed_count
.finishing
.load(Ordering::SeqCst),
);
assert_eq!(
1,
event_handler
.state_changed_count
.suspending
.terminating
.load(Ordering::SeqCst)
);
}
Expand Down
20 changes: 15 additions & 5 deletions crates/msr-core/tests/cyclic_realtime_worker_timing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ impl CyclicWorkerEvents {
}
}

pub fn state(&self) -> State {
pub fn last_state(&self) -> State {
State::from_u8(self.state.load(Ordering::Acquire)).expect("valid value")
}

Expand Down Expand Up @@ -277,16 +277,26 @@ fn run_cyclic_worker(params: CyclicWorkerParams) -> anyhow::Result<CyclicWorkerM
let mut finished = false;
let mut exit_loop = false;
while !exit_loop {
match event_handler.state() {
State::Unknown | State::Starting | State::Running => (),
State::Suspending => {
match event_handler.last_state() {
State::Unknown
| State::Starting
| State::Started
| State::Running
| State::Resumed
| State::Finishing
| State::Finished => {
// These (intermediate) states might not be visible when reading
// the last state at arbitrary times from an atomic and cannot
// be used for controlling the control flow of the test!
}
State::Suspended => {
assert!(resumed_count <= suspended_count);
if resumed_count < suspended_count {
progress_hint_tx.resume().expect("resumed");
resumed_count += 1;
}
}
State::Stopping | State::Finishing => {
State::Terminating => {
exit_loop = true;
// Drain the channel one last time after the worker thread has
// exited its process_work() function. This is required to not
Expand Down
4 changes: 2 additions & 2 deletions crates/msr-plugin/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ version.workspace = true

[dependencies]
log = "0.4.17"
thiserror = "1.0.36"
tokio = { version = "1.21.1", default-features = false, features = ["sync"] }
thiserror = "1.0.37"
tokio = { version = "1.21.2", default-features = false, features = ["sync"] }

# Workspace dependencies
msr-core = "=0.3.4"
2 changes: 1 addition & 1 deletion crates/msr/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ plugin = ["msr-plugin"]
anyhow = "1.0.65"
env_logger = "0.9.1"
log = "0.4.17"
tokio = { version = "1.21.1", features = ["full"] }
tokio = { version = "1.21.2", features = ["full"] }

# Workspace dev-dependencies
msr-plugin = "=0.3.4"
4 changes: 2 additions & 2 deletions plugins/csv-event-journal/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ version.workspace = true
[dependencies]
anyhow = "1.0.65"
log = "0.4.17"
thiserror = "1.0.36"
tokio = { version = "1.21.1", default-features = false, features = ["rt-multi-thread", "sync"] }
thiserror = "1.0.37"
tokio = { version = "1.21.2", default-features = false, features = ["rt-multi-thread", "sync"] }

# Workspace dependencies
msr-core = { version = "=0.3.4", features = ["csv-event-journal"] }
Expand Down
4 changes: 2 additions & 2 deletions plugins/csv-register-recorder/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ version.workspace = true
anyhow = "1.0.65"
bs58 = { version = "0.4.0", default-features = false }
log = "0.4.17"
thiserror = "1.0.36"
tokio = { version = "1.21.1", default-features = false, features = ["rt-multi-thread"] }
thiserror = "1.0.37"
tokio = { version = "1.21.2", default-features = false, features = ["rt-multi-thread"] }

# Workspace dependencies
msr-core = { version = "=0.3.4", features = ["csv-register-recorder"] }
Expand Down

0 comments on commit bad4098

Please sign in to comment.