Skip to content

Commit

Permalink
feat: Add additional metadata to HDF5 outputs. (#157)
Browse files Browse the repository at this point in the history
  • Loading branch information
jetuk authored Apr 12, 2024
1 parent d9d6361 commit d6c7d8a
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 15 deletions.
4 changes: 2 additions & 2 deletions pywr-core/src/recorders/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,8 @@ impl Recorder for CsvWideFmtOutput {

// This is a vec of vec for each scenario group
let mut header_scenario_groups = Vec::new();
for group_name in domain.scenarios().group_names() {
header_scenario_groups.push(vec![format!("scenario-group: {}", group_name)]);
for group in domain.scenarios().groups() {
header_scenario_groups.push(vec![format!("scenario-group: {}", group.name())]);
}

for scenario_index in domain.scenarios().indices().iter() {
Expand Down
97 changes: 93 additions & 4 deletions pywr-core/src/recorders/hdf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,22 @@ use super::{MetricSetState, PywrError, Recorder, RecorderMeta, Timestep};
use crate::models::ModelDomain;
use crate::network::Network;
use crate::recorders::MetricSetIndex;
use crate::scenario::ScenarioIndex;
use crate::scenario::{ScenarioDomain, ScenarioIndex};
use crate::state::State;
use chrono::{Datelike, Timelike};
use hdf5::{Extents, Group};
use ndarray::{s, Array1};
use std::any::Any;
use std::ops::Deref;
use std::path::PathBuf;
use std::str::FromStr;

/// A recorder that saves model outputs to an HDF5 file.
///
/// This recorder saves the model outputs to an HDF5 file. The file will contain a number of groups
/// and datasets that correspond to the metrics in the metric set. Additionally, the file will
/// contain metadata about the time steps and scenarios that were used in the model simulation.
///
#[derive(Clone, Debug)]
pub struct HDF5Recorder {
meta: RecorderMeta,
Expand Down Expand Up @@ -69,7 +76,9 @@ impl Recorder for HDF5Recorder {
Ok(f) => f,
Err(e) => return Err(PywrError::HDF5Error(e.to_string())),
};
let mut datasets = Vec::new();

write_pywr_metadata(&file)?;
write_scenarios_metadata(&file, domain.scenarios())?;

// Create the time table
let dates: Array1<_> = domain.time().timesteps().iter().map(DateTime::from_timestamp).collect();
Expand All @@ -83,12 +92,14 @@ impl Recorder for HDF5Recorder {

let metric_set = network.get_metric_set(self.metric_set_idx)?;

let mut datasets = Vec::new();

for metric in metric_set.iter_metrics() {
let name = metric.name(network)?;
let sub_name = metric.sub_name(network)?;
let attribute = metric.attribute();

let ds = require_node_dataset(root_grp, shape, name, sub_name, attribute)?;
let ds = require_metric_dataset(root_grp, shape, name, sub_name, attribute)?;

datasets.push(ds);
}
Expand Down Expand Up @@ -162,7 +173,7 @@ fn require_dataset<S: Into<Extents>>(parent: &Group, shape: S, name: &str) -> Re
}

/// Create a node dataset in /parent/name/sub_name/attribute
fn require_node_dataset<S: Into<Extents>>(
fn require_metric_dataset<S: Into<Extents>>(
parent: &Group,
shape: S,
name: &str,
Expand Down Expand Up @@ -193,3 +204,81 @@ fn require_group(parent: &Group, name: &str) -> Result<Group, PywrError> {
}
}
}

fn write_pywr_metadata(file: &hdf5::File) -> Result<(), PywrError> {
let root = file.deref();

let grp = require_group(root, "pywr")?;

// Write the Pywr version as an attribute
const VERSION: &str = env!("CARGO_PKG_VERSION");
let version = hdf5::types::VarLenUnicode::from_str(VERSION).map_err(|e| PywrError::HDF5Error(e.to_string()))?;

let attr = grp
.new_attr::<hdf5::types::VarLenUnicode>()
.shape(())
.create("pywr-version")
.map_err(|e| PywrError::HDF5Error(e.to_string()))?;
attr.as_writer()
.write_scalar(&version)
.map_err(|e| PywrError::HDF5Error(e.to_string()))?;

Ok(())
}

#[derive(hdf5::H5Type, Clone, PartialEq, Debug)]
#[repr(C)]
pub struct ScenarioGroupEntry {
pub name: hdf5::types::VarLenUnicode,
pub size: usize,
}

#[derive(hdf5::H5Type, Clone, PartialEq, Debug)]
#[repr(C)]
pub struct H5ScenarioIndex {
index: usize,
indices: hdf5::types::VarLenArray<usize>,
}

/// Write scenario metadata to the HDF5 file.
///
/// This function will create the `/scenarios` group in the HDF5 file and write the scenario
/// groups and indices into `/scenarios/groups` and `/scenarios/indices` respectively.
fn write_scenarios_metadata(file: &hdf5::File, domain: &ScenarioDomain) -> Result<(), PywrError> {
// Create the scenario group and associated datasets
let grp = require_group(file.deref(), "scenarios")?;

let scenario_groups: Array1<ScenarioGroupEntry> = domain
.groups()
.iter()
.map(|s| {
let name =
hdf5::types::VarLenUnicode::from_str(s.name()).map_err(|e| PywrError::HDF5Error(e.to_string()))?;

Ok(ScenarioGroupEntry { name, size: s.size() })
})
.collect::<Result<_, PywrError>>()?;

if let Err(e) = grp.new_dataset_builder().with_data(&scenario_groups).create("groups") {
return Err(PywrError::HDF5Error(e.to_string()));
}

let scenarios: Array1<H5ScenarioIndex> = domain
.indices()
.iter()
.map(|s| {
let indices = hdf5::types::VarLenArray::from_slice(&s.indices);

Ok(H5ScenarioIndex {
index: s.index,
indices,
})
})
.collect::<Result<_, PywrError>>()?;

if let Err(e) = grp.new_dataset_builder().with_data(&scenarios).create("indices") {
return Err(PywrError::HDF5Error(e.to_string()));
}

Ok(())
}
15 changes: 6 additions & 9 deletions pywr-core/src/scenario.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ impl ScenarioIndex {
#[derive(Debug)]
pub struct ScenarioDomain {
scenario_indices: Vec<ScenarioIndex>,
scenario_group_names: Vec<String>,
scenario_groups: Vec<ScenarioGroup>,
}

impl ScenarioDomain {
Expand All @@ -131,29 +131,26 @@ impl ScenarioDomain {

/// Return the index of a scenario group by name
pub fn group_index(&self, name: &str) -> Option<usize> {
self.scenario_group_names.iter().position(|n| n == name)
self.scenario_groups.iter().position(|g| g.name == name)
}

/// Return the name of each scenario group
pub fn group_names(&self) -> &[String] {
&self.scenario_group_names
pub fn groups(&self) -> &[ScenarioGroup] {
&self.scenario_groups
}
}

impl From<ScenarioGroupCollection> for ScenarioDomain {
fn from(value: ScenarioGroupCollection) -> Self {
// Handle creating at-least one scenario if the collection is empty.
if !value.is_empty() {
let scenario_group_names = value.groups.iter().map(|g| g.name.clone()).collect();

Self {
scenario_indices: value.scenario_indices(),
scenario_group_names,
scenario_groups: value.groups,
}
} else {
Self {
scenario_indices: vec![ScenarioIndex::new(0, vec![0])],
scenario_group_names: vec!["default".to_string()],
scenario_groups: vec![ScenarioGroup::new("default", 1)],
}
}
}
Expand Down

0 comments on commit d6c7d8a

Please sign in to comment.