Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Initial support for Pandas timeseries backend. #240

Merged
merged 7 commits into from
Oct 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@ dependencies = [
"click"
]

[project.optional-dependencies]
excel = ["openpyxl"]
hdf = ["pytables"]


[build-system]
requires = ["maturin>=1.0,<2.0"]
build-backend = "maturin"
Expand Down
1 change: 1 addition & 0 deletions pywr-schema/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,4 @@ cbc = ["pywr-core/cbc"]
highs = ["pywr-core/highs"]
ipm-ocl = ["pywr-core/ipm-ocl"]
ipm-simd = ["pywr-core/ipm-simd"]
test-python = []
10 changes: 9 additions & 1 deletion pywr-schema/src/parameters/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ pub use profiles::{
DailyProfileParameter, MonthlyInterpDay, MonthlyProfileParameter, RadialBasisFunction, RbfProfileParameter,
RbfProfileVariableSettings, UniformDrawdownProfileParameter, WeeklyProfileParameter,
};
#[cfg(feature = "core")]
pub use python::try_json_value_into_py;
pub use python::{PythonParameter, PythonReturnType, PythonSource};
#[cfg(feature = "core")]
use pywr_core::{metric::MetricUsize, parameters::ParameterIndex};
Expand All @@ -64,6 +66,7 @@ use pywr_v1_schema::parameters::{
TableIndexEntry as TableIndexEntryV1,
};
use schemars::JsonSchema;
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use strum_macros::{Display, EnumDiscriminants, EnumString, IntoStaticStr, VariantNames};
pub use tables::TablesArrayParameter;
Expand Down Expand Up @@ -468,6 +471,7 @@ pub struct TimeseriesV1Data {
pub time_col: Option<String>,
pub column: Option<String>,
pub scenario: Option<String>,
pub pandas_kwargs: HashMap<String, serde_json::Value>,
}

impl From<DataFrameParameterV1> for TimeseriesV1Data {
Expand All @@ -481,7 +485,10 @@ impl From<DataFrameParameterV1> for TimeseriesV1Data {
};

let name = p.meta.and_then(|m| m.name);
let time_col = match p.pandas_kwargs.get("index_col") {

let mut pandas_kwargs = p.pandas_kwargs;

let time_col = match pandas_kwargs.remove("index_col") {
Some(v) => v.as_str().map(|s| s.to_string()),
None => None,
};
Expand All @@ -492,6 +499,7 @@ impl From<DataFrameParameterV1> for TimeseriesV1Data {
time_col,
column: p.column,
scenario: p.scenario,
pandas_kwargs,
}
}
}
Expand Down
113 changes: 113 additions & 0 deletions pywr-schema/src/test_models/timeseries_pandas.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
{
"metadata": {
"title": "Simple timeseries"
},
"timestepper": {
"start": "2021-01-01",
"end": "2021-12-31",
"timestep": 1
},
"network": {
"nodes": [
{
"meta": {
"name": "input2"
},
"type": "Input",
"max_flow": {
"type": "Parameter",
"name": "factored_flow"
}
},
{
"meta": {
"name": "input1"
},
"type": "Input",
"max_flow": {
"type": "Timeseries",
"name": "inflow",
"columns": {
"type": "Column",
"name": "inflow1"
}
}
},
{
"meta": {
"name": "link1"
},
"type": "Link"
},
{
"meta": {
"name": "output1"
},
"type": "Output",
"cost": {
"type": "Constant",
"value": -10
},
"max_flow": {
"type": "Parameter",
"name": "demand"
}
}
],
"edges": [
{
"from_node": "input1",
"to_node": "link1"
},
{
"from_node": "input2",
"to_node": "link1"
},
{
"from_node": "link1",
"to_node": "output1"
}
],
"parameters": [
{
"meta": {
"name": "demand"
},
"type": "Constant",
"value": 100.0
},
{
"meta": {
"name": "factored_flow"
},
"type": "Aggregated",
"agg_func": "product",
"metrics": [
{
"type": "Timeseries",
"name": "inflow"
},
{
"type": "Constant",
"value": 0.5
}
]
}
],
"timeseries": [
{
"meta": {
"name": "inflow"
},
"provider": {
"type": "Pandas",
"time_col": "date",
"url": "inflow.csv",
"kwargs": {
"dayfirst": true
}
}
}
]
}
}
11 changes: 7 additions & 4 deletions pywr-schema/src/test_models/v1/timeseries-converted.json
Original file line number Diff line number Diff line change
Expand Up @@ -121,14 +121,17 @@
"name": "inflow"
},
"provider": {
"type": "Polars",
"infer_schema_length": null,
"type": "Pandas",
"time_col": null,
"url": "inflow.csv"
"url": "inflow.csv",
"kwargs": {
"parse_dates": true,
"dayfirst": true
}
}
}
],
"metric_sets": null,
"outputs": null
}
}
}
70 changes: 60 additions & 10 deletions pywr-schema/src/timeseries/mod.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,20 @@
#[cfg(feature = "core")]
mod align_and_resample;

mod pandas;
mod polars_dataset;

use self::polars_dataset::PolarsDataset;
use crate::parameters::{ParameterMeta, TimeseriesV1Data, TimeseriesV1Source};
use crate::visit::VisitPaths;
use crate::ConversionError;
#[cfg(feature = "core")]
use ndarray::Array2;
pub use pandas::PandasDataset;
#[cfg(feature = "core")]
use polars::error::PolarsError;
#[cfg(feature = "core")]
use polars::prelude::{DataFrame, DataType::Float64, Float64Type, IndexOrder};
pub use polars_dataset::PolarsDataset;
#[cfg(feature = "core")]
use pywr_core::{
models::ModelDomain,
Expand Down Expand Up @@ -42,7 +45,8 @@ pub enum TimeseriesError {
DataFrameTimestepMismatch(String),
#[error("A timeseries dataframe with the name '{0}' already exists.")]
TimeseriesDataframeAlreadyExists(String),
#[error("The timeseries dataset '{0}' has more than one column of data so a column or scenario name must be provided for any reference")]
#[error("The timeseries dataset '{0}' has more than one column of data so a column or scenario name must be provided for any reference"
)]
TimeseriesColumnOrScenarioRequired(String),
#[error("The timeseries dataset '{0}' has no columns")]
TimeseriesDataframeHasNoColumns(String),
Expand All @@ -57,7 +61,7 @@ pub enum TimeseriesError {
#[derive(serde::Deserialize, serde::Serialize, Debug, Clone, JsonSchema)]
#[serde(tag = "type")]
enum TimeseriesProvider {
Pandas,
Pandas(PandasDataset),
Polars(PolarsDataset),
}

Expand All @@ -73,7 +77,7 @@ impl Timeseries {
pub fn load(&self, domain: &ModelDomain, data_path: Option<&Path>) -> Result<DataFrame, TimeseriesError> {
match &self.provider {
TimeseriesProvider::Polars(dataset) => dataset.load(self.meta.name.as_str(), data_path, domain),
TimeseriesProvider::Pandas => todo!(),
TimeseriesProvider::Pandas(dataset) => dataset.load(self.meta.name.as_str(), data_path, domain),
}
}

Expand All @@ -86,14 +90,14 @@ impl VisitPaths for Timeseries {
fn visit_paths<F: FnMut(&Path)>(&self, visitor: &mut F) {
match &self.provider {
TimeseriesProvider::Polars(dataset) => dataset.visit_paths(visitor),
TimeseriesProvider::Pandas => todo!(),
TimeseriesProvider::Pandas(dataset) => dataset.visit_paths(visitor),
}
}

fn visit_paths_mut<F: FnMut(&mut PathBuf)>(&mut self, visitor: &mut F) {
match &mut self.provider {
TimeseriesProvider::Polars(dataset) => dataset.visit_paths_mut(visitor),
TimeseriesProvider::Pandas => todo!(),
TimeseriesProvider::Pandas(dataset) => dataset.visit_paths_mut(visitor),
}
}
}
Expand Down Expand Up @@ -222,6 +226,9 @@ impl LoadedTimeseriesCollection {
}
}

/// Convert timeseries inputs to this schema.
///
/// The conversions
pub fn convert_from_v1_data(
df_data: Vec<TimeseriesV1Data>,
v1_tables: &Option<TableVec>,
Expand All @@ -239,13 +246,18 @@ pub fn convert_from_v1_data(
}

let time_col = None;
let provider = PolarsDataset::new(time_col, table.url.clone(), None);

let provider = PandasDataset {
time_col,
url: table.url.clone(),
kwargs: Some(data.pandas_kwargs),
};

ts.insert(
name.clone(),
Timeseries {
meta: ParameterMeta { name, comment: None },
provider: TimeseriesProvider::Polars(provider),
provider: TimeseriesProvider::Pandas(provider),
},
);
}
Expand All @@ -263,13 +275,18 @@ pub fn convert_from_v1_data(
continue;
}

let provider = PolarsDataset::new(data.time_col, url, None);
let provider = PandasDataset {
time_col: data.time_col,
url,
kwargs: Some(data.pandas_kwargs),
};

ts.insert(
name.clone(),
Timeseries {
meta: ParameterMeta { name, comment: None },
provider: TimeseriesProvider::Polars(provider),

provider: TimeseriesProvider::Pandas(provider),
},
);
}
Expand Down Expand Up @@ -313,4 +330,37 @@ mod tests {

run_all_solvers(&model, &[], &[])
}

fn model_pandas_str() -> &'static str {
include_str!("../test_models/timeseries_pandas.json")
}

#[test]
fn test_timeseries_pandas() {
let data = model_pandas_str();
#[cfg(not(feature = "test-python"))]
let _: PywrModel = serde_json::from_str(data).unwrap();

// Can only build this model within a Python environment that has pandas.
#[cfg(feature = "test-python")]
{
let cargo_manifest_dir = env!("CARGO_MANIFEST_DIR");
let schema: PywrModel = serde_json::from_str(data).unwrap();
let model_dir = PathBuf::from(cargo_manifest_dir).join("src/test_models");
let mut model = schema.build_model(Some(model_dir.as_path()), None).unwrap();

let expected = Array::from_shape_fn((365, 1), |(x, _)| {
let month_day = NaiveDate::from_yo_opt(2021, (x + 1) as u32).unwrap().day() as f64;
month_day + month_day * 0.5
});

let idx = model.network().get_node_by_name("output1", None).unwrap().index();

let recorder =
AssertionRecorder::new("output-flow", MetricF64::NodeInFlow(idx), expected.clone(), None, None);
model.network_mut().add_recorder(Box::new(recorder)).unwrap();

run_all_solvers(&model, &[], &[])
}
}
}
Loading