diff --git a/pywr-core/src/models/mod.rs b/pywr-core/src/models/mod.rs index ed6ebb3b..a2140206 100644 --- a/pywr-core/src/models/mod.rs +++ b/pywr-core/src/models/mod.rs @@ -7,6 +7,7 @@ use crate::PywrError; pub use multi::{MultiNetworkModel, MultiNetworkTransferIndex}; pub use simple::{Model, ModelState}; +#[derive(Debug)] pub struct ModelDomain { time: TimeDomain, scenarios: ScenarioDomain, diff --git a/pywr-core/src/recorders/metric_set.rs b/pywr-core/src/recorders/metric_set.rs index aeee6b07..39de4afe 100644 --- a/pywr-core/src/recorders/metric_set.rs +++ b/pywr-core/src/recorders/metric_set.rs @@ -109,11 +109,27 @@ impl MetricSet { .as_mut() .expect("Aggregation state expected for metric set with aggregator!"); - let agg_values = values - .into_iter() - .zip(aggregation_states.iter_mut()) - .map(|(value, current_state)| aggregator.append_value(current_state, value)) - .collect::>>(); + // Collect any aggregated values. This will remain empty if the aggregator yields + // no values. However, if there are values we will expect the same number of aggregated + // values as the input values / metrics. + let mut agg_values = Vec::with_capacity(values.len()); + // Use a for loop instead of using an iterator because we need to execute the + // `append_value` method on all aggregators. + for (value, current_state) in values.iter().zip(aggregation_states.iter_mut()) { + if let Some(agg_value) = aggregator.append_value(current_state, *value) { + agg_values.push(agg_value); + } + } + + let agg_values = if agg_values.is_empty() { + None + } else if agg_values.len() == values.len() { + Some(agg_values) + } else { + // This should never happen because the aggregator should either yield no values + // or the same number of values as the input metrics. + unreachable!("Some values were aggregated and some were not!"); + }; internal_state.current_values = agg_values; } else { diff --git a/pywr-core/src/scenario.rs b/pywr-core/src/scenario.rs index 4a48a760..698cac12 100644 --- a/pywr-core/src/scenario.rs +++ b/pywr-core/src/scenario.rs @@ -31,6 +31,10 @@ pub struct ScenarioGroupCollection { } impl ScenarioGroupCollection { + pub fn new(groups: Vec) -> Self { + Self { groups } + } + /// Number of [`ScenarioGroup`]s in the collection. pub fn len(&self) -> usize { self.groups.len() @@ -105,6 +109,7 @@ impl ScenarioIndex { } } +#[derive(Debug)] pub struct ScenarioDomain { scenario_indices: Vec, scenario_group_names: Vec, diff --git a/pywr-core/src/timestep.rs b/pywr-core/src/timestep.rs index ccaf96b7..55adf34d 100644 --- a/pywr-core/src/timestep.rs +++ b/pywr-core/src/timestep.rs @@ -54,10 +54,15 @@ impl PywrDuration { } } - // Returns the fractional number of days in the duration. + /// Returns the fractional number of days in the duration. pub fn fractional_days(&self) -> f64 { self.0.num_seconds() as f64 / SECS_IN_DAY as f64 } + + /// Returns the number of nanoseconds in the duration. + pub fn whole_nanoseconds(&self) -> Option { + self.0.num_nanoseconds() + } } type TimestepIndex = usize; @@ -187,6 +192,7 @@ impl Timestepper { } /// The time domain that a model will be simulated over. +#[derive(Debug)] pub struct TimeDomain { timesteps: Vec, } @@ -209,6 +215,14 @@ impl TimeDomain { self.timesteps.len() } + pub fn first_timestep(&self) -> &Timestep { + self.timesteps.first().expect("No time-steps defined.") + } + + pub fn last_timestep(&self) -> &Timestep { + self.timesteps.last().expect("No time-steps defined.") + } + pub fn is_empty(&self) -> bool { self.timesteps.is_empty() } diff --git a/pywr-python/tests/models/aggregated-node1/model.json b/pywr-python/tests/models/aggregated-node1/model.json index 4c69a449..1ddd2c59 100644 --- a/pywr-python/tests/models/aggregated-node1/model.json +++ b/pywr-python/tests/models/aggregated-node1/model.json @@ -13,8 +13,12 @@ "name": "input1", "type": "Input", "max_flow": { - "type": "Parameter", - "name": "inflow" + "type": "Timeseries", + "name": "inflow", + "columns": { + "type": "Column", + "name": "inflow" + } } }, { @@ -69,17 +73,14 @@ "name": "demand", "type": "Constant", "value": 10.0 - }, + } + ], + "timeseries": [ { "name": "inflow", - "type": "DataFrame", - "url": "inflow.csv", - "pandas_kwargs": { - "index_col": 0 - }, - "columns": { - "type": "Column", - "name": "inflow" + "provider": { + "type": "Polars", + "url": "inflow.csv" } } ], diff --git a/pywr-python/tests/models/piecewise-link1/model.json b/pywr-python/tests/models/piecewise-link1/model.json index 8b067e54..94f0d359 100644 --- a/pywr-python/tests/models/piecewise-link1/model.json +++ b/pywr-python/tests/models/piecewise-link1/model.json @@ -13,12 +13,20 @@ "name": "input1", "type": "Input", "max_flow": { - "type": "Parameter", - "name": "inflow" + "type": "Timeseries", + "name": "inflow", + "columns": { + "type": "Column", + "name": "inflow" + } }, "min_flow": { - "type": "Parameter", - "name": "inflow" + "type": "Timeseries", + "name": "inflow", + "columns": { + "type": "Column", + "name": "inflow" + } } }, { @@ -74,17 +82,14 @@ "name": "demand", "type": "Constant", "value": 10.0 - }, + } + ], + "timeseries": [ { "name": "inflow", - "type": "DataFrame", - "url": "inflow.csv", - "pandas_kwargs": { - "index_col": 0 - }, - "columns": { - "type": "Column", - "name": "inflow" + "provider": { + "type": "Polars", + "url": "inflow.csv" } } ], diff --git a/pywr-python/tests/models/simple-custom-parameter/model.json b/pywr-python/tests/models/simple-custom-parameter/model.json index 5b709a1c..c2da5aa0 100644 --- a/pywr-python/tests/models/simple-custom-parameter/model.json +++ b/pywr-python/tests/models/simple-custom-parameter/model.json @@ -13,8 +13,12 @@ "name": "input1", "type": "Input", "max_flow": { - "type": "Parameter", - "name": "inflow" + "type": "Timeseries", + "name": "inflow", + "columns": { + "type": "Column", + "name": "inflow" + } } }, { @@ -53,17 +57,14 @@ "kwargs": { "multiplier": 2.0 } - }, + } + ], + "timeseries": [ { "name": "inflow", - "type": "DataFrame", - "url": "inflow.csv", - "pandas_kwargs": { - "index_col": 0 - }, - "columns": { - "type": "Column", - "name": "inflow" + "provider": { + "type": "Polars", + "url": "inflow.csv" } } ], diff --git a/pywr-python/tests/models/simple-timeseries/model.json b/pywr-python/tests/models/simple-timeseries/model.json index 0a992c1e..90f7e9cc 100644 --- a/pywr-python/tests/models/simple-timeseries/model.json +++ b/pywr-python/tests/models/simple-timeseries/model.json @@ -13,8 +13,12 @@ "name": "input1", "type": "Input", "max_flow": { - "type": "Parameter", - "name": "inflow" + "type": "Timeseries", + "name": "inflow", + "columns": { + "type": "Column", + "name": "inflow" + } } }, { @@ -46,17 +50,14 @@ "name": "demand", "type": "Constant", "value": 10.0 - }, + } + ], + "timeseries": [ { "name": "inflow", - "type": "DataFrame", - "url": "inflow.csv", - "pandas_kwargs": { - "index_col": 0 - }, - "columns": { - "type": "Column", - "name": "inflow" + "provider": { + "type": "Polars", + "url": "inflow.csv" } } ], diff --git a/pywr-python/tests/models/simple-wasm/model.json b/pywr-python/tests/models/simple-wasm/model.json index cdb1bbe7..99f105d7 100644 --- a/pywr-python/tests/models/simple-wasm/model.json +++ b/pywr-python/tests/models/simple-wasm/model.json @@ -10,8 +10,12 @@ "name": "input1", "type": "input", "max_flow": { - "type": "Parameter", - "name": "inflow" + "type": "Timeseries", + "name": "inflow", + "columns": { + "type": "Column", + "name": "inflow" + } } }, { @@ -64,6 +68,15 @@ "url": "inflow.csv.gz", "column": "inflow" } + ], + "timeseries": [ + { + "name": "inflow", + "provider": { + "type": "Polars", + "url": "inflow.csv.gz" + } + } ] } } diff --git a/pywr-schema/Cargo.toml b/pywr-schema/Cargo.toml index aa6a529b..4f98c07d 100644 --- a/pywr-schema/Cargo.toml +++ b/pywr-schema/Cargo.toml @@ -15,7 +15,7 @@ categories = ["science", "simulation"] [dependencies] svgbobdoc = { version = "0.3.0", features = ["enable"] } -polars = { workspace = true } +polars = { workspace = true, features = ["csv", "diff", "dtype-datetime", "dtype-date", "dynamic_group_by"] } pyo3 = { workspace = true } pyo3-polars = { workspace = true } strum = "0.26" diff --git a/pywr-schema/src/error.rs b/pywr-schema/src/error.rs index 8bf2281c..a458878d 100644 --- a/pywr-schema/src/error.rs +++ b/pywr-schema/src/error.rs @@ -1,5 +1,6 @@ use crate::data_tables::TableError; use crate::nodes::NodeAttribute; +use crate::timeseries::TimeseriesError; use pyo3::exceptions::PyRuntimeError; use pyo3::PyErr; use thiserror::Error; @@ -54,6 +55,8 @@ pub enum SchemaError { InvalidRollingWindow { name: String }, #[error("Failed to load parameter {name}: {error}")] LoadParameter { name: String, error: String }, + #[error("Timeseries error: {0}")] + Timeseries(#[from] TimeseriesError), } impl From for PyErr { @@ -103,4 +106,10 @@ pub enum ConversionError { UnparseableDate(String), #[error("Chrono out of range error: {0}")] OutOfRange(#[from] chrono::OutOfRange), + #[error("The dataframe parameters '{0}' defines both a column and a scenario attribute. Only 1 is allowed.")] + AmbiguousColumnAndScenario(String), + #[error("The dataframe parameters '{0}' defines both a column and a scenario. Only 1 is allowed.")] + MissingColumnOrScenario(String), + #[error("Unable to create a timeseries for file: '{0}'. No name was found.")] + MissingTimeseriesName(String), } diff --git a/pywr-schema/src/lib.rs b/pywr-schema/src/lib.rs index dbe4c3ec..0413d44e 100644 --- a/pywr-schema/src/lib.rs +++ b/pywr-schema/src/lib.rs @@ -12,6 +12,7 @@ pub mod model; pub mod nodes; pub mod outputs; pub mod parameters; +pub mod timeseries; pub use error::{ConversionError, SchemaError}; pub use model::PywrModel; diff --git a/pywr-schema/src/model.rs b/pywr-schema/src/model.rs index 69d9fe7a..d4a9262f 100644 --- a/pywr-schema/src/model.rs +++ b/pywr-schema/src/model.rs @@ -4,8 +4,10 @@ use super::parameters::Parameter; use crate::data_tables::{DataTable, LoadedTableCollection}; use crate::error::{ConversionError, SchemaError}; use crate::metric_sets::MetricSet; +use crate::nodes::NodeAndTimeseries; use crate::outputs::Output; -use crate::parameters::{MetricFloatReference, TryIntoV2Parameter}; +use crate::parameters::{convert_parameter_v1_to_v2, MetricFloatReference}; +use crate::timeseries::{convert_from_v1_data, LoadedTimeseriesCollection, Timeseries}; use chrono::{NaiveDate, NaiveDateTime, NaiveTime}; use pywr_core::models::ModelDomain; use pywr_core::timestep::TimestepDuration; @@ -139,6 +141,7 @@ pub struct LoadArgs<'a> { pub schema: &'a PywrNetwork, pub domain: &'a ModelDomain, pub tables: &'a LoadedTableCollection, + pub timeseries: &'a LoadedTimeseriesCollection, pub data_path: Option<&'a Path>, pub inter_network_transfers: &'a [PywrMultiNetworkTransfer], } @@ -149,6 +152,7 @@ pub struct PywrNetwork { pub edges: Vec, pub parameters: Option>, pub tables: Option>, + pub timeseries: Option>, pub metric_sets: Option>, pub outputs: Option>, } @@ -193,12 +197,25 @@ impl PywrNetwork { Ok(LoadedTableCollection::from_schema(self.tables.as_deref(), data_path)?) } + pub fn load_timeseries( + &self, + domain: &ModelDomain, + data_path: Option<&Path>, + ) -> Result { + Ok(LoadedTimeseriesCollection::from_schema( + self.timeseries.as_deref(), + domain, + data_path, + )?) + } + pub fn build_network( &self, domain: &ModelDomain, data_path: Option<&Path>, output_path: Option<&Path>, tables: &LoadedTableCollection, + timeseries: &LoadedTimeseriesCollection, inter_network_transfers: &[PywrMultiNetworkTransfer], ) -> Result { let mut network = pywr_core::network::Network::default(); @@ -207,6 +224,7 @@ impl PywrNetwork { schema: self, domain, tables, + timeseries, data_path, inter_network_transfers, }; @@ -401,9 +419,11 @@ impl PywrModel { let domain = ModelDomain::from(timestepper, scenario_collection)?; let tables = self.network.load_tables(data_path)?; + let timeseries = self.network.load_timeseries(&domain, data_path)?; + let network = self .network - .build_network(&domain, data_path, output_path, &tables, &[])?; + .build_network(&domain, data_path, output_path, &tables, ×eries, &[])?; let model = pywr_core::models::Model::new(domain, network); @@ -430,8 +450,10 @@ impl PywrModel { Timestepper::default() }); - let nodes = v1 + // Extract nodes and any timeseries data from the v1 nodes + let nodes_and_ts: Vec = v1 .nodes + .clone() .into_iter() .filter_map(|n| match n.try_into() { Ok(n) => Some(n), @@ -442,22 +464,29 @@ impl PywrModel { }) .collect::>(); + let mut ts_data = nodes_and_ts + .iter() + .filter_map(|n| n.timeseries.clone()) + .flatten() + .collect::>(); + + let nodes = nodes_and_ts.into_iter().map(|n| n.node).collect::>(); + let edges = v1.edges.into_iter().map(|e| e.into()).collect(); let parameters = if let Some(v1_parameters) = v1.parameters { let mut unnamed_count: usize = 0; - Some( - v1_parameters - .into_iter() - .filter_map(|p| match p.try_into_v2_parameter(None, &mut unnamed_count) { - Ok(p) => Some(p), - Err(e) => { - errors.push(e); - None - } - }) - .collect::>(), - ) + let (parameters, param_ts_data) = + convert_parameter_v1_to_v2(v1_parameters, &mut unnamed_count, &mut errors); + ts_data.extend(param_ts_data); + Some(parameters) + } else { + None + }; + + let timeseries = if !ts_data.is_empty() { + let ts = convert_from_v1_data(ts_data, &v1.tables, &mut errors); + Some(ts) } else { None }; @@ -471,6 +500,7 @@ impl PywrModel { edges, parameters, tables, + timeseries, metric_sets, outputs, }; @@ -603,14 +633,15 @@ impl PywrMultiNetworkModel { let domain = ModelDomain::from(timestepper, scenario_collection)?; let mut networks = Vec::with_capacity(self.networks.len()); let mut inter_network_transfers = Vec::new(); - let mut schemas: Vec<(PywrNetwork, LoadedTableCollection)> = Vec::with_capacity(self.networks.len()); + let mut schemas: Vec<(PywrNetwork, LoadedTableCollection, LoadedTimeseriesCollection)> = + Vec::with_capacity(self.networks.len()); // First load all the networks // These will contain any parameters that are referenced by the inter-model transfers // Because of potential circular references, we need to load all the networks first. for network_entry in &self.networks { // Load the network itself - let (network, schema, tables) = match &network_entry.network { + let (network, schema, tables, timeseries) = match &network_entry.network { PywrNetworkRef::Path(path) => { let pth = if let Some(dp) = data_path { if path.is_relative() { @@ -624,31 +655,35 @@ impl PywrMultiNetworkModel { let network_schema = PywrNetwork::from_path(pth)?; let tables = network_schema.load_tables(data_path)?; + let timeseries = network_schema.load_timeseries(&domain, data_path)?; let net = network_schema.build_network( &domain, data_path, output_path, &tables, + ×eries, &network_entry.transfers, )?; - (net, network_schema, tables) + (net, network_schema, tables, timeseries) } PywrNetworkRef::Inline(network_schema) => { let tables = network_schema.load_tables(data_path)?; + let timeseries = network_schema.load_timeseries(&domain, data_path)?; let net = network_schema.build_network( &domain, data_path, output_path, &tables, + ×eries, &network_entry.transfers, )?; - (net, network_schema.clone(), tables) + (net, network_schema.clone(), tables, timeseries) } }; - schemas.push((schema, tables)); + schemas.push((schema, tables, timeseries)); networks.push((network_entry.name.clone(), network)); } @@ -670,12 +705,13 @@ impl PywrMultiNetworkModel { .ok_or_else(|| SchemaError::NetworkNotFound(transfer.from_network.clone()))?; // The transfer metric will fail to load if it is defined as an inter-model transfer itself. - let (from_schema, from_tables) = &schemas[from_network_idx]; + let (from_schema, from_tables, from_timeseries) = &schemas[from_network_idx]; let args = LoadArgs { schema: from_schema, domain: &domain, tables: from_tables, + timeseries: from_timeseries, data_path, inter_network_transfers: &[], }; diff --git a/pywr-schema/src/nodes/annual_virtual_storage.rs b/pywr-schema/src/nodes/annual_virtual_storage.rs index df4ee21a..bafec164 100644 --- a/pywr-schema/src/nodes/annual_virtual_storage.rs +++ b/pywr-schema/src/nodes/annual_virtual_storage.rs @@ -11,7 +11,7 @@ use pywr_schema_macros::PywrNode; use pywr_v1_schema::nodes::AnnualVirtualStorageNode as AnnualVirtualStorageNodeV1; use std::collections::HashMap; -#[derive(serde::Deserialize, serde::Serialize, Clone)] +#[derive(serde::Deserialize, serde::Serialize, Clone, Debug)] pub struct AnnualReset { pub day: u8, pub month: chrono::Month, @@ -28,7 +28,7 @@ impl Default for AnnualReset { } } -#[derive(serde::Deserialize, serde::Serialize, Clone, Default, PywrNode)] +#[derive(serde::Deserialize, serde::Serialize, Clone, Default, Debug, PywrNode)] pub struct AnnualVirtualStorageNode { #[serde(flatten)] pub meta: NodeMeta, diff --git a/pywr-schema/src/nodes/core.rs b/pywr-schema/src/nodes/core.rs index d0e6f157..6bdb0370 100644 --- a/pywr-schema/src/nodes/core.rs +++ b/pywr-schema/src/nodes/core.rs @@ -13,7 +13,7 @@ use pywr_v1_schema::nodes::{ }; use std::collections::HashMap; -#[derive(serde::Deserialize, serde::Serialize, Clone, Default, PywrNode)] +#[derive(serde::Deserialize, serde::Serialize, Clone, Default, Debug, PywrNode)] pub struct InputNode { #[serde(flatten)] pub meta: NodeMeta, @@ -116,7 +116,7 @@ impl TryFrom for InputNode { } } -#[derive(serde::Deserialize, serde::Serialize, Clone, Default, PywrNode)] +#[derive(serde::Deserialize, serde::Serialize, Clone, Default, Debug, PywrNode)] pub struct LinkNode { #[serde(flatten)] pub meta: NodeMeta, @@ -219,7 +219,7 @@ impl TryFrom for LinkNode { } } -#[derive(serde::Deserialize, serde::Serialize, Clone, Default, PywrNode)] +#[derive(serde::Deserialize, serde::Serialize, Clone, Default, Debug, PywrNode)] pub struct OutputNode { #[serde(flatten)] pub meta: NodeMeta, @@ -545,7 +545,7 @@ impl TryFrom for StorageNode { /// ``` /// )] -#[derive(serde::Deserialize, serde::Serialize, Clone, Default, PywrNode)] +#[derive(serde::Deserialize, serde::Serialize, Clone, Default, Debug, PywrNode)] pub struct CatchmentNode { #[serde(flatten)] pub meta: NodeMeta, @@ -634,14 +634,14 @@ impl TryFrom for CatchmentNode { } } -#[derive(serde::Deserialize, serde::Serialize, Clone)] +#[derive(serde::Deserialize, serde::Serialize, Clone, Debug)] #[serde(tag = "type")] pub enum Factors { Proportion { factors: Vec }, Ratio { factors: Vec }, } -#[derive(serde::Deserialize, serde::Serialize, Clone, Default, PywrNode)] +#[derive(serde::Deserialize, serde::Serialize, Clone, Default, Debug, PywrNode)] pub struct AggregatedNode { #[serde(flatten)] pub meta: NodeMeta, @@ -779,7 +779,7 @@ impl TryFrom for AggregatedNode { } } -#[derive(serde::Deserialize, serde::Serialize, Clone, Default, PywrNode)] +#[derive(serde::Deserialize, serde::Serialize, Clone, Default, Debug, PywrNode)] pub struct AggregatedStorageNode { #[serde(flatten)] pub meta: NodeMeta, diff --git a/pywr-schema/src/nodes/delay.rs b/pywr-schema/src/nodes/delay.rs index 9817a41c..6591446d 100644 --- a/pywr-schema/src/nodes/delay.rs +++ b/pywr-schema/src/nodes/delay.rs @@ -26,7 +26,7 @@ use std::collections::HashMap; /// ``` /// )] -#[derive(serde::Deserialize, serde::Serialize, Clone, Default, PywrNode)] +#[derive(serde::Deserialize, serde::Serialize, Clone, Default, Debug, PywrNode)] pub struct DelayNode { #[serde(flatten)] pub meta: NodeMeta, diff --git a/pywr-schema/src/nodes/loss_link.rs b/pywr-schema/src/nodes/loss_link.rs index f90298e2..a82cebfb 100644 --- a/pywr-schema/src/nodes/loss_link.rs +++ b/pywr-schema/src/nodes/loss_link.rs @@ -24,7 +24,7 @@ use std::collections::HashMap; /// ``` /// )] -#[derive(serde::Deserialize, serde::Serialize, Clone, Default, PywrNode)] +#[derive(serde::Deserialize, serde::Serialize, Clone, Default, Debug, PywrNode)] pub struct LossLinkNode { #[serde(flatten)] pub meta: NodeMeta, diff --git a/pywr-schema/src/nodes/mod.rs b/pywr-schema/src/nodes/mod.rs index 54f21f02..6a393dba 100644 --- a/pywr-schema/src/nodes/mod.rs +++ b/pywr-schema/src/nodes/mod.rs @@ -20,7 +20,7 @@ pub use crate::nodes::core::{ pub use crate::nodes::delay::DelayNode; pub use crate::nodes::river::RiverNode; use crate::nodes::rolling_virtual_storage::RollingVirtualStorageNode; -use crate::parameters::DynamicFloatValue; +use crate::parameters::{DynamicFloatValue, TimeseriesV1Data}; pub use annual_virtual_storage::AnnualVirtualStorageNode; pub use loss_link::LossLinkNode; pub use monthly_virtual_storage::MonthlyVirtualStorageNode; @@ -30,6 +30,9 @@ use pywr_core::metric::MetricF64; use pywr_v1_schema::nodes::{ CoreNode as CoreNodeV1, Node as NodeV1, NodeMeta as NodeMetaV1, NodePosition as NodePositionV1, }; +use pywr_v1_schema::parameters::{ + CoreParameter as CoreParameterV1, Parameter as ParameterV1, ParameterValue as ParameterValueV1, ParameterValueType, +}; pub use river_gauge::RiverGaugeNode; pub use river_split_with_gauge::RiverSplitWithGaugeNode; use std::collections::HashMap; @@ -217,7 +220,7 @@ impl NodeBuilder { } } -#[derive(serde::Deserialize, serde::Serialize, Clone, EnumDiscriminants)] +#[derive(serde::Deserialize, serde::Serialize, Clone, EnumDiscriminants, Debug)] #[serde(tag = "type")] #[strum_discriminants(derive(Display, IntoStaticStr, EnumString, VariantNames))] // This creates a separate enum called `NodeType` that is available in this module. @@ -511,3 +514,243 @@ impl TryFrom> for Node { Ok(n) } } + +/// struct that acts as a container for a node and any associated timeseries data. +/// +/// v1 nodes may contain inline DataFrame parameters from which data needs to be extract +/// to created timeseries entries in the schema. +#[derive(Debug)] +pub struct NodeAndTimeseries { + pub node: Node, + pub timeseries: Option>, +} + +impl TryFrom for NodeAndTimeseries { + type Error = ConversionError; + + fn try_from(v1: NodeV1) -> Result { + let mut ts_vec = Vec::new(); + let mut unnamed_count: usize = 0; + + // extract timeseries data for all inline DataFame parameters included in the node. + for param_value in v1.parameters().values() { + ts_vec.extend(extract_timeseries(param_value, v1.name(), &mut unnamed_count)); + } + let timeseries = if ts_vec.is_empty() { None } else { Some(ts_vec) }; + + // Now convert the node to the v2 schema representation + let node = Node::try_from(v1)?; + Ok(Self { node, timeseries }) + } +} + +/// Extract timeseries data from a parameter value. +/// +/// If the parameter value is a DataFrame, then convert it to timeseries data. If it is another type then recursively +/// call the function on any inline parameters this parameter may contain to check for other dataframe parameters. +fn extract_timeseries( + param_value: &ParameterValueType, + name: &str, + unnamed_count: &mut usize, +) -> Vec { + let mut ts_vec = Vec::new(); + match param_value { + ParameterValueType::Single(param) => { + if let ParameterValueV1::Inline(p) = param { + if let ParameterV1::Core(CoreParameterV1::DataFrame(df_param)) = p.as_ref() { + let mut ts_data: TimeseriesV1Data = df_param.clone().into(); + if ts_data.name.is_none() { + // Because the parameter could contain multiple inline DataFrame parameters use the unnamed_count + // to create a unique name. + let name = format!("{}-p{}.timeseries", name, unnamed_count); + *unnamed_count += 1; + ts_data.name = Some(name); + } + ts_vec.push(ts_data); + } else { + // Not a dataframe parameter but the parameter might have child dataframe parameters. + // Update the name and call the function recursively on all child parameters. + let name = if p.name().is_none() { + let n = format!("{}-p{}", name, unnamed_count); + *unnamed_count += 1; + n + } else { + p.name().unwrap().to_string() + }; + for nested_param in p.parameters().values() { + ts_vec.extend(extract_timeseries(nested_param, &name, unnamed_count)); + } + } + } + } + ParameterValueType::List(params) => { + for param in params.iter() { + if let ParameterValueV1::Inline(p) = param { + if let ParameterV1::Core(CoreParameterV1::DataFrame(df_param)) = p.as_ref() { + let mut ts_data: TimeseriesV1Data = df_param.clone().into(); + if ts_data.name.is_none() { + // Because the parameter could contain multiple inline DataFrame parameters use the unnamed_count + // to create a unique name. + let name = format!("{}-p{}.timeseries", name, unnamed_count); + *unnamed_count += 1; + ts_data.name = Some(name); + } + ts_vec.push(ts_data); + } else { + // Not a dataframe parameter but the parameter might have child dataframe parameters. + // Update the name and call the function recursively on all child parameters. + let name = if p.name().is_none() { + let n = format!("{}-p{}", name, unnamed_count); + *unnamed_count += 1; + n + } else { + p.name().unwrap().to_string() + }; + for nested_param in p.parameters().values() { + ts_vec.extend(extract_timeseries(nested_param, &name, unnamed_count)); + } + } + } + } + } + }; + ts_vec +} + +#[cfg(test)] +mod tests { + use pywr_v1_schema::nodes::Node as NodeV1; + + use crate::{ + nodes::{Node, NodeAndTimeseries}, + parameters::{DynamicFloatValue, MetricFloatValue, Parameter}, + }; + + #[test] + fn test_ts_inline() { + let node_data = r#" + { + "name": "catchment1", + "type": "Input", + "max_flow": { + "type": "dataframe", + "url" : "timeseries1.csv", + "parse_dates": true, + "dayfirst": true, + "index_col": 0, + "column": "Data" + } + } + "#; + + let v1_node: NodeV1 = serde_json::from_str(node_data).unwrap(); + + let node_ts: NodeAndTimeseries = v1_node.try_into().unwrap(); + + let input_node = match node_ts.node { + Node::Input(n) => n, + _ => panic!("Expected InputNode"), + }; + + let expected_name = String::from("catchment1-p0.timeseries"); + + match input_node.max_flow { + Some(DynamicFloatValue::Dynamic(MetricFloatValue::Timeseries(ts))) => { + assert_eq!(ts.name(), &expected_name) + } + _ => panic!("Expected Timeseries"), + }; + + match node_ts.timeseries { + Some(ts) => { + assert_eq!(ts.len(), 1); + assert_eq!(ts.first().unwrap().name.as_ref().unwrap().as_str(), &expected_name); + } + None => panic!("Expected timeseries data"), + }; + } + + #[test] + fn test_ts_inline_nested() { + let node_data = r#" + { + "name": "catchment1", + "type": "Input", + "max_flow": { + "type": "aggregated", + "agg_func": "product", + "parameters": [ + { + "type": "constant", + "value": 0.9 + }, + { + "type": "dataframe", + "url" : "timeseries1.csv", + "parse_dates": true, + "dayfirst": true, + "index_col": 0, + "column": "Data" + }, + { + "type": "constant", + "value": 0.9 + }, + { + "type": "dataframe", + "url" : "timeseries2.csv", + "parse_dates": true, + "dayfirst": true, + "index_col": 0, + "column": "Data" + } + ] + } + } + "#; + + let v1_node: NodeV1 = serde_json::from_str(node_data).unwrap(); + + let node_ts: NodeAndTimeseries = v1_node.try_into().unwrap(); + + let input_node = match node_ts.node { + Node::Input(n) => n, + _ => panic!("Expected InputNode"), + }; + + let expected_name1 = String::from("catchment1-p0-p2.timeseries"); + let expected_name2 = String::from("catchment1-p0-p4.timeseries"); + + match input_node.max_flow { + Some(DynamicFloatValue::Dynamic(MetricFloatValue::InlineParameter { definition })) => match *definition { + Parameter::Aggregated(param) => { + assert_eq!(param.metrics.len(), 4); + match ¶m.metrics[1] { + DynamicFloatValue::Dynamic(MetricFloatValue::Timeseries(ts)) => { + assert_eq!(ts.name(), &expected_name1) + } + _ => panic!("Expected Timeseries"), + } + + match ¶m.metrics[3] { + DynamicFloatValue::Dynamic(MetricFloatValue::Timeseries(ts)) => { + assert_eq!(ts.name(), &expected_name2) + } + _ => panic!("Expected Timeseries"), + } + } + _ => panic!("Expected Aggregated parameter"), + }, + _ => panic!("Expected Timeseries"), + }; + + match node_ts.timeseries { + Some(ts) => { + assert_eq!(ts.len(), 2); + assert_eq!(ts[0].name.as_ref().unwrap().as_str(), &expected_name1); + assert_eq!(ts[1].name.as_ref().unwrap().as_str(), &expected_name2); + } + None => panic!("Expected timeseries data"), + }; + } +} diff --git a/pywr-schema/src/nodes/monthly_virtual_storage.rs b/pywr-schema/src/nodes/monthly_virtual_storage.rs index 210a4f7c..3e93c9a8 100644 --- a/pywr-schema/src/nodes/monthly_virtual_storage.rs +++ b/pywr-schema/src/nodes/monthly_virtual_storage.rs @@ -11,7 +11,7 @@ use pywr_schema_macros::PywrNode; use pywr_v1_schema::nodes::MonthlyVirtualStorageNode as MonthlyVirtualStorageNodeV1; use std::collections::HashMap; -#[derive(serde::Deserialize, serde::Serialize, Clone)] +#[derive(serde::Deserialize, serde::Serialize, Clone, Debug)] pub struct NumberOfMonthsReset { pub months: u8, } @@ -22,7 +22,7 @@ impl Default for NumberOfMonthsReset { } } -#[derive(serde::Deserialize, serde::Serialize, Clone, Default, PywrNode)] +#[derive(serde::Deserialize, serde::Serialize, Clone, Default, Debug, PywrNode)] pub struct MonthlyVirtualStorageNode { #[serde(flatten)] pub meta: NodeMeta, diff --git a/pywr-schema/src/nodes/piecewise_link.rs b/pywr-schema/src/nodes/piecewise_link.rs index b0e71295..d08e0775 100644 --- a/pywr-schema/src/nodes/piecewise_link.rs +++ b/pywr-schema/src/nodes/piecewise_link.rs @@ -7,7 +7,7 @@ use pywr_schema_macros::PywrNode; use pywr_v1_schema::nodes::PiecewiseLinkNode as PiecewiseLinkNodeV1; use std::collections::HashMap; -#[derive(serde::Deserialize, serde::Serialize, Clone)] +#[derive(serde::Deserialize, serde::Serialize, Clone, Debug)] pub struct PiecewiseLinkStep { pub max_flow: Option, pub min_flow: Option, @@ -36,7 +36,7 @@ pub struct PiecewiseLinkStep { /// ``` /// )] -#[derive(serde::Deserialize, serde::Serialize, Clone, Default, PywrNode)] +#[derive(serde::Deserialize, serde::Serialize, Clone, Default, Debug, PywrNode)] pub struct PiecewiseLinkNode { #[serde(flatten)] pub meta: NodeMeta, diff --git a/pywr-schema/src/nodes/piecewise_storage.rs b/pywr-schema/src/nodes/piecewise_storage.rs index 4a9bf86e..1fb6765d 100644 --- a/pywr-schema/src/nodes/piecewise_storage.rs +++ b/pywr-schema/src/nodes/piecewise_storage.rs @@ -9,7 +9,7 @@ use pywr_core::parameters::VolumeBetweenControlCurvesParameter; use pywr_schema_macros::PywrNode; use std::collections::HashMap; -#[derive(serde::Deserialize, serde::Serialize, Clone)] +#[derive(serde::Deserialize, serde::Serialize, Clone, Debug)] pub struct PiecewiseStore { pub control_curve: DynamicFloatValue, pub cost: Option, @@ -41,7 +41,7 @@ pub struct PiecewiseStore { /// ``` /// )] -#[derive(serde::Deserialize, serde::Serialize, Clone, Default, PywrNode)] +#[derive(serde::Deserialize, serde::Serialize, Clone, Default, Debug, PywrNode)] pub struct PiecewiseStorageNode { #[serde(flatten)] pub meta: NodeMeta, diff --git a/pywr-schema/src/nodes/river.rs b/pywr-schema/src/nodes/river.rs index f9d92c94..47fcddf7 100644 --- a/pywr-schema/src/nodes/river.rs +++ b/pywr-schema/src/nodes/river.rs @@ -6,7 +6,7 @@ use pywr_schema_macros::PywrNode; use pywr_v1_schema::nodes::LinkNode as LinkNodeV1; use std::collections::HashMap; -#[derive(serde::Deserialize, serde::Serialize, Clone, Default, PywrNode)] +#[derive(serde::Deserialize, serde::Serialize, Clone, Default, Debug, PywrNode)] pub struct RiverNode { #[serde(flatten)] pub meta: NodeMeta, diff --git a/pywr-schema/src/nodes/river_gauge.rs b/pywr-schema/src/nodes/river_gauge.rs index 8039ad06..16aba56a 100644 --- a/pywr-schema/src/nodes/river_gauge.rs +++ b/pywr-schema/src/nodes/river_gauge.rs @@ -22,7 +22,7 @@ use std::collections::HashMap; /// ``` /// )] -#[derive(serde::Deserialize, serde::Serialize, Clone, Default, PywrNode)] +#[derive(serde::Deserialize, serde::Serialize, Clone, Default, Debug, PywrNode)] pub struct RiverGaugeNode { #[serde(flatten)] pub meta: NodeMeta, diff --git a/pywr-schema/src/nodes/river_split_with_gauge.rs b/pywr-schema/src/nodes/river_split_with_gauge.rs index cd67b569..a86aa338 100644 --- a/pywr-schema/src/nodes/river_split_with_gauge.rs +++ b/pywr-schema/src/nodes/river_split_with_gauge.rs @@ -31,7 +31,7 @@ use std::collections::HashMap; /// ``` /// )] -#[derive(serde::Deserialize, serde::Serialize, Clone, Default, PywrNode)] +#[derive(serde::Deserialize, serde::Serialize, Clone, Default, Debug, PywrNode)] pub struct RiverSplitWithGaugeNode { #[serde(flatten)] pub meta: NodeMeta, diff --git a/pywr-schema/src/nodes/rolling_virtual_storage.rs b/pywr-schema/src/nodes/rolling_virtual_storage.rs index dd62f522..aa6de857 100644 --- a/pywr-schema/src/nodes/rolling_virtual_storage.rs +++ b/pywr-schema/src/nodes/rolling_virtual_storage.rs @@ -15,7 +15,7 @@ use std::num::NonZeroUsize; /// The length of the rolling window. /// /// This can be specified in either days or time-steps. -#[derive(serde::Deserialize, serde::Serialize, Clone)] +#[derive(serde::Deserialize, serde::Serialize, Clone, Debug)] pub enum RollingWindow { Days(NonZeroUsize), Timesteps(NonZeroUsize), @@ -60,7 +60,7 @@ impl RollingWindow { /// The rolling virtual storage node is useful for representing rolling licences. For example, a 30-day or 90-day /// licence on a water abstraction. /// -#[derive(serde::Deserialize, serde::Serialize, Clone, Default, PywrNode)] +#[derive(serde::Deserialize, serde::Serialize, Clone, Default, Debug, PywrNode)] pub struct RollingVirtualStorageNode { #[serde(flatten)] pub meta: NodeMeta, diff --git a/pywr-schema/src/nodes/virtual_storage.rs b/pywr-schema/src/nodes/virtual_storage.rs index b74aa1ed..be34ff57 100644 --- a/pywr-schema/src/nodes/virtual_storage.rs +++ b/pywr-schema/src/nodes/virtual_storage.rs @@ -11,7 +11,7 @@ use pywr_schema_macros::PywrNode; use pywr_v1_schema::nodes::VirtualStorageNode as VirtualStorageNodeV1; use std::collections::HashMap; -#[derive(serde::Deserialize, serde::Serialize, Clone, Default, PywrNode)] +#[derive(serde::Deserialize, serde::Serialize, Clone, Default, Debug, PywrNode)] pub struct VirtualStorageNode { #[serde(flatten)] pub meta: NodeMeta, diff --git a/pywr-schema/src/nodes/water_treatment_works.rs b/pywr-schema/src/nodes/water_treatment_works.rs index 038d18c0..49836252 100644 --- a/pywr-schema/src/nodes/water_treatment_works.rs +++ b/pywr-schema/src/nodes/water_treatment_works.rs @@ -35,7 +35,7 @@ use std::collections::HashMap; /// ``` /// )] -#[derive(serde::Deserialize, serde::Serialize, Clone, Default, PywrNode)] +#[derive(serde::Deserialize, serde::Serialize, Clone, Default, Debug, PywrNode)] pub struct WaterTreatmentWorks { /// Node metadata #[serde(flatten)] diff --git a/pywr-schema/src/parameters/data_frame.rs b/pywr-schema/src/parameters/data_frame.rs deleted file mode 100644 index ec5f2408..00000000 --- a/pywr-schema/src/parameters/data_frame.rs +++ /dev/null @@ -1,205 +0,0 @@ -use crate::error::SchemaError; -use crate::model::LoadArgs; -use crate::parameters::python::try_json_value_into_py; -use crate::parameters::{DynamicFloatValueType, IntoV2Parameter, ParameterMeta, TryFromV1Parameter}; -use crate::ConversionError; -use ndarray::Array2; -use polars::prelude::DataType::Float64; -use polars::prelude::{DataFrame, Float64Type, IndexOrder}; -use pyo3::prelude::PyModule; -use pyo3::types::{PyDict, PyTuple}; -use pyo3::{IntoPy, PyErr, PyObject, Python, ToPyObject}; -use pyo3_polars::PyDataFrame; -use pywr_core::parameters::{Array1Parameter, Array2Parameter, ParameterIndex}; -use pywr_v1_schema::parameters::DataFrameParameter as DataFrameParameterV1; -use std::collections::HashMap; -use std::path::{Path, PathBuf}; - -#[derive(serde::Deserialize, serde::Serialize, Debug, Clone)] -#[serde(tag = "type", content = "name")] -pub enum DataFrameColumns { - Scenario(String), - Column(String), -} - -enum FileFormat { - Csv, - Hdf, - Excel, -} - -impl FileFormat { - /// Determine file format from a path's extension. - fn from_path(path: &Path) -> Option { - match path.extension() { - None => None, // No extension; unknown format - Some(ext) => match ext.to_str() { - None => None, - Some(ext) => match ext.to_lowercase().as_str() { - "h5" | "hdf5" | "hdf" => Some(FileFormat::Hdf), - "csv" => Some(FileFormat::Csv), - "xlsx" | "xlsm" => Some(FileFormat::Excel), - "gz" => FileFormat::from_path(&path.with_extension("")), - _ => None, - }, - }, - } - } -} - -/// A parameter that reads its data into a Pandas DataFrame object. -/// -/// Upon loading this parameter will attempt to read its data using the Python library -/// `pandas`. It expects to load a timeseries DataFrame which is then sliced and aligned -/// to the -#[derive(serde::Deserialize, serde::Serialize, Debug, Clone)] -pub struct DataFrameParameter { - #[serde(flatten)] - pub meta: ParameterMeta, - pub url: PathBuf, - pub columns: DataFrameColumns, - pub timestep_offset: Option, - pub pandas_kwargs: HashMap, -} - -impl DataFrameParameter { - pub fn node_references(&self) -> HashMap<&str, &str> { - HashMap::new() - } - pub fn parameters(&self) -> HashMap<&str, DynamicFloatValueType> { - HashMap::new() - } - - pub fn add_to_model( - &self, - network: &mut pywr_core::network::Network, - args: &LoadArgs, - ) -> Result, SchemaError> { - // Handle the case of an optional data path with a relative url. - let pth = if let Some(dp) = args.data_path { - if self.url.is_relative() { - dp.join(&self.url) - } else { - self.url.clone() - } - } else { - self.url.clone() - }; - - let format = FileFormat::from_path(&pth).ok_or(SchemaError::UnsupportedFileFormat)?; - - // 1. Call Python & Pandas to read the data and return an array - let df: DataFrame = Python::with_gil(|py| { - // Import pandas and appropriate read function depending on file extension - let pandas = PyModule::import(py, "pandas")?; - // Determine pandas read function from file format. - let read_func = match format { - FileFormat::Csv => pandas.getattr("read_csv"), - FileFormat::Hdf => pandas.getattr("read_hdf"), - FileFormat::Excel => pandas.getattr("read_excel"), - }?; - - // Import polars and get a reference to the DataFrame initialisation method - let polars = PyModule::import(py, "polars")?; - let polars_data_frame_init = polars.getattr("DataFrame")?; - - // Create arguments for pandas - let args = (pth.into_py(py),); - let seq = PyTuple::new( - py, - self.pandas_kwargs - .iter() - .map(|(k, v)| (k.into_py(py), try_json_value_into_py(py, v).unwrap())), - ); - let kwargs = PyDict::from_sequence(py, seq.to_object(py))?; - // Read pandas DataFrame from relevant function - let py_pandas_df: PyObject = read_func.call(args, Some(kwargs))?.extract()?; - // Convert to polars DataFrame using the Python library - let py_polars_df: PyDataFrame = polars_data_frame_init.call1((py_pandas_df,))?.extract()?; - - Ok(py_polars_df.into()) - }) - .map_err(|e: PyErr| SchemaError::PythonError(e.to_string()))?; - - // 2. TODO Validate the shape of the data array. I.e. check number of columns matches scenario - // and number of rows matches time-steps. - - // 3. Create an ArrayParameter using the loaded array. - match &self.columns { - DataFrameColumns::Scenario(scenario) => { - let scenario_group_index = args - .domain - .scenarios() - .group_index(scenario) - .ok_or(SchemaError::ScenarioGroupNotFound(scenario.to_string()))?; - - let array: Array2 = df.to_ndarray::(IndexOrder::default()).unwrap(); - let p = Array2Parameter::new(&self.meta.name, array, scenario_group_index, self.timestep_offset); - Ok(network.add_parameter(Box::new(p))?) - } - DataFrameColumns::Column(column) => { - let series = df.column(column).unwrap(); - let array = series - .cast(&Float64) - .unwrap() - .f64() - .unwrap() - .to_ndarray() - .unwrap() - .to_owned(); - - let p = Array1Parameter::new(&self.meta.name, array, self.timestep_offset); - Ok(network.add_parameter(Box::new(p))?) - } - } - } -} - -impl TryFromV1Parameter for DataFrameParameter { - type Error = ConversionError; - - fn try_from_v1_parameter( - v1: DataFrameParameterV1, - parent_node: Option<&str>, - unnamed_count: &mut usize, - ) -> Result { - let meta: ParameterMeta = v1.meta.into_v2_parameter(parent_node, unnamed_count); - let url = v1.url.ok_or(ConversionError::MissingAttribute { - attrs: vec!["url".to_string()], - name: meta.name.clone(), - })?; - - // Here we can only handle a specific column or assume the columns map to a scenario group. - let columns = match (v1.column, v1.scenario) { - (None, None) => { - return Err(ConversionError::MissingAttribute { - attrs: vec!["column".to_string(), "scenario".to_string()], - name: meta.name.clone(), - }) - } - (Some(_), Some(_)) => { - return Err(ConversionError::UnexpectedAttribute { - attrs: vec!["column".to_string(), "scenario".to_string()], - name: meta.name.clone(), - }) - } - (Some(c), None) => DataFrameColumns::Column(c), - (None, Some(s)) => DataFrameColumns::Scenario(s), - }; - - if v1.index.is_some() || v1.indexes.is_some() || v1.table.is_some() { - return Err(ConversionError::UnsupportedAttribute { - attrs: vec!["index".to_string(), "indexes".to_string(), "table".to_string()], - name: meta.name.clone(), - }); - } - - Ok(Self { - meta, - url, - columns, - timestep_offset: v1.timestep_offset, - pandas_kwargs: v1.pandas_kwargs, - }) - } -} diff --git a/pywr-schema/src/parameters/mod.rs b/pywr-schema/src/parameters/mod.rs index 22362fd6..1dbb3044 100644 --- a/pywr-schema/src/parameters/mod.rs +++ b/pywr-schema/src/parameters/mod.rs @@ -11,7 +11,6 @@ mod aggregated; mod asymmetric_switch; mod control_curves; mod core; -mod data_frame; mod delay; mod discount_factor; mod indexed_array; @@ -48,15 +47,15 @@ use crate::error::{ConversionError, SchemaError}; use crate::model::LoadArgs; use crate::nodes::NodeAttribute; use crate::parameters::core::DivisionParameter; -pub use crate::parameters::data_frame::DataFrameParameter; use crate::parameters::interpolated::InterpolatedParameter; pub use offset::OffsetParameter; use pywr_core::metric::{MetricF64, MetricUsize}; use pywr_core::models::MultiNetworkTransferIndex; use pywr_core::parameters::{ParameterIndex, ParameterType}; use pywr_v1_schema::parameters::{ - CoreParameter, ExternalDataRef as ExternalDataRefV1, Parameter as ParameterV1, ParameterMeta as ParameterMetaV1, - ParameterValue as ParameterValueV1, TableIndex as TableIndexV1, TableIndexEntry as TableIndexEntryV1, + CoreParameter, DataFrameParameter as DataFrameParameterV1, ExternalDataRef as ExternalDataRefV1, + Parameter as ParameterV1, ParameterMeta as ParameterMetaV1, ParameterValue as ParameterValueV1, ParameterVec, + TableIndex as TableIndexV1, TableIndexEntry as TableIndexEntryV1, }; use std::path::PathBuf; @@ -162,7 +161,6 @@ pub enum Parameter { ParameterThreshold(ParameterThresholdParameter), TablesArray(TablesArrayParameter), Python(PythonParameter), - DataFrame(DataFrameParameter), Delay(DelayParameter), Division(DivisionParameter), Offset(OffsetParameter), @@ -194,7 +192,6 @@ impl Parameter { Self::ParameterThreshold(p) => p.meta.name.as_str(), Self::TablesArray(p) => p.meta.name.as_str(), Self::Python(p) => p.meta.name.as_str(), - Self::DataFrame(p) => p.meta.name.as_str(), Self::Division(p) => p.meta.name.as_str(), Self::Delay(p) => p.meta.name.as_str(), Self::Offset(p) => p.meta.name.as_str(), @@ -226,7 +223,6 @@ impl Parameter { Self::ParameterThreshold(_) => "ParameterThreshold", Self::TablesArray(_) => "TablesArray", Self::Python(_) => "Python", - Self::DataFrame(_) => "DataFrame", Self::Delay(_) => "Delay", Self::Division(_) => "Division", Self::Offset(_) => "Offset", @@ -262,7 +258,6 @@ impl Parameter { Self::ParameterThreshold(p) => ParameterType::Index(p.add_to_model(network, args)?), Self::TablesArray(p) => ParameterType::Parameter(p.add_to_model(network, args)?), Self::Python(p) => p.add_to_model(network, args)?, - Self::DataFrame(p) => ParameterType::Parameter(p.add_to_model(network, args)?), Self::Delay(p) => ParameterType::Parameter(p.add_to_model(network, args)?), Self::Division(p) => ParameterType::Parameter(p.add_to_model(network, args)?), Self::Offset(p) => ParameterType::Parameter(p.add_to_model(network, args)?), @@ -275,7 +270,102 @@ impl Parameter { } } -impl TryFromV1Parameter for Parameter { +pub fn convert_parameter_v1_to_v2( + v1_parameters: ParameterVec, + unnamed_count: &mut usize, + errors: &mut Vec, +) -> (Vec, Vec) { + let param_or_ts: Vec = v1_parameters + .into_iter() + .filter_map(|p| match p.try_into_v2_parameter(None, unnamed_count) { + Ok(pt) => Some(pt), + Err(e) => { + errors.push(e); + None + } + }) + .collect::>(); + + let parameters = param_or_ts + .clone() + .into_iter() + .filter_map(|pot| match pot { + ParameterOrTimeseries::Parameter(p) => Some(p), + ParameterOrTimeseries::Timeseries(_) => None, + }) + .collect(); + + let timeseries = param_or_ts + .into_iter() + .filter_map(|pot| match pot { + ParameterOrTimeseries::Parameter(_) => None, + ParameterOrTimeseries::Timeseries(t) => Some(t), + }) + .collect(); + + (parameters, timeseries) +} + +#[derive(Clone)] +enum ParameterOrTimeseries { + Parameter(Parameter), + Timeseries(TimeseriesV1Data), +} + +#[derive(Clone, Debug)] +pub struct TimeseriesV1Data { + pub name: Option, + pub source: TimeseriesV1Source, + pub time_col: Option, + pub column: Option, + pub scenario: Option, +} + +impl From for TimeseriesV1Data { + fn from(p: DataFrameParameterV1) -> Self { + let source = if let Some(url) = p.url { + TimeseriesV1Source::Url(url) + } else if let Some(tbl) = p.table { + TimeseriesV1Source::Table(tbl) + } else { + panic!("DataFrameParameter must have a url or table attribute.") + }; + + let name = p.meta.and_then(|m| m.name); + let time_col = match p.pandas_kwargs.get("index_col") { + Some(v) => v.as_str().map(|s| s.to_string()), + None => None, + }; + + Self { + name, + source, + time_col, + column: p.column, + scenario: p.scenario, + } + } +} + +#[derive(Clone, Debug)] +pub enum TimeseriesV1Source { + Url(PathBuf), + Table(String), +} + +impl From for ParameterOrTimeseries { + fn from(p: Parameter) -> Self { + Self::Parameter(p) + } +} + +impl From for ParameterOrTimeseries { + fn from(t: TimeseriesV1Data) -> Self { + Self::Timeseries(t) + } +} + +impl TryFromV1Parameter for ParameterOrTimeseries { type Error = ConversionError; fn try_from_v1_parameter( @@ -283,58 +373,66 @@ impl TryFromV1Parameter for Parameter { parent_node: Option<&str>, unnamed_count: &mut usize, ) -> Result { - let p = match v1 { + let p: ParameterOrTimeseries = match v1 { ParameterV1::Core(v1) => match v1 { CoreParameter::Aggregated(p) => { - Parameter::Aggregated(p.try_into_v2_parameter(parent_node, unnamed_count)?) + Parameter::Aggregated(p.try_into_v2_parameter(parent_node, unnamed_count)?).into() } CoreParameter::AggregatedIndex(p) => { - Parameter::AggregatedIndex(p.try_into_v2_parameter(parent_node, unnamed_count)?) + Parameter::AggregatedIndex(p.try_into_v2_parameter(parent_node, unnamed_count)?).into() } CoreParameter::AsymmetricSwitchIndex(p) => { - Parameter::AsymmetricSwitchIndex(p.try_into_v2_parameter(parent_node, unnamed_count)?) + Parameter::AsymmetricSwitchIndex(p.try_into_v2_parameter(parent_node, unnamed_count)?).into() + } + CoreParameter::Constant(p) => { + Parameter::Constant(p.try_into_v2_parameter(parent_node, unnamed_count)?).into() } - CoreParameter::Constant(p) => Parameter::Constant(p.try_into_v2_parameter(parent_node, unnamed_count)?), CoreParameter::ControlCurvePiecewiseInterpolated(p) => { Parameter::ControlCurvePiecewiseInterpolated(p.try_into_v2_parameter(parent_node, unnamed_count)?) + .into() } CoreParameter::ControlCurveInterpolated(p) => { - Parameter::ControlCurveInterpolated(p.try_into_v2_parameter(parent_node, unnamed_count)?) + Parameter::ControlCurveInterpolated(p.try_into_v2_parameter(parent_node, unnamed_count)?).into() } CoreParameter::ControlCurveIndex(p) => { - Parameter::ControlCurveIndex(p.try_into_v2_parameter(parent_node, unnamed_count)?) + Parameter::ControlCurveIndex(p.try_into_v2_parameter(parent_node, unnamed_count)?).into() } CoreParameter::ControlCurve(p) => match p.clone().try_into_v2_parameter(parent_node, unnamed_count) { - Ok(p) => Parameter::ControlCurve(p), - Err(_) => Parameter::ControlCurveIndex(p.try_into_v2_parameter(parent_node, unnamed_count)?), + Ok(p) => Parameter::ControlCurve(p).into(), + Err(_) => Parameter::ControlCurveIndex(p.try_into_v2_parameter(parent_node, unnamed_count)?).into(), }, CoreParameter::DailyProfile(p) => { - Parameter::DailyProfile(p.try_into_v2_parameter(parent_node, unnamed_count)?) + Parameter::DailyProfile(p.try_into_v2_parameter(parent_node, unnamed_count)?).into() } CoreParameter::IndexedArray(p) => { - Parameter::IndexedArray(p.try_into_v2_parameter(parent_node, unnamed_count)?) + Parameter::IndexedArray(p.try_into_v2_parameter(parent_node, unnamed_count)?).into() } CoreParameter::MonthlyProfile(p) => { - Parameter::MonthlyProfile(p.try_into_v2_parameter(parent_node, unnamed_count)?) + Parameter::MonthlyProfile(p.try_into_v2_parameter(parent_node, unnamed_count)?).into() } CoreParameter::UniformDrawdownProfile(p) => { - Parameter::UniformDrawdownProfile(p.try_into_v2_parameter(parent_node, unnamed_count)?) + Parameter::UniformDrawdownProfile(p.try_into_v2_parameter(parent_node, unnamed_count)?).into() + } + CoreParameter::Max(p) => Parameter::Max(p.try_into_v2_parameter(parent_node, unnamed_count)?).into(), + CoreParameter::Negative(p) => { + Parameter::Negative(p.try_into_v2_parameter(parent_node, unnamed_count)?).into() } - CoreParameter::Max(p) => Parameter::Max(p.try_into_v2_parameter(parent_node, unnamed_count)?), - CoreParameter::Negative(p) => Parameter::Negative(p.try_into_v2_parameter(parent_node, unnamed_count)?), CoreParameter::Polynomial1D(p) => { - Parameter::Polynomial1D(p.try_into_v2_parameter(parent_node, unnamed_count)?) + Parameter::Polynomial1D(p.try_into_v2_parameter(parent_node, unnamed_count)?).into() } CoreParameter::ParameterThreshold(p) => { - Parameter::ParameterThreshold(p.try_into_v2_parameter(parent_node, unnamed_count)?) + Parameter::ParameterThreshold(p.try_into_v2_parameter(parent_node, unnamed_count)?).into() } CoreParameter::TablesArray(p) => { - Parameter::TablesArray(p.try_into_v2_parameter(parent_node, unnamed_count)?) + Parameter::TablesArray(p.try_into_v2_parameter(parent_node, unnamed_count)?).into() + } + CoreParameter::Min(p) => Parameter::Min(p.try_into_v2_parameter(parent_node, unnamed_count)?).into(), + CoreParameter::Division(p) => { + Parameter::Division(p.try_into_v2_parameter(parent_node, unnamed_count)?).into() } - CoreParameter::Min(p) => Parameter::Min(p.try_into_v2_parameter(parent_node, unnamed_count)?), - CoreParameter::Division(p) => Parameter::Division(p.try_into_v2_parameter(parent_node, unnamed_count)?), CoreParameter::DataFrame(p) => { - Parameter::DataFrame(p.try_into_v2_parameter(parent_node, unnamed_count)?) + let ts_data: TimeseriesV1Data = p.into(); + ts_data.into() } CoreParameter::Deficit(p) => { return Err(ConversionError::DeprecatedParameter { @@ -344,19 +442,19 @@ impl TryFromV1Parameter for Parameter { }) } CoreParameter::DiscountFactor(p) => { - Parameter::DiscountFactor(p.try_into_v2_parameter(parent_node, unnamed_count)?) + Parameter::DiscountFactor(p.try_into_v2_parameter(parent_node, unnamed_count)?).into() } CoreParameter::InterpolatedVolume(p) => { - Parameter::Interpolated(p.try_into_v2_parameter(parent_node, unnamed_count)?) + Parameter::Interpolated(p.try_into_v2_parameter(parent_node, unnamed_count)?).into() } CoreParameter::InterpolatedFlow(p) => { - Parameter::Interpolated(p.try_into_v2_parameter(parent_node, unnamed_count)?) + Parameter::Interpolated(p.try_into_v2_parameter(parent_node, unnamed_count)?).into() } CoreParameter::NegativeMax(_) => todo!("Implement NegativeMaxParameter"), CoreParameter::NegativeMin(_) => todo!("Implement NegativeMinParameter"), CoreParameter::HydropowerTarget(_) => todo!("Implement HydropowerTargetParameter"), CoreParameter::WeeklyProfile(p) => { - Parameter::WeeklyProfile(p.try_into_v2_parameter(parent_node, unnamed_count)?) + Parameter::WeeklyProfile(p.try_into_v2_parameter(parent_node, unnamed_count)?).into() } CoreParameter::Storage(p) => { return Err(ConversionError::DeprecatedParameter { @@ -375,7 +473,7 @@ impl TryFromV1Parameter for Parameter { }) } CoreParameter::RbfProfile(p) => { - Parameter::RbfProfile(p.try_into_v2_parameter(parent_node, unnamed_count)?) + Parameter::RbfProfile(p.try_into_v2_parameter(parent_node, unnamed_count)?).into() } }, ParameterV1::Custom(p) => { @@ -395,6 +493,7 @@ impl TryFromV1Parameter for Parameter { }, value: ConstantValue::Literal(0.0), }) + .into() } }; @@ -514,12 +613,39 @@ impl MetricFloatReference { } } +#[derive(serde::Deserialize, serde::Serialize, Debug, Clone)] +#[serde(tag = "type", content = "name")] +pub enum TimeseriesColumns { + Scenario(String), + Column(String), +} + +#[derive(serde::Deserialize, serde::Serialize, Debug, Clone)] +pub struct TimeseriesReference { + #[serde(rename = "type")] + ty: String, + name: String, + columns: TimeseriesColumns, +} + +impl TimeseriesReference { + pub fn new(name: String, columns: TimeseriesColumns) -> Self { + let ty = "Timeseries".to_string(); + Self { ty, name, columns } + } + + pub fn name(&self) -> &str { + self.name.as_str() + } +} + /// A floating-point(f64) value from a metric in the network. #[derive(serde::Deserialize, serde::Serialize, Debug, Clone)] #[serde(untagged)] pub enum MetricFloatValue { Reference(MetricFloatReference), InlineParameter { definition: Box }, + Timeseries(TimeseriesReference), } impl MetricFloatValue { @@ -558,6 +684,19 @@ impl MetricFloatValue { } } } + Self::Timeseries(ts_ref) => { + let param_idx = match &ts_ref.columns { + TimeseriesColumns::Scenario(scenario) => { + args.timeseries + .load_df(network, ts_ref.name.as_ref(), &args.domain, scenario.as_str())? + } + TimeseriesColumns::Column(col) => { + args.timeseries + .load_column(network, ts_ref.name.as_ref(), col.as_str())? + } + }; + Ok(MetricF64::ParameterValue(param_idx)) + } } } } @@ -649,9 +788,38 @@ impl TryFromV1Parameter for DynamicFloatValue { })) } ParameterValueV1::Table(tbl) => Self::Constant(ConstantValue::Table(tbl.try_into()?)), - ParameterValueV1::Inline(param) => Self::Dynamic(MetricFloatValue::InlineParameter { - definition: Box::new((*param).try_into_v2_parameter(parent_node, unnamed_count)?), - }), + ParameterValueV1::Inline(param) => { + let definition: ParameterOrTimeseries = (*param).try_into_v2_parameter(parent_node, unnamed_count)?; + match definition { + ParameterOrTimeseries::Parameter(p) => Self::Dynamic(MetricFloatValue::InlineParameter { + definition: Box::new(p), + }), + ParameterOrTimeseries::Timeseries(t) => { + let name = match t.name { + Some(n) => n, + None => { + let n = match parent_node { + Some(node_name) => format!("{}-p{}.timeseries", node_name, *unnamed_count), + None => format!("unnamed-timeseries-{}", *unnamed_count), + }; + *unnamed_count += 1; + n + } + }; + + let cols = match (&t.column, &t.scenario) { + (Some(col), None) => TimeseriesColumns::Column(col.clone()), + (None, Some(scenario)) => TimeseriesColumns::Scenario(scenario.clone()), + (Some(_), Some(_)) => { + return Err(ConversionError::AmbiguousColumnAndScenario(name.clone())) + } + (None, None) => return Err(ConversionError::MissingColumnOrScenario(name.clone())), + }; + + Self::Dynamic(MetricFloatValue::Timeseries(TimeseriesReference::new(name, cols))) + } + } + } }; Ok(p) } @@ -697,9 +865,16 @@ impl TryFromV1Parameter for DynamicIndexValue { ParameterValueV1::Constant(_) => return Err(ConversionError::FloatToIndex), ParameterValueV1::Reference(p_name) => Self::Dynamic(ParameterIndexValue::Reference(p_name)), ParameterValueV1::Table(tbl) => Self::Constant(ConstantValue::Table(tbl.try_into()?)), - ParameterValueV1::Inline(param) => Self::Dynamic(ParameterIndexValue::Inline(Box::new( - (*param).try_into_v2_parameter(parent_node, unnamed_count)?, - ))), + ParameterValueV1::Inline(param) => { + let definition: ParameterOrTimeseries = (*param).try_into_v2_parameter(parent_node, unnamed_count)?; + match definition { + ParameterOrTimeseries::Parameter(p) => Self::Dynamic(ParameterIndexValue::Inline(Box::new(p))), + ParameterOrTimeseries::Timeseries(_) => { + // TODO create an error for this + panic!("Timeseries do not support indexes yet") + } + } + } }; Ok(p) } diff --git a/pywr-schema/src/parameters/python.rs b/pywr-schema/src/parameters/python.rs index d7627d4e..1ad5a4af 100644 --- a/pywr-schema/src/parameters/python.rs +++ b/pywr-schema/src/parameters/python.rs @@ -204,6 +204,7 @@ mod tests { use crate::data_tables::LoadedTableCollection; use crate::model::{LoadArgs, PywrNetwork}; use crate::parameters::python::PythonParameter; + use crate::timeseries::LoadedTimeseriesCollection; use pywr_core::models::ModelDomain; use pywr_core::network::Network; use pywr_core::test_utils::default_time_domain; @@ -237,11 +238,13 @@ mod tests { let schema = PywrNetwork::default(); let mut network = Network::default(); let tables = LoadedTableCollection::from_schema(None, None).unwrap(); + let ts = LoadedTimeseriesCollection::default(); let args = LoadArgs { schema: &schema, data_path: None, tables: &tables, + timeseries: &ts, domain: &domain, inter_network_transfers: &[], }; @@ -279,11 +282,13 @@ mod tests { let schema = PywrNetwork::default(); let mut network = Network::default(); let tables = LoadedTableCollection::from_schema(None, None).unwrap(); + let ts = LoadedTimeseriesCollection::default(); let args = LoadArgs { schema: &schema, data_path: None, tables: &tables, + timeseries: &ts, domain: &domain, inter_network_transfers: &[], }; diff --git a/pywr-schema/src/test_models/inflow.csv b/pywr-schema/src/test_models/inflow.csv new file mode 100644 index 00000000..07f79268 --- /dev/null +++ b/pywr-schema/src/test_models/inflow.csv @@ -0,0 +1,366 @@ +date,inflow1 +01/01/2021,1 +02/01/2021,2 +03/01/2021,3 +04/01/2021,4 +05/01/2021,5 +06/01/2021,6 +07/01/2021,7 +08/01/2021,8 +09/01/2021,9 +10/01/2021,10 +11/01/2021,11 +12/01/2021,12 +13/01/2021,13 +14/01/2021,14 +15/01/2021,15 +16/01/2021,16 +17/01/2021,17 +18/01/2021,18 +19/01/2021,19 +20/01/2021,20 +21/01/2021,21 +22/01/2021,22 +23/01/2021,23 +24/01/2021,24 +25/01/2021,25 +26/01/2021,26 +27/01/2021,27 +28/01/2021,28 +29/01/2021,29 +30/01/2021,30 +31/01/2021,31 +01/02/2021,1 +02/02/2021,2 +03/02/2021,3 +04/02/2021,4 +05/02/2021,5 +06/02/2021,6 +07/02/2021,7 +08/02/2021,8 +09/02/2021,9 +10/02/2021,10 +11/02/2021,11 +12/02/2021,12 +13/02/2021,13 +14/02/2021,14 +15/02/2021,15 +16/02/2021,16 +17/02/2021,17 +18/02/2021,18 +19/02/2021,19 +20/02/2021,20 +21/02/2021,21 +22/02/2021,22 +23/02/2021,23 +24/02/2021,24 +25/02/2021,25 +26/02/2021,26 +27/02/2021,27 +28/02/2021,28 +01/03/2021,1 +02/03/2021,2 +03/03/2021,3 +04/03/2021,4 +05/03/2021,5 +06/03/2021,6 +07/03/2021,7 +08/03/2021,8 +09/03/2021,9 +10/03/2021,10 +11/03/2021,11 +12/03/2021,12 +13/03/2021,13 +14/03/2021,14 +15/03/2021,15 +16/03/2021,16 +17/03/2021,17 +18/03/2021,18 +19/03/2021,19 +20/03/2021,20 +21/03/2021,21 +22/03/2021,22 +23/03/2021,23 +24/03/2021,24 +25/03/2021,25 +26/03/2021,26 +27/03/2021,27 +28/03/2021,28 +29/03/2021,29 +30/03/2021,30 +31/03/2021,31 +01/04/2021,1 +02/04/2021,2 +03/04/2021,3 +04/04/2021,4 +05/04/2021,5 +06/04/2021,6 +07/04/2021,7 +08/04/2021,8 +09/04/2021,9 +10/04/2021,10 +11/04/2021,11 +12/04/2021,12 +13/04/2021,13 +14/04/2021,14 +15/04/2021,15 +16/04/2021,16 +17/04/2021,17 +18/04/2021,18 +19/04/2021,19 +20/04/2021,20 +21/04/2021,21 +22/04/2021,22 +23/04/2021,23 +24/04/2021,24 +25/04/2021,25 +26/04/2021,26 +27/04/2021,27 +28/04/2021,28 +29/04/2021,29 +30/04/2021,30 +01/05/2021,1 +02/05/2021,2 +03/05/2021,3 +04/05/2021,4 +05/05/2021,5 +06/05/2021,6 +07/05/2021,7 +08/05/2021,8 +09/05/2021,9 +10/05/2021,10 +11/05/2021,11 +12/05/2021,12 +13/05/2021,13 +14/05/2021,14 +15/05/2021,15 +16/05/2021,16 +17/05/2021,17 +18/05/2021,18 +19/05/2021,19 +20/05/2021,20 +21/05/2021,21 +22/05/2021,22 +23/05/2021,23 +24/05/2021,24 +25/05/2021,25 +26/05/2021,26 +27/05/2021,27 +28/05/2021,28 +29/05/2021,29 +30/05/2021,30 +31/05/2021,31 +01/06/2021,1 +02/06/2021,2 +03/06/2021,3 +04/06/2021,4 +05/06/2021,5 +06/06/2021,6 +07/06/2021,7 +08/06/2021,8 +09/06/2021,9 +10/06/2021,10 +11/06/2021,11 +12/06/2021,12 +13/06/2021,13 +14/06/2021,14 +15/06/2021,15 +16/06/2021,16 +17/06/2021,17 +18/06/2021,18 +19/06/2021,19 +20/06/2021,20 +21/06/2021,21 +22/06/2021,22 +23/06/2021,23 +24/06/2021,24 +25/06/2021,25 +26/06/2021,26 +27/06/2021,27 +28/06/2021,28 +29/06/2021,29 +30/06/2021,30 +01/07/2021,1 +02/07/2021,2 +03/07/2021,3 +04/07/2021,4 +05/07/2021,5 +06/07/2021,6 +07/07/2021,7 +08/07/2021,8 +09/07/2021,9 +10/07/2021,10 +11/07/2021,11 +12/07/2021,12 +13/07/2021,13 +14/07/2021,14 +15/07/2021,15 +16/07/2021,16 +17/07/2021,17 +18/07/2021,18 +19/07/2021,19 +20/07/2021,20 +21/07/2021,21 +22/07/2021,22 +23/07/2021,23 +24/07/2021,24 +25/07/2021,25 +26/07/2021,26 +27/07/2021,27 +28/07/2021,28 +29/07/2021,29 +30/07/2021,30 +31/07/2021,31 +01/08/2021,1 +02/08/2021,2 +03/08/2021,3 +04/08/2021,4 +05/08/2021,5 +06/08/2021,6 +07/08/2021,7 +08/08/2021,8 +09/08/2021,9 +10/08/2021,10 +11/08/2021,11 +12/08/2021,12 +13/08/2021,13 +14/08/2021,14 +15/08/2021,15 +16/08/2021,16 +17/08/2021,17 +18/08/2021,18 +19/08/2021,19 +20/08/2021,20 +21/08/2021,21 +22/08/2021,22 +23/08/2021,23 +24/08/2021,24 +25/08/2021,25 +26/08/2021,26 +27/08/2021,27 +28/08/2021,28 +29/08/2021,29 +30/08/2021,30 +31/08/2021,31 +01/09/2021,1 +02/09/2021,2 +03/09/2021,3 +04/09/2021,4 +05/09/2021,5 +06/09/2021,6 +07/09/2021,7 +08/09/2021,8 +09/09/2021,9 +10/09/2021,10 +11/09/2021,11 +12/09/2021,12 +13/09/2021,13 +14/09/2021,14 +15/09/2021,15 +16/09/2021,16 +17/09/2021,17 +18/09/2021,18 +19/09/2021,19 +20/09/2021,20 +21/09/2021,21 +22/09/2021,22 +23/09/2021,23 +24/09/2021,24 +25/09/2021,25 +26/09/2021,26 +27/09/2021,27 +28/09/2021,28 +29/09/2021,29 +30/09/2021,30 +01/10/2021,1 +02/10/2021,2 +03/10/2021,3 +04/10/2021,4 +05/10/2021,5 +06/10/2021,6 +07/10/2021,7 +08/10/2021,8 +09/10/2021,9 +10/10/2021,10 +11/10/2021,11 +12/10/2021,12 +13/10/2021,13 +14/10/2021,14 +15/10/2021,15 +16/10/2021,16 +17/10/2021,17 +18/10/2021,18 +19/10/2021,19 +20/10/2021,20 +21/10/2021,21 +22/10/2021,22 +23/10/2021,23 +24/10/2021,24 +25/10/2021,25 +26/10/2021,26 +27/10/2021,27 +28/10/2021,28 +29/10/2021,29 +30/10/2021,30 +31/10/2021,31 +01/11/2021,1 +02/11/2021,2 +03/11/2021,3 +04/11/2021,4 +05/11/2021,5 +06/11/2021,6 +07/11/2021,7 +08/11/2021,8 +09/11/2021,9 +10/11/2021,10 +11/11/2021,11 +12/11/2021,12 +13/11/2021,13 +14/11/2021,14 +15/11/2021,15 +16/11/2021,16 +17/11/2021,17 +18/11/2021,18 +19/11/2021,19 +20/11/2021,20 +21/11/2021,21 +22/11/2021,22 +23/11/2021,23 +24/11/2021,24 +25/11/2021,25 +26/11/2021,26 +27/11/2021,27 +28/11/2021,28 +29/11/2021,29 +30/11/2021,30 +01/12/2021,1 +02/12/2021,2 +03/12/2021,3 +04/12/2021,4 +05/12/2021,5 +06/12/2021,6 +07/12/2021,7 +08/12/2021,8 +09/12/2021,9 +10/12/2021,10 +11/12/2021,11 +12/12/2021,12 +13/12/2021,13 +14/12/2021,14 +15/12/2021,15 +16/12/2021,16 +17/12/2021,17 +18/12/2021,18 +19/12/2021,19 +20/12/2021,20 +21/12/2021,21 +22/12/2021,22 +23/12/2021,23 +24/12/2021,24 +25/12/2021,25 +26/12/2021,26 +27/12/2021,27 +28/12/2021,28 +29/12/2021,29 +30/12/2021,30 +31/12/2021,31 diff --git a/pywr-schema/src/test_models/timeseries.json b/pywr-schema/src/test_models/timeseries.json new file mode 100644 index 00000000..6f876a12 --- /dev/null +++ b/pywr-schema/src/test_models/timeseries.json @@ -0,0 +1,95 @@ +{ + "metadata": { + "title": "Simple timeseries" + }, + "timestepper": { + "start": "2021-01-01", + "end": "2021-12-31", + "timestep": 1 + }, + "network": { + "nodes": [ + { + "name": "input1", + "type": "Input", + "max_flow": { + "type": "Timeseries", + "name": "inflow", + "columns": { + "type": "Column", + "name": "inflow1" + } + } + }, + { + "name": "input2", + "type": "Input", + "max_flow": { + "type": "Parameter", + "name": "factored_flow" + } + }, + { + "name": "link1", + "type": "Link" + }, + { + "name": "output1", + "type": "Output", + "cost": -10.0, + "max_flow": { + "type": "Parameter", + "name": "demand" + } + } + ], + "edges": [ + { + "from_node": "input1", + "to_node": "link1" + }, + { + "from_node": "input2", + "to_node": "link1" + }, + { + "from_node": "link1", + "to_node": "output1" + } + ], + "parameters": [ + { + "name": "demand", + "type": "Constant", + "value": 100.0 + }, + { + "name": "factored_flow", + "type": "Aggregated", + "agg_func": "product", + "metrics": [ + { + "type": "Timeseries", + "name": "inflow", + "columns": { + "type": "Column", + "name": "inflow1" + } + }, + 0.5 + ] + } + + ], + "timeseries": [ + { + "name": "inflow", + "provider": { + "type": "Polars", + "time_col": "date", + "url": "inflow.csv" + } + } + ] + } +} diff --git a/pywr-schema/src/timeseries/align_and_resample.rs b/pywr-schema/src/timeseries/align_and_resample.rs new file mode 100644 index 00000000..17411dcb --- /dev/null +++ b/pywr-schema/src/timeseries/align_and_resample.rs @@ -0,0 +1,245 @@ +use polars::{prelude::*, series::ops::NullBehavior}; +use pywr_core::models::ModelDomain; +use std::{cmp::Ordering, ops::Deref}; + +use crate::timeseries::TimeseriesError; + +pub fn align_and_resample( + name: &str, + df: DataFrame, + time_col: &str, + domain: &ModelDomain, +) -> Result { + // Ensure type of time column is datetime and that it is sorted + let df = df + .clone() + .lazy() + .with_columns([col(time_col).cast(DataType::Datetime(TimeUnit::Nanoseconds, None))]) + .collect()? + .sort([time_col], false, true)?; + + // Ensure that df start aligns with models start for any resampling + let df = slice_start(df, time_col, domain)?; + + // Get the durations of the time column + let durations = df + .clone() + .lazy() + .select([col(time_col).diff(1, NullBehavior::Drop).unique().alias("duration")]) + .collect()?; + let durations = durations.column("duration")?.duration()?.deref(); + + if durations.len() > 1 { + todo!("Non-uniform timestep are not yet supported"); + } + + let timeseries_duration = match durations.get(0) { + Some(duration) => duration, + None => return Err(TimeseriesError::TimeseriesDurationNotFound(name.to_string())), + }; + + let model_duration = domain + .time() + .step_duration() + .whole_nanoseconds() + .ok_or(TimeseriesError::NoDurationNanoSeconds)?; + + let df = match model_duration.cmp(×eries_duration) { + Ordering::Greater => { + // Downsample + df.clone() + .lazy() + .group_by_dynamic( + col(time_col), + [], + DynamicGroupOptions { + every: Duration::new(model_duration), + period: Duration::new(model_duration), + offset: Duration::new(0), + start_by: StartBy::DataPoint, + ..Default::default() + }, + ) + .agg([col("*").exclude([time_col]).mean()]) + .collect()? + } + Ordering::Less => { + // Upsample + // TODO: this does not extend the dataframe beyond its original end date. Should it do when using a forward fill strategy? + // The df could be extend by the length of the duration it is being resampled to. + df.clone() + .upsample::<[String; 0]>([], "time", Duration::new(model_duration), Duration::new(0))? + .fill_null(FillNullStrategy::Forward(None))? + } + Ordering::Equal => df, + }; + + let df = slice_end(df, time_col, domain)?; + + if df.height() != domain.time().timesteps().len() { + return Err(TimeseriesError::DataFrameTimestepMismatch(name.to_string())); + } + + Ok(df) +} + +fn slice_start(df: DataFrame, time_col: &str, domain: &ModelDomain) -> Result { + let start = domain.time().first_timestep().date; + let df = df.clone().lazy().filter(col(time_col).gt_eq(lit(start))).collect()?; + Ok(df) +} + +fn slice_end(df: DataFrame, time_col: &str, domain: &ModelDomain) -> Result { + let end = domain.time().last_timestep().date; + let df = df.clone().lazy().filter(col(time_col).lt_eq(lit(end))).collect()?; + Ok(df) +} + +#[cfg(test)] +mod tests { + use chrono::{NaiveDate, NaiveDateTime}; + use polars::prelude::*; + use pywr_core::{ + models::ModelDomain, + scenario::{ScenarioDomain, ScenarioGroupCollection}, + timestep::{TimeDomain, TimestepDuration, Timestepper}, + }; + + use crate::timeseries::align_and_resample::align_and_resample; + + #[test] + fn test_downsample_and_slice() { + let start = NaiveDateTime::parse_from_str("2021-01-07 00:00:00", "%Y-%m-%d %H:%M:%S").unwrap(); + let end = NaiveDateTime::parse_from_str("2021-01-20 00:00:00", "%Y-%m-%d %H:%M:%S").unwrap(); + let timestep = TimestepDuration::Days(7); + let timestepper = Timestepper::new(start, end, timestep); + let time_domain = TimeDomain::try_from(timestepper).unwrap(); + + let scenario_domain: ScenarioDomain = ScenarioGroupCollection::new(vec![]).into(); + + let domain = ModelDomain::new(time_domain, scenario_domain); + + let time = polars::time::date_range( + "time", + NaiveDate::from_ymd_opt(2021, 1, 1).unwrap().into(), + NaiveDate::from_ymd_opt(2021, 1, 31).unwrap().into(), + Duration::parse("1d"), + ClosedWindow::Both, + TimeUnit::Milliseconds, + None, + ) + .unwrap(); + + let values: Vec = (1..32).map(|x| x as f64).collect(); + let mut df = df!( + "time" => time, + "values" => values + ) + .unwrap(); + + df = align_and_resample("test", df, "time", &domain).unwrap(); + + let expected_dates = Series::new( + "time", + vec![ + NaiveDateTime::parse_from_str("2021-01-07 00:00:00", "%Y-%m-%d %H:%M:%S").unwrap(), + NaiveDateTime::parse_from_str("2021-01-14 00:00:00", "%Y-%m-%d %H:%M:%S").unwrap(), + ], + ) + .cast(&DataType::Datetime(TimeUnit::Nanoseconds, None)) + .unwrap(); + let resampled_dates = df.column("time").unwrap(); + assert!(resampled_dates.equals(&expected_dates)); + + let expected_values = Series::new( + "values", + vec![ + 10.0, // mean of 7, 8, 9, 10, 11, 12, 13 + 17.0, // mean of 14, 15, 16, 17, 18, 19, 20 + ], + ); + let resampled_values = df.column("values").unwrap(); + assert!(resampled_values.equals(&expected_values)); + } + + #[test] + fn test_upsample_and_slice() { + let start = NaiveDateTime::parse_from_str("2021-01-01 00:00:00", "%Y-%m-%d %H:%M:%S").unwrap(); + let end = NaiveDateTime::parse_from_str("2021-01-14 00:00:00", "%Y-%m-%d %H:%M:%S").unwrap(); + let timestep = TimestepDuration::Days(1); + let timestepper = Timestepper::new(start, end, timestep); + let time_domain = TimeDomain::try_from(timestepper).unwrap(); + + let scenario_domain: ScenarioDomain = ScenarioGroupCollection::new(vec![]).into(); + let domain = ModelDomain::new(time_domain, scenario_domain); + + let time = polars::time::date_range( + "time", + NaiveDate::from_ymd_opt(2021, 1, 1).unwrap().into(), + NaiveDate::from_ymd_opt(2021, 1, 15).unwrap().into(), + Duration::parse("7d"), + ClosedWindow::Both, + TimeUnit::Milliseconds, + None, + ) + .unwrap(); + + let values: Vec = vec![1.0, 2.0, 3.0]; + let mut df = df!( + "time" => time, + "values" => values + ) + .unwrap(); + + df = align_and_resample("test", df, "time", &domain).unwrap(); + + let expected_values = Series::new( + "values", + vec![1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0], + ); + let resampled_values = df.column("values").unwrap(); + assert!(resampled_values.equals(&expected_values)); + } + + #[test] + fn test_no_resample_slice() { + let start = NaiveDateTime::parse_from_str("2021-01-01 00:00:00", "%Y-%m-%d %H:%M:%S").unwrap(); + let end = NaiveDateTime::parse_from_str("2021-01-03 00:00:00", "%Y-%m-%d %H:%M:%S").unwrap(); + let timestep = TimestepDuration::Days(1); + let timestepper = Timestepper::new(start, end, timestep); + let time_domain = TimeDomain::try_from(timestepper).unwrap(); + + let scenario_domain: ScenarioDomain = ScenarioGroupCollection::new(vec![]).into(); + let domain = ModelDomain::new(time_domain, scenario_domain); + + let time = polars::time::date_range( + "time", + NaiveDate::from_ymd_opt(2021, 1, 1).unwrap().into(), + NaiveDate::from_ymd_opt(2021, 1, 3).unwrap().into(), + Duration::parse("1d"), + ClosedWindow::Both, + TimeUnit::Milliseconds, + None, + ) + .unwrap(); + + let values: Vec = vec![1.0, 2.0, 3.0]; + let mut df = df!( + "time" => time.clone(), + "values" => values.clone() + ) + .unwrap(); + + df = align_and_resample("test", df, "time", &domain).unwrap(); + + let expected_values = Series::new("values", values); + let resampled_values = df.column("values").unwrap(); + assert!(resampled_values.equals(&expected_values)); + + let expected_dates = Series::new("time", time) + .cast(&DataType::Datetime(TimeUnit::Nanoseconds, None)) + .unwrap(); + let resampled_dates = df.column("time").unwrap(); + assert!(resampled_dates.equals(&expected_dates)); + } +} diff --git a/pywr-schema/src/timeseries/mod.rs b/pywr-schema/src/timeseries/mod.rs new file mode 100644 index 00000000..74edd4fa --- /dev/null +++ b/pywr-schema/src/timeseries/mod.rs @@ -0,0 +1,249 @@ +mod align_and_resample; +mod polars_dataset; + +use ndarray::Array2; +use polars::error::PolarsError; +use polars::prelude::DataType::Float64; +use polars::prelude::{DataFrame, Float64Type, IndexOrder}; +use pywr_core::models::ModelDomain; +use pywr_core::parameters::{Array1Parameter, Array2Parameter, ParameterIndex}; +use pywr_core::PywrError; +use pywr_v1_schema::tables::TableVec; +use std::{collections::HashMap, path::Path}; +use thiserror::Error; + +use crate::parameters::{ParameterMeta, TimeseriesV1Data, TimeseriesV1Source}; +use crate::ConversionError; + +use self::polars_dataset::PolarsDataset; + +#[derive(Error, Debug)] +pub enum TimeseriesError { + #[error("Timeseries '{0} not found")] + TimeseriesNotFound(String), + #[error("The duration of timeseries '{0}' could not be determined.")] + TimeseriesDurationNotFound(String), + #[error("Column '{col}' not found in timeseries input '{name}'")] + ColumnNotFound { col: String, name: String }, + #[error("Timeseries provider '{provider}' does not support '{fmt}' file types")] + TimeseriesUnsupportedFileFormat { provider: String, fmt: String }, + #[error("Timeseries provider '{provider}' cannot parse file: '{path}'")] + TimeseriesUnparsableFileFormat { provider: String, path: String }, + #[error("A scenario group with name '{0}' was not found")] + ScenarioGroupNotFound(String), + #[error("Duration could not be represented as nanoseconds")] + NoDurationNanoSeconds, + #[error("The length of the resampled timeseries dataframe '{0}' does not match the number of model timesteps.")] + DataFrameTimestepMismatch(String), + #[error("A timeseries dataframe with the name '{0}' already exists.")] + TimeseriesDataframeAlreadyExists(String), + #[error("Polars error: {0}")] + PolarsError(#[from] PolarsError), + #[error("Pywr core error: {0}")] + PywrCore(#[from] pywr_core::PywrError), +} + +#[derive(serde::Deserialize, serde::Serialize, Debug, Clone)] +#[serde(tag = "type")] +enum TimeseriesProvider { + Pandas, + Polars(PolarsDataset), +} + +#[derive(serde::Deserialize, serde::Serialize, Debug, Clone)] +pub struct Timeseries { + #[serde(flatten)] + meta: ParameterMeta, + provider: TimeseriesProvider, +} + +impl Timeseries { + pub fn load(&self, domain: &ModelDomain, data_path: Option<&Path>) -> Result { + match &self.provider { + TimeseriesProvider::Polars(dataset) => dataset.load(self.meta.name.as_str(), data_path, domain), + TimeseriesProvider::Pandas => todo!(), + } + } + + pub fn name(&self) -> &str { + &self.meta.name + } +} + +#[derive(Default)] +pub struct LoadedTimeseriesCollection { + timeseries: HashMap, +} + +impl LoadedTimeseriesCollection { + pub fn from_schema( + timeseries_defs: Option<&[Timeseries]>, + domain: &ModelDomain, + data_path: Option<&Path>, + ) -> Result { + let mut timeseries = HashMap::new(); + if let Some(timeseries_defs) = timeseries_defs { + for ts in timeseries_defs { + let df = ts.load(domain, data_path)?; + if timeseries.contains_key(&ts.meta.name) { + return Err(TimeseriesError::TimeseriesDataframeAlreadyExists(ts.meta.name.clone())); + } + timeseries.insert(ts.meta.name.clone(), df); + } + } + Ok(Self { timeseries }) + } + + pub fn load_column( + &self, + network: &mut pywr_core::network::Network, + name: &str, + col: &str, + ) -> Result, TimeseriesError> { + let df = self + .timeseries + .get(name) + .ok_or(TimeseriesError::TimeseriesNotFound(name.to_string()))?; + let series = df.column(col)?; + + let array = series.cast(&Float64)?.f64()?.to_ndarray()?.to_owned(); + let name = format!("{}_{}", name, col); + + match network.get_parameter_index_by_name(&name) { + Ok(idx) => Ok(idx), + Err(e) => match e { + PywrError::ParameterNotFound(_) => { + let p = Array1Parameter::new(&name, array, None); + Ok(network.add_parameter(Box::new(p))?) + } + _ => Err(TimeseriesError::PywrCore(e)), + }, + } + } + + pub fn load_df( + &self, + network: &mut pywr_core::network::Network, + name: &str, + domain: &ModelDomain, + scenario: &str, + ) -> Result, TimeseriesError> { + let scenario_group_index = domain + .scenarios() + .group_index(scenario) + .ok_or(TimeseriesError::ScenarioGroupNotFound(scenario.to_string()))?; + + let df = self + .timeseries + .get(name) + .ok_or(TimeseriesError::TimeseriesNotFound(name.to_string()))?; + + let array: Array2 = df.to_ndarray::(IndexOrder::default()).unwrap(); + let name = format!("timeseries.{}_{}", name, scenario); + + match network.get_parameter_index_by_name(&name) { + Ok(idx) => Ok(idx), + Err(e) => match e { + PywrError::ParameterNotFound(_) => { + let p = Array2Parameter::new(&name, array, scenario_group_index, None); + Ok(network.add_parameter(Box::new(p))?) + } + _ => Err(TimeseriesError::PywrCore(e)), + }, + } + } +} + +pub fn convert_from_v1_data( + df_data: Vec, + v1_tables: &Option, + errors: &mut Vec, +) -> Vec { + let mut ts = HashMap::new(); + for data in df_data.into_iter() { + match data.source { + TimeseriesV1Source::Table(name) => { + let tables = v1_tables.as_ref().unwrap(); + let table = tables.iter().find(|t| t.name == *name).unwrap(); + let name = table.name.clone(); + if ts.contains_key(&name) { + continue; + } + + let time_col = None; + let provider = PolarsDataset::new(time_col, table.url.clone()); + + ts.insert( + name.clone(), + Timeseries { + meta: ParameterMeta { name, comment: None }, + provider: TimeseriesProvider::Polars(provider), + }, + ); + } + TimeseriesV1Source::Url(url) => { + let name = match data.name { + Some(name) => name, + None => { + errors.push(ConversionError::MissingTimeseriesName( + url.to_str().unwrap_or("").to_string(), + )); + continue; + } + }; + if ts.contains_key(&name) { + continue; + } + + let provider = PolarsDataset::new(data.time_col, url); + + ts.insert( + name.clone(), + Timeseries { + meta: ParameterMeta { name, comment: None }, + provider: TimeseriesProvider::Polars(provider), + }, + ); + } + } + } + ts.into_values().collect::>() +} + +#[cfg(test)] +mod tests { + use std::path::PathBuf; + + use chrono::{Datelike, NaiveDate}; + use ndarray::Array; + use pywr_core::{metric::MetricF64, recorders::AssertionRecorder, test_utils::run_all_solvers}; + + use crate::PywrModel; + + fn model_str() -> &'static str { + include_str!("../test_models/timeseries.json") + } + + #[test] + fn test_timeseries_polars() { + let cargo_manifest_dir = env!("CARGO_MANIFEST_DIR"); + + let model_dir = PathBuf::from(cargo_manifest_dir).join("src/test_models"); + + let data = model_str(); + let schema: PywrModel = serde_json::from_str(data).unwrap(); + let mut model = schema.build_model(Some(model_dir.as_path()), None).unwrap(); + + let expected = Array::from_shape_fn((365, 1), |(x, _)| { + let month_day = NaiveDate::from_yo_opt(2021, (x + 1) as u32).unwrap().day() as f64; + month_day + month_day * 0.5 + }); + + let idx = model.network().get_node_by_name("output1", None).unwrap().index(); + + let recorder = AssertionRecorder::new("output-flow", MetricF64::NodeInFlow(idx), expected.clone(), None, None); + model.network_mut().add_recorder(Box::new(recorder)).unwrap(); + + run_all_solvers(&model) + } +} diff --git a/pywr-schema/src/timeseries/pandas.rs b/pywr-schema/src/timeseries/pandas.rs new file mode 100644 index 00000000..e69de29b diff --git a/pywr-schema/src/timeseries/polars_dataset.rs b/pywr-schema/src/timeseries/polars_dataset.rs new file mode 100644 index 00000000..cf9c984f --- /dev/null +++ b/pywr-schema/src/timeseries/polars_dataset.rs @@ -0,0 +1,80 @@ +use std::path::{Path, PathBuf}; + +use polars::{frame::DataFrame, prelude::*}; +use pywr_core::models::ModelDomain; + +use crate::timeseries::TimeseriesError; + +use super::align_and_resample::align_and_resample; + +#[derive(serde::Deserialize, serde::Serialize, Debug, Clone)] +pub struct PolarsDataset { + time_col: Option, + url: PathBuf, +} + +impl PolarsDataset { + pub fn new(time_col: Option, url: PathBuf) -> Self { + Self { time_col, url } + } + + pub fn load( + &self, + name: &str, + data_path: Option<&Path>, + domain: &ModelDomain, + ) -> Result { + let fp = if self.url.is_absolute() { + self.url.clone() + } else if let Some(data_path) = data_path { + data_path.join(self.url.as_path()) + } else { + self.url.clone() + }; + + let mut df = match fp.extension() { + Some(ext) => { + let ext = ext.to_str().map(|s| s.to_lowercase()); + match ext.as_deref() { + Some("csv") => CsvReader::from_path(fp)? + .infer_schema(None) + .with_try_parse_dates(true) + .has_header(true) + .finish()?, + Some("parquet") => { + todo!() + } + Some(other_ext) => { + return Err(TimeseriesError::TimeseriesUnsupportedFileFormat { + provider: "polars".to_string(), + fmt: other_ext.to_string(), + }) + } + None => { + return Err(TimeseriesError::TimeseriesUnparsableFileFormat { + provider: "polars".to_string(), + path: self.url.to_string_lossy().to_string(), + }) + } + } + } + None => { + return Err(TimeseriesError::TimeseriesUnparsableFileFormat { + provider: "polars".to_string(), + path: self.url.to_string_lossy().to_string(), + }) + } + }; + + df = match self.time_col { + Some(ref col) => align_and_resample(name, df, col, domain)?, + None => { + // If a time col has not been provided assume it is the first column + let first_col = df.get_column_names()[0].to_string(); + align_and_resample(name, df, first_col.as_str(), domain)? + } + }; + + Ok(df) + } +}