diff --git a/src/pudl/transform/eia176.py b/src/pudl/transform/eia176.py index ff6fa2c6e..cdbfcd2d6 100644 --- a/src/pudl/transform/eia176.py +++ b/src/pudl/transform/eia176.py @@ -1,7 +1,14 @@ """Module to perform data cleaning functions on EIA176 data tables.""" import pandas as pd -from dagster import AssetCheckResult, AssetIn, AssetOut, asset_check, multi_asset +from dagster import ( + AssetCheckResult, + AssetIn, + AssetOut, + Output, + asset_check, + multi_asset, +) from pudl.logging_helpers import get_logger @@ -10,13 +17,13 @@ @multi_asset( outs={ - "core_eia176__yearly_company_data": AssetOut(), - "core_eia861__yearly_aggregate_data": AssetOut(), + "_core_eia176__yearly_company_data": AssetOut(), + "_core_eia176__yearly_aggregate_data": AssetOut(), }, ) def _core_eia176__data( raw_eia176__data: pd.DataFrame, -) -> tuple[pd.DataFrame, pd.DataFrame]: +) -> tuple[Output, Output]: """Take raw list and return two wide tables with primary keys and one column per variable. One table with data for each year and company, one with state- and US-level aggregates per year. @@ -45,11 +52,16 @@ def _core_eia176__data( raw_eia176__data.company == "total of all companies" ] wide_aggregate = get_wide_table( - long_table=long_aggregate.drop(columns=aggregate_drop_columns), + long_table=long_aggregate.drop(columns=aggregate_drop_columns).dropna( + subset=aggregate_primary_key + ), primary_key=aggregate_primary_key, ) - return wide_company, wide_aggregate + return ( + Output(output_name="_core_eia176__yearly_company_data", value=wide_company), + Output(output_name="_core_eia176__yearly_aggregate_data", value=wide_aggregate), + ) def get_wide_table(long_table: pd.DataFrame, primary_key: list[str]) -> pd.DataFrame: @@ -59,29 +71,30 @@ def get_wide_table(long_table: pd.DataFrame, primary_key: list[str]) -> pd.DataF ) unstacked.columns = unstacked.columns.droplevel(0) unstacked.columns.name = None # gets rid of "variable_name" name of columns index - return unstacked.reset_index().fillna(0) + return unstacked.fillna(0).reset_index() @asset_check( - asset="core_eia176__yearly_company_data", - additional_ins={"core_eia861__yearly_aggregate_data": AssetIn()}, + asset="_core_eia176__yearly_company_data", + additional_ins={"_core_eia176__yearly_aggregate_data": AssetIn()}, blocking=True, ) def validate_totals( - core_eia176__yearly_company_data: pd.DataFrame, - core_eia861__yearly_aggregate_data: pd.DataFrame, + _core_eia176__yearly_company_data: pd.DataFrame, + _core_eia176__yearly_aggregate_data: pd.DataFrame, ) -> AssetCheckResult: """Compare reported and calculated totals for different geographical aggregates, report any differences.""" # First make it so we can directly compare reported aggregates to groupings of granular data - comparable_aggregates = core_eia861__yearly_aggregate_data.sort_values( + comparable_aggregates = _core_eia176__yearly_aggregate_data.sort_values( ["report_year", "area"] ).fillna(0) # Group company data into state-level data and compare to reported totals state_data = ( - core_eia176__yearly_company_data.drop(columns="id") + _core_eia176__yearly_company_data.drop(columns="id") .groupby(["report_year", "area"]) .sum() + .round(4) .reset_index() ) aggregate_state = comparable_aggregates[ @@ -107,7 +120,24 @@ def validate_totals( # Compare using the same columns us_diff = aggregate_us[us_data.columns].compare(us_data) - return AssetCheckResult(passed=bool(us_diff.empty and state_diff.empty)) + # 2024-11-28: "3014_ct" is reported as 1 in aggregate data from 2005 + # through 2015. If we run into cases where the totals don't add up, check + # to see that they are all this specific case and then ignore them. + if not state_diff.empty: + assert (state_diff.columns.levels[0] == ["3014_ct"]).all() + assert (state_diff["3014_ct"]["self"] == 1.0).all() + assert ( + aggregate_us.loc[us_diff.index].report_year.unique() == range(2005, 2016) + ).all() + + if not us_diff.empty: + assert (us_diff.columns.levels[0] == ["3014_ct"]).all() + assert (us_diff["3014_ct"]["self"] == 1.0).all() + assert ( + aggregate_us.loc[us_diff.index].report_year.unique() == (range(2005, 2016)) + ).all() + + return AssetCheckResult(passed=True) # TODO: Reasonable boundaries -- in a script/notebook in the 'validate' directory? How are those executed? diff --git a/test/unit/transform/eia176_test.py b/test/unit/transform/eia176_test.py index a0f5ced30..c40d24158 100644 --- a/test/unit/transform/eia176_test.py +++ b/test/unit/transform/eia176_test.py @@ -138,7 +138,7 @@ def test_core_eia176__data(df): ] ].reset_index() - wide_company, wide_aggregate = _core_eia176__data(eav_model) + wide_company, wide_aggregate = (o.value for o in _core_eia176__data(eav_model)) assert wide_company.shape == (1, 4) company_row = wide_company.loc[0] @@ -190,8 +190,8 @@ def test_validate__totals(df): ] ].reset_index() # Add the value for the 1010_vl variable - company_data["1010_vl"] = [str(v) for v in [VOLUME_1, VOLUME_2, VOLUME_3]] - company_data = company_data.drop(columns=DROP_COLS) + company_data["1010_vl"] = [VOLUME_1, VOLUME_2, VOLUME_3] + company_data = company_data.drop(columns=DROP_COLS + ["value"]) aggregate_data = df.loc[ [ @@ -201,7 +201,7 @@ def test_validate__totals(df): ] ].reset_index() # Add the value for the 1010_vl variable - aggregate_data["1010_vl"] = [str(v) for v in [NM_VOLUME, TX_VOLUME, US_VOLUME]] - aggregate_data = aggregate_data.drop(columns=DROP_COLS + ["id"]) + aggregate_data["1010_vl"] = [NM_VOLUME, TX_VOLUME, US_VOLUME] + aggregate_data = aggregate_data.drop(columns=DROP_COLS + ["id", "value"]) validate_totals(company_data, aggregate_data)