From 38f16d4d80e49c041843ebad25c3dc7ba5811405 Mon Sep 17 00:00:00 2001 From: James Batchelor Date: Sat, 23 Mar 2024 11:20:05 +0000 Subject: [PATCH] response to review comments --- pywr-core/src/timestep.rs | 3 ++- pywr-schema/src/error.rs | 1 - pywr-schema/src/model.rs | 22 +++++++------------ pywr-schema/src/nodes/mod.rs | 13 ++++++++++- pywr-schema/src/parameters/control_curves.rs | 2 +- pywr-schema/src/parameters/mod.rs | 8 ++++++- pywr-schema/src/parameters/offset.rs | 2 +- .../src/timeseries/align_and_resample.rs | 14 ++++++------ pywr-schema/src/timeseries/mod.rs | 11 ++++++++-- 9 files changed, 47 insertions(+), 29 deletions(-) diff --git a/pywr-core/src/timestep.rs b/pywr-core/src/timestep.rs index 48c71e38..55adf34d 100644 --- a/pywr-core/src/timestep.rs +++ b/pywr-core/src/timestep.rs @@ -54,11 +54,12 @@ impl PywrDuration { } } - // Returns the fractional number of days in the duration. + /// Returns the fractional number of days in the duration. pub fn fractional_days(&self) -> f64 { self.0.num_seconds() as f64 / SECS_IN_DAY as f64 } + /// Returns the number of nanoseconds in the duration. pub fn whole_nanoseconds(&self) -> Option { self.0.num_nanoseconds() } diff --git a/pywr-schema/src/error.rs b/pywr-schema/src/error.rs index 06c146ab..ad0d216c 100644 --- a/pywr-schema/src/error.rs +++ b/pywr-schema/src/error.rs @@ -1,7 +1,6 @@ use crate::data_tables::TableError; use crate::nodes::NodeAttribute; use crate::timeseries::TimeseriesError; -use polars::error::PolarsError; use pyo3::exceptions::PyRuntimeError; use pyo3::PyErr; use thiserror::Error; diff --git a/pywr-schema/src/model.rs b/pywr-schema/src/model.rs index c32fc3da..7f6c6af6 100644 --- a/pywr-schema/src/model.rs +++ b/pywr-schema/src/model.rs @@ -6,19 +6,12 @@ use crate::error::{ConversionError, SchemaError}; use crate::metric_sets::MetricSet; use crate::nodes::NodeAndTimeseries; use crate::outputs::Output; -use crate::parameters::{ - convert_parameter_v1_to_v2, DataFrameColumns, DynamicFloatValue, MetricFloatReference, MetricFloatValue, - TimeseriesReference, TryIntoV2Parameter, -}; +use crate::parameters::{convert_parameter_v1_to_v2, MetricFloatReference}; use crate::timeseries::{convert_from_v1_data, LoadedTimeseriesCollection, Timeseries}; use chrono::{NaiveDate, NaiveDateTime, NaiveTime}; -use polars::frame::DataFrame; use pywr_core::models::ModelDomain; use pywr_core::timestep::TimestepDuration; use pywr_core::PywrError; -use serde::de; -use std::collections::HashSet; -use std::hash::Hash; use std::path::{Path, PathBuf}; use std::str::FromStr; @@ -445,6 +438,7 @@ impl PywrModel { Timestepper::default() }); + // Extract nodes and any timeseries data from the v1 nodes let nodes_and_ts: Vec = v1 .nodes .clone() @@ -468,16 +462,16 @@ impl PywrModel { let edges = v1.edges.into_iter().map(|e| e.into()).collect(); - let (parameters, param_ts_data) = if let Some(v1_parameters) = v1.parameters { + let parameters = if let Some(v1_parameters) = v1.parameters { let mut unnamed_count: usize = 0; - let (parameters, param_ts_data) = convert_parameter_v1_to_v2(v1_parameters, &mut unnamed_count, &mut errors); - (Some(parameters), Some(param_ts_data)) + let (parameters, param_ts_data) = + convert_parameter_v1_to_v2(v1_parameters, &mut unnamed_count, &mut errors); + ts_data.extend(param_ts_data); + Some(parameters) } else { - (None, None) + None }; - ts_data.extend(param_ts_data.into_iter().flatten()); - let timeseries = if !ts_data.is_empty() { let ts = convert_from_v1_data(&ts_data, &v1.tables); Some(ts) diff --git a/pywr-schema/src/nodes/mod.rs b/pywr-schema/src/nodes/mod.rs index 32b20bfa..40927843 100644 --- a/pywr-schema/src/nodes/mod.rs +++ b/pywr-schema/src/nodes/mod.rs @@ -677,6 +677,10 @@ impl TryFrom> for Node { } } +/// struct that acts as a container for a node and any associated timeseries data. +/// +/// v1 nodes may contain inline DataFrame parameters from which data needs to be extract +/// to created timeseries entries in the schema. #[derive(Debug)] pub struct NodeAndTimeseries { pub node: Node, @@ -689,11 +693,14 @@ impl TryFrom for NodeAndTimeseries { fn try_from(v1: NodeV1) -> Result { let mut ts_vec = Vec::new(); let mut unnamed_count: usize = 0; + + // extract timeseries data for all inline DataFame parameters included in the node. for param_value in v1.parameters().values() { ts_vec.extend(extract_timeseries(param_value, v1.name(), &mut unnamed_count)); } - let timeseries = if ts_vec.is_empty() { None } else { Some(ts_vec) }; + + // Now convert the node to the v2 schema representation let node = Node::try_from(v1)?; Ok(Self { node, timeseries }) } @@ -715,6 +722,8 @@ fn extract_timeseries( if let ParameterV1::Core(CoreParameterV1::DataFrame(df_param)) = p.as_ref() { let mut ts_data: TimeseriesV1Data = df_param.clone().into(); if ts_data.name.is_none() { + // Because the parameter could contain multiple inline DataFrame parameters use the unnamed_count + // to create a unique name. let name = format!("{}-p{}.timeseries", name, unnamed_count); *unnamed_count += 1; ts_data.name = Some(name); @@ -742,6 +751,8 @@ fn extract_timeseries( if let ParameterV1::Core(CoreParameterV1::DataFrame(df_param)) = p.as_ref() { let mut ts_data: TimeseriesV1Data = df_param.clone().into(); if ts_data.name.is_none() { + // Because the parameter could contain multiple inline DataFrame parameters use the unnamed_count + // to create a unique name. let name = format!("{}-p{}.timeseries", name, unnamed_count); *unnamed_count += 1; ts_data.name = Some(name); diff --git a/pywr-schema/src/parameters/control_curves.rs b/pywr-schema/src/parameters/control_curves.rs index a0869b91..09a2c1ec 100644 --- a/pywr-schema/src/parameters/control_curves.rs +++ b/pywr-schema/src/parameters/control_curves.rs @@ -5,7 +5,7 @@ use crate::nodes::NodeAttribute; use crate::parameters::{ DynamicFloatValue, IntoV2Parameter, NodeReference, ParameterMeta, TryFromV1Parameter, TryIntoV2Parameter, }; -use crate::timeseries::{self, LoadedTimeseriesCollection}; +use crate::timeseries::LoadedTimeseriesCollection; use pywr_core::models::ModelDomain; use pywr_core::parameters::ParameterIndex; use pywr_v1_schema::parameters::{ diff --git a/pywr-schema/src/parameters/mod.rs b/pywr-schema/src/parameters/mod.rs index 65158663..e3b143db 100644 --- a/pywr-schema/src/parameters/mod.rs +++ b/pywr-schema/src/parameters/mod.rs @@ -433,7 +433,7 @@ pub fn convert_parameter_v1_to_v2( ) -> (Vec, Vec) { let param_or_ts: Vec = v1_parameters .into_iter() - .filter_map(|p| match p.try_into_v2_parameter(None, unnamed_count){ + .filter_map(|p| match p.try_into_v2_parameter(None, unnamed_count) { Ok(pt) => Some(pt), Err(e) => { errors.push(e); @@ -472,6 +472,7 @@ enum ParameterOrTimeseries { pub struct TimeseriesV1Data { pub name: Option, pub source: TimeseriesV1Source, + pub time_col: Option, pub column: Option, pub scenario: Option, } @@ -487,10 +488,15 @@ impl From for TimeseriesV1Data { }; let name = p.meta.and_then(|m| m.name); + let time_col = match p.pandas_kwargs.get("index_col") { + Some(v) => v.as_str().map(|s| s.to_string()), + None => None, + }; Self { name, source, + time_col, column: p.column, scenario: p.scenario, } diff --git a/pywr-schema/src/parameters/offset.rs b/pywr-schema/src/parameters/offset.rs index c49c8711..4e74dfee 100644 --- a/pywr-schema/src/parameters/offset.rs +++ b/pywr-schema/src/parameters/offset.rs @@ -1,5 +1,5 @@ use crate::data_tables::LoadedTableCollection; -use crate::parameters::{ConstantValue, DynamicFloatValue, DynamicFloatValueType, ParameterMeta, VariableSettings}; +use crate::parameters::{ConstantValue, DynamicFloatValue, DynamicFloatValueType, ParameterMeta}; use crate::timeseries::LoadedTimeseriesCollection; use pywr_core::parameters::ParameterIndex; diff --git a/pywr-schema/src/timeseries/align_and_resample.rs b/pywr-schema/src/timeseries/align_and_resample.rs index 52dbad6e..17411dcb 100644 --- a/pywr-schema/src/timeseries/align_and_resample.rs +++ b/pywr-schema/src/timeseries/align_and_resample.rs @@ -1,4 +1,3 @@ -use chrono::TimeDelta; use polars::{prelude::*, series::ops::NullBehavior}; use pywr_core::models::ModelDomain; use std::{cmp::Ordering, ops::Deref}; @@ -31,8 +30,7 @@ pub fn align_and_resample( let durations = durations.column("duration")?.duration()?.deref(); if durations.len() > 1 { - // Non-uniform timestep are not yet supported - todo!(); + todo!("Non-uniform timestep are not yet supported"); } let timeseries_duration = match durations.get(0) { @@ -44,7 +42,7 @@ pub fn align_and_resample( .time() .step_duration() .whole_nanoseconds() - .expect("Nano seconds could not be extracted from model step duration"); + .ok_or(TimeseriesError::NoDurationNanoSeconds)?; let df = match model_duration.cmp(×eries_duration) { Ordering::Greater => { @@ -78,7 +76,10 @@ pub fn align_and_resample( let df = slice_end(df, time_col, domain)?; - // TODO check df length equals number of model timesteps + if df.height() != domain.time().timesteps().len() { + return Err(TimeseriesError::DataFrameTimestepMismatch(name.to_string())); + } + Ok(df) } @@ -96,7 +97,6 @@ fn slice_end(df: DataFrame, time_col: &str, domain: &ModelDomain) -> Result = vec![1.0; 31]; + let values: Vec = (1..32).map(|x| x as f64).collect(); let mut df = df!( "time" => time, diff --git a/pywr-schema/src/timeseries/mod.rs b/pywr-schema/src/timeseries/mod.rs index 02a82433..878eee4c 100644 --- a/pywr-schema/src/timeseries/mod.rs +++ b/pywr-schema/src/timeseries/mod.rs @@ -9,7 +9,6 @@ use pywr_core::models::ModelDomain; use pywr_core::parameters::{Array1Parameter, Array2Parameter, ParameterIndex}; use pywr_core::PywrError; use pywr_v1_schema::tables::TableVec; -use std::sync::Arc; use std::{collections::HashMap, path::Path}; use thiserror::Error; @@ -31,6 +30,12 @@ pub enum TimeseriesError { TimeseriesUnparsableFileFormat { provider: String, path: String }, #[error("A scenario group with name '{0}' was not found")] ScenarioGroupNotFound(String), + #[error("Duration could not be represented as nanoseconds")] + NoDurationNanoSeconds, + #[error("The length of the resampled timeseries dataframe '{0}' does not match the number of model timesteps.")] + DataFrameTimestepMismatch(String), + #[error("A timeseries dataframe with the name '{0}' already exists.")] + TimeseriesDataframeAlreadyExists(String), #[error("Polars error: {0}")] PolarsError(#[from] PolarsError), #[error("Pywr core error: {0}")] @@ -79,7 +84,9 @@ impl LoadedTimeseriesCollection { if let Some(timeseries_defs) = timeseries_defs { for ts in timeseries_defs { let df = ts.load(domain, data_path)?; - // TODO error if key already exists + if timeseries.contains_key(&ts.meta.name) { + return Err(TimeseriesError::TimeseriesDataframeAlreadyExists(ts.meta.name.clone())); + } timeseries.insert(ts.meta.name.clone(), df); } }