Skip to content

Commit

Permalink
response to review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Batch21 committed Mar 23, 2024
1 parent bbe8187 commit 38f16d4
Show file tree
Hide file tree
Showing 9 changed files with 47 additions and 29 deletions.
3 changes: 2 additions & 1 deletion pywr-core/src/timestep.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,12 @@ impl PywrDuration {
}
}

// Returns the fractional number of days in the duration.
/// Returns the fractional number of days in the duration.
pub fn fractional_days(&self) -> f64 {
self.0.num_seconds() as f64 / SECS_IN_DAY as f64
}

/// Returns the number of nanoseconds in the duration.
pub fn whole_nanoseconds(&self) -> Option<i64> {
self.0.num_nanoseconds()
}
Expand Down
1 change: 0 additions & 1 deletion pywr-schema/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use crate::data_tables::TableError;
use crate::nodes::NodeAttribute;
use crate::timeseries::TimeseriesError;
use polars::error::PolarsError;
use pyo3::exceptions::PyRuntimeError;
use pyo3::PyErr;
use thiserror::Error;
Expand Down
22 changes: 8 additions & 14 deletions pywr-schema/src/model.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,12 @@ use crate::error::{ConversionError, SchemaError};
use crate::metric_sets::MetricSet;
use crate::nodes::NodeAndTimeseries;
use crate::outputs::Output;
use crate::parameters::{
convert_parameter_v1_to_v2, DataFrameColumns, DynamicFloatValue, MetricFloatReference, MetricFloatValue,
TimeseriesReference, TryIntoV2Parameter,
};
use crate::parameters::{convert_parameter_v1_to_v2, MetricFloatReference};
use crate::timeseries::{convert_from_v1_data, LoadedTimeseriesCollection, Timeseries};
use chrono::{NaiveDate, NaiveDateTime, NaiveTime};
use polars::frame::DataFrame;
use pywr_core::models::ModelDomain;
use pywr_core::timestep::TimestepDuration;
use pywr_core::PywrError;
use serde::de;
use std::collections::HashSet;
use std::hash::Hash;
use std::path::{Path, PathBuf};
use std::str::FromStr;

Expand Down Expand Up @@ -445,6 +438,7 @@ impl PywrModel {
Timestepper::default()
});

// Extract nodes and any timeseries data from the v1 nodes
let nodes_and_ts: Vec<NodeAndTimeseries> = v1
.nodes
.clone()
Expand All @@ -468,16 +462,16 @@ impl PywrModel {

let edges = v1.edges.into_iter().map(|e| e.into()).collect();

let (parameters, param_ts_data) = if let Some(v1_parameters) = v1.parameters {
let parameters = if let Some(v1_parameters) = v1.parameters {
let mut unnamed_count: usize = 0;
let (parameters, param_ts_data) = convert_parameter_v1_to_v2(v1_parameters, &mut unnamed_count, &mut errors);
(Some(parameters), Some(param_ts_data))
let (parameters, param_ts_data) =
convert_parameter_v1_to_v2(v1_parameters, &mut unnamed_count, &mut errors);
ts_data.extend(param_ts_data);
Some(parameters)
} else {
(None, None)
None
};

ts_data.extend(param_ts_data.into_iter().flatten());

let timeseries = if !ts_data.is_empty() {
let ts = convert_from_v1_data(&ts_data, &v1.tables);
Some(ts)
Expand Down
13 changes: 12 additions & 1 deletion pywr-schema/src/nodes/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -677,6 +677,10 @@ impl TryFrom<Box<CoreNodeV1>> for Node {
}
}

/// struct that acts as a container for a node and any associated timeseries data.
///
/// v1 nodes may contain inline DataFrame parameters from which data needs to be extract
/// to created timeseries entries in the schema.
#[derive(Debug)]
pub struct NodeAndTimeseries {
pub node: Node,
Expand All @@ -689,11 +693,14 @@ impl TryFrom<NodeV1> for NodeAndTimeseries {
fn try_from(v1: NodeV1) -> Result<Self, Self::Error> {
let mut ts_vec = Vec::new();
let mut unnamed_count: usize = 0;

// extract timeseries data for all inline DataFame parameters included in the node.
for param_value in v1.parameters().values() {
ts_vec.extend(extract_timeseries(param_value, v1.name(), &mut unnamed_count));
}

let timeseries = if ts_vec.is_empty() { None } else { Some(ts_vec) };

// Now convert the node to the v2 schema representation
let node = Node::try_from(v1)?;
Ok(Self { node, timeseries })
}
Expand All @@ -715,6 +722,8 @@ fn extract_timeseries(
if let ParameterV1::Core(CoreParameterV1::DataFrame(df_param)) = p.as_ref() {
let mut ts_data: TimeseriesV1Data = df_param.clone().into();
if ts_data.name.is_none() {
// Because the parameter could contain multiple inline DataFrame parameters use the unnamed_count
// to create a unique name.
let name = format!("{}-p{}.timeseries", name, unnamed_count);
*unnamed_count += 1;
ts_data.name = Some(name);
Expand Down Expand Up @@ -742,6 +751,8 @@ fn extract_timeseries(
if let ParameterV1::Core(CoreParameterV1::DataFrame(df_param)) = p.as_ref() {
let mut ts_data: TimeseriesV1Data = df_param.clone().into();
if ts_data.name.is_none() {
// Because the parameter could contain multiple inline DataFrame parameters use the unnamed_count
// to create a unique name.
let name = format!("{}-p{}.timeseries", name, unnamed_count);
*unnamed_count += 1;
ts_data.name = Some(name);
Expand Down
2 changes: 1 addition & 1 deletion pywr-schema/src/parameters/control_curves.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::nodes::NodeAttribute;
use crate::parameters::{
DynamicFloatValue, IntoV2Parameter, NodeReference, ParameterMeta, TryFromV1Parameter, TryIntoV2Parameter,
};
use crate::timeseries::{self, LoadedTimeseriesCollection};
use crate::timeseries::LoadedTimeseriesCollection;
use pywr_core::models::ModelDomain;
use pywr_core::parameters::ParameterIndex;
use pywr_v1_schema::parameters::{
Expand Down
8 changes: 7 additions & 1 deletion pywr-schema/src/parameters/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,7 @@ pub fn convert_parameter_v1_to_v2(
) -> (Vec<Parameter>, Vec<TimeseriesV1Data>) {
let param_or_ts: Vec<ParameterOrTimeseries> = v1_parameters
.into_iter()
.filter_map(|p| match p.try_into_v2_parameter(None, unnamed_count){
.filter_map(|p| match p.try_into_v2_parameter(None, unnamed_count) {
Ok(pt) => Some(pt),
Err(e) => {
errors.push(e);
Expand Down Expand Up @@ -472,6 +472,7 @@ enum ParameterOrTimeseries {
pub struct TimeseriesV1Data {
pub name: Option<String>,
pub source: TimeseriesV1Source,
pub time_col: Option<String>,
pub column: Option<String>,
pub scenario: Option<String>,
}
Expand All @@ -487,10 +488,15 @@ impl From<DataFrameParameterV1> for TimeseriesV1Data {
};

let name = p.meta.and_then(|m| m.name);
let time_col = match p.pandas_kwargs.get("index_col") {
Some(v) => v.as_str().map(|s| s.to_string()),
None => None,
};

Self {
name,
source,
time_col,
column: p.column,
scenario: p.scenario,
}
Expand Down
2 changes: 1 addition & 1 deletion pywr-schema/src/parameters/offset.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::data_tables::LoadedTableCollection;
use crate::parameters::{ConstantValue, DynamicFloatValue, DynamicFloatValueType, ParameterMeta, VariableSettings};
use crate::parameters::{ConstantValue, DynamicFloatValue, DynamicFloatValueType, ParameterMeta};
use crate::timeseries::LoadedTimeseriesCollection;
use pywr_core::parameters::ParameterIndex;

Expand Down
14 changes: 7 additions & 7 deletions pywr-schema/src/timeseries/align_and_resample.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use chrono::TimeDelta;
use polars::{prelude::*, series::ops::NullBehavior};
use pywr_core::models::ModelDomain;
use std::{cmp::Ordering, ops::Deref};
Expand Down Expand Up @@ -31,8 +30,7 @@ pub fn align_and_resample(
let durations = durations.column("duration")?.duration()?.deref();

if durations.len() > 1 {
// Non-uniform timestep are not yet supported
todo!();
todo!("Non-uniform timestep are not yet supported");
}

let timeseries_duration = match durations.get(0) {
Expand All @@ -44,7 +42,7 @@ pub fn align_and_resample(
.time()
.step_duration()
.whole_nanoseconds()
.expect("Nano seconds could not be extracted from model step duration");
.ok_or(TimeseriesError::NoDurationNanoSeconds)?;

let df = match model_duration.cmp(&timeseries_duration) {
Ordering::Greater => {
Expand Down Expand Up @@ -78,7 +76,10 @@ pub fn align_and_resample(

let df = slice_end(df, time_col, domain)?;

// TODO check df length equals number of model timesteps
if df.height() != domain.time().timesteps().len() {
return Err(TimeseriesError::DataFrameTimestepMismatch(name.to_string()));
}

Ok(df)
}

Expand All @@ -96,7 +97,6 @@ fn slice_end(df: DataFrame, time_col: &str, domain: &ModelDomain) -> Result<Data

#[cfg(test)]
mod tests {
//use polars::{datatypes::TimeUnit, time::{ClosedWindow, Duration}};
use chrono::{NaiveDate, NaiveDateTime};
use polars::prelude::*;
use pywr_core::{
Expand Down Expand Up @@ -129,7 +129,7 @@ mod tests {
None,
)
.unwrap();
//let values: Vec<f64> = vec![1.0; 31];

let values: Vec<f64> = (1..32).map(|x| x as f64).collect();
let mut df = df!(
"time" => time,
Expand Down
11 changes: 9 additions & 2 deletions pywr-schema/src/timeseries/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use pywr_core::models::ModelDomain;
use pywr_core::parameters::{Array1Parameter, Array2Parameter, ParameterIndex};
use pywr_core::PywrError;
use pywr_v1_schema::tables::TableVec;
use std::sync::Arc;
use std::{collections::HashMap, path::Path};
use thiserror::Error;

Expand All @@ -31,6 +30,12 @@ pub enum TimeseriesError {
TimeseriesUnparsableFileFormat { provider: String, path: String },
#[error("A scenario group with name '{0}' was not found")]
ScenarioGroupNotFound(String),
#[error("Duration could not be represented as nanoseconds")]
NoDurationNanoSeconds,
#[error("The length of the resampled timeseries dataframe '{0}' does not match the number of model timesteps.")]
DataFrameTimestepMismatch(String),
#[error("A timeseries dataframe with the name '{0}' already exists.")]
TimeseriesDataframeAlreadyExists(String),
#[error("Polars error: {0}")]
PolarsError(#[from] PolarsError),
#[error("Pywr core error: {0}")]
Expand Down Expand Up @@ -79,7 +84,9 @@ impl LoadedTimeseriesCollection {
if let Some(timeseries_defs) = timeseries_defs {
for ts in timeseries_defs {
let df = ts.load(domain, data_path)?;
// TODO error if key already exists
if timeseries.contains_key(&ts.meta.name) {
return Err(TimeseriesError::TimeseriesDataframeAlreadyExists(ts.meta.name.clone()));
}
timeseries.insert(ts.meta.name.clone(), df);
}
}
Expand Down

0 comments on commit 38f16d4

Please sign in to comment.