From fb9631dcfde43f7f0540662f4ea3720cf62ea4f9 Mon Sep 17 00:00:00 2001 From: Molier Date: Mon, 9 Dec 2024 12:40:17 +0000 Subject: [PATCH] refactor: pull apart data loading and analysis for seperation of tasks --- openenergyid/baseload/main.py | 48 +++++++++++++++++++++++++++-------- 1 file changed, 38 insertions(+), 10 deletions(-) diff --git a/openenergyid/baseload/main.py b/openenergyid/baseload/main.py index e5bd804..0ae1e35 100644 --- a/openenergyid/baseload/main.py +++ b/openenergyid/baseload/main.py @@ -7,10 +7,11 @@ EnergySchema: A pandera DataFrameModel for validating energy usage data. Functions: - load_data(path: str) -> pl.LazyFrame: + load_energy_data(path: str) -> pl.LazyFrame: Loads and validates energy usage data from an NDJSON file. - calculate_base_load(lf: pl.LazyFrame, granularity: Granularity = Granularity.DAILY) -> pl.DataFrame: + analyze_base_load(data: pl.LazyFrame, granularity: Granularity = Granularity.DAILY) + -> pl.DataFrame: Calculates base load metrics from energy usage data aggregated by the specified granularity. main(file_path: str, granularity: Granularity) -> pl.DataFrame: @@ -64,8 +65,16 @@ def timestamps_are_ordered(self, data: pl.DataFrame) -> bool: return data["timestamp"].is_sorted() -def load_data(path: str) -> pl.LazyFrame: - """Load and validate energy usage data from NDJSON file""" +def load_energy_data(path: str) -> pl.LazyFrame: + """ + Load and validate energy usage data from NDJSON file. + + Args: + path: Path to the NDJSON file containing energy usage data + + Returns: + A validated LazyFrame containing energy measurements + """ lf = pl.scan_ndjson( path, schema={"timestamp": pl.Datetime(time_zone="Europe/Brussels"), "total": pl.Float64}, @@ -75,13 +84,22 @@ def load_data(path: str) -> pl.LazyFrame: return pl.LazyFrame(validated_df) -def calculate_base_load( - lf: pl.LazyFrame, granularity: Granularity = Granularity.P1D +def analyze_base_load( + data: pl.LazyFrame, granularity: Granularity = Granularity.P1D ) -> pl.DataFrame: - """Calculate base load metrics aggregated by specified granularity""" + """ + Calculate base load metrics from energy usage data. + + Args: + data: LazyFrame containing validated energy measurements + granularity: Time period for aggregating results (default: daily) + + Returns: + DataFrame containing base load metrics for each time period + """ polars_interval = GRANULARITY_TO_POLARS[granularity] return ( - lf.filter(pl.col("total") >= 0) + data.filter(pl.col("total") >= 0) .sort("timestamp") .group_by_dynamic("timestamp", every=polars_interval) .agg( @@ -103,8 +121,18 @@ def calculate_base_load( def main(file_path: str, granularity: Granularity) -> pl.DataFrame: - """Process energy data and return base load metrics for specified granularity""" - return calculate_base_load(load_data(file_path), granularity) + """ + Process energy data and return base load metrics. + + Args: + file_path: Path to the input NDJSON file + granularity: Time period for aggregating results + + Returns: + DataFrame containing base load metrics for each time period + """ + data = load_energy_data(file_path) + return analyze_base_load(data, granularity) # Example usage: