Skip to content

Commit

Permalink
Write preliminary transform class and function for XBRL and CSV core_…
Browse files Browse the repository at this point in the history
…ferc714__yearly_planning_area_demand_forecast table
  • Loading branch information
aesharpe committed Sep 19, 2024
1 parent 6936a38 commit 780ebb7
Showing 1 changed file with 147 additions and 55 deletions.
202 changes: 147 additions & 55 deletions src/pudl/transform/ferc714.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,17 @@
"summer_forecast": "summer_peak_demand_mw",
"winter_forecast": "winter_peak_demand_mw",
"net_energy_forecast": "net_demand_mwh",
}
},
"xbrl": {
"entity_id": "respondent_id_ferc714_xbrl",
"start_date": "start_date",
"end_date": "end_date",
"report_year": "report_year",
"planning_area_hourly_demand_and_forecast_year": "forecast_year",
"planning_area_hourly_demand_and_forecast_summer_forecast": "summer_peak_demand_mw",
"planning_area_hourly_demand_and_forecast_winter_forecast": "winter_peak_demand_mw",
"planning_area_hourly_demand_and_forecast_forecast_of_annual_net_energy_for_load": "net_demand_mwh",
},
},
}

Expand Down Expand Up @@ -771,70 +781,152 @@ def out_ferc714__hourly_planning_area_demand(
return HourlyPlanningAreaDemand.run(raw_csv, raw_xbrl_duration, raw_xbrl_instant)


@asset(
io_manager_key="pudl_io_manager",
compute_kind="pandas",
)
def core_ferc714__yearly_planning_area_demand_forecast(
raw_ferc714_csv__yearly_planning_area_demand_forecast: pd.DataFrame,
) -> pd.DataFrame:
"""Transform the yearly planning area forecast data per Planning Area.
class YearlyPlanningAreaDemandForecast:
"""Class for building the :ref:`core_ferc714__yearly_planning_area_demand_forecast` asset.
Transformations include:
The :ref:`core_ferc714__yearly_planning_area_demand_forecast` table is an annual, forecasted
time series of demand by Planning Area.
- Drop/rename columns.
- Remove duplicate rows and average out the metrics.
Most of the methods in this class as staticmethods. The purpose of using a class
in this instance is mostly for organizing the table specific transforms under the
same name-space.
"""

Args:
raw_ferc714_csv__yearly_planning_area_demand_forecast: Raw table containing,
for each year and each planning area, the forecasted summer and winter peak demand,
in megawatts, and annual net energy for load, in megawatthours, for the next
ten years.
@classmethod
def run(
cls,
raw_csv: pd.DataFrame,
raw_xbrl_duration: pd.DataFrame,
) -> pd.DataFrame:
"""Build the :ref:`core_ferc714__yearly_planning_area_demand_forecast` asset.
Returns:
Clean(er) version of the yearly forecasted demand by Planning Area.
"""
# Clean up columns
df = _pre_process_csv(
raw_ferc714_csv__yearly_planning_area_demand_forecast,
table_name="core_ferc714__yearly_planning_area_demand_forecast",
)
To transform this table we have to process the CSV data and the XBRL duration data
(this data has not instant table), merge together the XBRL and CSV data, and
process the combined datasets.
# For any rows with non-unique respondent_id_ferc714/report_year/forecast_year,
# group and take the mean measures
# For the 2006-2020 data, there were only 20 such rows. In most cases, demand metrics were identical.
# But for some, demand metrics were different - thus the need to take the average.
logger.info(
"Removing non-unique report rows and taking the average of non-equal metrics."
)
"""
table_name = "core_ferc714__yearly_planning_area_demand_forecast"
# XBRL STUFF
xbrl = (
rename_columns(
df=raw_xbrl_duration,
params=RenameColumns(columns=RENAME_COLS[table_name]["xbrl"]),
)
.pipe(_assign_respondent_id_ferc714, "xbrl")
.pipe(cls.spot_fix_forcast_years_xbrl)
)
# CSV STUFF
csv = (
_pre_process_csv(raw_csv, table_name=table_name)
.pipe(_assign_respondent_id_ferc714, "csv")
.pipe(cls.average_duplicate_pks_csv)
.pipe(_post_process, table_name=table_name)
)
# CONCATED STUFF
return # Need to add concated value

# Grab the number of rows before duplicate cleanup
num_rows_before = len(df)
@staticmethod
def spot_fix_forcast_years_xbrl(df):
"""Spot fix forecast year errors.
This function fixes the following errors:
- There's one record with an NA forecast_year value. This row
also has no demand forcast values. Because forcast_year is a primary key
we can't have any NA values. Because there are no substantive forcasts
in this row, we can safely remove this row.
- respondent_id_ferc714 number 107 reported their forecast_year
as YY instead of YYYY values.
- There's also at least one forecast year value reported as 3022 that should
be 2033.
This function also checks that the values for forecast year are within an
expected range.
"""
df = df.astype({"forecast_year": "Int64"})
# Make sure there's only one NA forecast_year value and remove it
assert (
len(df[df["forecast_year"].isna()]) == 1
), "Only expected one NA forecast year"
df = df[df["forecast_year"].notna()]
# Convert YY to YYYY for respondent 107 (the culprit).
# The earliest forecast year reported as YY is 22. Any numbers
# lower than that would signify a transition into 2100.
mask = (df["respondent_id_ferc714"] == 107) & (df["forecast_year"] > 21)
df.loc[mask, "forecast_year"] = df["forecast_year"] + 2000
# Fix extraneus 3022 value from respondent 17
mask = (
(df["respondent_id_ferc714"] == 17)
& (df["report_year"] == 2023)
& (df["forecast_year"] == 3033)
)
df.loc[mask, "forecast_year"] = 2033
# Make sure forecast_year values are expected
assert (
df["forecast_year"].isin(range(2021, 3001)).all()
), "Forecast year values not in expected range"
return df

df = (
df.groupby(["respondent_id_ferc714", "report_year", "forecast_year"])[
["summer_peak_demand_mw", "winter_peak_demand_mw", "net_demand_mwh"]
]
.mean()
.reset_index()
)
@staticmethod
def average_duplicate_pks_csv(df):
"""Average forecast values for duplicate primary keys.
The XBRL data had duplicate primary keys, but it was easy to parse
them by keeping rows with the most recent publication_time value.
The CSVs have no such distinguishing column, dispite having some
duplicate primary keys.
This function takes the average of the forecast values for rows
with duplicate primary keys. There are only 5 respondent/report_year/
forecast year rows where the forecast values differ. One of those is a
pair where one forecast value is 0. We'll take the non-zero value here
and average out the rest.
"""
# Record original length of dataframe
original_len = len(df)
# Remove duplicate row with 0 forecast values
mask = (
(df["respondent_id_ferc714"] == 100)
& (df["report_year"] == 2013)
& (df["forecast_year"] == 2014)
)
df = df[~mask]
# Take the average of duplicate PK forecast values.
df = (
df.groupby(["respondent_id_ferc714", "report_year", "forecast_year"])[
["summer_peak_demand_mw", "winter_peak_demand_mw", "net_demand_mwh"]
]
.mean()
.reset_index()
)
# Make sure no more rows were dropped than expected
assert (
original_len - len(df) == 21
), f"dropped {original_len - len(df)} rows, expected 26"
return df

# Capture the number of rows after grouping
num_rows_after = len(df)

# Add the number of duplicates removed as metadata
num_duplicates_removed = num_rows_before - num_rows_after
logger.info(f"Number of duplicate rows removed: {num_duplicates_removed}")
# Assert that number of removed rows meets expectation
assert (
num_duplicates_removed <= 20
), f"Expected no more than 20 duplicates removed, but found {num_duplicates_removed}"
@asset(
ins={
"raw_csv": AssetIn(key="raw_ferc714_csv__yearly_planning_area_demand_forecast"),
"raw_xbrl_duration": AssetIn(
key="raw_ferc714_xbrl__planning_area_hourly_demand_and_forecast_summer_and_winter_peak_demand_and_annual_net_energy_for_load_table_03_2_duration"
),
},
io_manager_key="pudl_io_manager",
compute_kind="pandas",
)
def core_ferc714__yearly_planning_area_demand_forecast(
raw_csv: pd.DataFrame,
raw_xbrl_duration: pd.DataFrame,
) -> pd.DataFrame:
"""Build the :ref:`core_ferc714__yearly_planning_area_demand_forecast`.
# Check all data types and columns to ensure consistency with defined schema
df = _post_process(
df, table_name="core_ferc714__yearly_planning_area_demand_forecast"
)
return df
This is a light wrapper around :class:`YearlyPlanningAreaDemandForecast` because
it seems you need to build an asset from a function - not a staticmethod of
a class.
"""
return YearlyPlanningAreaDemandForecast.run(raw_csv, raw_xbrl_duration)


@dataclass
Expand Down

0 comments on commit 780ebb7

Please sign in to comment.