Skip to content

Commit

Permalink
feat: Add "long" formatted CSV output. (#150)
Browse files Browse the repository at this point in the history
Add an output to pywr-core for writing long formatted CSV files. This is supported through the existing CSV output type in the schema.
  • Loading branch information
jetuk authored Mar 26, 2024
1 parent e201dad commit ef16934
Show file tree
Hide file tree
Showing 18 changed files with 1,219 additions and 53 deletions.
4 changes: 4 additions & 0 deletions .gitattributes
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
* text=auto
*.csv text eol=lf
*.sh text eol=lf
*.bat text eol=crlf
1 change: 1 addition & 0 deletions pywr-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ rand = "0.8.5"
rand_distr = "0.4.3"
rand_chacha = "0.3.1"
dyn-clone = "1.0.16"
serde = { version = "1.0.197", features = ["derive"] }

[dev-dependencies]
criterion = "0.5"
Expand Down
2 changes: 1 addition & 1 deletion pywr-core/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@ impl Network {

// Setup recorders
for (recorder, internal_state) in self.recorders.iter().zip(recorder_internal_states) {
recorder.finalise(metric_set_states, internal_state)?;
recorder.finalise(self, metric_set_states, internal_state)?;
}

Ok(())
Expand Down
5 changes: 5 additions & 0 deletions pywr-core/src/recorders/aggregator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,11 @@ impl<T> PeriodValue<T> {
pub fn new(start: NaiveDateTime, duration: PywrDuration, value: T) -> Self {
Self { start, duration, value }
}

/// The end of the period.
pub fn end(&self) -> NaiveDateTime {
self.duration + self.start
}
}

impl<T> PeriodValue<Vec<T>> {
Expand Down
208 changes: 182 additions & 26 deletions pywr-core/src/recorders/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,16 @@ use crate::network::Network;
use crate::recorders::metric_set::MetricSetIndex;
use crate::scenario::ScenarioIndex;
use crate::state::State;
use chrono::NaiveDateTime;
use serde::{Deserialize, Serialize};
use std::any::Any;
use std::fs::File;
use std::ops::Deref;
use std::path::PathBuf;

/// Output the values from a [`MetricSet`] to a CSV file.
#[derive(Clone, Debug)]
pub struct CSVRecorder {
pub struct CsvWideFmtOutput {
meta: RecorderMeta,
filename: PathBuf,
metric_set_idx: MetricSetIndex,
Expand All @@ -21,17 +23,56 @@ struct Internal {
writer: csv::Writer<File>,
}

impl CSVRecorder {
impl CsvWideFmtOutput {
pub fn new<P: Into<PathBuf>>(name: &str, filename: P, metric_set_idx: MetricSetIndex) -> Self {
Self {
meta: RecorderMeta::new(name),
filename: filename.into(),
metric_set_idx,
}
}

fn write_values(
&self,
metric_set_states: &[Vec<MetricSetState>],
internal: &mut Internal,
) -> Result<(), PywrError> {
let mut row = Vec::new();

// Iterate through all scenario's state
for ms_scenario_states in metric_set_states.iter() {
let metric_set_state = ms_scenario_states
.get(*self.metric_set_idx.deref())
.ok_or(PywrError::MetricSetIndexNotFound(self.metric_set_idx))?;

if let Some(current_values) = metric_set_state.current_values() {
let values = current_values
.iter()
.map(|v| format!("{:.2}", v.value))
.collect::<Vec<_>>();

// If the row is empty, add the start time
if row.is_empty() {
row.push(current_values.first().unwrap().start.to_string())
}

row.extend(values);
}
}

// Only write
if row.len() > 1 {
internal
.writer
.write_record(row)
.map_err(|e| PywrError::CSVError(e.to_string()))?;
}

Ok(())
}
}

impl Recorder for CSVRecorder {
impl Recorder for CsvWideFmtOutput {
fn meta(&self) -> &RecorderMeta {
&self.meta
}
Expand Down Expand Up @@ -110,7 +151,7 @@ impl Recorder for CSVRecorder {

fn save(
&self,
timestep: &Timestep,
_timestep: &Timestep,
_scenario_indices: &[ScenarioIndex],
_network: &Network,
_state: &[State],
Expand All @@ -125,44 +166,159 @@ impl Recorder for CSVRecorder {
None => panic!("No internal state defined when one was expected! :("),
};

let mut row = vec![timestep.date.to_string()];

// Iterate through all of the scenario's state
for ms_scenario_states in metric_set_states.iter() {
let metric_set_state = ms_scenario_states
.get(*self.metric_set_idx.deref())
.ok_or(PywrError::MetricSetIndexNotFound(self.metric_set_idx))?;
self.write_values(metric_set_states, internal)?;

if let Some(current_values) = metric_set_state.current_values() {
let values = current_values
.iter()
.map(|v| format!("{:.2}", v.value))
.collect::<Vec<_>>();
Ok(())
}

row.extend(values);
fn finalise(
&self,
_network: &Network,
metric_set_states: &[Vec<MetricSetState>],
internal_state: &mut Option<Box<dyn Any>>,
) -> Result<(), PywrError> {
// This will leave the internal state with a `None` because we need to take
// ownership of the file handle in order to close it.
match internal_state.take() {
Some(mut internal) => {
if let Some(internal) = internal.downcast_mut::<Internal>() {
self.write_values(metric_set_states, internal)?;
Ok(())
} else {
panic!("Internal state did not downcast to the correct type! :(");
}
}
None => panic!("No internal state defined when one was expected! :("),
}
}
}

#[derive(Debug, Serialize, Deserialize)]
pub struct CsvLongFmtRecord {
time_start: NaiveDateTime,
time_end: NaiveDateTime,
scenario_index: usize,
metric_set: String,
name: String,
sub_name: String,
attribute: String,
value: f64,
}

/// Output the values from a several [`MetricSet`]s to a CSV file in long format.
///
/// The long format contains a row for each value produced by the metric set. This is useful
/// for analysis in tools like R or Python which can easily read long format data.
///
#[derive(Clone, Debug)]
pub struct CsvLongFmtOutput {
meta: RecorderMeta,
filename: PathBuf,
metric_set_indices: Vec<MetricSetIndex>,
}

impl CsvLongFmtOutput {
pub fn new<P: Into<PathBuf>>(name: &str, filename: P, metric_set_indices: &[MetricSetIndex]) -> Self {
Self {
meta: RecorderMeta::new(name),
filename: filename.into(),
metric_set_indices: metric_set_indices.to_vec(),
}
}

// Only write
if row.len() > 1 {
internal
.writer
.write_record(row)
.map_err(|e| PywrError::CSVError(e.to_string()))?;
fn write_values(
&self,
network: &Network,
metric_set_states: &[Vec<MetricSetState>],
internal: &mut Internal,
) -> Result<(), PywrError> {
// Iterate through all the scenario's state
for (scenario_idx, ms_scenario_states) in metric_set_states.iter().enumerate() {
for metric_set_idx in self.metric_set_indices.iter() {
let metric_set_state = ms_scenario_states
.get(*metric_set_idx.deref())
.ok_or(PywrError::MetricSetIndexNotFound(*metric_set_idx))?;

if let Some(current_values) = metric_set_state.current_values() {
let metric_set = network.get_metric_set(*metric_set_idx)?;

for (metric, value) in metric_set.iter_metrics().zip(current_values.iter()) {
let name = metric.name(network)?.to_string();
let sub_name = metric
.sub_name(network)?
.map_or_else(|| "".to_string(), |s| s.to_string());
let attribute = metric.attribute().to_string();

let record = CsvLongFmtRecord {
time_start: value.start,
time_end: value.end(),
scenario_index: scenario_idx,
metric_set: metric_set.name().to_string(),
name,
sub_name,
attribute,
value: value.value,
};

internal
.writer
.serialize(record)
.map_err(|e| PywrError::CSVError(e.to_string()))?;
}
}
}
}

Ok(())
}
}

impl Recorder for CsvLongFmtOutput {
fn meta(&self) -> &RecorderMeta {
&self.meta
}
fn setup(&self, _domain: &ModelDomain, _network: &Network) -> Result<Option<Box<(dyn Any)>>, PywrError> {
let writer = csv::Writer::from_path(&self.filename).map_err(|e| PywrError::CSVError(e.to_string()))?;

let internal = Internal { writer };

Ok(Some(Box::new(internal)))
}

fn save(
&self,
_timestep: &Timestep,
_scenario_indices: &[ScenarioIndex],
network: &Network,
_state: &[State],
metric_set_states: &[Vec<MetricSetState>],
internal_state: &mut Option<Box<dyn Any>>,
) -> Result<(), PywrError> {
let internal = match internal_state {
Some(internal) => match internal.downcast_mut::<Internal>() {
Some(pa) => pa,
None => panic!("Internal state did not downcast to the correct type! :("),
},
None => panic!("No internal state defined when one was expected! :("),
};

self.write_values(network, metric_set_states, internal)?;

Ok(())
}

fn finalise(
&self,
_metric_set_states: &[Vec<MetricSetState>],
network: &Network,
metric_set_states: &[Vec<MetricSetState>],
internal_state: &mut Option<Box<dyn Any>>,
) -> Result<(), PywrError> {
// This will leave the internal state with a `None` because we need to take
// ownership of the file handle in order to close it.
match internal_state.take() {
Some(internal) => {
if let Ok(_internal) = internal.downcast::<Internal>() {
Some(mut internal) => {
if let Some(internal) = internal.downcast_mut::<Internal>() {
self.write_values(network, metric_set_states, internal)?;
Ok(())
} else {
panic!("Internal state did not downcast to the correct type! :(");
Expand Down
1 change: 1 addition & 0 deletions pywr-core/src/recorders/hdf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ impl Recorder for HDF5Recorder {

fn finalise(
&self,
_network: &Network,
_metric_set_states: &[Vec<MetricSetState>],
internal_state: &mut Option<Box<dyn Any>>,
) -> Result<(), PywrError> {
Expand Down
1 change: 1 addition & 0 deletions pywr-core/src/recorders/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,7 @@ impl Recorder for MemoryRecorder {

fn finalise(
&self,
_network: &Network,
metric_set_states: &[Vec<MetricSetState>],
internal_state: &mut Option<Box<dyn Any>>,
) -> Result<(), PywrError> {
Expand Down
3 changes: 2 additions & 1 deletion pywr-core/src/recorders/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ mod memory;
mod metric_set;
mod py;

pub use self::csv::CSVRecorder;
use crate::metric::{MetricF64, MetricUsize};
use crate::models::ModelDomain;
use crate::network::Network;
Expand All @@ -14,6 +13,7 @@ use crate::state::State;
use crate::timestep::Timestep;
use crate::PywrError;
pub use aggregator::{AggregationFrequency, AggregationFunction, Aggregator};
pub use csv::{CsvLongFmtOutput, CsvLongFmtRecord, CsvWideFmtOutput};
use float_cmp::{approx_eq, ApproxEq, F64Margin};
pub use hdf::HDF5Recorder;
pub use memory::{Aggregation, AggregationError, MemoryRecorder};
Expand Down Expand Up @@ -87,6 +87,7 @@ pub trait Recorder: Send + Sync {
}
fn finalise(
&self,
_network: &Network,
_metric_set_states: &[Vec<MetricSetState>],
_internal_state: &mut Option<Box<dyn Any>>,
) -> Result<(), PywrError> {
Expand Down
4 changes: 2 additions & 2 deletions pywr-schema/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ pub enum SchemaError {
PythonError(String),
#[error("hdf5 error: {0}")]
HDF5Error(String),
#[error("csv error: {0}")]
CSVError(String),
#[error("Missing metric set: {0}")]
MissingMetricSet(String),
#[error("unexpected parameter type: {0}")]
UnexpectedParameterType(String),
#[error("mismatch in the length of data provided. expected: {expected}, found: {found}")]
Expand Down
Loading

0 comments on commit ef16934

Please sign in to comment.