Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New scheduler structure & other minor refactorings #102

Merged
merged 9 commits into from
Feb 19, 2024
23 changes: 19 additions & 4 deletions examples/fuel_tank_controller/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@

use a653rs::partition;
use a653rs::prelude::PartitionExt;
use a653rs_linux::partition::ApexLogger;
use log::LevelFilter;

use a653rs_linux::partition::ApexLogger;

fn main() {
ApexLogger::install_panic_hook();
ApexLogger::install_logger(LevelFilter::Trace).unwrap();
Expand All @@ -14,20 +15,34 @@ fn main() {

#[partition(a653rs_linux::partition::ApexLinuxPartition)]
mod hello {

use a653rs::prelude::SystemTime;

#[sampling_in(name = "fuel_sensors", msg_size = "10KB", refresh_period = "10s")]
struct FuelSensors;

#[sampling_out(name = "fuel_actuators", msg_size = "10KB")]
struct FuelActuators;

#[start(cold)]
fn cold_start(_ctx: start::Context) {}
fn cold_start(mut ctx: start::Context) {
ctx.create_periodic().unwrap().start().unwrap();
}

#[start(warm)]
fn warm_start(ctx: start::Context) {
cold_start(ctx)
}

#[periodic(
period = "0ms",
time_capacity = "Infinite",
stack_size = "100KB",
base_priority = 1,
deadline = "Soft"
)]
fn periodic(ctx: periodic::Context) {
loop {
// Empty for now
ctx.periodic_wait().unwrap();
}
}
}
29 changes: 13 additions & 16 deletions hypervisor/src/hypervisor/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,14 @@ use std::net::{SocketAddr, ToSocketAddrs};
use std::path::PathBuf;
use std::time::Duration;

use anyhow::anyhow;
use serde::{Deserialize, Serialize};

use a653rs_linux_core::channel::{QueuingChannelConfig, SamplingChannelConfig};
use a653rs_linux_core::error::{ResultExt, SystemError, TypedResult};
use a653rs_linux_core::health::{ModuleInitHMTable, ModuleRunHMTable, PartitionHMTable};
use anyhow::anyhow;
use itertools::Itertools;
use serde::{Deserialize, Serialize};

use crate::hypervisor::scheduler::{PartitionSchedule, ScheduledTimeframe};

/// Main configuration of the hypervisor
#[derive(Debug, Serialize, Deserialize, Clone)]
Expand Down Expand Up @@ -190,7 +192,7 @@ pub enum ModuleStates {
}

impl Config {
pub(crate) fn generate_schedule(&self) -> TypedResult<Vec<(Duration, Duration, String)>> {
pub(crate) fn generate_schedule(&self) -> TypedResult<PartitionSchedule> {
// Verify Periods and Major Frame
let lcm_periods = self
.partitions
Expand All @@ -206,27 +208,22 @@ impl Config {
}

// Generate Schedule
let mut s = self
let timeframes = self
.partitions
.iter()
.flat_map(|p| {
let pimf = (self.major_frame.as_nanos() / p.period.as_nanos()) as u32;
(0..pimf).map(|i| {
let start = p.offset + (p.period * i);
(start, start + p.duration, p.name.clone())
ScheduledTimeframe {
start,
end: start + p.duration,
partition_name: p.name.clone(),
}
})
})
.collect::<Vec<_>>();
s.sort_by(|(d1, ..), (d2, ..)| d1.cmp(d2));

// Verify no overlaps
for ((pstart, pend, pname), (nstart, nend, nname)) in s.iter().tuple_windows() {
if pend > nstart {
return Err(anyhow!("Overlapping Partition Windows: {pname} (start: {pstart:?}, end: {pend:?}). {nname} (start: {nstart:?}, end: {nend:?})"))
.typ(SystemError::PartitionConfig);
}
}

Ok(s)
PartitionSchedule::from_timeframes(timeframes).typ(SystemError::PartitionConfig)
}
}
35 changes: 17 additions & 18 deletions hypervisor/src/hypervisor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use a653rs_linux_core::sampling::Sampling;

use crate::hypervisor::config::{Channel, Config};
use crate::hypervisor::partition::Partition;
use crate::hypervisor::scheduler::Timeout;
use crate::hypervisor::scheduler::{Scheduler, Timeout};

pub mod config;
pub mod partition;
Expand All @@ -28,7 +28,7 @@ pub static SYSTEM_START_TIME: OnceCell<TempFile<Instant>> = OnceCell::new();
pub struct Hypervisor {
cg: CGroup,
major_frame: Duration,
schedule: Vec<(Duration, Duration, String)>,
scheduler: Scheduler,
partitions: HashMap<String, Partition>,
sampling_channel: HashMap<String, Sampling>,
prev_cg: PathBuf,
Expand All @@ -55,7 +55,7 @@ impl Hypervisor {

let mut hv = Self {
cg,
schedule,
scheduler: Scheduler::new(schedule),
major_frame: config.major_frame,
partitions: Default::default(),
prev_cg,
Expand Down Expand Up @@ -122,9 +122,11 @@ impl Hypervisor {
let mut frame_start = Instant::now();

// retain the first frame start as our sytems t0
if self.t0.is_none() {
self.t0 = Some(frame_start);
}
let t0 = self.t0.unwrap_or(frame_start);

let terminate_after_timeout = self
.terminate_after
.map(|duration| Timeout::new(t0, duration));

let sys_time = SYSTEM_START_TIME
.get()
Expand All @@ -133,26 +135,23 @@ impl Hypervisor {
sys_time.write(&frame_start).lev(ErrorLevel::ModuleInit)?;
sys_time.seal_read_only().lev(ErrorLevel::ModuleInit)?;
loop {
// if we are not ment to execute any longer, terminate here
match self.terminate_after {
Some(terminate_after) if frame_start - self.t0.unwrap() > terminate_after => {
// terminate hypervisor now if timeout is over
if let Some(timeout) = &terminate_after_timeout {
if !timeout.has_time_left() {
info!(
"quitting, as a run-time of {} was reached",
humantime::Duration::from(terminate_after)
humantime::Duration::from(timeout.total_duration())
);
quit::with_code(0)
}
_ => {}
}

for (target_start, target_stop, partition_name) in &self.schedule {
sleep(target_start.saturating_sub(frame_start.elapsed()));
self.scheduler.run_major_frame(
frame_start,
&mut self.partitions,
&mut self.sampling_channel,
)?;

self.partitions.get_mut(partition_name).unwrap().run(
&mut self.sampling_channel,
Timeout::new(frame_start, *target_stop),
)?;
}
sleep(self.major_frame.saturating_sub(frame_start.elapsed()));

frame_start += self.major_frame;
Expand Down
Loading
Loading