Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add additional metadata to HDF5 outputs. #157

Merged
merged 1 commit into from
Apr 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pywr-core/src/recorders/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,8 @@ impl Recorder for CsvWideFmtOutput {

// This is a vec of vec for each scenario group
let mut header_scenario_groups = Vec::new();
for group_name in domain.scenarios().group_names() {
header_scenario_groups.push(vec![format!("scenario-group: {}", group_name)]);
for group in domain.scenarios().groups() {
header_scenario_groups.push(vec![format!("scenario-group: {}", group.name())]);
}

for scenario_index in domain.scenarios().indices().iter() {
Expand Down
97 changes: 93 additions & 4 deletions pywr-core/src/recorders/hdf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,22 @@ use super::{MetricSetState, PywrError, Recorder, RecorderMeta, Timestep};
use crate::models::ModelDomain;
use crate::network::Network;
use crate::recorders::MetricSetIndex;
use crate::scenario::ScenarioIndex;
use crate::scenario::{ScenarioDomain, ScenarioIndex};
use crate::state::State;
use chrono::{Datelike, Timelike};
use hdf5::{Extents, Group};
use ndarray::{s, Array1};
use std::any::Any;
use std::ops::Deref;
use std::path::PathBuf;
use std::str::FromStr;

/// A recorder that saves model outputs to an HDF5 file.
///
/// This recorder saves the model outputs to an HDF5 file. The file will contain a number of groups
/// and datasets that correspond to the metrics in the metric set. Additionally, the file will
/// contain metadata about the time steps and scenarios that were used in the model simulation.
///
#[derive(Clone, Debug)]
pub struct HDF5Recorder {
meta: RecorderMeta,
Expand Down Expand Up @@ -69,7 +76,9 @@ impl Recorder for HDF5Recorder {
Ok(f) => f,
Err(e) => return Err(PywrError::HDF5Error(e.to_string())),
};
let mut datasets = Vec::new();

write_pywr_metadata(&file)?;
write_scenarios_metadata(&file, domain.scenarios())?;

// Create the time table
let dates: Array1<_> = domain.time().timesteps().iter().map(DateTime::from_timestamp).collect();
Expand All @@ -83,12 +92,14 @@ impl Recorder for HDF5Recorder {

let metric_set = network.get_metric_set(self.metric_set_idx)?;

let mut datasets = Vec::new();

for metric in metric_set.iter_metrics() {
let name = metric.name(network)?;
let sub_name = metric.sub_name(network)?;
let attribute = metric.attribute();

let ds = require_node_dataset(root_grp, shape, name, sub_name, attribute)?;
let ds = require_metric_dataset(root_grp, shape, name, sub_name, attribute)?;

datasets.push(ds);
}
Expand Down Expand Up @@ -162,7 +173,7 @@ fn require_dataset<S: Into<Extents>>(parent: &Group, shape: S, name: &str) -> Re
}

/// Create a node dataset in /parent/name/sub_name/attribute
fn require_node_dataset<S: Into<Extents>>(
fn require_metric_dataset<S: Into<Extents>>(
parent: &Group,
shape: S,
name: &str,
Expand Down Expand Up @@ -193,3 +204,81 @@ fn require_group(parent: &Group, name: &str) -> Result<Group, PywrError> {
}
}
}

fn write_pywr_metadata(file: &hdf5::File) -> Result<(), PywrError> {
let root = file.deref();

let grp = require_group(root, "pywr")?;

// Write the Pywr version as an attribute
const VERSION: &str = env!("CARGO_PKG_VERSION");
let version = hdf5::types::VarLenUnicode::from_str(VERSION).map_err(|e| PywrError::HDF5Error(e.to_string()))?;

let attr = grp
.new_attr::<hdf5::types::VarLenUnicode>()
.shape(())
.create("pywr-version")
.map_err(|e| PywrError::HDF5Error(e.to_string()))?;
attr.as_writer()
.write_scalar(&version)
.map_err(|e| PywrError::HDF5Error(e.to_string()))?;

Ok(())
}

#[derive(hdf5::H5Type, Clone, PartialEq, Debug)]
#[repr(C)]
pub struct ScenarioGroupEntry {
pub name: hdf5::types::VarLenUnicode,
pub size: usize,
}

#[derive(hdf5::H5Type, Clone, PartialEq, Debug)]
#[repr(C)]
pub struct H5ScenarioIndex {
index: usize,
indices: hdf5::types::VarLenArray<usize>,
}

/// Write scenario metadata to the HDF5 file.
///
/// This function will create the `/scenarios` group in the HDF5 file and write the scenario
/// groups and indices into `/scenarios/groups` and `/scenarios/indices` respectively.
fn write_scenarios_metadata(file: &hdf5::File, domain: &ScenarioDomain) -> Result<(), PywrError> {
// Create the scenario group and associated datasets
let grp = require_group(file.deref(), "scenarios")?;

let scenario_groups: Array1<ScenarioGroupEntry> = domain
.groups()
.iter()
.map(|s| {
let name =
hdf5::types::VarLenUnicode::from_str(s.name()).map_err(|e| PywrError::HDF5Error(e.to_string()))?;

Ok(ScenarioGroupEntry { name, size: s.size() })
})
.collect::<Result<_, PywrError>>()?;

if let Err(e) = grp.new_dataset_builder().with_data(&scenario_groups).create("groups") {
return Err(PywrError::HDF5Error(e.to_string()));
}

let scenarios: Array1<H5ScenarioIndex> = domain
.indices()
.iter()
.map(|s| {
let indices = hdf5::types::VarLenArray::from_slice(&s.indices);

Ok(H5ScenarioIndex {
index: s.index,
indices,
})
})
.collect::<Result<_, PywrError>>()?;

if let Err(e) = grp.new_dataset_builder().with_data(&scenarios).create("indices") {
return Err(PywrError::HDF5Error(e.to_string()));
}

Ok(())
}
15 changes: 6 additions & 9 deletions pywr-core/src/scenario.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ impl ScenarioIndex {
#[derive(Debug)]
pub struct ScenarioDomain {
scenario_indices: Vec<ScenarioIndex>,
scenario_group_names: Vec<String>,
scenario_groups: Vec<ScenarioGroup>,
}

impl ScenarioDomain {
Expand All @@ -131,29 +131,26 @@ impl ScenarioDomain {

/// Return the index of a scenario group by name
pub fn group_index(&self, name: &str) -> Option<usize> {
self.scenario_group_names.iter().position(|n| n == name)
self.scenario_groups.iter().position(|g| g.name == name)
}

/// Return the name of each scenario group
pub fn group_names(&self) -> &[String] {
&self.scenario_group_names
pub fn groups(&self) -> &[ScenarioGroup] {
&self.scenario_groups
}
}

impl From<ScenarioGroupCollection> for ScenarioDomain {
fn from(value: ScenarioGroupCollection) -> Self {
// Handle creating at-least one scenario if the collection is empty.
if !value.is_empty() {
let scenario_group_names = value.groups.iter().map(|g| g.name.clone()).collect();

Self {
scenario_indices: value.scenario_indices(),
scenario_group_names,
scenario_groups: value.groups,
}
} else {
Self {
scenario_indices: vec![ScenarioIndex::new(0, vec![0])],
scenario_group_names: vec!["default".to_string()],
scenario_groups: vec![ScenarioGroup::new("default", 1)],
}
}
}
Expand Down