Skip to content

Commit

Permalink
Update asset checks to work around surprising data
Browse files Browse the repository at this point in the history
Also some lint changes + making these assets clearly not-persisted-yet.
  • Loading branch information
jdangerx committed Nov 28, 2024
1 parent 65a4990 commit 8cd921b
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 19 deletions.
58 changes: 44 additions & 14 deletions src/pudl/transform/eia176.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,14 @@
"""Module to perform data cleaning functions on EIA176 data tables."""

import pandas as pd
from dagster import AssetCheckResult, AssetIn, AssetOut, asset_check, multi_asset
from dagster import (
AssetCheckResult,
AssetIn,
AssetOut,
Output,
asset_check,
multi_asset,
)

from pudl.logging_helpers import get_logger

Expand All @@ -10,13 +17,13 @@

@multi_asset(
outs={
"core_eia176__yearly_company_data": AssetOut(),
"core_eia861__yearly_aggregate_data": AssetOut(),
"_core_eia176__yearly_company_data": AssetOut(),
"_core_eia176__yearly_aggregate_data": AssetOut(),
},
)
def _core_eia176__data(
raw_eia176__data: pd.DataFrame,
) -> tuple[pd.DataFrame, pd.DataFrame]:
) -> tuple[Output, Output]:
"""Take raw list and return two wide tables with primary keys and one column per variable.
One table with data for each year and company, one with state- and US-level aggregates per year.
Expand Down Expand Up @@ -45,11 +52,16 @@ def _core_eia176__data(
raw_eia176__data.company == "total of all companies"
]
wide_aggregate = get_wide_table(
long_table=long_aggregate.drop(columns=aggregate_drop_columns),
long_table=long_aggregate.drop(columns=aggregate_drop_columns).dropna(
subset=aggregate_primary_key
),
primary_key=aggregate_primary_key,
)

return wide_company, wide_aggregate
return (
Output(output_name="_core_eia176__yearly_company_data", value=wide_company),
Output(output_name="_core_eia176__yearly_aggregate_data", value=wide_aggregate),
)


def get_wide_table(long_table: pd.DataFrame, primary_key: list[str]) -> pd.DataFrame:
Expand All @@ -59,29 +71,30 @@ def get_wide_table(long_table: pd.DataFrame, primary_key: list[str]) -> pd.DataF
)
unstacked.columns = unstacked.columns.droplevel(0)
unstacked.columns.name = None # gets rid of "variable_name" name of columns index
return unstacked.reset_index().fillna(0)
return unstacked.fillna(0).reset_index()


@asset_check(
asset="core_eia176__yearly_company_data",
additional_ins={"core_eia861__yearly_aggregate_data": AssetIn()},
asset="_core_eia176__yearly_company_data",
additional_ins={"_core_eia176__yearly_aggregate_data": AssetIn()},
blocking=True,
)
def validate_totals(
core_eia176__yearly_company_data: pd.DataFrame,
core_eia861__yearly_aggregate_data: pd.DataFrame,
_core_eia176__yearly_company_data: pd.DataFrame,
_core_eia176__yearly_aggregate_data: pd.DataFrame,
) -> AssetCheckResult:
"""Compare reported and calculated totals for different geographical aggregates, report any differences."""
# First make it so we can directly compare reported aggregates to groupings of granular data
comparable_aggregates = core_eia861__yearly_aggregate_data.sort_values(
comparable_aggregates = _core_eia176__yearly_aggregate_data.sort_values(
["report_year", "area"]
).fillna(0)

# Group company data into state-level data and compare to reported totals
state_data = (
core_eia176__yearly_company_data.drop(columns="id")
_core_eia176__yearly_company_data.drop(columns="id")
.groupby(["report_year", "area"])
.sum()
.round(4)
.reset_index()
)
aggregate_state = comparable_aggregates[
Expand All @@ -107,7 +120,24 @@ def validate_totals(
# Compare using the same columns
us_diff = aggregate_us[us_data.columns].compare(us_data)

return AssetCheckResult(passed=bool(us_diff.empty and state_diff.empty))
# 2024-11-28: "3014_ct" is reported as 1 in aggregate data from 2005
# through 2015. If we run into cases where the totals don't add up, check
# to see that they are all this specific case and then ignore them.
if not state_diff.empty:
assert (state_diff.columns.levels[0] == ["3014_ct"]).all()
assert (state_diff["3014_ct"]["self"] == 1.0).all()
assert (
aggregate_us.loc[us_diff.index].report_year.unique() == range(2005, 2016)
).all()

if not us_diff.empty:
assert (us_diff.columns.levels[0] == ["3014_ct"]).all()
assert (us_diff["3014_ct"]["self"] == 1.0).all()
assert (
aggregate_us.loc[us_diff.index].report_year.unique() == (range(2005, 2016))
).all()

return AssetCheckResult(passed=True)


# TODO: Reasonable boundaries -- in a script/notebook in the 'validate' directory? How are those executed?
10 changes: 5 additions & 5 deletions test/unit/transform/eia176_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ def test_core_eia176__data(df):
]
].reset_index()

wide_company, wide_aggregate = _core_eia176__data(eav_model)
wide_company, wide_aggregate = (o.value for o in _core_eia176__data(eav_model))
assert wide_company.shape == (1, 4)

company_row = wide_company.loc[0]
Expand Down Expand Up @@ -190,8 +190,8 @@ def test_validate__totals(df):
]
].reset_index()
# Add the value for the 1010_vl variable
company_data["1010_vl"] = [str(v) for v in [VOLUME_1, VOLUME_2, VOLUME_3]]
company_data = company_data.drop(columns=DROP_COLS)
company_data["1010_vl"] = [VOLUME_1, VOLUME_2, VOLUME_3]
company_data = company_data.drop(columns=DROP_COLS + ["value"])

aggregate_data = df.loc[
[
Expand All @@ -201,7 +201,7 @@ def test_validate__totals(df):
]
].reset_index()
# Add the value for the 1010_vl variable
aggregate_data["1010_vl"] = [str(v) for v in [NM_VOLUME, TX_VOLUME, US_VOLUME]]
aggregate_data = aggregate_data.drop(columns=DROP_COLS + ["id"])
aggregate_data["1010_vl"] = [NM_VOLUME, TX_VOLUME, US_VOLUME]
aggregate_data = aggregate_data.drop(columns=DROP_COLS + ["id", "value"])

validate_totals(company_data, aggregate_data)

0 comments on commit 8cd921b

Please sign in to comment.