diff --git a/examples/fuel_tank_controller/src/main.rs b/examples/fuel_tank_controller/src/main.rs index 0d16792..15bccdd 100644 --- a/examples/fuel_tank_controller/src/main.rs +++ b/examples/fuel_tank_controller/src/main.rs @@ -2,9 +2,10 @@ use a653rs::partition; use a653rs::prelude::PartitionExt; -use a653rs_linux::partition::ApexLogger; use log::LevelFilter; +use a653rs_linux::partition::ApexLogger; + fn main() { ApexLogger::install_panic_hook(); ApexLogger::install_logger(LevelFilter::Trace).unwrap(); @@ -14,7 +15,6 @@ fn main() { #[partition(a653rs_linux::partition::ApexLinuxPartition)] mod hello { - use a653rs::prelude::SystemTime; #[sampling_in(name = "fuel_sensors", msg_size = "10KB", refresh_period = "10s")] @@ -22,12 +22,27 @@ mod hello { #[sampling_out(name = "fuel_actuators", msg_size = "10KB")] struct FuelActuators; - #[start(cold)] - fn cold_start(_ctx: start::Context) {} + fn cold_start(mut ctx: start::Context) { + ctx.create_periodic().unwrap().start().unwrap(); + } #[start(warm)] fn warm_start(ctx: start::Context) { cold_start(ctx) } + + #[periodic( + period = "0ms", + time_capacity = "Infinite", + stack_size = "100KB", + base_priority = 1, + deadline = "Soft" + )] + fn periodic(ctx: periodic::Context) { + loop { + // Empty for now + ctx.periodic_wait().unwrap(); + } + } } diff --git a/hypervisor/src/hypervisor/config.rs b/hypervisor/src/hypervisor/config.rs index 0d8e745..d4423e5 100644 --- a/hypervisor/src/hypervisor/config.rs +++ b/hypervisor/src/hypervisor/config.rs @@ -53,12 +53,14 @@ use std::net::{SocketAddr, ToSocketAddrs}; use std::path::PathBuf; use std::time::Duration; +use anyhow::anyhow; +use serde::{Deserialize, Serialize}; + use a653rs_linux_core::channel::{QueuingChannelConfig, SamplingChannelConfig}; use a653rs_linux_core::error::{ResultExt, SystemError, TypedResult}; use a653rs_linux_core::health::{ModuleInitHMTable, ModuleRunHMTable, PartitionHMTable}; -use anyhow::anyhow; -use itertools::Itertools; -use serde::{Deserialize, Serialize}; + +use crate::hypervisor::scheduler::{PartitionSchedule, ScheduledTimeframe}; /// Main configuration of the hypervisor #[derive(Debug, Serialize, Deserialize, Clone)] @@ -190,7 +192,7 @@ pub enum ModuleStates { } impl Config { - pub(crate) fn generate_schedule(&self) -> TypedResult> { + pub(crate) fn generate_schedule(&self) -> TypedResult { // Verify Periods and Major Frame let lcm_periods = self .partitions @@ -206,27 +208,22 @@ impl Config { } // Generate Schedule - let mut s = self + let timeframes = self .partitions .iter() .flat_map(|p| { let pimf = (self.major_frame.as_nanos() / p.period.as_nanos()) as u32; (0..pimf).map(|i| { let start = p.offset + (p.period * i); - (start, start + p.duration, p.name.clone()) + ScheduledTimeframe { + start, + end: start + p.duration, + partition_name: p.name.clone(), + } }) }) .collect::>(); - s.sort_by(|(d1, ..), (d2, ..)| d1.cmp(d2)); - - // Verify no overlaps - for ((pstart, pend, pname), (nstart, nend, nname)) in s.iter().tuple_windows() { - if pend > nstart { - return Err(anyhow!("Overlapping Partition Windows: {pname} (start: {pstart:?}, end: {pend:?}). {nname} (start: {nstart:?}, end: {nend:?})")) - .typ(SystemError::PartitionConfig); - } - } - Ok(s) + PartitionSchedule::from_timeframes(timeframes).typ(SystemError::PartitionConfig) } } diff --git a/hypervisor/src/hypervisor/mod.rs b/hypervisor/src/hypervisor/mod.rs index ff61475..9c8884e 100644 --- a/hypervisor/src/hypervisor/mod.rs +++ b/hypervisor/src/hypervisor/mod.rs @@ -13,7 +13,7 @@ use a653rs_linux_core::sampling::Sampling; use crate::hypervisor::config::{Channel, Config}; use crate::hypervisor::partition::Partition; -use crate::hypervisor::scheduler::Timeout; +use crate::hypervisor::scheduler::{Scheduler, Timeout}; pub mod config; pub mod partition; @@ -28,7 +28,7 @@ pub static SYSTEM_START_TIME: OnceCell> = OnceCell::new(); pub struct Hypervisor { cg: CGroup, major_frame: Duration, - schedule: Vec<(Duration, Duration, String)>, + scheduler: Scheduler, partitions: HashMap, sampling_channel: HashMap, prev_cg: PathBuf, @@ -55,7 +55,7 @@ impl Hypervisor { let mut hv = Self { cg, - schedule, + scheduler: Scheduler::new(schedule), major_frame: config.major_frame, partitions: Default::default(), prev_cg, @@ -122,9 +122,11 @@ impl Hypervisor { let mut frame_start = Instant::now(); // retain the first frame start as our sytems t0 - if self.t0.is_none() { - self.t0 = Some(frame_start); - } + let t0 = self.t0.unwrap_or(frame_start); + + let terminate_after_timeout = self + .terminate_after + .map(|duration| Timeout::new(t0, duration)); let sys_time = SYSTEM_START_TIME .get() @@ -133,26 +135,23 @@ impl Hypervisor { sys_time.write(&frame_start).lev(ErrorLevel::ModuleInit)?; sys_time.seal_read_only().lev(ErrorLevel::ModuleInit)?; loop { - // if we are not ment to execute any longer, terminate here - match self.terminate_after { - Some(terminate_after) if frame_start - self.t0.unwrap() > terminate_after => { + // terminate hypervisor now if timeout is over + if let Some(timeout) = &terminate_after_timeout { + if !timeout.has_time_left() { info!( "quitting, as a run-time of {} was reached", - humantime::Duration::from(terminate_after) + humantime::Duration::from(timeout.total_duration()) ); quit::with_code(0) } - _ => {} } - for (target_start, target_stop, partition_name) in &self.schedule { - sleep(target_start.saturating_sub(frame_start.elapsed())); + self.scheduler.run_major_frame( + frame_start, + &mut self.partitions, + &mut self.sampling_channel, + )?; - self.partitions.get_mut(partition_name).unwrap().run( - &mut self.sampling_channel, - Timeout::new(frame_start, *target_stop), - )?; - } sleep(self.major_frame.saturating_sub(frame_start.elapsed())); frame_start += self.major_frame; diff --git a/hypervisor/src/hypervisor/partition.rs b/hypervisor/src/hypervisor/partition.rs index db69b3b..80b3f8e 100644 --- a/hypervisor/src/hypervisor/partition.rs +++ b/hypervisor/src/hypervisor/partition.rs @@ -4,7 +4,8 @@ use std::net::{TcpStream, UdpSocket}; use std::os::unix::prelude::{AsRawFd, FromRawFd, OwnedFd, PermissionsExt, RawFd}; use std::path::{self, Path, PathBuf}; use std::process::{Command, Stdio}; -use std::time::Duration; +use std::thread::sleep; +use std::time::{Duration, Instant}; use a653rs::bindings::PortDirection; use a653rs::prelude::{OperatingMode, StartCondition}; @@ -14,15 +15,16 @@ use itertools::Itertools; use nix::mount::{mount, umount2, MntFlags, MsFlags}; use nix::sys::socket::{self, bind, AddressFamily, SockFlag, SockType, UnixAddr}; use nix::unistd::{chdir, close, pivot_root, setgid, setuid, Gid, Pid, Uid}; +use polling::{Event, Poller}; use procfs::process::Process; use tempfile::{tempdir, TempDir}; use a653rs_linux_core::cgroup::CGroup; use a653rs_linux_core::error::{ - ErrorLevel, LeveledResult, ResultExt, SystemError, TypedResult, TypedResultExt, + ErrorLevel, LeveledResult, ResultExt, SystemError, TypedError, TypedResult, TypedResultExt, }; use a653rs_linux_core::file::TempFile; -use a653rs_linux_core::health::PartitionHMTable; +use a653rs_linux_core::health::{ModuleRecoveryAction, PartitionHMTable, RecoveryAction}; use a653rs_linux_core::health_event::PartitionCall; use a653rs_linux_core::ipc::{channel_pair, io_pair, IoReceiver, IoSender, IpcReceiver}; use a653rs_linux_core::partition::{PartitionConstants, SamplingConstant}; @@ -34,7 +36,7 @@ use crate::hypervisor::SYSTEM_START_TIME; use crate::problem; use super::config::PosixSocket; -use super::scheduler::{PartitionTimeWindow, Timeout}; +use super::scheduler::Timeout; #[derive(Debug, Clone, Copy)] pub enum TransitionAction { @@ -674,25 +676,8 @@ impl Partition { } } - pub fn run( - &mut self, - sampling: &mut HashMap, - timeout: Timeout, - ) -> LeveledResult<()> { - PartitionTimeWindow::new(&self.base, &mut self.run, timeout).run()?; - // TODO Error handling and freeze if err - self.base.freeze().lev(ErrorLevel::Partition)?; - - for (name, _) in self - .base - .sampling_channel - .iter() - .filter(|(_, s)| s.dir == PortDirection::Source) - { - sampling.get_mut(name).unwrap().swap(); - } - - Ok(()) + pub fn get_base_run(&mut self) -> (&Base, &mut Run) { + (&self.base, &mut self.run) } //fn idle_transition(mut self) -> Result<()> { @@ -716,6 +701,224 @@ impl Partition { pub(crate) fn rm(self) -> TypedResult<()> { self.base.cgroup.rm().typ(SystemError::CGroup) } + + pub fn run_post_timeframe(&mut self, sampling_channels: &mut HashMap) { + // TODO remove because a base freeze is not necessary here, as all run_* methods + // should freeze base themself after execution. Before removal of this, check + // all run_* methods. + let _ = self.base.freeze(); + + for (name, _) in self + .base + .sampling_channel + .iter() + .filter(|(_, s)| s.dir == PortDirection::Source) + { + sampling_channels.get_mut(name).unwrap().swap(); + } + } + + /// Executes the periodic process for a maximum duration specified through + /// the `timeout` parameter. Returns whether the periodic process exists + /// and was run. + pub fn run_periodic_process(&mut self, timeout: Timeout) -> TypedResult { + match self.run.unfreeze_periodic() { + Ok(true) => {} + other => return other, + } + + let mut poller = PeriodicPoller::new(&self.run)?; + + self.base.unfreeze()?; + + while timeout.has_time_left() { + let event = poller.wait_timeout(&mut self.run, timeout)?; + match &event { + PeriodicEvent::Timeout => {} + PeriodicEvent::Frozen => { + self.base.freeze()?; + + return Ok(true); + } + // TODO Error Handling with HM + PeriodicEvent::Call(e @ PartitionCall::Error(se)) => { + e.print_partition_log(self.base.name()); + match self.base.part_hm().try_action(*se) { + Some(RecoveryAction::Module(ModuleRecoveryAction::Ignore)) => {} + Some(_) => { + return Err(TypedError::new(*se, anyhow!("Received Partition Error"))) + } + None => { + return Err(TypedError::new( + SystemError::Panic, + anyhow!( + "Could not get recovery action for requested partition error: {se}" + ), + )) + } + }; + } + PeriodicEvent::Call(c @ PartitionCall::Message(_)) => { + c.print_partition_log(self.base.name()) + } + PeriodicEvent::Call(PartitionCall::Transition(mode)) => { + // Only exit run_periodic, if we changed our mode + if self.run.handle_transition(&self.base, *mode)?.is_some() { + return Ok(true); + } + } + } + } + + // TODO being here means that we exceeded the timeout + // So we should return a SystemError stating that the time was exceeded + Ok(true) + } + + pub fn run_aperiodic_process(&mut self, timeout: Timeout) -> TypedResult { + match self.run.unfreeze_aperiodic() { + Ok(true) => {} + other => return other, + } + + // Did we even need to unfreeze aperiodic? + self.base.unfreeze()?; + + while timeout.has_time_left() { + match &self + .run + .receiver() + .try_recv_timeout(timeout.remaining_time())? + { + Some(m @ PartitionCall::Message(_)) => m.print_partition_log(self.base.name()), + Some(e @ PartitionCall::Error(se)) => { + e.print_partition_log(self.base.name()); + match self.base.part_hm().try_action(*se) { + Some(RecoveryAction::Module(ModuleRecoveryAction::Ignore)) => {} + Some(_) => { + return Err(TypedError::new(*se, anyhow!("Received Partition Error"))) + } + None => { + return Err(TypedError::new( + SystemError::Panic, + anyhow!( + "Could not get recovery action for requested partition error: {se}" + ), + )) + } + }; + } + Some(t @ PartitionCall::Transition(mode)) => { + // In case of a transition to idle, just sleep. Do not care for the rest + t.print_partition_log(self.base.name()); + if let Some(OperatingMode::Idle) = + self.run.handle_transition(&self.base, *mode)? + { + sleep(timeout.remaining_time()); + return Ok(true); + } + } + None => {} + } + } + + self.run.freeze_aperiodic()?; + + Ok(true) + } + + /// Currently the same as run_aperiodic + pub fn run_start(&mut self, timeout: Timeout, _warm_start: bool) -> TypedResult<()> { + self.base.unfreeze()?; + + while timeout.has_time_left() { + match &self + .run + .receiver() + .try_recv_timeout(timeout.remaining_time())? + { + Some(m @ PartitionCall::Message(_)) => m.print_partition_log(self.base.name()), + Some(e @ PartitionCall::Error(se)) => { + e.print_partition_log(self.base.name()); + match self.base.part_hm().try_action(*se) { + Some(RecoveryAction::Module(ModuleRecoveryAction::Ignore)) => {} + Some(_) => { + return Err(TypedError::new(*se, anyhow!("Received Partition Error"))) + } + None => { + return Err(TypedError::new( + SystemError::Panic, + anyhow!( + "Could not get recovery action for requested partition error: {se}" + ), + )) + } + }; + } + Some(t @ PartitionCall::Transition(mode)) => { + // In case of a transition to idle, just sleep. Do not care for the rest + t.print_partition_log(self.base.name()); + if let Some(OperatingMode::Idle) = + self.run.handle_transition(&self.base, *mode)? + { + sleep(timeout.remaining_time()); + return Ok(()); + } + } + None => {} + } + } + + self.base.freeze() + } + + /// Handles an error that occurred during self.run_* methods. + pub fn handle_error(&mut self, err: TypedError) -> LeveledResult<()> { + debug!("Partition \"{}\" received err: {err:?}", self.base.name()); + + let now = Instant::now(); + + let action = match self.base.part_hm().try_action(err.err()) { + None => { + warn!("Could not map \"{err:?}\" to action. Using Panic action instead"); + match self.base.part_hm().panic { + // We do not Handle Module Recovery actions here + RecoveryAction::Module(_) => { + return TypedResult::Err(err).lev(ErrorLevel::Partition) + } + RecoveryAction::Partition(action) => action, + } + } + // We do not Handle Module Recovery actions here + Some(RecoveryAction::Module(_)) => { + return TypedResult::Err(err).lev(ErrorLevel::Partition) + } + Some(RecoveryAction::Partition(action)) => action, + }; + + debug!("Handling: {err:?}"); + debug!("Apply Partition Recovery Action: {action:?}"); + + // TODO do not unwrap/expect these errors. Maybe raise Module Level + // PartitionInit Error? + match action { + a653rs_linux_core::health::PartitionRecoveryAction::Idle => self + .run + .idle_transition(&self.base) + .expect("Idle Transition Failed"), + a653rs_linux_core::health::PartitionRecoveryAction::ColdStart => self + .run + .start_transition(&self.base, false, StartCondition::HmPartitionRestart) + .expect("Start(Cold) Transition Failed"), + a653rs_linux_core::health::PartitionRecoveryAction::WarmStart => self + .run + .start_transition(&self.base, false, StartCondition::HmPartitionRestart) + .expect("Start(Warm) Transition Failed"), + } + + trace!("Partition Error Handling took: {:?}", now.elapsed()); + Ok(()) + } } impl PartitionConfig { @@ -775,3 +978,89 @@ impl PartitionConfig { Ok(bin) } } + +pub(crate) struct PeriodicPoller { + poll: Poller, + events: OwnedFd, +} + +pub enum PeriodicEvent { + Timeout, + Frozen, + Call(PartitionCall), +} + +impl PeriodicPoller { + const EVENTS_ID: usize = 1; + const RECEIVER_ID: usize = 2; + + pub fn new(run: &Run) -> TypedResult { + let events = run.periodic_events()?; + + let poll = Poller::new().typ(SystemError::Panic)?; + poll.add(events.as_raw_fd(), Event::readable(Self::EVENTS_ID)) + .typ(SystemError::Panic)?; + poll.add( + run.receiver().as_raw_fd(), + Event::readable(Self::RECEIVER_ID), + ) + .typ(SystemError::Panic)?; + + Ok(PeriodicPoller { poll, events }) + } + + pub fn wait_timeout(&mut self, run: &mut Run, timeout: Timeout) -> TypedResult { + if run.is_periodic_frozen()? { + return Ok(PeriodicEvent::Frozen); + } + + while timeout.has_time_left() { + let mut events = vec![]; + self.poll + .wait(events.as_mut(), Some(timeout.remaining_time())) + .typ(SystemError::Panic)?; + + for e in events { + match e.key { + // Got a Frozen event + Self::EVENTS_ID => { + // Re-sub the readable event + self.poll + .modify(self.events.as_raw_fd(), Event::readable(Self::EVENTS_ID)) + .typ(SystemError::Panic)?; + + // Then check if the cg is actually frozen + if run.is_periodic_frozen()? { + return Ok(PeriodicEvent::Frozen); + } + } + // got a call events + Self::RECEIVER_ID => { + // Re-sub the readable event + // This will result in the event instantly being ready again should we have + // something to read, but that is better than + // accidentally missing an event (at the expense of one extra loop per + // receive) + self.poll + .modify( + run.receiver().as_raw_fd(), + Event::readable(Self::RECEIVER_ID), + ) + .typ(SystemError::Panic)?; + + // Now receive anything + if let Some(call) = run.receiver().try_recv()? { + return Ok(PeriodicEvent::Call(call)); + } + } + _ => { + return Err(anyhow!("Unexpected Event Received: {e:?}")) + .typ(SystemError::Panic) + } + } + } + } + + Ok(PeriodicEvent::Timeout) + } +} diff --git a/hypervisor/src/hypervisor/scheduler.rs b/hypervisor/src/hypervisor/scheduler.rs index 2bafd2e..bbaf532 100644 --- a/hypervisor/src/hypervisor/scheduler.rs +++ b/hypervisor/src/hypervisor/scheduler.rs @@ -1,311 +1,124 @@ -use std::os::unix::prelude::{AsRawFd, OwnedFd}; +use std::collections::HashMap; use std::thread::sleep; -use std::time::{Duration, Instant}; +use std::time::Instant; -use a653rs::prelude::{OperatingMode, StartCondition}; -use a653rs_linux_core::error::{ - ErrorLevel, LeveledResult, ResultExt, SystemError, TypedError, TypedResult, TypedResultExt, -}; -use a653rs_linux_core::health::{ModuleRecoveryAction, RecoveryAction}; -use a653rs_linux_core::health_event::PartitionCall; +use a653rs::prelude::OperatingMode; use anyhow::anyhow; -use polling::{Event, Poller}; -use super::partition::{Base, Run}; +use a653rs_linux_core::error::{ErrorLevel, LeveledError, LeveledResult, SystemError, TypedResult}; +use a653rs_linux_core::sampling::Sampling; +pub(crate) use schedule::{PartitionSchedule, ScheduledTimeframe}; +pub(crate) use timeout::Timeout; -pub(crate) struct Timeout { - start: Instant, - stop: Duration, -} +use crate::hypervisor::partition::Partition; -impl Timeout { - pub fn new(start: Instant, stop: Duration) -> Self { - Self { start, stop } - } +mod schedule; +mod timeout; + +/// A scheduler that schedules the execution timeframes of partition according +/// to a given [PartitionSchedule]. By calling [Scheduler::run_major_frame] a +/// single major frame can be run. +pub(crate) struct Scheduler { + schedule: PartitionSchedule, +} - fn remaining_time(&self) -> Duration { - self.stop.saturating_sub(self.start.elapsed()) +impl Scheduler { + pub fn new(schedule: PartitionSchedule) -> Self { + Self { schedule } } + /// Takes &mut self for now because P4 limits scheduling to a single core + pub fn run_major_frame( + &mut self, + current_frame_start: Instant, + partitions_by_name: &mut HashMap, + sampling_channels_by_name: &mut HashMap, + ) -> LeveledResult<()> { + for timeframe in self.schedule.iter() { + sleep( + timeframe + .start + .saturating_sub(current_frame_start.elapsed()), + ); + + let timeframe_timeout = Timeout::new(current_frame_start, timeframe.end); + let partition = partitions_by_name + .get_mut(&timeframe.partition_name) + .expect("partition to exist because its name comes from `timeframe`"); + PartitionTimeframeScheduler::new(partition, timeframe_timeout).run()?; + + partition.run_post_timeframe(sampling_channels_by_name); + } - fn time_left(&self) -> bool { - self.remaining_time() > Duration::ZERO + Ok(()) } } -pub(crate) struct PartitionTimeWindow<'a> { - base: &'a Base, - run: &'a mut Run, +/// A scheduler for a single partition timeframe +struct PartitionTimeframeScheduler<'a> { + partition: &'a mut Partition, timeout: Timeout, } -impl<'a> PartitionTimeWindow<'a> { - pub fn new(base: &'a Base, run: &'a mut Run, timeout: Timeout) -> PartitionTimeWindow<'a> { - PartitionTimeWindow { base, run, timeout } +impl<'a> PartitionTimeframeScheduler<'a> { + fn new(partition: &'a mut Partition, timeout: Timeout) -> Self { + Self { partition, timeout } } - fn handle_part_err(&mut self, res: TypedResult<()>) -> LeveledResult<()> { - debug!("Partition \"{}\" received err: {res:?}", self.base.name()); - if let Err(err) = res.as_ref() { - let now = Instant::now(); - - let action = match self.base.part_hm().try_action(err.err()) { - None => { - warn!("Could not map \"{res:?}\" to action. Using Panic action instead"); - match self.base.part_hm().panic { - // We do not Handle Module Recovery actions here - RecoveryAction::Module(_) => return res.lev(ErrorLevel::Partition), - RecoveryAction::Partition(action) => action, - } - } - // We do not Handle Module Recovery actions here - Some(RecoveryAction::Module(_)) => return res.lev(ErrorLevel::Partition), - Some(RecoveryAction::Partition(action)) => action, - }; - - debug!("Handling: {err:?}"); - debug!("Apply Partition Recovery Action: {action:?}"); - - // TODO do not unwrap/expect these errors. Maybe raise Module Level - // PartitionInit Error? - match action { - a653rs_linux_core::health::PartitionRecoveryAction::Idle => self - .run - .idle_transition(self.base) - .expect("Idle Transition Failed"), - a653rs_linux_core::health::PartitionRecoveryAction::ColdStart => self - .run - .start_transition(self.base, false, StartCondition::HmPartitionRestart) - .expect("Start(Cold) Transition Failed"), - a653rs_linux_core::health::PartitionRecoveryAction::WarmStart => self - .run - .start_transition(self.base, false, StartCondition::HmPartitionRestart) - .expect("Start(Warm) Transition Failed"), - } - - trace!("Partition Error Handling took: {:?}", now.elapsed()) - } - - Ok(()) - } - - pub fn run(&mut self) -> LeveledResult<()> { + fn run(&mut self) -> LeveledResult<()> { // Stop if the time is already over - if !self.timeout.time_left() { + if !self.timeout.has_time_left() { return Ok(()); } // If we are in the normal mode at the beginning of the time frame, // only then we may schedule the periodic process inside a partition - if let OperatingMode::Normal = self.run.mode() { - let res = self.run.unfreeze_periodic(); - let res = match res { - Ok(true) => self.run_periodic(), - // Check if there is no periodic process - Ok(false) => { - self.run.unfreeze_aperiodic().lev(ErrorLevel::Partition)?; - Ok(()) + if let OperatingMode::Normal = self.partition.get_base_run().1.mode() { + let res = self.partition.run_periodic_process(self.timeout); + if self.handle_partition_result(res)? == Some(false) { + // Periodic process was not run -> run aperiodic process + let res = self.partition.run_aperiodic_process(self.timeout); + if self.handle_partition_result(res)? == Some(false) { + // Aperiodic process was also not run + return Err(LeveledError::new( + SystemError::Panic, + ErrorLevel::Partition, + anyhow!("At least one periodic or aperiodic process is expected to exist"), + )); } - Err(e) => Err(e), - }; - self.handle_part_err(res)?; + } } // Only continue if we have time left - if self.timeout.time_left() { + if self.timeout.has_time_left() { let res = self.run_post_periodic(); - self.handle_part_err(res)?; + self.handle_partition_result(res)?; } Ok(()) } fn run_post_periodic(&mut self) -> TypedResult<()> { // if we are in the idle mode, just sleep until the end of the frame - if let OperatingMode::Idle = self.run.mode() { - sleep(self.timeout.remaining_time()); - } else { - // Else we are in either a start mode or normal mode (post periodic/mid - // aperiodic time frame) Either-way we are supposed to unfreeze the - // partition - self.base.unfreeze()?; - - let mut leftover = self.timeout.remaining_time(); - while leftover > Duration::ZERO { - match &self - .run - .receiver() - .try_recv_timeout(self.timeout.remaining_time())? - { - Some(m @ PartitionCall::Message(_)) => m.print_partition_log(self.base.name()), - Some(e @ PartitionCall::Error(se)) => { - e.print_partition_log(self.base.name()); - match self.base.part_hm().try_action(*se){ - Some(RecoveryAction::Module(ModuleRecoveryAction::Ignore)) => {}, - Some(_) => return Err(TypedError::new(*se, anyhow!("Received Partition Error"))) , - None => return Err(TypedError::new(SystemError::Panic, anyhow!("Could not get recovery action for requested partition error: {se}"))), - }; - } - Some(t @ PartitionCall::Transition(mode)) => { - // In case of a transition to idle, just sleep. Do not care for the rest - t.print_partition_log(self.base.name()); - if let Some(OperatingMode::Idle) = - self.run.handle_transition(self.base, *mode)? - { - sleep(self.timeout.remaining_time()); - return Ok(()); - } - } - None => {} - } - - leftover = self.timeout.remaining_time(); - } - } - - self.run.freeze_aperiodic()?; - - Ok(()) - } - - fn run_periodic(&mut self) -> TypedResult<()> { - let mut poller = PeriodicPoller::new(self.run)?; - - self.base.unfreeze()?; - - let mut leftover = self.timeout.remaining_time(); - while leftover > Duration::ZERO { - let event = poller.wait_timeout(self.run, self.timeout.remaining_time())?; - match &event { - PeriodicEvent::Timeout => {} - PeriodicEvent::Frozen => { - // In case of a frozen periodic cgroup, we may start the aperiodic process - self.run.unfreeze_aperiodic()?; - return Ok(()); - } - // TODO Error Handling with HM - PeriodicEvent::Call(e @ PartitionCall::Error(se)) => { - e.print_partition_log(self.base.name()); - match self.base.part_hm().try_action(*se) { - Some(RecoveryAction::Module(ModuleRecoveryAction::Ignore)) => {} - Some(_) => { - return Err(TypedError::new(*se, anyhow!("Received Partition Error"))) - } - None => { - return Err(TypedError::new( - SystemError::Panic, - anyhow!( - "Could not get recovery action for requested partition error: {se}" - ), - )) - } - }; - } - PeriodicEvent::Call(c @ PartitionCall::Message(_)) => { - c.print_partition_log(self.base.name()) - } - PeriodicEvent::Call(PartitionCall::Transition(mode)) => { - // Only exit run_periodic, if we changed our mode - if self.run.handle_transition(self.base, *mode)?.is_some() { - return Ok(()); - } - } + match self.partition.get_base_run().1.mode() { + OperatingMode::Idle => { + sleep(self.timeout.remaining_time()); + Ok(()) } - - leftover = self.timeout.remaining_time(); + mode @ OperatingMode::ColdStart | mode @ OperatingMode::WarmStart => self + .partition + .run_start(self.timeout, mode == OperatingMode::WarmStart), + OperatingMode::Normal => self + .partition + .run_aperiodic_process(self.timeout) + .map(|_| ()), } - - // TODO being here means that we exceeded the timeout - // So we should return a SystemError stating that the time was exceeded - Ok(()) } -} - -pub(crate) struct PeriodicPoller { - poll: Poller, - events: OwnedFd, -} - -pub enum PeriodicEvent { - Timeout, - Frozen, - Call(PartitionCall), -} - -impl PeriodicPoller { - const EVENTS_ID: usize = 1; - const RECEIVER_ID: usize = 2; - - pub fn new(run: &Run) -> TypedResult { - let events = run.periodic_events()?; - - let poll = Poller::new().typ(SystemError::Panic)?; - poll.add(events.as_raw_fd(), Event::readable(Self::EVENTS_ID)) - .typ(SystemError::Panic)?; - poll.add( - run.receiver().as_raw_fd(), - Event::readable(Self::RECEIVER_ID), - ) - .typ(SystemError::Panic)?; - - Ok(PeriodicPoller { poll, events }) - } - - pub fn wait_timeout(&mut self, run: &mut Run, timeout: Duration) -> TypedResult { - let start = Instant::now(); - - if run.is_periodic_frozen()? { - return Ok(PeriodicEvent::Frozen); - } - - let mut leftover = timeout.saturating_sub(start.elapsed()); - while leftover > Duration::ZERO { - let mut events = vec![]; - self.poll - .wait(events.as_mut(), Some(leftover)) - .typ(SystemError::Panic)?; - - for e in events { - match e.key { - // Got a Frozen event - Self::EVENTS_ID => { - // Re-sub the readable event - self.poll - .modify(self.events.as_raw_fd(), Event::readable(Self::EVENTS_ID)) - .typ(SystemError::Panic)?; - - // Then check if the cg is actually frozen - if run.is_periodic_frozen()? { - return Ok(PeriodicEvent::Frozen); - } - } - // got a call events - Self::RECEIVER_ID => { - // Re-sub the readable event - // This will result in the event instantly being ready again should we have - // something to read, but that is better than - // accidentally missing an event (at the expense of one extra loop per - // receive) - self.poll - .modify( - run.receiver().as_raw_fd(), - Event::readable(Self::RECEIVER_ID), - ) - .typ(SystemError::Panic)?; - - // Now receive anything - if let Some(call) = run.receiver().try_recv()? { - return Ok(PeriodicEvent::Call(call)); - } - } - _ => { - return Err(anyhow!("Unexpected Event Received: {e:?}")) - .typ(SystemError::Panic) - } - } - } - - leftover = timeout.saturating_sub(start.elapsed()); - } - Ok(PeriodicEvent::Timeout) + /// Takes in a [TypedResult] and makes the partition handle the `Err` + /// variant. A successful handling of the error will then return + /// `Ok(None)`. In case of `Ok(_)` the contained value is returned as + /// `Ok(Some(_))`. + fn handle_partition_result(&mut self, res: TypedResult) -> LeveledResult> { + res.map(|t| Some(t)) + .or_else(|err| self.partition.handle_error(err).map(|_| None)) } } diff --git a/hypervisor/src/hypervisor/scheduler/schedule.rs b/hypervisor/src/hypervisor/scheduler/schedule.rs new file mode 100644 index 0000000..8158a98 --- /dev/null +++ b/hypervisor/src/hypervisor/scheduler/schedule.rs @@ -0,0 +1,67 @@ +use std::cmp::Ordering; +use std::time::Duration; + +use anyhow::bail; +use itertools::Itertools; + +/// The schedule for the execution of partitions in each major frame. +/// It consists of a [Vec] of timeframes sorted by their start time, which are +/// guaranteed to not overlap. +pub(crate) struct PartitionSchedule { + pub timeframes: Vec, +} + +impl PartitionSchedule { + /// Creates a new partition schedule from given timeframes. + /// Returns `Err` if there are overlaps. + pub fn from_timeframes(mut timeframes: Vec) -> anyhow::Result { + timeframes.sort(); + + // Verify no overlaps + for (prev, next) in timeframes.iter().tuple_windows() { + if prev.end > next.start { + bail!("Overlapping partition timeframes: {prev:?}, {next:?})"); + } + } + + Ok(Self { timeframes }) + } + + /// Returns an iterator through all timeframes sorted by start time + pub fn iter(&self) -> impl Iterator { + self.timeframes.iter() + } +} + +/// A timeframe inside of a major frame. +/// Both `start` and `end` are [Duration]s as they describe the time passed +/// since the major frame's start. +#[derive(Clone, Debug)] +pub(crate) struct ScheduledTimeframe { + pub partition_name: String, + pub start: Duration, + pub end: Duration, +} + +impl PartialEq for ScheduledTimeframe { + fn eq(&self, other: &Self) -> bool { + self.cmp(other) == Ordering::Equal + } +} + +impl Eq for ScheduledTimeframe {} + +impl Ord for ScheduledTimeframe { + fn cmp(&self, other: &Self) -> Ordering { + match self.start.cmp(&other.start) { + Ordering::Equal => self.end.cmp(&other.end), + other => other, + } + } +} + +impl PartialOrd for ScheduledTimeframe { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} diff --git a/hypervisor/src/hypervisor/scheduler/timeout.rs b/hypervisor/src/hypervisor/scheduler/timeout.rs new file mode 100644 index 0000000..823c4ce --- /dev/null +++ b/hypervisor/src/hypervisor/scheduler/timeout.rs @@ -0,0 +1,28 @@ +use std::time::{Duration, Instant}; + +/// A simple object for keeping track of a timeout that starts at some instant +/// and has a fixed duration. This object also exposes some basic functionality +/// like querying the remaining time. +#[derive(Copy, Clone)] +pub(crate) struct Timeout { + start: Instant, + stop: Duration, +} + +impl Timeout { + pub fn new(start: Instant, stop: Duration) -> Self { + Self { start, stop } + } + + pub fn remaining_time(&self) -> Duration { + self.stop.saturating_sub(self.start.elapsed()) + } + + pub fn has_time_left(&self) -> bool { + self.remaining_time() > Duration::ZERO + } + + pub fn total_duration(&self) -> Duration { + self.stop + } +}