From d81a522e49d0f874f853809209239015f95df817 Mon Sep 17 00:00:00 2001 From: James Tomlinson Date: Thu, 29 Feb 2024 10:17:25 +0000 Subject: [PATCH] feat: Several output additions and improvements. (#103) * feat: Several output additions and improvements. - Fixed an issue where metric sets were not finalised at the end of a model run. - Added a new Memory output and associated recorder. This recorder stores data in memory. - Add new functions to Recorder API to allow accessing an aggregated metric. - Additional work is needed to give access to different levels of aggregation. - Currently only the new Memory recorder supports this API. - The order of the aggregation operations probably needs looking at. - Changed serialised format of OutputMetric. This will allow it it eventually contain additional information. - Changed serialised format of MetricAggFunc. Again, this will allow definition of functions with additional arguments. - Added CountNonZero aggregation function. --- Cargo.toml | 1 + pywr-core/Cargo.toml | 2 +- pywr-core/src/lib.rs | 6 +- pywr-core/src/models/multi.rs | 3 +- pywr-core/src/models/simple.rs | 18 +- pywr-core/src/network.rs | 34 +- .../{aggregator.rs => aggregator/mod.rs} | 128 +++++- pywr-core/src/recorders/csv.rs | 6 +- pywr-core/src/recorders/hdf.rs | 6 +- pywr-core/src/recorders/memory.rs | 364 ++++++++++++++++++ pywr-core/src/recorders/metric_set.rs | 27 +- pywr-core/src/recorders/mod.rs | 12 +- .../tests/models/aggregated-node1/model.json | 25 +- .../tests/models/piecewise-link1/model.json | 25 +- .../models/simple-custom-parameter/model.json | 15 +- .../simple-storage-timeseries/model.json | 15 +- .../tests/models/simple-timeseries/model.json | 15 +- pywr-schema/Cargo.toml | 1 + pywr-schema/src/data_tables/mod.rs | 2 +- pywr-schema/src/data_tables/scalar.rs | 4 +- pywr-schema/src/metric_sets/mod.rs | 40 +- pywr-schema/src/nodes/core.rs | 7 +- pywr-schema/src/nodes/mod.rs | 1 + pywr-schema/src/outputs/memory.rs | 80 ++++ pywr-schema/src/outputs/mod.rs | 4 + pywr-schema/src/test_models/csv1.json | 5 +- pywr-schema/src/test_models/csv2.json | 17 +- pywr-schema/src/test_models/hdf1.json | 5 +- pywr-schema/src/test_models/memory1.json | 97 +++++ 29 files changed, 892 insertions(+), 73 deletions(-) rename pywr-core/src/recorders/{aggregator.rs => aggregator/mod.rs} (79%) create mode 100644 pywr-core/src/recorders/memory.rs create mode 100644 pywr-schema/src/outputs/memory.rs create mode 100644 pywr-schema/src/test_models/memory1.json diff --git a/Cargo.toml b/Cargo.toml index 67357778..3b42716f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -36,6 +36,7 @@ serde = { version = "1", features = ["derive"] } serde_json = "1.0" thiserror = "1.0.25" num = "0.4.0" +float-cmp = "0.9.0" ndarray = "0.15.3" polars = { version = "0.37.0", features = ["lazy", "rows", "ndarray"] } pyo3-polars = "0.11.1" diff --git a/pywr-core/Cargo.toml b/pywr-core/Cargo.toml index 169adc18..5bd95f1c 100644 --- a/pywr-core/Cargo.toml +++ b/pywr-core/Cargo.toml @@ -18,7 +18,7 @@ libc = "0.2.97" thiserror = { workspace = true } ndarray = { workspace = true } num = { workspace = true } -float-cmp = "0.9.0" +float-cmp = { workspace = true } hdf5 = { workspace = true } csv = { workspace = true } clp-sys = { path = "../clp-sys", version = "0.1.0" } diff --git a/pywr-core/src/lib.rs b/pywr-core/src/lib.rs index d51f398f..2b3ed952 100644 --- a/pywr-core/src/lib.rs +++ b/pywr-core/src/lib.rs @@ -6,7 +6,7 @@ use crate::derived_metric::DerivedMetricIndex; use crate::models::MultiNetworkTransferIndex; use crate::node::NodeIndex; use crate::parameters::{IndexParameterIndex, InterpolationError, MultiValueParameterIndex, ParameterIndex}; -use crate::recorders::{MetricSetIndex, RecorderIndex}; +use crate::recorders::{AggregationError, MetricSetIndex, RecorderIndex}; use crate::virtual_storage::VirtualStorageIndex; use pyo3::exceptions::{PyException, PyRuntimeError}; use pyo3::{create_exception, PyErr}; @@ -123,6 +123,8 @@ pub enum PywrError { InvalidMetricValue(String), #[error("recorder not initialised")] RecorderNotInitialised, + #[error("recorder does not supported aggregation")] + RecorderDoesNotSupportAggregation, #[error("hdf5 error: {0}")] HDF5Error(String), #[error("csv error: {0}")] @@ -161,6 +163,8 @@ pub enum PywrError { TimestepRangeGenerationError(String), #[error("Could not create timesteps for frequency '{0}'")] TimestepGenerationError(String), + #[error("aggregation error: {0}")] + Aggregation(#[from] AggregationError), } // Python errors diff --git a/pywr-core/src/models/multi.rs b/pywr-core/src/models/multi.rs index f3531df3..0b9f083a 100644 --- a/pywr-core/src/models/multi.rs +++ b/pywr-core/src/models/multi.rs @@ -298,8 +298,9 @@ impl MultiNetworkModel { } for (idx, entry) in self.networks.iter().enumerate() { + let sub_model_ms_states = state.states.get_mut(idx).unwrap().all_metric_set_internal_states_mut(); let sub_model_recorder_states = state.recorder_states.get_mut(idx).unwrap(); - entry.network.finalise(sub_model_recorder_states)?; + entry.network.finalise(sub_model_ms_states, sub_model_recorder_states)?; } // End the global timer and print the run statistics timings.finish(count); diff --git a/pywr-core/src/models/simple.rs b/pywr-core/src/models/simple.rs index 71c04c45..5017f9f1 100644 --- a/pywr-core/src/models/simple.rs +++ b/pywr-core/src/models/simple.rs @@ -22,6 +22,10 @@ impl ModelState { pub fn network_state_mut(&mut self) -> &mut NetworkState { &mut self.state } + + pub fn recorder_state(&self) -> &Vec>> { + &self.recorder_state + } } /// A standard Pywr model containing a single network. @@ -196,7 +200,7 @@ impl Model { /// Run a model through the given time-steps. /// /// This method will setup state and solvers, and then run the model through the time-steps. - pub fn run(&self, settings: &S::Settings) -> Result<(), PywrError> + pub fn run(&self, settings: &S::Settings) -> Result>>, PywrError> where S: Solver, ::Settings: SolverSettings, @@ -205,7 +209,7 @@ impl Model { self.run_with_state::(&mut state, settings)?; - Ok(()) + Ok(state.recorder_state) } /// Run the model with the provided states and solvers. @@ -243,7 +247,10 @@ impl Model { count += self.domain.scenarios.indices().len(); } - self.network.finalise(&mut state.recorder_state)?; + self.network.finalise( + state.state.all_metric_set_internal_states_mut(), + &mut state.recorder_state, + )?; // End the global timer and print the run statistics timings.finish(count); timings.print_table(); @@ -296,7 +303,10 @@ impl Model { count += self.domain.scenarios.indices().len(); } - self.network.finalise(&mut state.recorder_state)?; + self.network.finalise( + state.state.all_metric_set_internal_states_mut(), + &mut state.recorder_state, + )?; // End the global timer and print the run statistics timings.finish(count); diff --git a/pywr-core/src/network.rs b/pywr-core/src/network.rs index 74cdead0..13a44a2b 100644 --- a/pywr-core/src/network.rs +++ b/pywr-core/src/network.rs @@ -182,6 +182,10 @@ impl NetworkState { pub fn iter_parameter_states_mut(&mut self) -> IterMut<'_, ParameterStates> { self.parameter_internal_states.iter_mut() } + + pub fn all_metric_set_internal_states_mut(&mut self) -> &mut [Vec] { + &mut self.metric_set_internal_states + } } /// A Pywr network containing nodes, edges, parameters, metric sets, etc. @@ -358,10 +362,22 @@ impl Network { S::setup(self, scenario_indices.len(), settings) } - pub fn finalise(&self, recorder_internal_states: &mut [Option>]) -> Result<(), PywrError> { + pub fn finalise( + &self, + metric_set_states: &mut [Vec], + recorder_internal_states: &mut [Option>], + ) -> Result<(), PywrError> { + // Finally, save new data to the metric set + + for ms_states in metric_set_states.iter_mut() { + for (metric_set, ms_state) in self.metric_sets.iter().zip(ms_states.iter_mut()) { + metric_set.finalise(ms_state); + } + } + // Setup recorders for (recorder, internal_state) in self.recorders.iter().zip(recorder_internal_states) { - recorder.finalise(internal_state)?; + recorder.finalise(metric_set_states, internal_state)?; } Ok(()) @@ -1136,6 +1152,20 @@ impl Network { } } + pub fn get_recorder_index_by_name(&self, name: &str) -> Result { + match self.recorders.iter().position(|r| r.name() == name) { + Some(idx) => Ok(RecorderIndex::new(idx)), + None => Err(PywrError::RecorderNotFound), + } + } + + pub fn get_aggregated_value(&self, name: &str, recorder_states: &[Option>]) -> Result { + match self.recorders.iter().enumerate().find(|(_, r)| r.name() == name) { + Some((idx, recorder)) => recorder.aggregated_value(&recorder_states[idx]), + None => Err(PywrError::RecorderNotFound), + } + } + /// Add a new Node::Input to the network. pub fn add_input_node(&mut self, name: &str, sub_name: Option<&str>) -> Result { // Check for name. diff --git a/pywr-core/src/recorders/aggregator.rs b/pywr-core/src/recorders/aggregator/mod.rs similarity index 79% rename from pywr-core/src/recorders/aggregator.rs rename to pywr-core/src/recorders/aggregator/mod.rs index 7f5af08f..7f9b9bba 100644 --- a/pywr-core/src/recorders/aggregator.rs +++ b/pywr-core/src/recorders/aggregator/mod.rs @@ -1,10 +1,12 @@ use crate::timestep::PywrDuration; -use chrono::{Datelike, NaiveDate, NaiveDateTime, NaiveTime}; +use chrono::{Datelike, Duration, NaiveDate, NaiveDateTime, NaiveTime}; +use std::num::NonZeroUsize; #[derive(Clone, Debug)] pub enum AggregationFrequency { Monthly, Annual, + Days(NonZeroUsize), } impl AggregationFrequency { @@ -12,6 +14,10 @@ impl AggregationFrequency { match self { Self::Monthly => (period_start.year() == date.year()) && (period_start.month() == date.month()), Self::Annual => period_start.year() == date.year(), + Self::Days(days) => { + let period_end = *period_start + Duration::days(days.get() as i64); + (period_start <= date) && (date < &period_end) + } } } @@ -39,12 +45,13 @@ impl AggregationFrequency { let date = NaiveDate::from_ymd_opt(current_date.year() + 1, 1, 1).unwrap(); NaiveDateTime::new(date, NaiveTime::default()) } + Self::Days(days) => *current_date + Duration::days(days.get() as i64), } } /// Split the value representing a period into multiple ['PeriodValue'] that do not cross the /// boundary of the given period. - fn split_value_into_periods(&self, value: PeriodValue) -> Vec { + fn split_value_into_periods(&self, value: PeriodValue) -> Vec> { let mut sub_values = Vec::new(); let mut current_date = value.start; @@ -78,10 +85,13 @@ pub enum AggregationFunction { Mean, Min, Max, + CountNonZero, + CountFunc { func: fn(f64) -> bool }, } impl AggregationFunction { - fn calc(&self, values: &[PeriodValue]) -> Option { + /// Calculate the aggregation of the given values. + pub fn calc_period_values(&self, values: &[PeriodValue]) -> Option { match self { AggregationFunction::Sum => Some(values.iter().map(|v| v.value * v.duration.fractional_days()).sum()), AggregationFunction::Mean => { @@ -102,22 +112,67 @@ impl AggregationFunction { a.partial_cmp(b) .expect("Failed to calculate maximum of values containing a NaN.") }), + AggregationFunction::CountNonZero => { + let count = values.iter().filter(|v| v.value != 0.0).count(); + Some(count as f64) + } + AggregationFunction::CountFunc { func } => { + let count = values.iter().filter(|v| func(v.value)).count(); + Some(count as f64) + } + } + } + + pub fn calc_f64(&self, values: &[f64]) -> Option { + match self { + AggregationFunction::Sum => Some(values.iter().sum()), + AggregationFunction::Mean => { + let ndays: i64 = values.len() as i64; + if ndays == 0 { + None + } else { + let sum: f64 = values.iter().sum(); + Some(sum / ndays as f64) + } + } + AggregationFunction::Min => values + .iter() + .min_by(|a, b| { + a.partial_cmp(b) + .expect("Failed to calculate minimum of values containing a NaN.") + }) + .copied(), + AggregationFunction::Max => values + .iter() + .max_by(|a, b| { + a.partial_cmp(b) + .expect("Failed to calculate maximum of values containing a NaN.") + }) + .copied(), + AggregationFunction::CountNonZero => { + let count = values.iter().filter(|v| **v != 0.0).count(); + Some(count as f64) + } + AggregationFunction::CountFunc { func } => { + let count = values.iter().filter(|v| func(**v)).count(); + Some(count as f64) + } } } } #[derive(Default, Debug, Clone)] struct PeriodicAggregatorState { - current_values: Option>, + current_values: Option>>, } impl PeriodicAggregatorState { fn process_value( &mut self, - value: PeriodValue, + value: PeriodValue, agg_freq: &AggregationFrequency, agg_func: &AggregationFunction, - ) -> Option { + ) -> Option> { if let Some(current_values) = self.current_values.as_mut() { // SAFETY: The current_values vector is guaranteed to contain at least one value. let current_period_start = current_values @@ -135,7 +190,7 @@ impl PeriodicAggregatorState { // New value is part of a different period (assume the next one). // Calculate the aggregated value of the previous period. - let agg_period = if let Some(agg_value) = agg_func.calc(current_values) { + let agg_period = if let Some(agg_value) = agg_func.calc_period_values(current_values) { let agg_duration = value.start - current_period_start; Some(PeriodValue::new(current_period_start, agg_duration.into(), agg_value)) } else { @@ -157,7 +212,7 @@ impl PeriodicAggregatorState { } } - fn process_value_no_period(&mut self, value: PeriodValue) { + fn process_value_no_period(&mut self, value: PeriodValue) { if let Some(current_values) = self.current_values.as_mut() { current_values.push(value); } else { @@ -165,9 +220,9 @@ impl PeriodicAggregatorState { } } - fn calc_aggregation(&self, agg_func: &AggregationFunction) -> Option { + fn calc_aggregation(&self, agg_func: &AggregationFunction) -> Option> { if let Some(current_values) = &self.current_values { - if let Some(agg_value) = agg_func.calc(current_values) { + if let Some(agg_value) = agg_func.calc_period_values(current_values) { // SAFETY: The current_values vector is guaranteed to contain at least one value. let current_period_start = current_values .first() @@ -200,14 +255,43 @@ struct PeriodicAggregator { } #[derive(Debug, Copy, Clone)] -pub struct PeriodValue { +pub struct PeriodValue { pub start: NaiveDateTime, pub duration: PywrDuration, - pub value: f64, + pub value: T, +} + +impl PeriodValue { + pub fn new(start: NaiveDateTime, duration: PywrDuration, value: T) -> Self { + Self { start, duration, value } + } +} + +impl PeriodValue> { + pub fn index(&self, index: usize) -> PeriodValue + where + T: Copy, + { + PeriodValue { + start: self.start, + duration: self.duration, + value: self.value[index], + } + } + pub fn len(&self) -> usize { + self.value.len() + } } -impl PeriodValue { - pub fn new(start: NaiveDateTime, duration: PywrDuration, value: f64) -> Self { +impl From<&[PeriodValue]> for PeriodValue> +where + T: Copy, +{ + fn from(values: &[PeriodValue]) -> Self { + let start = values.first().expect("Empty vector of period values.").start; + let duration = values.last().expect("Empty vector of period values.").duration; + + let value = values.into_iter().map(|v| v.value).collect(); Self { start, duration, value } } } @@ -222,7 +306,11 @@ impl PeriodicAggregator { /// The new value should sequentially follow from the previously processed values. If the /// value completes a new aggregation period then a value representing that aggregation is /// returned. - fn process_value(&self, current_state: &mut PeriodicAggregatorState, value: PeriodValue) -> Option { + fn process_value( + &self, + current_state: &mut PeriodicAggregatorState, + value: PeriodValue, + ) -> Option> { // Split the given period into separate periods that align with the aggregation period. let mut agg_value = None; @@ -242,7 +330,7 @@ impl PeriodicAggregator { agg_value } - fn calc_aggregation(&self, state: &PeriodicAggregatorState) -> Option { + fn calc_aggregation(&self, state: &PeriodicAggregatorState) -> Option> { state.calc_aggregation(&self.function) } } @@ -278,7 +366,7 @@ impl Aggregator { } /// Append a new value to the aggregator. - pub fn append_value(&self, state: &mut AggregatorState, value: PeriodValue) -> Option { + pub fn append_value(&self, state: &mut AggregatorState, value: PeriodValue) -> Option> { let agg_value = match (&self.child, state.child.as_mut()) { (Some(child), Some(child_state)) => child.append_value(child_state, value), (None, None) => Some(value), @@ -297,7 +385,7 @@ impl Aggregator { /// /// This will also compute the final aggregation value from the child aggregators if any exists. /// This includes aggregation calculations over partial or unfinished periods. - pub fn finalise(&self, state: &mut AggregatorState) -> Option { + pub fn finalise(&self, state: &mut AggregatorState) -> Option> { let final_child_value = match (&self.child, state.child.as_mut()) { (Some(child), Some(child_state)) => child.finalise(child_state), (None, None) => None, @@ -438,10 +526,10 @@ mod tests { ), ]; - let agg_value = AggregationFunction::Mean.calc(values.as_slice()).unwrap(); + let agg_value = AggregationFunction::Mean.calc_period_values(values.as_slice()).unwrap(); assert_approx_eq!(f64, agg_value, 7.0 / 4.0); - let agg_value = AggregationFunction::Sum.calc(values.as_slice()).unwrap(); + let agg_value = AggregationFunction::Sum.calc_period_values(values.as_slice()).unwrap(); let expected = 2.0 * (1.0 / 24.0) + 1.0 * (2.0 / 24.0) + 3.0 * (1.0 / 24.0); assert_approx_eq!(f64, agg_value, expected); } diff --git a/pywr-core/src/recorders/csv.rs b/pywr-core/src/recorders/csv.rs index 82c24645..37ed7d8e 100644 --- a/pywr-core/src/recorders/csv.rs +++ b/pywr-core/src/recorders/csv.rs @@ -218,7 +218,11 @@ impl Recorder for CSVRecorder { Ok(()) } - fn finalise(&self, internal_state: &mut Option>) -> Result<(), PywrError> { + fn finalise( + &self, + _metric_set_states: &[Vec], + internal_state: &mut Option>, + ) -> Result<(), PywrError> { // This will leave the internal state with a `None` because we need to take // ownership of the file handle in order to close it. match internal_state.take() { diff --git a/pywr-core/src/recorders/hdf.rs b/pywr-core/src/recorders/hdf.rs index 602229ae..8781abdd 100644 --- a/pywr-core/src/recorders/hdf.rs +++ b/pywr-core/src/recorders/hdf.rs @@ -179,7 +179,11 @@ impl Recorder for HDF5Recorder { Ok(()) } - fn finalise(&self, internal_state: &mut Option>) -> Result<(), PywrError> { + fn finalise( + &self, + _metric_set_states: &[Vec], + internal_state: &mut Option>, + ) -> Result<(), PywrError> { // This will leave the internal state with a `None` because we need to take // ownership of the file handle in order to close it. match internal_state.take() { diff --git a/pywr-core/src/recorders/memory.rs b/pywr-core/src/recorders/memory.rs new file mode 100644 index 00000000..da66c695 --- /dev/null +++ b/pywr-core/src/recorders/memory.rs @@ -0,0 +1,364 @@ +use crate::models::ModelDomain; +use crate::network::Network; +use crate::recorders::aggregator::PeriodValue; +use crate::recorders::{AggregationFunction, MetricSetIndex, MetricSetState, Recorder, RecorderMeta}; +use crate::scenario::ScenarioIndex; +use crate::state::State; +use crate::timestep::Timestep; +use crate::PywrError; +use std::any::Any; +use std::ops::Deref; +use thiserror::Error; +use tracing::warn; + +#[derive(Error, Debug, PartialEq, Eq)] +pub enum AggregationError { + #[error("Aggregation function not defined.")] + AggregationFunctionNotDefined, + #[error("Aggregation function failed.")] + AggregationFunctionFailed, +} + +pub struct Aggregation { + scenario: Option, + time: Option, + metric: Option, +} + +impl Aggregation { + pub fn new( + scenario: Option, + time: Option, + metric: Option, + ) -> Self { + Self { scenario, time, metric } + } + + /// Apply the metric aggregation function to the provided data. + /// + /// If there is only one value in the data, the aggregation function is not required. If one + /// is provided, a warning is logged. + fn apply_metric_func_period_value( + &self, + values: &PeriodValue>, + ) -> Result, AggregationError> { + let agg_value = if values.len() == 1 { + if self.metric.is_some() { + warn!("Aggregation function defined for metric, but not used.") + } + *values.value.first().expect("No values found in time series") + } else { + self.metric + .as_ref() + .ok_or(AggregationError::AggregationFunctionNotDefined)? + .calc_f64(&values.value) + .ok_or(AggregationError::AggregationFunctionFailed)? + }; + + Ok(PeriodValue::new(values.start, values.duration, agg_value)) + } + + /// Apply the metric aggregation function to the provided data. + /// + /// If there is only one value in the data, the aggregation function is not required. If one + /// is provided, a warning is logged. + fn apply_metric_func_f64(&self, values: &[f64]) -> Result { + let agg_value = if values.len() == 1 { + if self.metric.is_some() { + warn!("Aggregation function defined for metric, but not used.") + } + *values.first().expect("No values found in time series") + } else { + self.metric + .as_ref() + .ok_or(AggregationError::AggregationFunctionNotDefined)? + .calc_f64(values) + .ok_or(AggregationError::AggregationFunctionFailed)? + }; + + Ok(agg_value) + } + + /// Apply the scenario aggregation function to the provided data. + /// + /// If there is only one value in the data, the aggregation function is not required. If one + /// is provided, a warning is logged. + fn apply_scenario_func(&self, values: &[f64]) -> Result { + let agg_value = if values.len() == 1 { + if self.scenario.is_some() { + warn!("Aggregation function defined for scenario, but not used.") + } + *values.first().expect("No values found in time series") + } else { + self.scenario + .as_ref() + .ok_or(AggregationError::AggregationFunctionNotDefined)? + .calc_f64(values) + .ok_or(AggregationError::AggregationFunctionFailed)? + }; + + Ok(agg_value) + } + + /// Apply the time aggregation function to the provided data. + /// + /// If there is only one value in the data, the aggregation function is not required. If one + /// is provided, a warning is logged. + fn apply_time_func(&self, values: &[PeriodValue]) -> Result { + let agg_value = if values.len() == 1 { + if self.time.is_some() { + warn!("Aggregation function defined for time, but not used.") + } + values.first().expect("No values found in time series").value + } else { + self.time + .as_ref() + .ok_or(AggregationError::AggregationFunctionNotDefined)? + .calc_period_values(values) + .ok_or(AggregationError::AggregationFunctionFailed)? + }; + + Ok(agg_value) + } +} + +/// Internal state for the memory recorder. +/// +/// This is a 3D array, where the first dimension is the scenario, the second dimension is the time, +/// and the third dimension is the metric. +struct InternalState { + data: Vec>>>, +} + +impl InternalState { + fn new(num_scenarios: usize) -> Self { + let mut data: Vec>>> = Vec::with_capacity(num_scenarios); + + for _ in 0..num_scenarios { + // We can't use `Vec::with_capacity` here because we don't know the number of + // periods that will be recorded. + data.push(Vec::new()) + } + + Self { data } + } + + /// Aggregate over the saved data to a single value using the provided aggregation functions. + /// + /// This method will first aggregation over the metrics, then over time, and finally over the scenarios. + fn aggregate_scenario_time_metric(&self, aggregation: &Aggregation) -> Result { + let scenario_data: Vec = self + .data + .iter() + .map(|time_data| { + // Aggregate each metric at each time step; + // this results in a time series iterator of aggregated values + let ts: Vec> = time_data + .iter() + .map(|metric_data| aggregation.apply_metric_func_period_value(metric_data)) + .collect::>()?; + + aggregation.apply_time_func(&ts) + }) + .collect::>()?; + + aggregation.apply_scenario_func(&scenario_data) + } + + /// Aggregate over the saved data to a single value using the provided aggregation functions. + /// + /// This method will first aggregation over time, then over the metrics, and finally over the scenarios. + fn aggregate_scenario_metric_time(&self, aggregation: &Aggregation) -> Result { + let scenario_data: Vec = self + .data + .iter() + .map(|time_data| { + // We expect the same number of metrics in all the entries + let num_metrics = time_data.first().expect("No metrics found in time data").len(); + + // Aggregate each metric over time first. This requires transposing the saved data. + let metric_ts: Vec = (0..num_metrics) + // TODO remove the collect allocation; requires `AggregationFunction.calc` to accept an iterator + .map(|metric_idx| time_data.iter().map(|t| t.index(metric_idx)).collect()) + .map(|ts: Vec>| aggregation.apply_time_func(&ts)) + .collect::>()?; + + // Now aggregate over the metrics + aggregation.apply_metric_func_f64(&metric_ts) + }) + .collect::>()?; + + aggregation.apply_scenario_func(&scenario_data) + } +} + +/// A recorder that saves the metric values to memory. +/// +/// This recorder saves data into memory and can be used to provide aggregated data for external +/// analysis. The data is saved in a 3D array, where the first dimension is the scenario, the second +/// dimension is the time, and the third dimension is the metric. +/// +/// Users should be aware that this recorder can consume a large amount of memory if the number of +/// scenarios, time steps, and metrics is large. +pub struct MemoryRecorder { + meta: RecorderMeta, + metric_set_idx: MetricSetIndex, + aggregation: Aggregation, +} + +impl MemoryRecorder { + pub fn new(name: &str, metric_set_idx: MetricSetIndex, aggregation: Aggregation) -> Self { + Self { + meta: RecorderMeta::new(name), + metric_set_idx, + aggregation, + } + } +} + +impl Recorder for MemoryRecorder { + fn meta(&self) -> &RecorderMeta { + &self.meta + } + + fn setup(&self, domain: &ModelDomain, _network: &Network) -> Result>, PywrError> { + let data = InternalState::new(domain.scenarios().len()); + + Ok(Some(Box::new(data))) + } + + fn save( + &self, + _timestep: &Timestep, + _scenario_indices: &[ScenarioIndex], + _model: &Network, + _state: &[State], + metric_set_states: &[Vec], + internal_state: &mut Option>, + ) -> Result<(), PywrError> { + let internal_state = match internal_state { + Some(internal) => match internal.downcast_mut::() { + Some(pa) => pa, + None => panic!("Internal state did not downcast to the correct type! :("), + }, + None => panic!("No internal state defined when one was expected! :("), + }; + + // Iterate through all of the scenario's state + for (ms_scenario_states, scenario_data) in metric_set_states.iter().zip(internal_state.data.iter_mut()) { + let metric_set_state = ms_scenario_states + .get(*self.metric_set_idx.deref()) + .ok_or(PywrError::MetricSetIndexNotFound(self.metric_set_idx))?; + + if let Some(current_values) = metric_set_state.current_values() { + scenario_data.push(current_values.into()); + } + } + + Ok(()) + } + + fn finalise( + &self, + metric_set_states: &[Vec], + internal_state: &mut Option>, + ) -> Result<(), PywrError> { + let internal_state = match internal_state { + Some(internal) => match internal.downcast_mut::() { + Some(pa) => pa, + None => panic!("Internal state did not downcast to the correct type! :("), + }, + None => panic!("No internal state defined when one was expected! :("), + }; + + // Iterate through all of the scenario's state + for (ms_scenario_states, scenario_data) in metric_set_states.iter().zip(internal_state.data.iter_mut()) { + let metric_set_state = ms_scenario_states + .get(*self.metric_set_idx.deref()) + .ok_or(PywrError::MetricSetIndexNotFound(self.metric_set_idx))?; + + if let Some(current_values) = metric_set_state.current_values() { + scenario_data.push(current_values.into()); + } + } + + Ok(()) + } + + /// Aggregate the saved data to a single value using the provided aggregation functions. + /// + /// This method will first aggregation over the metrics, then over time, and finally over the scenarios. + fn aggregated_value(&self, internal_state: &Option>) -> Result { + let internal_state = match internal_state { + Some(internal) => match internal.downcast_ref::() { + Some(pa) => pa, + None => panic!("Internal state did not downcast to the correct type! :("), + }, + None => panic!("No internal state defined when one was expected! :("), + }; + + // TODO allow the user to choose the order of aggregation + let agg_value = internal_state.aggregate_scenario_time_metric(&self.aggregation)?; + Ok(agg_value) + } +} + +#[cfg(test)] +mod tests { + use super::{Aggregation, InternalState}; + use crate::recorders::aggregator::PeriodValue; + use crate::recorders::AggregationFunction; + use crate::test_utils::default_timestepper; + use crate::timestep::TimeDomain; + use float_cmp::assert_approx_eq; + use rand::{Rng, SeedableRng}; + use rand_chacha::ChaCha8Rng; + use rand_distr::Normal; + + #[test] + fn test_aggregation_orders() { + let num_scenarios = 2; + let num_metrics = 3; + let mut state = InternalState::new(num_scenarios); + + let mut rng = ChaCha8Rng::seed_from_u64(0); + let dist: Normal = Normal::new(0.0, 1.0).unwrap(); + + let time_domain: TimeDomain = default_timestepper().try_into().unwrap(); + // The expected values from this test + let mut count_non_zero_max = 0.0; + let mut count_non_zero_by_metric = vec![0.0; num_metrics]; + + time_domain.timesteps().iter().for_each(|timestep| { + state.data.iter_mut().for_each(|scenario_data| { + let metric_data = (&mut rng).sample_iter(&dist).take(num_metrics).collect::>(); + + // Compute the expected values + if metric_data.iter().sum::() > 0.0 { + count_non_zero_max += 1.0; + } + // ... and by metric + metric_data.iter().enumerate().for_each(|(i, v)| { + if *v > 0.0 { + count_non_zero_by_metric[i] += 1.0; + } + }); + + let metric_data = PeriodValue::new(timestep.date, timestep.duration, metric_data); + + scenario_data.push(metric_data); + }); + }); + + let agg = Aggregation::new( + Some(AggregationFunction::Sum), + Some(AggregationFunction::CountFunc { func: |v: f64| v > 0.0 }), + Some(AggregationFunction::Sum), + ); + let agg_value = state.aggregate_scenario_time_metric(&agg).expect("Aggregation failed"); + assert_approx_eq!(f64, agg_value, count_non_zero_max); + + let agg_value = state.aggregate_scenario_metric_time(&agg).expect("Aggregation failed"); + assert_approx_eq!(f64, agg_value, count_non_zero_by_metric.iter().sum()); + } +} diff --git a/pywr-core/src/recorders/metric_set.rs b/pywr-core/src/recorders/metric_set.rs index 8ba3a451..5b72ae30 100644 --- a/pywr-core/src/recorders/metric_set.rs +++ b/pywr-core/src/recorders/metric_set.rs @@ -5,6 +5,7 @@ use crate::scenario::ScenarioIndex; use crate::state::State; use crate::timestep::Timestep; use crate::PywrError; +use core::f64; use std::fmt; use std::fmt::{Display, Formatter}; use std::ops::Deref; @@ -36,13 +37,13 @@ impl Display for MetricSetIndex { #[derive(Debug, Clone)] pub struct MetricSetState { // Populated with any yielded values from the last processing. - current_values: Option>, + current_values: Option>>, // If the metric set aggregates then this state tracks the aggregation of each metric aggregation_states: Option>, } impl MetricSetState { - pub fn current_values(&self) -> Option<&[PeriodValue]> { + pub fn current_values(&self) -> Option<&[PeriodValue]> { self.current_values.as_deref() } } @@ -92,12 +93,12 @@ impl MetricSet { internal_state: &mut MetricSetState, ) -> Result<(), PywrError> { // Combine all the values for metric across all of the scenarios - let values: Vec = self + let values: Vec> = self .metrics .iter() .map(|metric| { let value = metric.get_value(model, state)?; - Ok::(PeriodValue::new(timestep.date, timestep.duration, value)) + Ok::, PywrError>(PeriodValue::new(timestep.date, timestep.duration, value)) }) .collect::, _>>()?; @@ -123,4 +124,22 @@ impl MetricSet { Ok(()) } + + pub fn finalise(&self, internal_state: &mut MetricSetState) { + if let Some(aggregator) = &self.aggregator { + let aggregation_states = internal_state + .aggregation_states + .as_mut() + .expect("Aggregation state expected for metric set with aggregator!"); + + let final_values = aggregation_states + .iter_mut() + .map(|current_state| aggregator.finalise(current_state)) + .collect::>>(); + + internal_state.current_values = final_values; + } else { + internal_state.current_values = None; + } + } } diff --git a/pywr-core/src/recorders/mod.rs b/pywr-core/src/recorders/mod.rs index 90f221b8..47709640 100644 --- a/pywr-core/src/recorders/mod.rs +++ b/pywr-core/src/recorders/mod.rs @@ -1,6 +1,7 @@ mod aggregator; mod csv; mod hdf; +mod memory; mod metric_set; mod py; @@ -15,6 +16,7 @@ use crate::PywrError; pub use aggregator::{AggregationFrequency, AggregationFunction, Aggregator}; use float_cmp::{approx_eq, ApproxEq, F64Margin}; pub use hdf::HDF5Recorder; +pub use memory::{Aggregation, AggregationError, MemoryRecorder}; pub use metric_set::{MetricSet, MetricSetIndex, MetricSetState}; use ndarray::prelude::*; use ndarray::Array2; @@ -83,9 +85,17 @@ pub trait Recorder: Send + Sync { ) -> Result<(), PywrError> { Ok(()) } - fn finalise(&self, _internal_state: &mut Option>) -> Result<(), PywrError> { + fn finalise( + &self, + _metric_set_states: &[Vec], + _internal_state: &mut Option>, + ) -> Result<(), PywrError> { Ok(()) } + + fn aggregated_value(&self, _internal_state: &Option>) -> Result { + Err(PywrError::RecorderDoesNotSupportAggregation) + } } pub struct Array2Recorder { diff --git a/pywr-python/tests/models/aggregated-node1/model.json b/pywr-python/tests/models/aggregated-node1/model.json index 0f650a5d..4c69a449 100644 --- a/pywr-python/tests/models/aggregated-node1/model.json +++ b/pywr-python/tests/models/aggregated-node1/model.json @@ -87,11 +87,26 @@ { "name": "nodes", "metrics": [ - "input1", - "link1", - "link2", - "output1", - "agg-node" + { + "type": "Default", + "node": "input1" + }, + { + "type": "Default", + "node": "link1" + }, + { + "type": "Default", + "node": "link2" + }, + { + "type": "Default", + "node": "output1" + }, + { + "type": "Default", + "node": "agg-node" + } ] } ], diff --git a/pywr-python/tests/models/piecewise-link1/model.json b/pywr-python/tests/models/piecewise-link1/model.json index d058c164..8b067e54 100644 --- a/pywr-python/tests/models/piecewise-link1/model.json +++ b/pywr-python/tests/models/piecewise-link1/model.json @@ -92,11 +92,26 @@ { "name": "nodes", "metrics": [ - "input1", - "link1", - "mrf1", - "demand1", - "term1" + { + "type": "Default", + "node": "input1" + }, + { + "type": "Default", + "node": "link1" + }, + { + "type": "Default", + "node": "mrf1" + }, + { + "type": "Default", + "node": "demand1" + }, + { + "type": "Default", + "node": "term1" + } ] } ], diff --git a/pywr-python/tests/models/simple-custom-parameter/model.json b/pywr-python/tests/models/simple-custom-parameter/model.json index a99c2e44..5b709a1c 100644 --- a/pywr-python/tests/models/simple-custom-parameter/model.json +++ b/pywr-python/tests/models/simple-custom-parameter/model.json @@ -71,9 +71,18 @@ { "name": "nodes", "metrics": [ - "input1", - "link1", - "output1" + { + "type": "Default", + "node": "input1" + }, + { + "type": "Default", + "node": "link1" + }, + { + "type": "Default", + "node": "output1" + } ] } ], diff --git a/pywr-python/tests/models/simple-storage-timeseries/model.json b/pywr-python/tests/models/simple-storage-timeseries/model.json index 3599dcc3..ac0f6201 100644 --- a/pywr-python/tests/models/simple-storage-timeseries/model.json +++ b/pywr-python/tests/models/simple-storage-timeseries/model.json @@ -54,9 +54,18 @@ { "name": "nodes", "metrics": [ - "input1", - "storage1", - "output1" + { + "type": "Default", + "node": "input1" + }, + { + "type": "Default", + "node": "storage1" + }, + { + "type": "Default", + "node": "output1" + } ] } ], diff --git a/pywr-python/tests/models/simple-timeseries/model.json b/pywr-python/tests/models/simple-timeseries/model.json index ec98f54e..0a992c1e 100644 --- a/pywr-python/tests/models/simple-timeseries/model.json +++ b/pywr-python/tests/models/simple-timeseries/model.json @@ -64,9 +64,18 @@ { "name": "nodes", "metrics": [ - "input1", - "link1", - "output1" + { + "type": "Default", + "node": "input1" + }, + { + "type": "Default", + "node": "link1" + }, + { + "type": "Default", + "node": "output1" + } ] } ], diff --git a/pywr-schema/Cargo.toml b/pywr-schema/Cargo.toml index 5555b2fb..40428a82 100644 --- a/pywr-schema/Cargo.toml +++ b/pywr-schema/Cargo.toml @@ -25,6 +25,7 @@ hdf5 = { workspace = true } csv = { workspace = true } tracing = { workspace = true } num = { workspace = true } +float-cmp = { workspace = true } ndarray = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } diff --git a/pywr-schema/src/data_tables/mod.rs b/pywr-schema/src/data_tables/mod.rs index b373b0b3..1bdd42fd 100644 --- a/pywr-schema/src/data_tables/mod.rs +++ b/pywr-schema/src/data_tables/mod.rs @@ -130,7 +130,7 @@ pub enum TableError { TableNotFound(String), #[error("entry not found")] EntryNotFound, - #[error("wrong key size; expected: {0}; given: {0}")] + #[error("wrong key size; expected: {0}; given: {1}")] WrongKeySize(usize, usize), #[error("failed to get or parse key")] KeyParse, diff --git a/pywr-schema/src/data_tables/scalar.rs b/pywr-schema/src/data_tables/scalar.rs index 3a3a2358..2f0bc643 100644 --- a/pywr-schema/src/data_tables/scalar.rs +++ b/pywr-schema/src/data_tables/scalar.rs @@ -66,12 +66,12 @@ where T: Copy, { fn get_scalar(&self, index: &[&str]) -> Result { - if index.len() == 3 { + if index.len() == 2 { // I think this copies the strings and is not very efficient. let k = (index[0].to_string(), index[1].to_string()); self.values.get(&k).ok_or(TableError::EntryNotFound).copied() } else { - Err(TableError::WrongKeySize(3, index.len())) + Err(TableError::WrongKeySize(2, index.len())) } } } diff --git a/pywr-schema/src/metric_sets/mod.rs b/pywr-schema/src/metric_sets/mod.rs index 7a9a294a..1fc8ab16 100644 --- a/pywr-schema/src/metric_sets/mod.rs +++ b/pywr-schema/src/metric_sets/mod.rs @@ -1,13 +1,23 @@ use crate::error::SchemaError; use crate::model::PywrNetwork; +use crate::nodes::NodeAttribute; use serde::{Deserialize, Serialize}; +use std::num::NonZeroUsize; /// Output metrics that can be recorded from a model run. #[derive(Deserialize, Serialize, Clone)] -#[serde(untagged)] +#[serde(tag = "type")] pub enum OutputMetric { /// Output the default metric for a node. - NodeName(String), + Default { + node: String, + }, + Deficit { + node: String, + }, + Parameter { + name: String, + }, } impl OutputMetric { @@ -17,26 +27,39 @@ impl OutputMetric { schema: &PywrNetwork, ) -> Result { match self { - OutputMetric::NodeName(node_name) => { + OutputMetric::Default { node } => { // Get the node from the schema; not the model itself let node = schema - .get_node_by_name(node_name) - .ok_or_else(|| SchemaError::NodeNotFound(node_name.to_string()))?; + .get_node_by_name(node) + .ok_or_else(|| SchemaError::NodeNotFound(node.to_string()))?; // Create and return the node's default metric node.create_metric(network, None) } + OutputMetric::Deficit { node } => { + // Get the node from the schema; not the model itself + let node = schema + .get_node_by_name(node) + .ok_or_else(|| SchemaError::NodeNotFound(node.to_string()))?; + // Create and return the metric + node.create_metric(network, Some(NodeAttribute::Deficit)) + } + OutputMetric::Parameter { name } => { + let parameter_idx = network.get_parameter_index_by_name(name)?; + Ok(pywr_core::metric::Metric::ParameterValue(parameter_idx)) + } } } } /// Aggregation function to apply over metric values. #[derive(serde::Deserialize, serde::Serialize, Debug, Copy, Clone)] -#[serde(rename_all = "lowercase")] +#[serde(tag = "type")] pub enum MetricAggFunc { Sum, Max, Min, Mean, + CountNonZero, } impl From for pywr_core::recorders::AggregationFunction { @@ -46,15 +69,17 @@ impl From for pywr_core::recorders::AggregationFunction { MetricAggFunc::Max => pywr_core::recorders::AggregationFunction::Max, MetricAggFunc::Min => pywr_core::recorders::AggregationFunction::Min, MetricAggFunc::Mean => pywr_core::recorders::AggregationFunction::Mean, + MetricAggFunc::CountNonZero => pywr_core::recorders::AggregationFunction::CountNonZero, } } } #[derive(serde::Deserialize, serde::Serialize, Debug, Copy, Clone)] -#[serde(rename_all = "lowercase")] +#[serde(tag = "type")] pub enum MetricAggFrequency { Monthly, Annual, + Days { days: NonZeroUsize }, } impl From for pywr_core::recorders::AggregationFrequency { @@ -62,6 +87,7 @@ impl From for pywr_core::recorders::AggregationFrequency { match value { MetricAggFrequency::Monthly => pywr_core::recorders::AggregationFrequency::Monthly, MetricAggFrequency::Annual => pywr_core::recorders::AggregationFrequency::Annual, + MetricAggFrequency::Days { days } => pywr_core::recorders::AggregationFrequency::Days(days), } } } diff --git a/pywr-schema/src/nodes/core.rs b/pywr-schema/src/nodes/core.rs index d84bef0b..3f2d3a3f 100644 --- a/pywr-schema/src/nodes/core.rs +++ b/pywr-schema/src/nodes/core.rs @@ -328,7 +328,7 @@ impl OutputNode { pub fn create_metric( &self, - network: &pywr_core::network::Network, + network: &mut pywr_core::network::Network, attribute: Option, ) -> Result { // Use the default attribute if none is specified @@ -338,6 +338,11 @@ impl OutputNode { let metric = match attr { NodeAttribute::Inflow => Metric::NodeInFlow(idx), + NodeAttribute::Deficit => { + let dm = DerivedMetric::NodeInFlowDeficit(idx); + let dm_idx = network.add_derived_metric(dm); + Metric::DerivedMetric(dm_idx) + } _ => { return Err(SchemaError::NodeAttributeNotSupported { ty: "OutputNode".to_string(), diff --git a/pywr-schema/src/nodes/mod.rs b/pywr-schema/src/nodes/mod.rs index 62e6aa02..f55a712b 100644 --- a/pywr-schema/src/nodes/mod.rs +++ b/pywr-schema/src/nodes/mod.rs @@ -86,6 +86,7 @@ pub enum NodeAttribute { Volume, ProportionalVolume, Loss, + Deficit, } pub struct NodeBuilder { diff --git a/pywr-schema/src/outputs/memory.rs b/pywr-schema/src/outputs/memory.rs new file mode 100644 index 00000000..12ee13a3 --- /dev/null +++ b/pywr-schema/src/outputs/memory.rs @@ -0,0 +1,80 @@ +use crate::metric_sets::MetricAggFunc; +use crate::SchemaError; +use pywr_core::recorders::MemoryRecorder; + +#[derive(serde::Deserialize, serde::Serialize, Debug, Clone)] +pub struct MemoryAggregation { + time: Option, + scenario: Option, + metric: Option, +} + +impl From for pywr_core::recorders::Aggregation { + fn from(value: MemoryAggregation) -> Self { + pywr_core::recorders::Aggregation::new( + value.time.map(|f| f.into()), + value.scenario.map(|f| f.into()), + value.metric.map(|f| f.into()), + ) + } +} + +#[derive(serde::Deserialize, serde::Serialize, Debug, Clone)] +pub struct MemoryOutput { + name: String, + metric_set: String, + aggregation: MemoryAggregation, +} + +impl MemoryOutput { + pub fn add_to_model(&self, network: &mut pywr_core::network::Network) -> Result<(), SchemaError> { + let metric_set_idx = network.get_metric_set_index_by_name(&self.metric_set)?; + let recorder = MemoryRecorder::new(&self.name, metric_set_idx, self.aggregation.clone().into()); + + network.add_recorder(Box::new(recorder))?; + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use crate::PywrModel; + use float_cmp::assert_approx_eq; + use pywr_core::solvers::{ClpSolver, ClpSolverSettings}; + use std::str::FromStr; + use tempfile::TempDir; + + fn memory1_str() -> &'static str { + include_str!("../test_models/memory1.json") + } + + #[test] + fn test_schema() { + let data = memory1_str(); + let schema = PywrModel::from_str(data).unwrap(); + + assert_eq!(schema.network.nodes.len(), 3); + assert_eq!(schema.network.edges.len(), 2); + assert!(schema.network.outputs.is_some_and(|o| o.len() == 1)); + } + + #[test] + fn test_run() { + let data = memory1_str(); + let schema = PywrModel::from_str(data).unwrap(); + + let temp_dir = TempDir::new().unwrap(); + + let model = schema.build_model(None, Some(temp_dir.path())).unwrap(); + + let recorder_states = model.run::(&ClpSolverSettings::default()).unwrap(); + + let result = model + .network() + .get_aggregated_value("outputs", &recorder_states) + .expect("No results found"); + + assert_approx_eq!(f64, result, 91.0); + } +} diff --git a/pywr-schema/src/outputs/mod.rs b/pywr-schema/src/outputs/mod.rs index 3363cb6e..00410f53 100644 --- a/pywr-schema/src/outputs/mod.rs +++ b/pywr-schema/src/outputs/mod.rs @@ -1,9 +1,11 @@ mod csv; mod hdf; +mod memory; pub use self::csv::CsvOutput; use crate::error::SchemaError; pub use hdf::Hdf5Output; +pub use memory::MemoryOutput; use std::path::Path; #[derive(serde::Deserialize, serde::Serialize, Debug, Clone)] @@ -11,6 +13,7 @@ use std::path::Path; pub enum Output { CSV(CsvOutput), HDF5(Hdf5Output), + Memory(MemoryOutput), } impl Output { @@ -22,6 +25,7 @@ impl Output { match self { Self::CSV(o) => o.add_to_model(network, output_path), Self::HDF5(o) => o.add_to_model(network, output_path), + Self::Memory(o) => o.add_to_model(network), } } } diff --git a/pywr-schema/src/test_models/csv1.json b/pywr-schema/src/test_models/csv1.json index d971866a..4e612d2c 100644 --- a/pywr-schema/src/test_models/csv1.json +++ b/pywr-schema/src/test_models/csv1.json @@ -51,7 +51,10 @@ { "name": "nodes", "metrics": [ - "demand1" + { + "type": "Default", + "node": "demand1" + } ] } ], diff --git a/pywr-schema/src/test_models/csv2.json b/pywr-schema/src/test_models/csv2.json index 728ed6c2..5fd64c8f 100644 --- a/pywr-schema/src/test_models/csv2.json +++ b/pywr-schema/src/test_models/csv2.json @@ -50,12 +50,19 @@ "metric_sets": [ { "name": "nodes", - "aggregator": { - "freq": "monthly", - "func": "mean" - }, + "aggregator": { + "freq": { + "type": "Monthly" + }, + "func": { + "type": "Mean" + } + }, "metrics": [ - "demand1" + { + "type": "Default", + "node": "demand1" + } ] } ], diff --git a/pywr-schema/src/test_models/hdf1.json b/pywr-schema/src/test_models/hdf1.json index 95a53771..1a906516 100644 --- a/pywr-schema/src/test_models/hdf1.json +++ b/pywr-schema/src/test_models/hdf1.json @@ -51,7 +51,10 @@ { "name": "nodes", "metrics": [ - "demand1" + { + "type": "Default", + "node": "demand1" + } ] } ], diff --git a/pywr-schema/src/test_models/memory1.json b/pywr-schema/src/test_models/memory1.json new file mode 100644 index 00000000..bc9a6ac3 --- /dev/null +++ b/pywr-schema/src/test_models/memory1.json @@ -0,0 +1,97 @@ +{ + "metadata": { + "title": "Simple 1", + "description": "A very simple example.", + "minimum_version": "0.1" + }, + "timestepper": { + "start": "2015-01-01", + "end": "2015-12-31", + "timestep": 1 + }, + "network": { + "nodes": [ + { + "name": "supply1", + "type": "Input", + "max_flow": 15 + }, + { + "name": "link1", + "type": "Link" + }, + { + "name": "demand1", + "type": "Output", + "max_flow": { + "type": "Parameter", + "name": "demand" + }, + "cost": -10 + } + ], + "edges": [ + { + "from_node": "supply1", + "to_node": "link1" + }, + { + "from_node": "link1", + "to_node": "demand1" + } + ], + "parameters": [ + { + "name": "demand", + "type": "Constant", + "value": 10.0 + } + ], + "metric_sets": [ + { + "name": "nodes", + "aggregator": { + "freq": { + "type": "Annual" + }, + "func": { + "type": "CountNonZero" + }, + "child": { + "freq": { + "type": "Days", + "days": 4 + }, + "func": { + "type": "Min" + } + } + }, + "metrics": [ + { + "type": "Default", + "node": "demand1" + } + ] + } + ], + "outputs": [ + { + "name": "outputs", + "type": "Memory", + "metric_set": "nodes", + "aggregation": { + "time": { + "type": "CountNonZero" + }, + "metrics": { + "type": "Max" + }, + "scenario": { + "type": "Sum" + } + } + } + ] + } +}