From 7163863ad923ee54dc91642b57ea87c3ae0f3e1d Mon Sep 17 00:00:00 2001 From: David Mudrauskas Date: Wed, 27 Nov 2024 11:52:00 -0500 Subject: [PATCH] Eia176 wide table (#3590) * Function for getting a column per variable for EIA-176 data * Transpose rows to columns for entity variables, initial test/check ideas * Dagster job for eia176 core data * Use unstack instead of looping to create wide table * Extract testable function for wide table, validation function * Test for wide table function * Test for validating totals * Information-hide drop columns * Normalize string values in raw dataframe * Mutate data types * Reusable df in tests, label index --- src/pudl/etl/__init__.py | 1 + src/pudl/extract/eia176.py | 9 ++ src/pudl/transform/__init__.py | 1 + src/pudl/transform/eia176.py | 113 ++++++++++++++++ test/unit/transform/eia176_test.py | 207 +++++++++++++++++++++++++++++ 5 files changed, 331 insertions(+) create mode 100644 src/pudl/transform/eia176.py create mode 100644 test/unit/transform/eia176_test.py diff --git a/src/pudl/etl/__init__.py b/src/pudl/etl/__init__.py index 9b859c6448..4406825934 100644 --- a/src/pudl/etl/__init__.py +++ b/src/pudl/etl/__init__.py @@ -64,6 +64,7 @@ core_module_groups = { + "_core_eia176": [pudl.transform.eia176], "core_assn": [glue_assets], "core_censusdp1tract": [ pudl.convert.censusdp1tract_to_sqlite, diff --git a/src/pudl/extract/eia176.py b/src/pudl/extract/eia176.py index 484fedaf83..4ad9e59ef2 100644 --- a/src/pudl/extract/eia176.py +++ b/src/pudl/extract/eia176.py @@ -35,6 +35,15 @@ def process_raw( selection = self._metadata._get_partition_selection(partition) return df.assign(report_year=selection) + def process_renamed( + self, df: pd.DataFrame, page: str, **partition: PartitionSelection + ) -> pd.DataFrame: + """Strip and lowercase raw text fields (except ID).""" + text_fields = ["area", "atype", "company", "item"] + for tf in text_fields: + df[tf] = df[tf].str.strip().str.lower() + return df + raw_eia176__all_dfs = raw_df_factory(Extractor, name="eia176") diff --git a/src/pudl/transform/__init__.py b/src/pudl/transform/__init__.py index a9faccbd6d..68451cc9b9 100644 --- a/src/pudl/transform/__init__.py +++ b/src/pudl/transform/__init__.py @@ -63,6 +63,7 @@ from . import ( classes, eia, + eia176, eia860, eia860m, eia861, diff --git a/src/pudl/transform/eia176.py b/src/pudl/transform/eia176.py new file mode 100644 index 0000000000..ff6fa2c6ef --- /dev/null +++ b/src/pudl/transform/eia176.py @@ -0,0 +1,113 @@ +"""Module to perform data cleaning functions on EIA176 data tables.""" + +import pandas as pd +from dagster import AssetCheckResult, AssetIn, AssetOut, asset_check, multi_asset + +from pudl.logging_helpers import get_logger + +logger = get_logger(__name__) + + +@multi_asset( + outs={ + "core_eia176__yearly_company_data": AssetOut(), + "core_eia861__yearly_aggregate_data": AssetOut(), + }, +) +def _core_eia176__data( + raw_eia176__data: pd.DataFrame, +) -> tuple[pd.DataFrame, pd.DataFrame]: + """Take raw list and return two wide tables with primary keys and one column per variable. + + One table with data for each year and company, one with state- and US-level aggregates per year. + """ + raw_eia176__data = raw_eia176__data.astype({"report_year": int, "value": float}) + raw_eia176__data["variable_name"] = ( + raw_eia176__data["line"] + "_" + raw_eia176__data["atype"] + ) + + aggregate_primary_key = ["report_year", "area"] + company_primary_key = aggregate_primary_key + ["id"] + company_drop_columns = ["itemsort", "item", "atype", "line", "company"] + # We must drop 'id' here and cannot use as primary key because its arbitrary/duplicate in aggregate records + # 'id' is a reliable ID only in the context of granular company data + aggregate_drop_columns = company_drop_columns + ["id"] + + long_company = raw_eia176__data.loc[ + raw_eia176__data.company != "total of all companies" + ] + wide_company = get_wide_table( + long_table=long_company.drop(columns=company_drop_columns), + primary_key=company_primary_key, + ) + + long_aggregate = raw_eia176__data.loc[ + raw_eia176__data.company == "total of all companies" + ] + wide_aggregate = get_wide_table( + long_table=long_aggregate.drop(columns=aggregate_drop_columns), + primary_key=aggregate_primary_key, + ) + + return wide_company, wide_aggregate + + +def get_wide_table(long_table: pd.DataFrame, primary_key: list[str]) -> pd.DataFrame: + """Take a 'long' or entity-attribute-value table and return a wide table with one column per attribute/variable.""" + unstacked = long_table.set_index(primary_key + ["variable_name"]).unstack( + level="variable_name" + ) + unstacked.columns = unstacked.columns.droplevel(0) + unstacked.columns.name = None # gets rid of "variable_name" name of columns index + return unstacked.reset_index().fillna(0) + + +@asset_check( + asset="core_eia176__yearly_company_data", + additional_ins={"core_eia861__yearly_aggregate_data": AssetIn()}, + blocking=True, +) +def validate_totals( + core_eia176__yearly_company_data: pd.DataFrame, + core_eia861__yearly_aggregate_data: pd.DataFrame, +) -> AssetCheckResult: + """Compare reported and calculated totals for different geographical aggregates, report any differences.""" + # First make it so we can directly compare reported aggregates to groupings of granular data + comparable_aggregates = core_eia861__yearly_aggregate_data.sort_values( + ["report_year", "area"] + ).fillna(0) + + # Group company data into state-level data and compare to reported totals + state_data = ( + core_eia176__yearly_company_data.drop(columns="id") + .groupby(["report_year", "area"]) + .sum() + .reset_index() + ) + aggregate_state = comparable_aggregates[ + comparable_aggregates.area != "u.s. total" + ].reset_index(drop=True) + # Compare using the same columns + state_diff = aggregate_state[state_data.columns].compare(state_data) + + # Group calculated state-level data into US-level data and compare to reported totals + us_data = ( + state_data.drop(columns="area") + .groupby("report_year") + .sum() + .sort_values("report_year") + .reset_index() + ) + aggregate_us = ( + comparable_aggregates[comparable_aggregates.area == "u.s. total"] + .drop(columns="area") + .sort_values("report_year") + .reset_index(drop=True) + ) + # Compare using the same columns + us_diff = aggregate_us[us_data.columns].compare(us_data) + + return AssetCheckResult(passed=bool(us_diff.empty and state_diff.empty)) + + +# TODO: Reasonable boundaries -- in a script/notebook in the 'validate' directory? How are those executed? diff --git a/test/unit/transform/eia176_test.py b/test/unit/transform/eia176_test.py new file mode 100644 index 0000000000..a0f5ced30c --- /dev/null +++ b/test/unit/transform/eia176_test.py @@ -0,0 +1,207 @@ +import pandas as pd +from pytest import fixture + +from pudl.transform.eia176 import _core_eia176__data, get_wide_table, validate_totals + +COLUMN_NAMES = [ + "area", + "atype", + "company", + "id", + "line", + "report_year", + "value", + "itemsort", + "item", +] + +ID_1 = "17673850NM" +VOLUME_1 = 30980426.0 +COMPANY_1 = [ + "new mexico", + "vl", + "new mexico gas company", + ID_1, + "1010", + "2022", + VOLUME_1, + "[10.1]", + "residential sales volume", +] + +ID_2 = "17635017NM" +VOLUME_2 = 532842.0 +COMPANY_2 = [ + "new mexico", + "vl", + "west texas gas inc", + ID_2, + "1010", + "2022", + VOLUME_2, + "[10.1]", + "residential sales volume", +] + +NM_VOLUME = VOLUME_1 + VOLUME_2 +NM_AGGREGATE = [ + "new mexico", + "vl", + "total of all companies", + # Aggregates appear to reuse an arbitrary company ID + ID_1, + "1010", + "2022", + NM_VOLUME, + "[10.1]", + "residential sales volume", +] + +ID_3 = "17635017TX" +VOLUME_3 = 1.0 +COMPANY_3 = [ + "texas", + "vl", + "west texas gas inc", + ID_3, + "1010", + "2022", + VOLUME_3, + "[10.1]", + "residential sales volume", +] + +TX_VOLUME = VOLUME_3 +TX_AGGREGATE = [ + "texas", + "vl", + "total of all companies", + # Aggregates appear to reuse an arbitrary company ID + ID_3, + "1010", + "2022", + VOLUME_3, + "[10.1]", + "residential sales volume", +] + +US_VOLUME = NM_VOLUME + TX_VOLUME +US_AGGREGATE = [ + "u.s. total", + "vl", + "total of all companies", + # Aggregates appear to reuse an arbitrary company ID + ID_1, + "1010", + "2022", + US_VOLUME, + "[10.1]", + "residential sales volume", +] + +ID_4 = "4" +VOLUME_4 = 4.0 +COMPANY_4 = [ + "alaska", + "vl", + "alaska gas inc", + ID_4, + "1020", + "2022", + VOLUME_4, + "[10.2]", + "some other volume", +] + +DROP_COLS = ["itemsort", "item", "atype", "line", "company"] + + +@fixture +def df(): + df = pd.DataFrame(columns=COLUMN_NAMES) + df.loc[0] = COMPANY_1 + df.loc[1] = COMPANY_2 + df.loc[2] = NM_AGGREGATE + df.loc[3] = COMPANY_3 + df.loc[4] = TX_AGGREGATE + df.loc[5] = US_AGGREGATE + df.loc[6] = COMPANY_4 + df = df.set_index(["area", "company"]) + return df + + +def test_core_eia176__data(df): + eav_model = df.loc[ + [ + ("new mexico", "new mexico gas company"), + ("new mexico", "total of all companies"), + ] + ].reset_index() + + wide_company, wide_aggregate = _core_eia176__data(eav_model) + assert wide_company.shape == (1, 4) + + company_row = wide_company.loc[0] + assert list(company_row.index) == ["report_year", "area", "id", "1010_vl"] + assert list(company_row.values) == [2022, "new mexico", ID_1, VOLUME_1] + + assert wide_aggregate.shape == (1, 3) + aggregate_row = wide_aggregate.loc[0] + assert list(aggregate_row.index) == ["report_year", "area", "1010_vl"] + assert list(aggregate_row.values) == [2022, "new mexico", NM_VOLUME] + + +def test_get_wide_table(df): + long_table = df.loc[ + [ + ("new mexico", "new mexico gas company"), + ("new mexico", "west texas gas inc"), + # Row measuring a different variable to test filling NAs + ("alaska", "alaska gas inc"), + ] + ].reset_index() + + long_table["variable_name"] = long_table["line"] + "_" + long_table["atype"] + long_table = long_table.drop(columns=DROP_COLS) + + primary_key = ["report_year", "area", "id"] + wide_table = get_wide_table(long_table, primary_key) + + assert wide_table.shape == (3, 5) + assert list(wide_table.loc[0].index) == [ + "report_year", + "area", + "id", + "1010_vl", + "1020_vl", + ] + assert list(wide_table.loc[0].values) == ["2022", "alaska", ID_4, 0, VOLUME_4] + assert list(wide_table.loc[1].values) == ["2022", "new mexico", ID_2, VOLUME_2, 0] + assert list(wide_table.loc[2].values) == ["2022", "new mexico", ID_1, VOLUME_1, 0] + + +def test_validate__totals(df): + # Our test data will have only measurements for this 1010_vl variable + company_data = df.loc[ + [ + ("new mexico", "new mexico gas company"), + ("new mexico", "west texas gas inc"), + ("texas", "west texas gas inc"), + ] + ].reset_index() + # Add the value for the 1010_vl variable + company_data["1010_vl"] = [str(v) for v in [VOLUME_1, VOLUME_2, VOLUME_3]] + company_data = company_data.drop(columns=DROP_COLS) + + aggregate_data = df.loc[ + [ + ("new mexico", "total of all companies"), + ("texas", "total of all companies"), + ("u.s. total", "total of all companies"), + ] + ].reset_index() + # Add the value for the 1010_vl variable + aggregate_data["1010_vl"] = [str(v) for v in [NM_VOLUME, TX_VOLUME, US_VOLUME]] + aggregate_data = aggregate_data.drop(columns=DROP_COLS + ["id"]) + + validate_totals(company_data, aggregate_data)