Skip to content

Commit

Permalink
feat: updates to use of durations when resampling and aligning timese…
Browse files Browse the repository at this point in the history
…ries

Main change is to switch to using milliseconds as the main unit for comparison and resampling. Polars is also updated to v0.40.0
  • Loading branch information
Batch21 committed Jun 13, 2024
1 parent 70ba15d commit 5d727b4
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 27 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ thiserror = "1.0.25"
num = "0.4.0"
float-cmp = "0.9.0"
ndarray = "0.15.3"
polars = { version = "0.39", features = ["lazy", "rows", "ndarray"] }
polars = { version = "0.40", features = ["lazy", "rows", "ndarray"] }
pyo3-polars = "0.14"
pyo3 = { version = "0.21", default-features = false }
pyo3-log = "0.10"
Expand Down
2 changes: 2 additions & 0 deletions pywr-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,8 @@ pub enum PywrError {
TimestepRangeGenerationError(String),
#[error("Could not create timesteps for frequency '{0}'")]
TimestepGenerationError(String),
#[error("Pywr does not currently support timesteps of varying duration")]
TimestepDurationMismatch,
#[error("aggregation error: {0}")]
Aggregation(#[from] AggregationError),
}
Expand Down
55 changes: 47 additions & 8 deletions pywr-core/src/timestep.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ use std::ops::Add;
use crate::PywrError;

const SECS_IN_DAY: i64 = 60 * 60 * 24;
const MILLISECS_IN_DAY: i64 = 1000 * SECS_IN_DAY;
const MILLISECS_IN_HOUR: i64 = 1000 * 60 * 60;
const MILLISECS_IN_MINUTE: i64 = 1000 * 60;
const MILLISECS_IN_SECOND: i64 = 1000;

/// A newtype for `chrono::TimeDelta` that provides a couple of useful convenience methods.
#[pyclass]
Expand Down Expand Up @@ -59,9 +63,37 @@ impl PywrDuration {
self.0.num_seconds() as f64 / SECS_IN_DAY as f64
}

/// Returns the number of nanoseconds in the duration.
pub fn whole_nanoseconds(&self) -> Option<i64> {
self.0.num_nanoseconds()
/// Returns the number of milliseconds in the duration.
pub fn milliseconds(&self) -> i64 {
self.0.num_milliseconds()
}

/// Convert the duration to a string representation that can be parsed by polars
/// see: https://docs.rs/polars/latest/polars/prelude/struct.Duration.html#method.parse
pub fn duration_string(&self) -> String {
let milliseconds = self.milliseconds();
let mut duration = String::new();
let days = milliseconds / MILLISECS_IN_DAY;
if days > 0 {
duration.push_str(&format!("{}d", days));
}
let hours = (milliseconds % MILLISECS_IN_DAY) / MILLISECS_IN_HOUR;
if hours > 0 {
duration.push_str(&format!("{}h", hours));
}
let minutes = (milliseconds % MILLISECS_IN_HOUR) / MILLISECS_IN_MINUTE;
if minutes > 0 {
duration.push_str(&format!("{}m", minutes));
}
let seconds = (milliseconds % MILLISECS_IN_MINUTE) / MILLISECS_IN_SECOND;
if seconds > 0 {
duration.push_str(&format!("{}s", seconds));
}
let milliseconds = milliseconds % MILLISECS_IN_SECOND;
if milliseconds > 0 {
duration.push_str(&format!("{}ms", milliseconds));
}
duration
}
}

Expand Down Expand Up @@ -195,15 +227,13 @@ impl Timestepper {
#[derive(Debug)]
pub struct TimeDomain {
timesteps: Vec<Timestep>,
duration: PywrDuration,
}

impl TimeDomain {
/// Return the duration of each time-step.
pub fn step_duration(&self) -> PywrDuration {
// This relies on the assumption that all time-steps are the same length.
// Ideally, this invariant would be refactored to have the duration stored here in `TimeDomain`,
// rather than in `Timestep`.
self.timesteps.first().expect("Not time-steps defined.").duration
self.duration
}

pub fn timesteps(&self) -> &[Timestep] {
Expand Down Expand Up @@ -233,7 +263,11 @@ impl TryFrom<Timestepper> for TimeDomain {

fn try_from(value: Timestepper) -> Result<Self, Self::Error> {
let timesteps = value.timesteps()?;
Ok(Self { timesteps })
let duration = timesteps.first().expect("No time-steps defined.").duration;
match timesteps.iter().all(|t| t.duration == duration) {
true => Ok(Self { timesteps, duration }),
false => Err(PywrError::TimestepDurationMismatch),
}
}
}

Expand Down Expand Up @@ -313,23 +347,28 @@ mod test {
let duration = PywrDuration::days(5);
assert_eq!(duration.whole_days(), Some(5));
assert_eq!(duration.fractional_days(), 5.0);
assert_eq!(duration.duration_string(), String::from("5d"));

let duration: PywrDuration = TimeDelta::hours(12).into();
assert_eq!(duration.whole_days(), None);
assert_eq!(duration.fractional_days(), 0.5);
assert_eq!(duration.duration_string(), String::from("12h"));

let duration: PywrDuration = TimeDelta::minutes(30).into();
assert_eq!(duration.whole_days(), None);
assert_eq!(duration.fractional_days(), 1.0 / 48.0);
assert_eq!(duration.duration_string(), String::from("30m"));

let duration_secs = SECS_IN_DAY + 1;
let duration: PywrDuration = TimeDelta::seconds(duration_secs).into();
assert_eq!(duration.whole_days(), None);
assert_eq!(duration.fractional_days(), duration_secs as f64 / SECS_IN_DAY as f64);
assert_eq!(duration.duration_string(), String::from("1d1s"));

let duration_secs = SECS_IN_DAY - 1;
let duration: PywrDuration = TimeDelta::seconds(duration_secs).into();
assert_eq!(duration.whole_days(), None);
assert_eq!(duration.fractional_days(), duration_secs as f64 / SECS_IN_DAY as f64);
assert_eq!(duration.duration_string(), String::from("23h59m59s"));
}
}
28 changes: 15 additions & 13 deletions pywr-schema/src/timeseries/align_and_resample.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ pub fn align_and_resample(
let df = df
.clone()
.lazy()
.with_columns([col(time_col).cast(DataType::Datetime(TimeUnit::Nanoseconds, None))])
.with_columns([col(time_col).cast(DataType::Datetime(TimeUnit::Milliseconds, None))])
.collect()?
.sort([time_col], sort_options)?;

Expand All @@ -42,13 +42,10 @@ pub fn align_and_resample(
None => return Err(TimeseriesError::TimeseriesDurationNotFound(name.to_string())),
};

let model_duration = domain
.time()
.step_duration()
.whole_nanoseconds()
.ok_or(TimeseriesError::NoDurationNanoSeconds)?;
let model_duration = domain.time().step_duration();
let model_duration_string = model_duration.duration_string();

let df = match model_duration.cmp(&timeseries_duration) {
let df = match model_duration.milliseconds().cmp(&timeseries_duration) {
Ordering::Greater => {
// Downsample
df.clone()
Expand All @@ -57,9 +54,9 @@ pub fn align_and_resample(
col(time_col),
[],
DynamicGroupOptions {
every: Duration::new(model_duration),
period: Duration::new(model_duration),
offset: Duration::new(0),
every: Duration::parse(model_duration_string.as_str()),
period: Duration::parse(model_duration_string.as_str()),
offset: Duration::parse("0d"),
start_by: StartBy::DataPoint,
..Default::default()
},
Expand All @@ -72,7 +69,12 @@ pub fn align_and_resample(
// TODO: this does not extend the dataframe beyond its original end date. Should it do when using a forward fill strategy?
// The df could be extend by the length of the duration it is being resampled to.
df.clone()
.upsample::<[String; 0]>([], "time", Duration::new(model_duration), Duration::new(0))?
.upsample::<[String; 0]>(
[],
"time",
Duration::parse(model_duration_string.as_str()),
Duration::parse("0d"),
)?
.fill_null(FillNullStrategy::Forward(None))?
}
Ordering::Equal => df,
Expand Down Expand Up @@ -150,7 +152,7 @@ mod tests {
NaiveDateTime::parse_from_str("2021-01-14 00:00:00", "%Y-%m-%d %H:%M:%S").unwrap(),
],
)
.cast(&DataType::Datetime(TimeUnit::Nanoseconds, None))
.cast(&DataType::Datetime(TimeUnit::Milliseconds, None))
.unwrap();
let resampled_dates = df.column("time").unwrap();
assert!(resampled_dates.equals(&expected_dates));
Expand Down Expand Up @@ -242,7 +244,7 @@ mod tests {
assert!(resampled_values.equals(&expected_values));

let expected_dates = Series::new("time", time)
.cast(&DataType::Datetime(TimeUnit::Nanoseconds, None))
.cast(&DataType::Datetime(TimeUnit::Milliseconds, None))
.unwrap();
let resampled_dates = df.column("time").unwrap();
assert!(resampled_dates.equals(&expected_dates));
Expand Down
15 changes: 10 additions & 5 deletions pywr-schema/src/timeseries/polars_dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,16 @@ mod core {
Some(ext) => {
let ext = ext.to_str().map(|s| s.to_lowercase());
match ext.as_deref() {
Some("csv") => CsvReader::from_path(fp)?
.infer_schema(None)
.with_try_parse_dates(true)
.has_header(true)
.finish()?,
Some("csv") => {
let parse_options = CsvParseOptions::default().with_try_parse_dates(true);

CsvReadOptions::default()
.with_schema(None)
.with_has_header(true)
.with_parse_options(parse_options)
.try_into_reader_with_file_path(Some(fp))?
.finish()?
}
Some("parquet") => {
todo!()
}
Expand Down

0 comments on commit 5d727b4

Please sign in to comment.