Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Transform XBRL core_ferc714__yearly_planning_area_demand_forecast table #3856

Merged
merged 10 commits into from
Sep 23, 2024
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
"""Add forecast to forecast columns

Revision ID: bbd84fd6320f
Revises: a93bdb8d4fbd
Create Date: 2024-09-19 14:59:49.108628

"""
from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision = 'bbd84fd6320f'
down_revision = 'a93bdb8d4fbd'
branch_labels = None
depends_on = None


def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table('core_ferc714__yearly_planning_area_demand_forecast', schema=None) as batch_op:
batch_op.add_column(sa.Column('summer_peak_demand_forecast_mw', sa.Float(), nullable=True, comment='The maximum forecasted hourly sumemr load (for the months of June through September).'))
batch_op.add_column(sa.Column('winter_peak_demand_forecast_mw', sa.Float(), nullable=True, comment='The maximum forecasted hourly winter load (for the months of January through March).'))
batch_op.add_column(sa.Column('net_demand_forecast_mwh', sa.Float(), nullable=True, comment='Net forecasted electricity demand for the specific period in megawatt-hours (MWh).'))
batch_op.drop_column('summer_peak_demand_mw')
batch_op.drop_column('winter_peak_demand_mw')
batch_op.drop_column('net_demand_mwh')

# ### end Alembic commands ###


def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table('core_ferc714__yearly_planning_area_demand_forecast', schema=None) as batch_op:
batch_op.add_column(sa.Column('net_demand_mwh', sa.FLOAT(), nullable=True))
batch_op.add_column(sa.Column('winter_peak_demand_mw', sa.FLOAT(), nullable=True))
batch_op.add_column(sa.Column('summer_peak_demand_mw', sa.FLOAT(), nullable=True))
batch_op.drop_column('net_demand_forecast_mwh')
batch_op.drop_column('winter_peak_demand_forecast_mw')
batch_op.drop_column('summer_peak_demand_forecast_mw')

# ### end Alembic commands ###
20 changes: 18 additions & 2 deletions src/pudl/metadata/fields.py
Original file line number Diff line number Diff line change
Expand Up @@ -2680,9 +2680,9 @@
),
"unit": "MW",
},
"net_demand_mwh": {
"net_demand_forecast_mwh": {
"type": "number",
"description": "Net electricity demand for the specified period in megawatt-hours (MWh).",
"description": "Net forecasted electricity demand for the specific period in megawatt-hours (MWh).",
cmgosnell marked this conversation as resolved.
Show resolved Hide resolved
"unit": "MWh",
},
"net_generation_adjusted_mwh": {
Expand Down Expand Up @@ -4268,6 +4268,14 @@
"description": "EIA estimated summer capacity (in MWh).",
"unit": "MWh",
},
"summer_peak_demand_forecast_mw": {
"type": "number",
"description": (
"The maximum forecasted hourly sumemr load (for the months of June through "
"September)."
),
"unit": "MW",
},
"summer_peak_demand_mw": {
"type": "number",
"description": (
Expand Down Expand Up @@ -4814,6 +4822,14 @@
"description": "EIA estimated winter capacity (in MWh).",
"unit": "MWh",
},
"winter_peak_demand_forecast_mw": {
"type": "number",
"description": (
"The maximum forecasted hourly winter load (for the months of January "
"through March)."
),
"unit": "MW",
},
"winter_peak_demand_mw": {
"type": "number",
"description": (
Expand Down
6 changes: 3 additions & 3 deletions src/pudl/metadata/resources/ferc714.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,9 @@
"respondent_id_ferc714",
"report_year",
"forecast_year",
"summer_peak_demand_mw",
"winter_peak_demand_mw",
"net_demand_mwh",
"summer_peak_demand_forecast_mw",
"winter_peak_demand_forecast_mw",
"net_demand_forecast_mwh",
],
"primary_key": ["respondent_id_ferc714", "report_year", "forecast_year"],
},
Expand Down
217 changes: 159 additions & 58 deletions src/pudl/transform/ferc714.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,10 +226,20 @@
"respondent_id": "respondent_id_ferc714_csv",
"report_yr": "report_year",
"plan_year": "forecast_year",
"summer_forecast": "summer_peak_demand_mw",
"winter_forecast": "winter_peak_demand_mw",
"net_energy_forecast": "net_demand_mwh",
}
"summer_forecast": "summer_peak_demand_forecast_mw",
"winter_forecast": "winter_peak_demand_forecast_mw",
"net_energy_forecast": "net_demand_forecast_mwh",
},
"xbrl": {
"entity_id": "respondent_id_ferc714_xbrl",
"start_date": "start_date",
"end_date": "end_date",
"report_year": "report_year",
"planning_area_hourly_demand_and_forecast_year": "forecast_year",
"planning_area_hourly_demand_and_forecast_summer_forecast": "summer_peak_demand_forecast_mw",
"planning_area_hourly_demand_and_forecast_winter_forecast": "winter_peak_demand_forecast_mw",
"planning_area_hourly_demand_and_forecast_forecast_of_annual_net_energy_for_load": "net_demand_forecast_mwh",
},
},
}

Expand Down Expand Up @@ -771,70 +781,161 @@ def out_ferc714__hourly_planning_area_demand(
return HourlyPlanningAreaDemand.run(raw_csv, raw_xbrl_duration, raw_xbrl_instant)


@asset(
io_manager_key="pudl_io_manager",
compute_kind="pandas",
)
def core_ferc714__yearly_planning_area_demand_forecast(
raw_ferc714_csv__yearly_planning_area_demand_forecast: pd.DataFrame,
) -> pd.DataFrame:
"""Transform the yearly planning area forecast data per Planning Area.
class YearlyPlanningAreaDemandForecast:
"""Class for building the :ref:`core_ferc714__yearly_planning_area_demand_forecast` asset.
aesharpe marked this conversation as resolved.
Show resolved Hide resolved

Transformations include:
The :ref:`core_ferc714__yearly_planning_area_demand_forecast` table is an annual, forecasted
time series of demand by Planning Area.

- Drop/rename columns.
- Remove duplicate rows and average out the metrics.
Most of the methods in this class as staticmethods. The purpose of using a class
in this instance is mostly for organizing the table specific transforms under the
same name-space.
"""

Args:
raw_ferc714_csv__yearly_planning_area_demand_forecast: Raw table containing,
for each year and each planning area, the forecasted summer and winter peak demand,
in megawatts, and annual net energy for load, in megawatthours, for the next
ten years.
@classmethod
def run(
cls,
raw_csv: pd.DataFrame,
raw_xbrl_duration: pd.DataFrame,
) -> pd.DataFrame:
"""Build the :ref:`core_ferc714__yearly_planning_area_demand_forecast` asset.

Returns:
Clean(er) version of the yearly forecasted demand by Planning Area.
"""
# Clean up columns
df = _pre_process_csv(
raw_ferc714_csv__yearly_planning_area_demand_forecast,
table_name="core_ferc714__yearly_planning_area_demand_forecast",
)
To transform this table we have to process the CSV data and the XBRL duration data
(this data has not instant table), merge together the XBRL and CSV data, and
process the combined datasets.

# For any rows with non-unique respondent_id_ferc714/report_year/forecast_year,
# group and take the mean measures
# For the 2006-2020 data, there were only 20 such rows. In most cases, demand metrics were identical.
# But for some, demand metrics were different - thus the need to take the average.
logger.info(
"Removing non-unique report rows and taking the average of non-equal metrics."
)
The main transforms include spot-fixing forecast years with
:func:`spot_fix_forecast_years_xbrl` and averaging out duplicate forecast values
for duplicate primary key rows in the CSV table.

"""
table_name = "core_ferc714__yearly_planning_area_demand_forecast"
# XBRL STUFF
xbrl = (
rename_columns(
df=raw_xbrl_duration,
params=RenameColumns(columns=RENAME_COLS[table_name]["xbrl"]),
)
.pipe(_assign_respondent_id_ferc714, "xbrl")
.pipe(cls.spot_fix_forecast_years_xbrl)
)
# CSV STUFF
csv = (
_pre_process_csv(raw_csv, table_name=table_name)
.pipe(_assign_respondent_id_ferc714, "csv")
.pipe(cls.average_duplicate_pks_csv)
.pipe(_post_process, table_name=table_name)
)
# CONCATED STUFF
df = pd.concat([csv, xbrl]).reset_index(drop=True)
return df

# Grab the number of rows before duplicate cleanup
num_rows_before = len(df)
@staticmethod
def spot_fix_forecast_years_xbrl(df):
"""Spot fix forecast year errors.

This function fixes the following errors:

- There's one record with an NA forecast_year value. This row
also has no demand forcast values. Because forcast_year is a primary key
we can't have any NA values. Because there are no substantive forcasts
in this row, we can safely remove this row.
- respondent_id_ferc714 number 107 reported their forecast_year
as YY instead of YYYY values.
- There's also at least one forecast year value reported as 3022 that should
be 2033.
aesharpe marked this conversation as resolved.
Show resolved Hide resolved

This function also checks that the values for forecast year are within an
expected range.
"""
df = df.astype({"forecast_year": "Int64"})
# Make sure there's only one NA forecast_year value and remove it
assert (
len(df[df["forecast_year"].isna()]) == 1
), "Only expected one NA forecast year"
df = df[df["forecast_year"].notna()]
# Convert YY to YYYY for respondent 107 (the culprit).
# The earliest forecast year reported as YY is 22. Any numbers
# lower than that would signify a transition into 2100.
mask = (df["respondent_id_ferc714"] == 107) & (df["forecast_year"] > 21)
df.loc[mask, "forecast_year"] = df["forecast_year"] + 2000
# Fix extraneus 3022 value from respondent 17
mask = (
(df["respondent_id_ferc714"] == 17)
& (df["report_year"] == 2023)
& (df["forecast_year"] == 3033)
)
df.loc[mask, "forecast_year"] = 2033
# Make sure forecast_year values are expected
assert (
df["forecast_year"].isin(range(2021, 3001)).all()
), "Forecast year values not in expected range"
aesharpe marked this conversation as resolved.
Show resolved Hide resolved
return df

df = (
df.groupby(["respondent_id_ferc714", "report_year", "forecast_year"])[
["summer_peak_demand_mw", "winter_peak_demand_mw", "net_demand_mwh"]
]
.mean()
.reset_index()
)
@staticmethod
def average_duplicate_pks_csv(df):
"""Average forecast values for duplicate primary keys.

The XBRL data had duplicate primary keys, but it was easy to parse
them by keeping rows with the most recent publication_time value.
The CSVs have no such distinguishing column, dispite having some
duplicate primary keys.

This function takes the average of the forecast values for rows
with duplicate primary keys. There are only 5 respondent/report_year/
forecast year rows where the forecast values differ. One of those is a
pair where one forecast value is 0. We'll take the non-zero value here
and average out the rest.
"""
# Record original length of dataframe
original_len = len(df)
# Remove duplicate row with 0 forecast values
mask = (
(df["respondent_id_ferc714"] == 100)
& (df["report_year"] == 2013)
& (df["forecast_year"] == 2014)
)
df = df[~mask]
# Take the average of duplicate PK forecast values.
df = (
df.groupby(["respondent_id_ferc714", "report_year", "forecast_year"])[
[
"summer_peak_demand_forecast_mw",
"winter_peak_demand_forecast_mw",
"net_demand_forecast_mwh",
]
]
.mean()
.reset_index()
)
aesharpe marked this conversation as resolved.
Show resolved Hide resolved
# Make sure no more rows were dropped than expected
assert (
original_len - len(df) == 21
), f"dropped {original_len - len(df)} rows, expected 26"
aesharpe marked this conversation as resolved.
Show resolved Hide resolved
return df

# Capture the number of rows after grouping
num_rows_after = len(df)

# Add the number of duplicates removed as metadata
num_duplicates_removed = num_rows_before - num_rows_after
logger.info(f"Number of duplicate rows removed: {num_duplicates_removed}")
# Assert that number of removed rows meets expectation
assert (
num_duplicates_removed <= 20
), f"Expected no more than 20 duplicates removed, but found {num_duplicates_removed}"
@asset(
ins={
"raw_csv": AssetIn(key="raw_ferc714_csv__yearly_planning_area_demand_forecast"),
"raw_xbrl_duration": AssetIn(
key="raw_ferc714_xbrl__planning_area_hourly_demand_and_forecast_summer_and_winter_peak_demand_and_annual_net_energy_for_load_table_03_2_duration"
),
},
io_manager_key="pudl_io_manager",
compute_kind="pandas",
)
def core_ferc714__yearly_planning_area_demand_forecast(
raw_csv: pd.DataFrame,
raw_xbrl_duration: pd.DataFrame,
) -> pd.DataFrame:
"""Build the :ref:`core_ferc714__yearly_planning_area_demand_forecast`.

# Check all data types and columns to ensure consistency with defined schema
df = _post_process(
df, table_name="core_ferc714__yearly_planning_area_demand_forecast"
)
return df
This is a light wrapper around :class:`YearlyPlanningAreaDemandForecast` because
it seems you need to build an asset from a function - not a staticmethod of
a class.
"""
return YearlyPlanningAreaDemandForecast.run(raw_csv, raw_xbrl_duration)


@dataclass
Expand Down
Loading