Skip to content

Commit

Permalink
Merge branch 'main' into update-conda-lockfile
Browse files Browse the repository at this point in the history
  • Loading branch information
zaneselvans authored Nov 30, 2024
2 parents 2a68502 + 7490fd9 commit 7c17024
Show file tree
Hide file tree
Showing 5 changed files with 361 additions and 0 deletions.
1 change: 1 addition & 0 deletions src/pudl/etl/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@


core_module_groups = {
"core_eia176": [pudl.transform.eia176],
"core_assn": [glue_assets],
"core_censusdp1tract": [
pudl.convert.censusdp1tract_to_sqlite,
Expand Down
9 changes: 9 additions & 0 deletions src/pudl/extract/eia176.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,15 @@ def process_raw(
selection = self._metadata._get_partition_selection(partition)
return df.assign(report_year=selection)

def process_renamed(
self, df: pd.DataFrame, page: str, **partition: PartitionSelection
) -> pd.DataFrame:
"""Strip and lowercase raw text fields (except ID)."""
text_fields = ["area", "atype", "company", "item"]
for tf in text_fields:
df[tf] = df[tf].str.strip().str.lower()
return df


raw_eia176__all_dfs = raw_df_factory(Extractor, name="eia176")

Expand Down
1 change: 1 addition & 0 deletions src/pudl/transform/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
from . import (
classes,
eia,
eia176,
eia860,
eia860m,
eia861,
Expand Down
143 changes: 143 additions & 0 deletions src/pudl/transform/eia176.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
"""Module to perform data cleaning functions on EIA176 data tables."""

import pandas as pd
from dagster import (
AssetCheckResult,
AssetIn,
AssetOut,
Output,
asset_check,
multi_asset,
)

from pudl.logging_helpers import get_logger

logger = get_logger(__name__)


@multi_asset(
outs={
"_core_eia176__yearly_company_data": AssetOut(),
"_core_eia176__yearly_aggregate_data": AssetOut(),
},
)
def _core_eia176__data(
raw_eia176__data: pd.DataFrame,
) -> tuple[Output, Output]:
"""Take raw list and return two wide tables with primary keys and one column per variable.
One table with data for each year and company, one with state- and US-level aggregates per year.
"""
raw_eia176__data = raw_eia176__data.astype({"report_year": int, "value": float})
raw_eia176__data["variable_name"] = (
raw_eia176__data["line"] + "_" + raw_eia176__data["atype"]
)

aggregate_primary_key = ["report_year", "area"]
company_primary_key = aggregate_primary_key + ["id"]
company_drop_columns = ["itemsort", "item", "atype", "line", "company"]
# We must drop 'id' here and cannot use as primary key because its arbitrary/duplicate in aggregate records
# 'id' is a reliable ID only in the context of granular company data
aggregate_drop_columns = company_drop_columns + ["id"]

long_company = raw_eia176__data.loc[
raw_eia176__data.company != "total of all companies"
]
wide_company = get_wide_table(
long_table=long_company.drop(columns=company_drop_columns),
primary_key=company_primary_key,
)

long_aggregate = raw_eia176__data.loc[
raw_eia176__data.company == "total of all companies"
]
wide_aggregate = get_wide_table(
long_table=long_aggregate.drop(columns=aggregate_drop_columns).dropna(
subset=aggregate_primary_key
),
primary_key=aggregate_primary_key,
)

return (
Output(output_name="_core_eia176__yearly_company_data", value=wide_company),
Output(output_name="_core_eia176__yearly_aggregate_data", value=wide_aggregate),
)


def get_wide_table(long_table: pd.DataFrame, primary_key: list[str]) -> pd.DataFrame:
"""Take a 'long' or entity-attribute-value table and return a wide table with one column per attribute/variable."""
unstacked = long_table.set_index(primary_key + ["variable_name"]).unstack(
level="variable_name"
)
unstacked.columns = unstacked.columns.droplevel(0)
unstacked.columns.name = None # gets rid of "variable_name" name of columns index
return unstacked.reset_index().fillna(0)


@asset_check(
asset="_core_eia176__yearly_company_data",
additional_ins={"_core_eia176__yearly_aggregate_data": AssetIn()},
blocking=True,
)
def validate_totals(
_core_eia176__yearly_company_data: pd.DataFrame,
_core_eia176__yearly_aggregate_data: pd.DataFrame,
) -> AssetCheckResult:
"""Compare reported and calculated totals for different geographical aggregates, report any differences."""
# First make it so we can directly compare reported aggregates to groupings of granular data
comparable_aggregates = _core_eia176__yearly_aggregate_data.sort_values(
["report_year", "area"]
).fillna(0)

# Group company data into state-level data and compare to reported totals
state_data = (
_core_eia176__yearly_company_data.drop(columns="id")
.groupby(["report_year", "area"])
.sum()
.round(4)
.reset_index()
)
aggregate_state = comparable_aggregates[
comparable_aggregates.area != "u.s. total"
].reset_index(drop=True)
# Compare using the same columns
state_diff = aggregate_state[state_data.columns].compare(state_data)

# Group calculated state-level data into US-level data and compare to reported totals
us_data = (
state_data.drop(columns="area")
.groupby("report_year")
.sum()
.sort_values("report_year")
.reset_index()
)
aggregate_us = (
comparable_aggregates[comparable_aggregates.area == "u.s. total"]
.drop(columns="area")
.sort_values("report_year")
.reset_index(drop=True)
)
# Compare using the same columns
us_diff = aggregate_us[us_data.columns].compare(us_data)

# 2024-11-28: "3014_ct" is reported as 1 in aggregate data from 2005
# through 2015. If we run into cases where the totals don't add up, check
# to see that they are all this specific case and then ignore them.
if not state_diff.empty:
assert (state_diff.columns.levels[0] == ["3014_ct"]).all()
assert (state_diff["3014_ct"]["self"] == 1.0).all()
assert (
aggregate_us.loc[us_diff.index].report_year.unique() == range(2005, 2016)
).all()

if not us_diff.empty:
assert (us_diff.columns.levels[0] == ["3014_ct"]).all()
assert (us_diff["3014_ct"]["self"] == 1.0).all()
assert (
aggregate_us.loc[us_diff.index].report_year.unique() == (range(2005, 2016))
).all()

return AssetCheckResult(passed=True)


# TODO: Reasonable boundaries -- in a script/notebook in the 'validate' directory? How are those executed?
207 changes: 207 additions & 0 deletions test/unit/transform/eia176_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
import pandas as pd
from pytest import fixture

from pudl.transform.eia176 import _core_eia176__data, get_wide_table, validate_totals

COLUMN_NAMES = [
"area",
"atype",
"company",
"id",
"line",
"report_year",
"value",
"itemsort",
"item",
]

ID_1 = "17673850NM"
VOLUME_1 = 30980426.0
COMPANY_1 = [
"new mexico",
"vl",
"new mexico gas company",
ID_1,
"1010",
"2022",
VOLUME_1,
"[10.1]",
"residential sales volume",
]

ID_2 = "17635017NM"
VOLUME_2 = 532842.0
COMPANY_2 = [
"new mexico",
"vl",
"west texas gas inc",
ID_2,
"1010",
"2022",
VOLUME_2,
"[10.1]",
"residential sales volume",
]

NM_VOLUME = VOLUME_1 + VOLUME_2
NM_AGGREGATE = [
"new mexico",
"vl",
"total of all companies",
# Aggregates appear to reuse an arbitrary company ID
ID_1,
"1010",
"2022",
NM_VOLUME,
"[10.1]",
"residential sales volume",
]

ID_3 = "17635017TX"
VOLUME_3 = 1.0
COMPANY_3 = [
"texas",
"vl",
"west texas gas inc",
ID_3,
"1010",
"2022",
VOLUME_3,
"[10.1]",
"residential sales volume",
]

TX_VOLUME = VOLUME_3
TX_AGGREGATE = [
"texas",
"vl",
"total of all companies",
# Aggregates appear to reuse an arbitrary company ID
ID_3,
"1010",
"2022",
VOLUME_3,
"[10.1]",
"residential sales volume",
]

US_VOLUME = NM_VOLUME + TX_VOLUME
US_AGGREGATE = [
"u.s. total",
"vl",
"total of all companies",
# Aggregates appear to reuse an arbitrary company ID
ID_1,
"1010",
"2022",
US_VOLUME,
"[10.1]",
"residential sales volume",
]

ID_4 = "4"
VOLUME_4 = 4.0
COMPANY_4 = [
"alaska",
"vl",
"alaska gas inc",
ID_4,
"1020",
"2022",
VOLUME_4,
"[10.2]",
"some other volume",
]

DROP_COLS = ["itemsort", "item", "atype", "line", "company"]


@fixture
def df():
df = pd.DataFrame(columns=COLUMN_NAMES)
df.loc[0] = COMPANY_1
df.loc[1] = COMPANY_2
df.loc[2] = NM_AGGREGATE
df.loc[3] = COMPANY_3
df.loc[4] = TX_AGGREGATE
df.loc[5] = US_AGGREGATE
df.loc[6] = COMPANY_4
df = df.set_index(["area", "company"])
return df


def test_core_eia176__data(df):
eav_model = df.loc[
[
("new mexico", "new mexico gas company"),
("new mexico", "total of all companies"),
]
].reset_index()

wide_company, wide_aggregate = (o.value for o in _core_eia176__data(eav_model))
assert wide_company.shape == (1, 4)

company_row = wide_company.loc[0]
assert list(company_row.index) == ["report_year", "area", "id", "1010_vl"]
assert list(company_row.values) == [2022, "new mexico", ID_1, VOLUME_1]

assert wide_aggregate.shape == (1, 3)
aggregate_row = wide_aggregate.loc[0]
assert list(aggregate_row.index) == ["report_year", "area", "1010_vl"]
assert list(aggregate_row.values) == [2022, "new mexico", NM_VOLUME]


def test_get_wide_table(df):
long_table = df.loc[
[
("new mexico", "new mexico gas company"),
("new mexico", "west texas gas inc"),
# Row measuring a different variable to test filling NAs
("alaska", "alaska gas inc"),
]
].reset_index()

long_table["variable_name"] = long_table["line"] + "_" + long_table["atype"]
long_table = long_table.drop(columns=DROP_COLS)

primary_key = ["report_year", "area", "id"]
wide_table = get_wide_table(long_table, primary_key)

assert wide_table.shape == (3, 5)
assert list(wide_table.loc[0].index) == [
"report_year",
"area",
"id",
"1010_vl",
"1020_vl",
]
assert list(wide_table.loc[0].values) == ["2022", "alaska", ID_4, 0, VOLUME_4]
assert list(wide_table.loc[1].values) == ["2022", "new mexico", ID_2, VOLUME_2, 0]
assert list(wide_table.loc[2].values) == ["2022", "new mexico", ID_1, VOLUME_1, 0]


def test_validate__totals(df):
# Our test data will have only measurements for this 1010_vl variable
company_data = df.loc[
[
("new mexico", "new mexico gas company"),
("new mexico", "west texas gas inc"),
("texas", "west texas gas inc"),
]
].reset_index()
# Add the value for the 1010_vl variable
company_data["1010_vl"] = [VOLUME_1, VOLUME_2, VOLUME_3]
company_data = company_data.drop(columns=DROP_COLS + ["value"])

aggregate_data = df.loc[
[
("new mexico", "total of all companies"),
("texas", "total of all companies"),
("u.s. total", "total of all companies"),
]
].reset_index()
# Add the value for the 1010_vl variable
aggregate_data["1010_vl"] = [NM_VOLUME, TX_VOLUME, US_VOLUME]
aggregate_data = aggregate_data.drop(columns=DROP_COLS + ["id", "value"])

validate_totals(company_data, aggregate_data)

0 comments on commit 7c17024

Please sign in to comment.