Skip to content

Commit

Permalink
refactor: pull apart data loading and analysis for seperation of tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
Molier committed Dec 9, 2024
1 parent 5cbb5fc commit fb9631d
Showing 1 changed file with 38 additions and 10 deletions.
48 changes: 38 additions & 10 deletions openenergyid/baseload/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@
EnergySchema: A pandera DataFrameModel for validating energy usage data.
Functions:
load_data(path: str) -> pl.LazyFrame:
load_energy_data(path: str) -> pl.LazyFrame:
Loads and validates energy usage data from an NDJSON file.
calculate_base_load(lf: pl.LazyFrame, granularity: Granularity = Granularity.DAILY) -> pl.DataFrame:
analyze_base_load(data: pl.LazyFrame, granularity: Granularity = Granularity.DAILY)
-> pl.DataFrame:
Calculates base load metrics from energy usage data aggregated by the specified granularity.
main(file_path: str, granularity: Granularity) -> pl.DataFrame:
Expand Down Expand Up @@ -64,8 +65,16 @@ def timestamps_are_ordered(self, data: pl.DataFrame) -> bool:
return data["timestamp"].is_sorted()


def load_data(path: str) -> pl.LazyFrame:
"""Load and validate energy usage data from NDJSON file"""
def load_energy_data(path: str) -> pl.LazyFrame:
"""
Load and validate energy usage data from NDJSON file.
Args:
path: Path to the NDJSON file containing energy usage data
Returns:
A validated LazyFrame containing energy measurements
"""
lf = pl.scan_ndjson(
path,
schema={"timestamp": pl.Datetime(time_zone="Europe/Brussels"), "total": pl.Float64},
Expand All @@ -75,13 +84,22 @@ def load_data(path: str) -> pl.LazyFrame:
return pl.LazyFrame(validated_df)


def calculate_base_load(
lf: pl.LazyFrame, granularity: Granularity = Granularity.P1D
def analyze_base_load(
data: pl.LazyFrame, granularity: Granularity = Granularity.P1D
) -> pl.DataFrame:
"""Calculate base load metrics aggregated by specified granularity"""
"""
Calculate base load metrics from energy usage data.
Args:
data: LazyFrame containing validated energy measurements
granularity: Time period for aggregating results (default: daily)
Returns:
DataFrame containing base load metrics for each time period
"""
polars_interval = GRANULARITY_TO_POLARS[granularity]
return (
lf.filter(pl.col("total") >= 0)
data.filter(pl.col("total") >= 0)
.sort("timestamp")
.group_by_dynamic("timestamp", every=polars_interval)
.agg(
Expand All @@ -103,8 +121,18 @@ def calculate_base_load(


def main(file_path: str, granularity: Granularity) -> pl.DataFrame:
"""Process energy data and return base load metrics for specified granularity"""
return calculate_base_load(load_data(file_path), granularity)
"""
Process energy data and return base load metrics.
Args:
file_path: Path to the input NDJSON file
granularity: Time period for aggregating results
Returns:
DataFrame containing base load metrics for each time period
"""
data = load_energy_data(file_path)
return analyze_base_load(data, granularity)


# Example usage:
Expand Down

0 comments on commit fb9631d

Please sign in to comment.