Skip to content

Commit

Permalink
Eia176 wide table (#3590)
Browse files Browse the repository at this point in the history
* Function for getting a column per variable for EIA-176 data

* Transpose rows to columns for entity variables, initial test/check ideas

* Dagster job for eia176 core data

* Use unstack instead of looping to create wide table

* Extract testable function for wide table, validation function

* Test for wide table function

* Test for validating totals

* Information-hide drop columns

* Normalize string values in raw dataframe

* Mutate data types

* Reusable df in tests, label index
  • Loading branch information
davidmudrauskas authored Nov 27, 2024
1 parent 9dd1ab9 commit 7163863
Show file tree
Hide file tree
Showing 5 changed files with 331 additions and 0 deletions.
1 change: 1 addition & 0 deletions src/pudl/etl/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@


core_module_groups = {
"_core_eia176": [pudl.transform.eia176],
"core_assn": [glue_assets],
"core_censusdp1tract": [
pudl.convert.censusdp1tract_to_sqlite,
Expand Down
9 changes: 9 additions & 0 deletions src/pudl/extract/eia176.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,15 @@ def process_raw(
selection = self._metadata._get_partition_selection(partition)
return df.assign(report_year=selection)

def process_renamed(
self, df: pd.DataFrame, page: str, **partition: PartitionSelection
) -> pd.DataFrame:
"""Strip and lowercase raw text fields (except ID)."""
text_fields = ["area", "atype", "company", "item"]
for tf in text_fields:
df[tf] = df[tf].str.strip().str.lower()
return df


raw_eia176__all_dfs = raw_df_factory(Extractor, name="eia176")

Expand Down
1 change: 1 addition & 0 deletions src/pudl/transform/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
from . import (
classes,
eia,
eia176,
eia860,
eia860m,
eia861,
Expand Down
113 changes: 113 additions & 0 deletions src/pudl/transform/eia176.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
"""Module to perform data cleaning functions on EIA176 data tables."""

import pandas as pd
from dagster import AssetCheckResult, AssetIn, AssetOut, asset_check, multi_asset

from pudl.logging_helpers import get_logger

logger = get_logger(__name__)


@multi_asset(
outs={
"core_eia176__yearly_company_data": AssetOut(),
"core_eia861__yearly_aggregate_data": AssetOut(),
},
)
def _core_eia176__data(
raw_eia176__data: pd.DataFrame,
) -> tuple[pd.DataFrame, pd.DataFrame]:
"""Take raw list and return two wide tables with primary keys and one column per variable.
One table with data for each year and company, one with state- and US-level aggregates per year.
"""
raw_eia176__data = raw_eia176__data.astype({"report_year": int, "value": float})
raw_eia176__data["variable_name"] = (
raw_eia176__data["line"] + "_" + raw_eia176__data["atype"]
)

aggregate_primary_key = ["report_year", "area"]
company_primary_key = aggregate_primary_key + ["id"]
company_drop_columns = ["itemsort", "item", "atype", "line", "company"]
# We must drop 'id' here and cannot use as primary key because its arbitrary/duplicate in aggregate records
# 'id' is a reliable ID only in the context of granular company data
aggregate_drop_columns = company_drop_columns + ["id"]

long_company = raw_eia176__data.loc[
raw_eia176__data.company != "total of all companies"
]
wide_company = get_wide_table(
long_table=long_company.drop(columns=company_drop_columns),
primary_key=company_primary_key,
)

long_aggregate = raw_eia176__data.loc[
raw_eia176__data.company == "total of all companies"
]
wide_aggregate = get_wide_table(
long_table=long_aggregate.drop(columns=aggregate_drop_columns),
primary_key=aggregate_primary_key,
)

return wide_company, wide_aggregate


def get_wide_table(long_table: pd.DataFrame, primary_key: list[str]) -> pd.DataFrame:
"""Take a 'long' or entity-attribute-value table and return a wide table with one column per attribute/variable."""
unstacked = long_table.set_index(primary_key + ["variable_name"]).unstack(
level="variable_name"
)
unstacked.columns = unstacked.columns.droplevel(0)
unstacked.columns.name = None # gets rid of "variable_name" name of columns index
return unstacked.reset_index().fillna(0)


@asset_check(
asset="core_eia176__yearly_company_data",
additional_ins={"core_eia861__yearly_aggregate_data": AssetIn()},
blocking=True,
)
def validate_totals(
core_eia176__yearly_company_data: pd.DataFrame,
core_eia861__yearly_aggregate_data: pd.DataFrame,
) -> AssetCheckResult:
"""Compare reported and calculated totals for different geographical aggregates, report any differences."""
# First make it so we can directly compare reported aggregates to groupings of granular data
comparable_aggregates = core_eia861__yearly_aggregate_data.sort_values(
["report_year", "area"]
).fillna(0)

# Group company data into state-level data and compare to reported totals
state_data = (
core_eia176__yearly_company_data.drop(columns="id")
.groupby(["report_year", "area"])
.sum()
.reset_index()
)
aggregate_state = comparable_aggregates[
comparable_aggregates.area != "u.s. total"
].reset_index(drop=True)
# Compare using the same columns
state_diff = aggregate_state[state_data.columns].compare(state_data)

# Group calculated state-level data into US-level data and compare to reported totals
us_data = (
state_data.drop(columns="area")
.groupby("report_year")
.sum()
.sort_values("report_year")
.reset_index()
)
aggregate_us = (
comparable_aggregates[comparable_aggregates.area == "u.s. total"]
.drop(columns="area")
.sort_values("report_year")
.reset_index(drop=True)
)
# Compare using the same columns
us_diff = aggregate_us[us_data.columns].compare(us_data)

return AssetCheckResult(passed=bool(us_diff.empty and state_diff.empty))


# TODO: Reasonable boundaries -- in a script/notebook in the 'validate' directory? How are those executed?
207 changes: 207 additions & 0 deletions test/unit/transform/eia176_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
import pandas as pd
from pytest import fixture

from pudl.transform.eia176 import _core_eia176__data, get_wide_table, validate_totals

COLUMN_NAMES = [
"area",
"atype",
"company",
"id",
"line",
"report_year",
"value",
"itemsort",
"item",
]

ID_1 = "17673850NM"
VOLUME_1 = 30980426.0
COMPANY_1 = [
"new mexico",
"vl",
"new mexico gas company",
ID_1,
"1010",
"2022",
VOLUME_1,
"[10.1]",
"residential sales volume",
]

ID_2 = "17635017NM"
VOLUME_2 = 532842.0
COMPANY_2 = [
"new mexico",
"vl",
"west texas gas inc",
ID_2,
"1010",
"2022",
VOLUME_2,
"[10.1]",
"residential sales volume",
]

NM_VOLUME = VOLUME_1 + VOLUME_2
NM_AGGREGATE = [
"new mexico",
"vl",
"total of all companies",
# Aggregates appear to reuse an arbitrary company ID
ID_1,
"1010",
"2022",
NM_VOLUME,
"[10.1]",
"residential sales volume",
]

ID_3 = "17635017TX"
VOLUME_3 = 1.0
COMPANY_3 = [
"texas",
"vl",
"west texas gas inc",
ID_3,
"1010",
"2022",
VOLUME_3,
"[10.1]",
"residential sales volume",
]

TX_VOLUME = VOLUME_3
TX_AGGREGATE = [
"texas",
"vl",
"total of all companies",
# Aggregates appear to reuse an arbitrary company ID
ID_3,
"1010",
"2022",
VOLUME_3,
"[10.1]",
"residential sales volume",
]

US_VOLUME = NM_VOLUME + TX_VOLUME
US_AGGREGATE = [
"u.s. total",
"vl",
"total of all companies",
# Aggregates appear to reuse an arbitrary company ID
ID_1,
"1010",
"2022",
US_VOLUME,
"[10.1]",
"residential sales volume",
]

ID_4 = "4"
VOLUME_4 = 4.0
COMPANY_4 = [
"alaska",
"vl",
"alaska gas inc",
ID_4,
"1020",
"2022",
VOLUME_4,
"[10.2]",
"some other volume",
]

DROP_COLS = ["itemsort", "item", "atype", "line", "company"]


@fixture
def df():
df = pd.DataFrame(columns=COLUMN_NAMES)
df.loc[0] = COMPANY_1
df.loc[1] = COMPANY_2
df.loc[2] = NM_AGGREGATE
df.loc[3] = COMPANY_3
df.loc[4] = TX_AGGREGATE
df.loc[5] = US_AGGREGATE
df.loc[6] = COMPANY_4
df = df.set_index(["area", "company"])
return df


def test_core_eia176__data(df):
eav_model = df.loc[
[
("new mexico", "new mexico gas company"),
("new mexico", "total of all companies"),
]
].reset_index()

wide_company, wide_aggregate = _core_eia176__data(eav_model)
assert wide_company.shape == (1, 4)

company_row = wide_company.loc[0]
assert list(company_row.index) == ["report_year", "area", "id", "1010_vl"]
assert list(company_row.values) == [2022, "new mexico", ID_1, VOLUME_1]

assert wide_aggregate.shape == (1, 3)
aggregate_row = wide_aggregate.loc[0]
assert list(aggregate_row.index) == ["report_year", "area", "1010_vl"]
assert list(aggregate_row.values) == [2022, "new mexico", NM_VOLUME]


def test_get_wide_table(df):
long_table = df.loc[
[
("new mexico", "new mexico gas company"),
("new mexico", "west texas gas inc"),
# Row measuring a different variable to test filling NAs
("alaska", "alaska gas inc"),
]
].reset_index()

long_table["variable_name"] = long_table["line"] + "_" + long_table["atype"]
long_table = long_table.drop(columns=DROP_COLS)

primary_key = ["report_year", "area", "id"]
wide_table = get_wide_table(long_table, primary_key)

assert wide_table.shape == (3, 5)
assert list(wide_table.loc[0].index) == [
"report_year",
"area",
"id",
"1010_vl",
"1020_vl",
]
assert list(wide_table.loc[0].values) == ["2022", "alaska", ID_4, 0, VOLUME_4]
assert list(wide_table.loc[1].values) == ["2022", "new mexico", ID_2, VOLUME_2, 0]
assert list(wide_table.loc[2].values) == ["2022", "new mexico", ID_1, VOLUME_1, 0]


def test_validate__totals(df):
# Our test data will have only measurements for this 1010_vl variable
company_data = df.loc[
[
("new mexico", "new mexico gas company"),
("new mexico", "west texas gas inc"),
("texas", "west texas gas inc"),
]
].reset_index()
# Add the value for the 1010_vl variable
company_data["1010_vl"] = [str(v) for v in [VOLUME_1, VOLUME_2, VOLUME_3]]
company_data = company_data.drop(columns=DROP_COLS)

aggregate_data = df.loc[
[
("new mexico", "total of all companies"),
("texas", "total of all companies"),
("u.s. total", "total of all companies"),
]
].reset_index()
# Add the value for the 1010_vl variable
aggregate_data["1010_vl"] = [str(v) for v in [NM_VOLUME, TX_VOLUME, US_VOLUME]]
aggregate_data = aggregate_data.drop(columns=DROP_COLS + ["id"])

validate_totals(company_data, aggregate_data)

0 comments on commit 7163863

Please sign in to comment.