diff --git a/docs/release_notes.rst b/docs/release_notes.rst index c24b3d3d55..ed28675ab8 100644 --- a/docs/release_notes.rst +++ b/docs/release_notes.rst @@ -6,6 +6,16 @@ PUDL Release Notes v2024.X.x (2024-XX-XX) --------------------------------------------------------------------------------------- +New Data Coverage +^^^^^^^^^^^^^^^^^ + +FERC Form 714 +~~~~~~~~~~~~~ +* Integrate 2021-2023 years of the FERC Form 714 data. FERC updated its reporting + format for 2021 from a CSV files to XBRL files. This update integrates the two + raw data sources and extends the data coverage through 2023. See :issue:`3809` + and :pr:`3842`. + Schema Changes ^^^^^^^^^^^^^^ * Added :ref:`out_eia__yearly_assn_plant_parts_plant_gen` table. This table associates diff --git a/docs/templates/ferc714_child.rst.jinja b/docs/templates/ferc714_child.rst.jinja index 8b5c0e7ada..96969fed53 100644 --- a/docs/templates/ferc714_child.rst.jinja +++ b/docs/templates/ferc714_child.rst.jinja @@ -1,6 +1,9 @@ {% extends "data_source_parent.rst.jinja" %} {% block background %} +FERC Form 714, otherwise known as the Annual Electric Balancing Authority Area and +Planning Area Report, collects data and provides insights about balancing authority +area and planning area operations. {% endblock %} @@ -13,28 +16,21 @@ {% block availability %} The data we've integrated from FERC Form 714 includes: -* hourly electricity demand by utility or balancing authority from 2006-2020 -* a table identifying the form respondents including their EIA utility or balancing +* Hourly electricity demand by utility or balancing authority. +* Annual demand forecast. +* A table identifying the form respondents including their EIA utility or balancing authority ID, which allows us to link the FERC-714 data to other information reported in :doc:`eia860` and :doc:`eia861`. -We have not yet had the opportunity to work with the most recent FERC-714 data (2021 and -later), which is now being published using the new XBRL format. - -The hourly demand data for 2006-2020 is about 15 million records. There are about 200 -respondents that show up in the respondents table. - -WIth the EIA IDs, we link the hourly electricity demand to a particular georgraphic -region at the county level, because utilities and balancing authorities report their -service territories in :ref:`core_eia861__yearly_service_territory`, and from that -information we can estimate historical hourly electricity demand by state. +With the EIA IDs we can link the hourly electricity demand to a particular geographic +region at the county level because utilities and balancing authorities report their +service territories in :ref:`core_eia861__yearly_service_territory`. From that +information we estimate historical hourly electricity demand by state. Plant operators reported in :ref:`core_eia860__scd_plants` and generator ownership information reported in :ref:`core_eia860__scd_ownership` are linked to :ref:`core_eia860__scd_utilities` and :ref:`core_eia861__yearly_balancing_authority` and -so can also be linked to the :ref:`core_ferc714__respondent_id` table, as well as the -:ref:`core_epacems__hourly_emissions` unit-level emissions and generation data reported -in :doc:`epacems`. +can therefore be linked to the :ref:`core_ferc714__respondent_id` table. {% endblock %} @@ -56,32 +52,44 @@ formats: * **2021-present**: Standardized electronic filing using the XBRL (eXtensible Business Reporting Language) dialect of XML. -We only have plans to integrate the data from the standardized electronic reporting era -since the format of the earlier data varies for each reporting balancing authority and -utility, and would be very labor intensive to parse and reconcile. +We only plan to integrate the data from the standardized electronic reporting era +(2006+) since the format of the earlier data varies for each reporting balancing authority +and utility, and would be very labor intensive to parse and reconcile. {% endblock %} {% block notable_irregularities %} +Timezone errors +--------------- + The original hourly electricity demand time series is plagued with timezone and daylight savings vs. standard time irregularities, which we have done our best to clean up. The timestamps in the clean data are all in UTC, with a timezone code stored in a separate column, so that the times can be easily localized or converted. It's certainly not perfect, but its much better than the original data and it's easy to work with! +Sign errors +----------- + Not all respondents use the same sign convention for reporting "demand." The vast majority consider demand / load that they serve to be a positive number, and so we've standardized the data to use that convention. +Reporting gaps +-------------- + There are a lot of reporting gaps, especially for smaller respondents. Sometimes these are brief, and sometimes they are entire years. There are also a number of outliers and suspicious values (e.g. a long series of identical consecutive values). We have some tools that we've built to clean up these outliers in :mod:`pudl.analysis.timeseries_cleaning`. +Respondent-to-balancing-authority inconsistencies +------------------------------------------------- + Because utilities and balancing authorities occasionally change their service -territories or merge, the demand reproted by any individual "respondent" may correspond +territories or merge, the demand reported by any individual "respondent" may correspond to wildly different consumers in different years. To make it at least somewhat possible to compare the reported data across time, we've also compiled historical service territory maps for the respondents based on data reported in :doc:`eia861`. However, @@ -93,4 +101,34 @@ be found in :mod:`pudl.analysis.service_territory` and :mod:`pudl.analysis.spati The :mod:`pudl.analysis.state_demand` script brings together all of the above to estimate historical hourly electricity demand by state for 2006-2020. +Combining XBRL and CSV data +--------------------------- + +The format of the company identifiers (CIDs) used in the CSV data (2006-2020) and the +XBRL data (2021+) differs. To link respondents between both data formats, we manually +map the IDs from both datasets and create a ``respondent_id_ferc714`` in +:mod:`pudl.package_data.glue.respondent_id_ferc714.csv`. + +This CSV builds on the `migrated data +`__ provided +by FERC during the transition from CSV to XBRL data, which notes that: + + Companies that did not have a CID prior to the migration have been assigned a CID that + begins with R, i.e., a temporary RID. These RIDs will be replaced in future with the + accurate CIDs and new datasets will be published. + +The file names of the migrated data (which correspond to CSV IDs) and the respondent +CIDs in the migrated files provide the basis for ID mapping. Though CIDs are intended to +be static, some of the CIDs in the migrated data weren't found in the actual XBRL data, +and the same respondents were reporting data using different CIDs. To ensure accurate +record matching, we manually reviewed the CIDs for each respondent, matching based on +name and location. Some quirks to note: + +* All respondents are matched 1:1 from CSV to XBRL data. Unmatched respondents mostly + occur due to mergers, splits, acquisitions, and companies that no longer exist. +* Some CIDs assigned during the migration process do not appear in the data. Given the + intention by FERC to make these CIDs permanent, they are still included in the mapping + CSV in case these respondents re-appear. All temporary IDs (beginning with R) were + removed. + {% endblock %} diff --git a/migrations/versions/8fffc1d0399a_add_my_cool_lil_respondent_id_glue_.py b/migrations/versions/8fffc1d0399a_add_my_cool_lil_respondent_id_glue_.py new file mode 100644 index 0000000000..7deb3e15b0 --- /dev/null +++ b/migrations/versions/8fffc1d0399a_add_my_cool_lil_respondent_id_glue_.py @@ -0,0 +1,91 @@ +"""Add my cool lil respondent id glue tables and other 714 xbrl updates + +Revision ID: 8fffc1d0399a +Revises: a93bdb8d4fbd +Create Date: 2024-09-24 09:28:45.862748 + +""" +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = '8fffc1d0399a' +down_revision = 'a93bdb8d4fbd' +branch_labels = None +depends_on = None + + +def upgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + op.create_table('core_pudl__assn_ferc714_pudl_respondents', + sa.Column('respondent_id_ferc714', sa.Integer(), nullable=False, comment='PUDL-assigned identifying a respondent to FERC Form 714. This ID associates natively reported respondent IDs from the orignal CSV and XBRL data sources.'), + sa.PrimaryKeyConstraint('respondent_id_ferc714', name=op.f('pk_core_pudl__assn_ferc714_pudl_respondents')) + ) + op.create_table('core_pudl__assn_ferc714_csv_pudl_respondents', + sa.Column('respondent_id_ferc714', sa.Integer(), nullable=False, comment='PUDL-assigned identifying a respondent to FERC Form 714. This ID associates natively reported respondent IDs from the orignal CSV and XBRL data sources.'), + sa.Column('respondent_id_ferc714_csv', sa.Integer(), nullable=False, comment='FERC Form 714 respondent ID from CSV reported data - published from years: 2006-2020. This ID is linked to the newer years of reported XBRL data through the PUDL-assigned respondent_id_ferc714 ID. This ID was originally reported as respondent_id. Note that this ID does not correspond to FERC respondent IDs from other forms.'), + sa.ForeignKeyConstraint(['respondent_id_ferc714'], ['core_pudl__assn_ferc714_pudl_respondents.respondent_id_ferc714'], name=op.f('fk_core_pudl__assn_ferc714_csv_pudl_respondents_respondent_id_ferc714_core_pudl__assn_ferc714_pudl_respondents')), + sa.PrimaryKeyConstraint('respondent_id_ferc714', 'respondent_id_ferc714_csv', name=op.f('pk_core_pudl__assn_ferc714_csv_pudl_respondents')) + ) + op.create_table('core_pudl__assn_ferc714_xbrl_pudl_respondents', + sa.Column('respondent_id_ferc714', sa.Integer(), nullable=False, comment='PUDL-assigned identifying a respondent to FERC Form 714. This ID associates natively reported respondent IDs from the orignal CSV and XBRL data sources.'), + sa.Column('respondent_id_ferc714_xbrl', sa.Text(), nullable=False, comment='FERC Form 714 respondent ID from XBRL reported data - published from years: 2021-present. This ID is linked to the older years of reported CSV data through the PUDL-assigned respondent_id_ferc714 ID. This ID was originally reported as entity_id. Note that this ID does not correspond to FERC respondent IDs from other forms.'), + sa.ForeignKeyConstraint(['respondent_id_ferc714'], ['core_pudl__assn_ferc714_pudl_respondents.respondent_id_ferc714'], name=op.f('fk_core_pudl__assn_ferc714_xbrl_pudl_respondents_respondent_id_ferc714_core_pudl__assn_ferc714_pudl_respondents')), + sa.PrimaryKeyConstraint('respondent_id_ferc714', 'respondent_id_ferc714_xbrl', name=op.f('pk_core_pudl__assn_ferc714_xbrl_pudl_respondents')) + ) + with op.batch_alter_table('core_ferc714__respondent_id', schema=None) as batch_op: + batch_op.add_column(sa.Column('respondent_id_ferc714_csv', sa.Integer(), nullable=True, comment='FERC Form 714 respondent ID from CSV reported data - published from years: 2006-2020. This ID is linked to the newer years of reported XBRL data through the PUDL-assigned respondent_id_ferc714 ID. This ID was originally reported as respondent_id. Note that this ID does not correspond to FERC respondent IDs from other forms.')) + batch_op.add_column(sa.Column('respondent_id_ferc714_xbrl', sa.Text(), nullable=True, comment='FERC Form 714 respondent ID from XBRL reported data - published from years: 2021-present. This ID is linked to the older years of reported CSV data through the PUDL-assigned respondent_id_ferc714 ID. This ID was originally reported as entity_id. Note that this ID does not correspond to FERC respondent IDs from other forms.')) + batch_op.create_foreign_key(batch_op.f('fk_core_ferc714__respondent_id_respondent_id_ferc714_core_pudl__assn_ferc714_pudl_respondents'), 'core_pudl__assn_ferc714_pudl_respondents', ['respondent_id_ferc714'], ['respondent_id_ferc714']) + + with op.batch_alter_table('core_ferc714__yearly_planning_area_demand_forecast', schema=None) as batch_op: + batch_op.add_column(sa.Column('summer_peak_demand_forecast_mw', sa.Float(), nullable=True, comment='The maximum forecasted hourly sumemr load (for the months of June through September).')) + batch_op.add_column(sa.Column('winter_peak_demand_forecast_mw', sa.Float(), nullable=True, comment='The maximum forecasted hourly winter load (for the months of January through March).')) + batch_op.add_column(sa.Column('net_demand_forecast_mwh', sa.Float(), nullable=True, comment='Net forecasted electricity demand for the specific period in megawatt-hours (MWh).')) + batch_op.drop_constraint('fk_core_ferc714__yearly_planning_area_demand_forecast_respondent_id_ferc714_core_ferc714__respondent_id', type_='foreignkey') + batch_op.create_foreign_key(batch_op.f('fk_core_ferc714__yearly_planning_area_demand_forecast_respondent_id_ferc714_core_pudl__assn_ferc714_pudl_respondents'), 'core_pudl__assn_ferc714_pudl_respondents', ['respondent_id_ferc714'], ['respondent_id_ferc714']) + batch_op.drop_column('summer_peak_demand_mw') + batch_op.drop_column('net_demand_mwh') + batch_op.drop_column('winter_peak_demand_mw') + + with op.batch_alter_table('out_ferc714__respondents_with_fips', schema=None) as batch_op: + batch_op.drop_constraint('fk_out_ferc714__respondents_with_fips_respondent_id_ferc714_core_ferc714__respondent_id', type_='foreignkey') + batch_op.create_foreign_key(batch_op.f('fk_out_ferc714__respondents_with_fips_respondent_id_ferc714_core_pudl__assn_ferc714_pudl_respondents'), 'core_pudl__assn_ferc714_pudl_respondents', ['respondent_id_ferc714'], ['respondent_id_ferc714']) + + with op.batch_alter_table('out_ferc714__summarized_demand', schema=None) as batch_op: + batch_op.drop_constraint('fk_out_ferc714__summarized_demand_respondent_id_ferc714_core_ferc714__respondent_id', type_='foreignkey') + batch_op.create_foreign_key(batch_op.f('fk_out_ferc714__summarized_demand_respondent_id_ferc714_core_pudl__assn_ferc714_pudl_respondents'), 'core_pudl__assn_ferc714_pudl_respondents', ['respondent_id_ferc714'], ['respondent_id_ferc714']) + + # ### end Alembic commands ### + + +def downgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + with op.batch_alter_table('out_ferc714__summarized_demand', schema=None) as batch_op: + batch_op.drop_constraint(batch_op.f('fk_out_ferc714__summarized_demand_respondent_id_ferc714_core_pudl__assn_ferc714_pudl_respondents'), type_='foreignkey') + batch_op.create_foreign_key('fk_out_ferc714__summarized_demand_respondent_id_ferc714_core_ferc714__respondent_id', 'core_ferc714__respondent_id', ['respondent_id_ferc714'], ['respondent_id_ferc714']) + + with op.batch_alter_table('out_ferc714__respondents_with_fips', schema=None) as batch_op: + batch_op.drop_constraint(batch_op.f('fk_out_ferc714__respondents_with_fips_respondent_id_ferc714_core_pudl__assn_ferc714_pudl_respondents'), type_='foreignkey') + batch_op.create_foreign_key('fk_out_ferc714__respondents_with_fips_respondent_id_ferc714_core_ferc714__respondent_id', 'core_ferc714__respondent_id', ['respondent_id_ferc714'], ['respondent_id_ferc714']) + + with op.batch_alter_table('core_ferc714__yearly_planning_area_demand_forecast', schema=None) as batch_op: + batch_op.add_column(sa.Column('winter_peak_demand_mw', sa.FLOAT(), nullable=True)) + batch_op.add_column(sa.Column('net_demand_mwh', sa.FLOAT(), nullable=True)) + batch_op.add_column(sa.Column('summer_peak_demand_mw', sa.FLOAT(), nullable=True)) + batch_op.drop_constraint(batch_op.f('fk_core_ferc714__yearly_planning_area_demand_forecast_respondent_id_ferc714_core_pudl__assn_ferc714_pudl_respondents'), type_='foreignkey') + batch_op.create_foreign_key('fk_core_ferc714__yearly_planning_area_demand_forecast_respondent_id_ferc714_core_ferc714__respondent_id', 'core_ferc714__respondent_id', ['respondent_id_ferc714'], ['respondent_id_ferc714']) + batch_op.drop_column('net_demand_forecast_mwh') + batch_op.drop_column('winter_peak_demand_forecast_mw') + batch_op.drop_column('summer_peak_demand_forecast_mw') + + with op.batch_alter_table('core_ferc714__respondent_id', schema=None) as batch_op: + batch_op.drop_constraint(batch_op.f('fk_core_ferc714__respondent_id_respondent_id_ferc714_core_pudl__assn_ferc714_pudl_respondents'), type_='foreignkey') + batch_op.drop_column('respondent_id_ferc714_xbrl') + batch_op.drop_column('respondent_id_ferc714_csv') + + op.drop_table('core_pudl__assn_ferc714_xbrl_pudl_respondents') + op.drop_table('core_pudl__assn_ferc714_csv_pudl_respondents') + op.drop_table('core_pudl__assn_ferc714_pudl_respondents') + # ### end Alembic commands ### diff --git a/src/pudl/analysis/state_demand.py b/src/pudl/analysis/state_demand.py index 3063b9c90f..7c4b7c28ae 100644 --- a/src/pudl/analysis/state_demand.py +++ b/src/pudl/analysis/state_demand.py @@ -293,9 +293,9 @@ def load_hourly_demand_matrix_ferc714( matrix = out_ferc714__hourly_planning_area_demand.pivot( index="datetime", columns="respondent_id_ferc714", values="demand_mwh" ) - # List timezone by year for each respondent + # List timezone by year for each respondent by the datetime out_ferc714__hourly_planning_area_demand["year"] = ( - out_ferc714__hourly_planning_area_demand["report_date"].dt.year + out_ferc714__hourly_planning_area_demand["datetime"].dt.year ) utc_offset = out_ferc714__hourly_planning_area_demand.groupby( ["respondent_id_ferc714", "year"], as_index=False @@ -378,7 +378,9 @@ def filter_ferc714_hourly_demand_matrix( return df -def impute_ferc714_hourly_demand_matrix(df: pd.DataFrame) -> pd.DataFrame: +def impute_ferc714_hourly_demand_matrix( + df: pd.DataFrame, years: list[int] +) -> pd.DataFrame: """Impute null values in FERC 714 hourly demand matrix. Imputation is performed separately for each year, @@ -390,17 +392,28 @@ def impute_ferc714_hourly_demand_matrix(df: pd.DataFrame) -> pd.DataFrame: Args: df: FERC 714 hourly demand matrix, as described in :func:`load_ferc714_hourly_demand_matrix`. + years: list of years to input Returns: Copy of `df` with imputed values. """ results = [] - for year, gdf in df.groupby(df.index.year): - logger.info(f"Imputing year {year}") - keep = df.columns[~gdf.isnull().all()] - tsi = pudl.analysis.timeseries_cleaning.Timeseries(gdf[keep]) - result = tsi.to_dataframe(tsi.impute(method="tnn"), copy=False) - results.append(result) + # sort here and then don't sort in the groupby so we can process + # the newer years of data first. This is so we can see early if + # new data causes any failures. + df = df.sort_index(ascending=False) + for year, gdf in df.groupby(df.index.year, sort=False): + # remove the records o/s of the working years because some + # respondents report one record of midnight of January first + # of the next year (report_date.dt.year + 1). and + # impute_ferc714_hourly_demand_matrix chunks over years at a time + # and having only one record + if year in years: + logger.info(f"Imputing year {year}") + keep = df.columns[~gdf.isnull().all()] + tsi = pudl.analysis.timeseries_cleaning.Timeseries(gdf[keep]) + result = tsi.to_dataframe(tsi.impute(method="tnn"), copy=False) + results.append(result) return pd.concat(results) @@ -474,8 +487,12 @@ def _out_ferc714__hourly_demand_matrix( return df -@asset(compute_kind="NumPy") +@asset( + compute_kind="NumPy", + required_resource_keys={"dataset_settings"}, +) def _out_ferc714__hourly_imputed_demand( + context, _out_ferc714__hourly_demand_matrix: pd.DataFrame, _out_ferc714__utc_offset: pd.DataFrame, ) -> pd.DataFrame: @@ -492,7 +509,8 @@ def _out_ferc714__hourly_imputed_demand( Returns: df: DataFrame with imputed FERC714 hourly demand. """ - df = impute_ferc714_hourly_demand_matrix(_out_ferc714__hourly_demand_matrix) + years = context.resources.dataset_settings.ferc714.years + df = impute_ferc714_hourly_demand_matrix(_out_ferc714__hourly_demand_matrix, years) df = melt_ferc714_hourly_demand_matrix(df, _out_ferc714__utc_offset) return df diff --git a/src/pudl/etl/glue_assets.py b/src/pudl/etl/glue_assets.py index 99f37526ae..01956bf7d7 100644 --- a/src/pudl/etl/glue_assets.py +++ b/src/pudl/etl/glue_assets.py @@ -37,12 +37,17 @@ def create_glue_tables(context): A dictionary of DataFrames whose keys are the names of the corresponding database table. """ + # TODO 2024-09-23: double check if these settings are actually + # doing anything for the FERC-EIA glue... doesn't look like it. dataset_settings = context.resources.dataset_settings # grab the glue tables for ferc1 & eia glue_dfs = pudl.glue.ferc1_eia.glue( ferc1=dataset_settings.glue.ferc1, eia=dataset_settings.glue.eia, ) + # these 714 glue tables are so easy to build, it doesn't seem worth it + # to not build/load them if we are not etl-ing 714 + glue_dfs = glue_dfs | pudl.glue.ferc714.glue() # Ensure they are sorted so they match up with the asset outs glue_dfs = dict(sorted(glue_dfs.items())) diff --git a/src/pudl/extract/ferc714.py b/src/pudl/extract/ferc714.py index 69bc616c71..9847b64e69 100644 --- a/src/pudl/extract/ferc714.py +++ b/src/pudl/extract/ferc714.py @@ -78,7 +78,7 @@ "csv": "Part 3 Schedule 2 - Planning Area Hourly Demand.csv", "xbrl": "planning_area_hourly_demand_and_forecast_summer_and_winter_peak_demand_and_annual_net_energy_for_load_03_2", }, - "core_ferc714_respondent_id": { + "core_ferc714__respondent_id": { "csv": "Respondent IDs.csv", "xbrl": "identification_and_certification_01_1", }, diff --git a/src/pudl/glue/__init__.py b/src/pudl/glue/__init__.py index 045edaac34..b27bab5010 100644 --- a/src/pudl/glue/__init__.py +++ b/src/pudl/glue/__init__.py @@ -14,4 +14,4 @@ which glue exists are being processed together. """ -from . import ferc1_eia +from . import ferc1_eia, ferc714 diff --git a/src/pudl/glue/ferc714.py b/src/pudl/glue/ferc714.py new file mode 100644 index 0000000000..be1d482fd4 --- /dev/null +++ b/src/pudl/glue/ferc714.py @@ -0,0 +1,52 @@ +"""Extract and transform glue tables between FERC Form 714's CSV and XBRL raw sources.""" + +import importlib.resources + +import pandas as pd + +import pudl + +logger = pudl.logging_helpers.get_logger(__name__) + +RESP_ID_FERC_MAP_CSV = ( + importlib.resources.files("pudl.package_data.glue") / "respondent_id_ferc714.csv" +) +"""Path to the PUDL ID mapping sheet with the plant map.""" + + +def get_respondent_map_ferc714() -> pd.DataFrame: + """Read in the manual CSV to XBRL FERC714 respondent mapping data.""" + return pd.read_csv(RESP_ID_FERC_MAP_CSV).convert_dtypes() + + +def glue() -> dict[str : pd.DataFrame]: + """Make the FERC 714 glue tables out of stored CSVs of association tables. + + This function was mirrored off of ferc1_eia.glue, but is much more + paired down. + """ + respondent_map = get_respondent_map_ferc714() + + respondents_pudl_ids = ( + respondent_map.loc[:, ["respondent_id_ferc714"]] + .drop_duplicates("respondent_id_ferc714") + .dropna(subset=["respondent_id_ferc714"]) + ) + respondents_csv_ids = ( + respondent_map.loc[:, ["respondent_id_ferc714", "respondent_id_ferc714_csv"]] + .drop_duplicates("respondent_id_ferc714_csv") + .dropna(subset=["respondent_id_ferc714_csv"]) + ) + respondents_xbrl_ids = ( + respondent_map.loc[:, ["respondent_id_ferc714", "respondent_id_ferc714_xbrl"]] + .drop_duplicates("respondent_id_ferc714_xbrl") + .dropna(subset=["respondent_id_ferc714_xbrl"]) + ) + + glue_dfs = { + "core_pudl__assn_ferc714_pudl_respondents": respondents_pudl_ids, + "core_pudl__assn_ferc714_csv_pudl_respondents": respondents_csv_ids, + "core_pudl__assn_ferc714_xbrl_pudl_respondents": respondents_xbrl_ids, + } + + return glue_dfs diff --git a/src/pudl/io_managers.py b/src/pudl/io_managers.py index 12e1069a91..1438aa2759 100644 --- a/src/pudl/io_managers.py +++ b/src/pudl/io_managers.py @@ -781,10 +781,11 @@ def __compare_dedupe_methodologies( ) never_duped = original.loc[~dupe_mask] apply_diffs = __apply_diffs(duped_groups) - best_snapshot = __best_snapshot(duped_groups) - __compare_dedupe_methodologies( - apply_diffs=apply_diffs, best_snapshot=best_snapshot - ) + # TODO: MAKE THIS FASTER AND TURN IT BACK ON!!! + # best_snapshot = __best_snapshot(duped_groups) + # __compare_dedupe_methodologies( + # apply_diffs=apply_diffs, best_snapshot=best_snapshot + # ) deduped = pd.concat([never_duped, apply_diffs], ignore_index=True) return deduped @@ -805,7 +806,7 @@ def refine_report_year(df: pd.DataFrame, xbrl_years: list[int]) -> pd.DataFrame: is_instant = "date" in df.columns def get_year(df: pd.DataFrame, col: str) -> pd.Series: - datetimes = pd.to_datetime(df.loc[:, col]) + datetimes = pd.to_datetime(df.loc[:, col], format="%Y-%m-%d", exact=False) if datetimes.isna().any(): raise ValueError(f"{col} has null values!") return datetimes.apply(lambda x: x.year) diff --git a/src/pudl/metadata/fields.py b/src/pudl/metadata/fields.py index 826add40fb..a2dbc8d3d5 100644 --- a/src/pudl/metadata/fields.py +++ b/src/pudl/metadata/fields.py @@ -2680,9 +2680,9 @@ ), "unit": "MW", }, - "net_demand_mwh": { + "net_demand_forecast_mwh": { "type": "number", - "description": "Net electricity demand for the specified period in megawatt-hours (MWh).", + "description": "Net forecasted electricity demand for the specific period in megawatt-hours (MWh).", "unit": "MWh", }, "net_generation_adjusted_mwh": { @@ -3660,7 +3660,30 @@ }, "respondent_id_ferc714": { "type": "integer", - "description": "FERC Form 714 respondent ID. Note that this ID does not correspond to FERC respondent IDs from other forms.", + "description": ( + "PUDL-assigned identifying a respondent to FERC Form 714. This ID associates " + "natively reported respondent IDs from the orignal CSV and XBRL data sources." + ), + }, + "respondent_id_ferc714_csv": { + "type": "integer", + "description": ( + "FERC Form 714 respondent ID from CSV reported data - published from years: 2006-2020. " + "This ID is linked to the newer years of reported XBRL data through the PUDL-assigned " + "respondent_id_ferc714 ID. " + "This ID was originally reported as respondent_id. " + "Note that this ID does not correspond to FERC respondent IDs from other forms." + ), + }, + "respondent_id_ferc714_xbrl": { + "type": "string", + "description": ( + "FERC Form 714 respondent ID from XBRL reported data - published from years: 2021-present. " + "This ID is linked to the older years of reported CSV data through the PUDL-assigned " + "respondent_id_ferc714 ID. " + "This ID was originally reported as entity_id. " + "Note that this ID does not correspond to FERC respondent IDs from other forms." + ), }, "respondent_name_ferc714": { "type": "string", @@ -4268,6 +4291,14 @@ "description": "EIA estimated summer capacity (in MWh).", "unit": "MWh", }, + "summer_peak_demand_forecast_mw": { + "type": "number", + "description": ( + "The maximum forecasted hourly sumemr load (for the months of June through " + "September)." + ), + "unit": "MW", + }, "summer_peak_demand_mw": { "type": "number", "description": ( @@ -4814,6 +4845,14 @@ "description": "EIA estimated winter capacity (in MWh).", "unit": "MWh", }, + "winter_peak_demand_forecast_mw": { + "type": "number", + "description": ( + "The maximum forecasted hourly winter load (for the months of January " + "through March)." + ), + "unit": "MW", + }, "winter_peak_demand_mw": { "type": "number", "description": ( diff --git a/src/pudl/metadata/resources/ferc714.py b/src/pudl/metadata/resources/ferc714.py index 583bfce9b1..059fcf3ee9 100644 --- a/src/pudl/metadata/resources/ferc714.py +++ b/src/pudl/metadata/resources/ferc714.py @@ -8,11 +8,12 @@ "schema": { "fields": [ "respondent_id_ferc714", + "respondent_id_ferc714_csv", + "respondent_id_ferc714_xbrl", "respondent_name_ferc714", "eia_code", ], "primary_key": ["respondent_id_ferc714"], - "foreign_key_rules": {"fields": [["respondent_id_ferc714"]]}, }, "sources": ["ferc714"], "field_namespace": "ferc714", @@ -21,11 +22,31 @@ "out_ferc714__hourly_planning_area_demand": { "description": ( "Hourly electricity demand by planning area. FERC Form 714, Part III, " - "Schedule 2a." + "Schedule 2a. This table includes data from the pre-2021 CSV raw source " + "as well as the newer 2021 through present XBRL raw source.\n\nAn important " + "caveat to note is that there was some cleaning done to the datetime_utc " + "timestamps. The Form 714 includes sparse documentation for respondents " + "for how to interpret timestamps - the form asks respondents to provide " + "24 instances of hourly demand for each day. The form is labeled with hour " + "1-24. There is no indication if hour 1 begins at midnight.\n\nThe XBRL data " + "contained several formats of timestamps. Most records corresponding to hour " + "1 of the Form have a timestamp with hour 1 as T1. About two thirds of the records " + "in the hour 24 location of the form have a timestamp with an hour reported as " + "T24 while the remaining third report this as T00 of the next day. T24 is not a " + "valid format for the hour of a datetime, so we convert these T24 hours into " + "T00 of the next day. A smaller subset of the respondents reports the 24th hour " + "as the last second of the day - we also convert these records to the T00 of the " + "next day.\n\nThis table includes three respondent ID columns: one from the " + "CSV raw source, one from the XBRL raw source and another that is PUDL-derived " + "that links those two source ID's together. This table has filled in source IDs " + "for all records so you can select the full timeseries for a given respondent from " + "any of these three IDs." ), "schema": { "fields": [ "respondent_id_ferc714", + "respondent_id_ferc714_csv", + "respondent_id_ferc714_xbrl", "report_date", "datetime_utc", "timezone", @@ -99,17 +120,25 @@ }, "core_ferc714__yearly_planning_area_demand_forecast": { "description": ( - "10-year forecasted summer and winter peak demand and annual net energy per planning area. FERC Form 714, Part III, " - "Schedule 2b." + "10-year forecasted summer and winter peak demand and annual net energy " + "per planning area. FERC Form 714, Part III, Schedule 2b. This table " + "includes data from the pre-2021 CSV raw source as well as the newer 2021 " + "through present XBRL raw source. We created the respondent_id_ferc714 " + "field to blend disparate IDs from the CSV and XBRL data over time. See " + "the core_ferc714_respondent_id table for links to the original source IDs.\n\n" + "This table contains forecasted net demand (MWh) as well as summer and winter " + "peak demand (MW) for the next ten years after after the report_date. " + "There is a small handful of respondents (~11) that report more than 10 " + "years and an even smaller handful that report less than 10 (~9)." ), "schema": { "fields": [ "respondent_id_ferc714", "report_year", "forecast_year", - "summer_peak_demand_mw", - "winter_peak_demand_mw", - "net_demand_mwh", + "summer_peak_demand_forecast_mw", + "winter_peak_demand_forecast_mw", + "net_demand_forecast_mwh", ], "primary_key": ["respondent_id_ferc714", "report_year", "forecast_year"], }, @@ -118,6 +147,51 @@ "etl_group": "ferc714", "create_database_schema": True, }, + "core_pudl__assn_ferc714_pudl_respondents": { + "description": ( + "Home table for PUDL derived FERC 714 respondent IDs. These ID's are used to connect " + "older CSV data which uses different respondent IDs than the newer XBRL entity IDs. " + "These IDs are manually assigned when new FERC 714 data is is integrated, and any " + "newly found utilities are added to " + "the list with a new ID. " + "This table is read in from a CSV stored in the PUDL " + "repository: src/pudl/package_data/glue/respondent_id_ferc714.xlsx" + ), + "schema": { + "fields": ["respondent_id_ferc714"], + "primary_key": ["respondent_id_ferc714"], + "foreign_key_rules": {"fields": [["respondent_id_ferc714"]]}, + }, + "etl_group": "glue", + "field_namespace": "pudl", + "sources": ["pudl", "ferc714"], + }, + "core_pudl__assn_ferc714_csv_pudl_respondents": { + "description": ( + "This table maps the PUDL-assigned respondent ID FERC714 to the native " + "respondent ID from the FERC714 CSV inputs - originally reported as respondent_id." + ), + "schema": { + "fields": ["respondent_id_ferc714", "respondent_id_ferc714_csv"], + "primary_key": ["respondent_id_ferc714", "respondent_id_ferc714_csv"], + }, + "etl_group": "glue", + "field_namespace": "pudl", + "sources": ["pudl", "ferc714"], + }, + "core_pudl__assn_ferc714_xbrl_pudl_respondents": { + "description": ( + "This table maps the PUDL-assigned respondent ID FERC714 to the native " + "respondent ID from the FERC714 XBRL inputs - originally reported as entity_id." + ), + "schema": { + "fields": ["respondent_id_ferc714", "respondent_id_ferc714_xbrl"], + "primary_key": ["respondent_id_ferc714", "respondent_id_ferc714_xbrl"], + }, + "etl_group": "glue", + "field_namespace": "pudl", + "sources": ["pudl", "ferc714"], + }, } """FERC Form 714 resource attributes by PUDL identifier (``resource.name``). diff --git a/src/pudl/package_data/glue/respondent_id_ferc714.csv b/src/pudl/package_data/glue/respondent_id_ferc714.csv new file mode 100644 index 0000000000..3316bed182 --- /dev/null +++ b/src/pudl/package_data/glue/respondent_id_ferc714.csv @@ -0,0 +1,219 @@ +respondent_id_ferc714,respondent_id_ferc714_xbrl,respondent_id_ferc714_csv,Source,Notes +1,C004339,101,Manually confirmed, +2,C001552,102,Migration files and manually confirmed, +3,C011389,103,Manually confirmed, +4,,104,, +5,,105,, +6,,106,, +7,,107,, +8,,108,, +9,,109,, +10,C002671,110,Migration files and manually confirmed, +11,,111,, +12,,112,, +13,,113,, +14,,114,, +15,C011501,115,Manually confirmed, +16,C001436,116,Migration files and manually confirmed, +17,C003472,118,Manually confirmed, +18,C000379,119,Migration files and manually confirmed, +19,,120,, +20,C011432,121,Manually confirmed, +21,C002357,122,Migration files and manually confirmed, +22,,123,, +23,C002447,124,Migration files and manually confirmed, +24,C001183,125,Migration files and manually confirmed, +25,,126,, +26,C011536,128,Manually confirmed, +27,C011100,133,Manually confirmed, +28,,134,, +29,C011431,135,Manually confirmed, +30,C011428,136,Manually confirmed, +31,,137,, +32,,138,, +33,C011405,139,Manually confirmed, +34,C011474,140,Manually confirmed, +35,C011524,141,Manually confirmed, +36,,142,, +37,C011400,143,Manually confirmed, +38,,144,, +39,,145,, +40,,146,, +41,,147,, +42,,148,, +43,,149,, +44,,150,, +45,,151,, +46,,152,, +47,C011377,153,, +48,,154,, +49,,155,, +50,,156,, +51,C000290,157,Migration files and manually confirmed, +52,,159,, +53,C000465,160,Migration files and manually confirmed, +54,C002079,161,Migration files and manually confirmed, +55,C011367,162,Manually confirmed, +56,,163,, +57,,164,, +58,C011371,165,Manually confirmed, +59,C011491,166,Manually confirmed, +60,,167,, +61,C003749,169,Migration files and manually confirmed, +62,C011373,170,Manually confirmed, +63,C001030,171,Migration files and manually confirmed, +64,C011518,172,Manually confirmed, +65,C001298,173,Migration files and manually confirmed, +66,C003527,174,Migration files and manually confirmed, +67,,175,, +68,,176,, +69,,177,, +70,C011397,178,Manually confirmed, +71,,179,, +72,C000620,180,Migration files and manually confirmed, +73,C004519,182,Migration files and manually confirmed, +74,C003554,183,Migration files and manually confirmed, +75,,184,, +76,C000029,185,Manually confirmed, +77,C011421,186,Manually confirmed, +78,,187,, +79,C001181,188,Migration files and manually confirmed, +80,,189,, +81,,190,, +82,C011347,191,Manually confirmed, +83,,193,, +84,C011510,194,Manually confirmed, +85,,195,, +86,,196,, +87,C003661,197,Migration files and manually confirmed, +88,,198,, +89,,199,, +90,C011472,200,Manually confirmed, +91,C011508,201,Manually confirmed, +92,,202,, +93,,203,, +94,,204,, +95,C011568,206,Manually confirmed, +96,,207,, +97,,208,, +98,C003746,209,Migration files and manually confirmed, +99,C001610,210,Migration files and manually confirmed, +100,C000038,211,Migration files and manually confirmed, +101,C000618,212,Migration files and manually confirmed, +102,C011543,213,Manually confirmed, +103,,214,, +104,,215,, +105,,216,, +106,C001789,217,Manually confirmed, +107,C003561,218,Migration files and manually confirmed, +108,C000622,219,Migration files and manually confirmed, +109,C004202,220,Migration files and manually confirmed, +110,C011446,221,Manually confirmed, +111,,222,, +112,C004410,223,Manually confirmed,In the migration file this is noted as “D004410” which is a clear typo. +113,,224,, +114,C002422,225,Migration files and manually confirmed, +115,,226,, +116,,227,, +117,C011560,228,Manually confirmed, +118,C011559,229,Manually confirmed, +119,C000030,230,Migration files and manually confirmed, +120,C003851,231,Migration files and manually confirmed, +121,C001132,232,Migration files and manually confirmed, +122,,233,, +123,,234,, +124,C000822,235,Manually confirmed, +125,C001218,236,Migration files and manually confirmed, +126,C003680,237,Migration files and manually confirmed, +127,C011552,238,Manually confirmed, +128,C003677,239,Migration files and manually confirmed, +129,C000171,240,Migration files and manually confirmed, +130,C000617,241,Migration files and manually confirmed, +131,,242,, +132,C011399,243,Manually confirmed, +133,C004245,244,Migration files and manually confirmed, +134,,245,, +135,C000685,246,Migration files and manually confirmed, +136,C003474,247,Migration files and manually confirmed, +137,C003529,248,Migration files and manually confirmed, +138,C001609,249,Manually confirmed, +139,C000241,250,Migration files and manually confirmed, +140,C003669,251,Migration files and manually confirmed, +141,,252,, +142,C003610,253,Manually confirmed, +143,,254,, +144,,255,, +145,,256,, +146,C000771,257,Manually confirmed, +147,C000350,258,Migration files and manually confirmed, +148,,259,, +149,C011562,260,Manually confirmed, +150,C004000,261,Migration files and manually confirmed, +151,C000116,262,Migration files and manually confirmed, +152,C004480,263,Migration files and manually confirmed, +153,,264,, +154,C003836,265,Migration files and manually confirmed, +155,C001184,266,Manually confirmed, +156,C011509,267,Manually confirmed, +157,,268,, +158,,269,, +159,C000045,271,Migration files and manually confirmed, +160,,272,, +161,C011370,273,Manually confirmed, +162,C011390,274,Manually confirmed, +163,,275,, +164,C003701,277,Manually confirmed, +165,,278,, +166,,279,, +167,C001188,280,Migration files and manually confirmed, +168,,281,, +169,C003635,282,Manually confirmed, +170,C011544,283,Manually confirmed, +171,,284,, +172,C004098,285,Migration files and manually confirmed, +173,,286,, +174,C003849,287,Migration files and manually confirmed, +175,,288,, +176,,289,, +177,,290,, +178,C002022,291,Migration files and manually confirmed, +179,,292,, +180,C011380,293,Manually confirmed, +181,,294,, +182,,295,, +183,C001553,296,Migration files and manually confirmed, +184,C001555,297,Migration files and manually confirmed, +185,C001554,298,Migration files and manually confirmed, +186,C001556,299,Migration files and manually confirmed, +187,,300,, +188,,301,, +189,,302,, +190,,303,, +191,,304,, +192,,305,, +193,,306,, +194,C001646,307,Manually confirmed, +195,C011374,308,Manually confirmed, +196,,309,, +197,,310,, +198,,311,, +199,,312,, +200,,313,, +201,C011381,315,Manually confirmed, +202,,320,, +203,C001344,321,Migration files and manually confirmed, +204,,322,, +205,C011378,323,Manually confirmed, +206,C002869,324,Migration files and manually confirmed, +207,,325,, +208,,326,, +209,,327,, +210,C003850,328,Migration files and manually confirmed, +211,C011542,329,Manually confirmed, +212,C001526,330,Migration files and manually confirmed, +213,C009068,331,Manually confirmed, +214,C000135,,, +215,C000136,,, +216,C011420,,, +217,C011454,,, +218,C002732,,, diff --git a/src/pudl/package_data/settings/etl_fast.yml b/src/pudl/package_data/settings/etl_fast.yml index eb2308baf6..110708d87c 100644 --- a/src/pudl/package_data/settings/etl_fast.yml +++ b/src/pudl/package_data/settings/etl_fast.yml @@ -21,7 +21,7 @@ ferc_to_sqlite_settings: ferc60_xbrl_to_sqlite_settings: years: [2021, 2022] ferc714_xbrl_to_sqlite_settings: - years: [2021, 2022] + years: [2021, 2023] ########################################################################### # Settings for pudl_etl script @@ -37,7 +37,7 @@ datasets: ferc1: years: [2020, 2021, 2023] ferc714: - years: [2019, 2020] + years: [2020, 2023] eia: eia176: years: [2020, 2022] diff --git a/src/pudl/package_data/settings/etl_full.yml b/src/pudl/package_data/settings/etl_full.yml index a1bfcd11fc..80eec1d625 100644 --- a/src/pudl/package_data/settings/etl_full.yml +++ b/src/pudl/package_data/settings/etl_full.yml @@ -185,6 +185,9 @@ datasets: 2018, 2019, 2020, + 2021, + 2022, + 2023, ] eia: eia176: diff --git a/src/pudl/settings.py b/src/pudl/settings.py index 2f6752c63d..7a8eda2f06 100644 --- a/src/pudl/settings.py +++ b/src/pudl/settings.py @@ -810,7 +810,9 @@ class Ferc714XbrlToSqliteSettings(FercGenericXbrlToSqliteSettings): """ data_source: ClassVar[DataSource] = DataSource.from_id("ferc714") - years: list[int] = [2021, 2022, 2023] + years: list[int] = [ + year for year in data_source.working_partitions["years"] if year >= 2021 + ] class FercToSqliteSettings(BaseSettings): @@ -902,6 +904,30 @@ def from_yaml(cls, path: str) -> "EtlSettings": yaml_file = yaml.safe_load(f) return cls.model_validate(yaml_file) + @model_validator(mode="after") + def validate_xbrl_years(self): + """Ensure the XBRL years in DatasetsSettings align with FercToSqliteSettings. + + For each of the FERC forms that we are processing in PUDL, check to ensure + that the years we are trying to process in the PUDL ETL are included in the + XBRL to SQLite settings. + """ + for which_ferc in ["ferc1", "ferc714"]: + if ( + (pudl_ferc := getattr(self.datasets, which_ferc)) + and ( + sqlite_ferc := getattr( + self.ferc_to_sqlite_settings, + f"{which_ferc}_xbrl_to_sqlite_settings", + ) + ) + ) and not set(pudl_ferc.xbrl_years).issubset(set(sqlite_ferc.years)): + raise AssertionError( + "You are trying to build a PUDL database with different XBRL years " + f"than the ferc_to_sqlite_settings years for {which_ferc}." + ) + return self + def _convert_settings_to_dagster_config(settings_dict: dict[str, Any]) -> None: """Recursively convert a dictionary of dataset settings to dagster config in place. diff --git a/src/pudl/transform/classes.py b/src/pudl/transform/classes.py index fbff8c15df..4d1b487c7d 100644 --- a/src/pudl/transform/classes.py +++ b/src/pudl/transform/classes.py @@ -263,6 +263,26 @@ class RenameColumns(TransformParams): """A dictionary of columns to be renamed.""" +def rename_columns( + df: pd.DataFrame, params: RenameColumns | None = None, **kwargs +) -> pd.DataFrame: + """Rename the whole collection of dataframe columns using input params. + + Raise an error if there's any mismatch between the columns in the dataframe, and + the columns that have been defined in the mapping for renaming. + """ + # If we are attempting to rename columns that do *not* appear in the dataframe, + # raise an error. + if len(params.columns) > 0: + missing_cols = set(params.columns).difference(set(df.columns)) + if missing_cols: + raise ValueError( + f"Attempting to rename columns which are not present in the dataframe.\n" + f"Missing columns: {sorted(missing_cols)}\nExisting Columns: {df.columns}" + ) + return df.rename(columns=params.columns) + + ################################################################################ # Normalize Strings ################################################################################ @@ -1230,18 +1250,7 @@ def rename_columns( f"{self.table_id.value}: Attempting to rename {len(params.columns)} " "columns." ) - - # If we are attempting to rename columns that do *not* appear in the dataframe, - # raise an error. - if len(params.columns) > 0: - missing_cols = set(params.columns).difference(set(df.columns)) - if missing_cols: - raise ValueError( - f"{self.table_id.value}: Attempting to rename columns which are not " - "present in the dataframe.\n" - f"Missing columns: {sorted(missing_cols)}\nExisting Columns: {df.columns}" - ) - return df.rename(columns=params.columns) + return rename_columns(df, params) def normalize_strings( self, diff --git a/src/pudl/transform/ferc1.py b/src/pudl/transform/ferc1.py index 189e916d14..096fc0bfc6 100644 --- a/src/pudl/transform/ferc1.py +++ b/src/pudl/transform/ferc1.py @@ -1669,6 +1669,30 @@ def dimension_columns(self) -> list[str]: return list(dims) +def select_current_year_annual_records_duration_xbrl(df: pd.DataFrame, table_name: str): + """Select for annual records within their report_year. + + Select only records that have a start_date at beginning of the report_year and + have an end_date at the end of the report_year. + """ + len_og = len(df) + df = df.astype({"start_date": "datetime64[s]", "end_date": "datetime64[s]"}) + df = df[ + (df.start_date.dt.year == df.report_year) + & (df.start_date.dt.month == 1) + & (df.start_date.dt.day == 1) + & (df.end_date.dt.year == df.report_year) + & (df.end_date.dt.month == 12) + & (df.end_date.dt.day == 31) + ] + len_out = len(df) + logger.info( + f"{table_name}: After selection of dates based on the report year," + f" we have {len_out/len_og:.1%} of the original table." + ) + return df + + ################################################################################ # FERC 1 transform helper functions. Probably to be integrated into a class # below as methods or moved to a different module once it's clear where they belong. @@ -2714,7 +2738,7 @@ def merge_instant_and_duration_tables_xbrl( information from header and note rows. Outer merging messes up the order, so we need to use a one-sided merge. So far, it seems like the duration df contains all the index values in the instant df. To be sure, there's a check that makes - sure there are no unique intant df index values. If that passes, we merge the + sure there are no unique instant df index values. If that passes, we merge the instant table into the duration table, and the row order is preserved. Note: This should always be applied before :meth:``rename_columns`` @@ -2731,8 +2755,13 @@ def merge_instant_and_duration_tables_xbrl( """ drop_cols = ["filing_name", "index"] # Ignore errors in case not all drop_cols are present. - instant = raw_xbrl_instant.drop(columns=drop_cols, errors="ignore") - duration = raw_xbrl_duration.drop(columns=drop_cols, errors="ignore") + # Do any table-specific preprocessing of the instant and duration tables + instant = raw_xbrl_instant.drop(columns=drop_cols, errors="ignore").pipe( + self.process_instant_xbrl + ) + duration = raw_xbrl_duration.drop(columns=drop_cols, errors="ignore").pipe( + self.process_duration_xbrl + ) instant_axes = [ col for col in raw_xbrl_instant.columns if col.endswith("_axis") @@ -2751,10 +2780,6 @@ def merge_instant_and_duration_tables_xbrl( f" duration: {duration_axes}" ) - # Do any table-specific preprocessing of the instant and duration tables - instant = self.process_instant_xbrl(instant) - duration = self.process_duration_xbrl(duration) - if instant.empty: logger.info(f"{self.table_id.value}: No XBRL instant table found.") out_df = duration @@ -2838,33 +2863,10 @@ def process_duration_xbrl(self, df: pd.DataFrame) -> pd.DataFrame: """ if not df.empty: df = self.rename_columns(df, rename_stage="duration_xbrl").pipe( - self.select_current_year_annual_records_duration_xbrl + select_current_year_annual_records_duration_xbrl, self.table_id.name ) return df - def select_current_year_annual_records_duration_xbrl(self, df): - """Select for annual records within their report_year. - - Select only records that have a start_date at begining of the report_year and - have an end_date at the end of the report_year. - """ - len_og = len(df) - df = df.astype({"start_date": "datetime64[s]", "end_date": "datetime64[s]"}) - df = df[ - (df.start_date.dt.year == df.report_year) - & (df.start_date.dt.month == 1) - & (df.start_date.dt.day == 1) - & (df.end_date.dt.year == df.report_year) - & (df.end_date.dt.month == 12) - & (df.end_date.dt.day == 31) - ] - len_out = len(df) - logger.info( - f"{self.table_id.value}: After selection of dates based on the report year," - f" we have {len_out/len_og:.1%} of the original table." - ) - return df - @cache_df(key="dbf") def drop_footnote_columns_dbf(self, df: pd.DataFrame) -> pd.DataFrame: """Drop DBF footnote reference columns, which all end with _f.""" @@ -6045,10 +6047,10 @@ def process_xbrl( ) -> pd.DataFrame: """Rename columns before running wide_to_tidy.""" logger.info(f"{self.table_id.value}: Processing XBRL data pre-concatenation.") + instant_xbrl = self.process_instant_xbrl(raw_xbrl_instant) + duration_xbrl = self.process_duration_xbrl(raw_xbrl_duration) return ( - self.merge_instant_and_duration_tables_xbrl( - raw_xbrl_instant, raw_xbrl_duration - ) + self.merge_instant_and_duration_tables_xbrl(instant_xbrl, duration_xbrl) .pipe(self.rename_columns, rename_stage="xbrl") .pipe(self.combine_axis_columns_xbrl) .pipe(self.add_axis_to_total_table_rows) diff --git a/src/pudl/transform/ferc714.py b/src/pudl/transform/ferc714.py index 655b27c32e..e59753d2ef 100644 --- a/src/pudl/transform/ferc714.py +++ b/src/pudl/transform/ferc714.py @@ -1,14 +1,32 @@ -"""Transformation of the FERC Form 714 data.""" +"""Transformation of the FERC Form 714 data. + +FERC Form 714 has two separate raw data sources - CSV and XBRL. For both sources +there is usually some specific processing that needs to happen before the two +data sources get concatenated together to create the full timeseries. We are +currently processing three tables from 714. Each one is processed using a similar +pattern: we've defined a class with a run classmethod as a coordinating method, +any table-specific transforms are defined as staticmethod's within the table +class and any generic 714 transforms are defined as internal module functions. +The table assets are created through a small function that calls the run method. +Any of the methods or functions that only apply to either of the raw data sources +should include a raw datasource suffix. +""" +import importlib import re from dataclasses import dataclass +from typing import Literal import numpy as np import pandas as pd -from dagster import AssetCheckResult, AssetChecksDefinition, asset, asset_check +from dagster import AssetCheckResult, AssetChecksDefinition, AssetIn, asset, asset_check import pudl.logging_helpers -from pudl.metadata import PUDL_PACKAGE +from pudl.settings import Ferc714Settings +from pudl.transform.classes import ( + RenameColumns, + rename_columns, +) logger = pudl.logging_helpers.get_logger(__name__) @@ -16,146 +34,104 @@ # Constants required for transforming FERC 714 ############################################################################## + # More detailed fixes on a per respondent basis -OFFSET_CODE_FIXES = { - 102: {"CPT": "CST"}, - 110: {"CPT": "EST"}, - 115: {"MS": "MST"}, - 118: { - "CS": "CST", - "CD": "CDT", - }, - 120: { - "CTR": "CST", - "CSR": "CST", - "CPT": "CST", - "DST": "CST", - np.nan: "CST", - }, - 133: { +TIMEZONE_OFFSET_CODE_FIXES = { + 2: {"CPT": "CST"}, + 10: {"CPT": "EST"}, + 15: {"MS": "MST"}, + 17: {"CS": "CST", "CD": "CDT"}, + 19: {"CTR": "CST", "CSR": "CST", "CPT": "CST", "DST": "CST", np.nan: "CST"}, + 27: { "AKS": "AKST", "AST": "AKST", "AKD": "AKDT", "ADT": "AKDT", + "AKT": "AKST", + "1": "AKST", # they swap from 1 - 2 in 2023 + "2": "AKDT", }, - 134: {np.nan: "EST"}, - 137: {np.nan: "CST"}, - 140: { - "1": "EST", - "2": "EDT", - np.nan: "EST", - }, - 141: {np.nan: "CST"}, - 143: {"MS": "MST"}, - 146: {"DST": "EST"}, - 148: {np.nan: "CST"}, - 151: { - "DST": "CDT", - np.nan: "CST", - }, - 153: {np.nan: "MST"}, - 154: {np.nan: "MST"}, - 156: {np.nan: "CST"}, - 157: { - "DST": "EDT", - "EPT": "EST", - }, - 161: {"CPT": "CST"}, - 163: {"CPT": "CST"}, - 164: {np.nan: "CST"}, - 165: {"CS": "CST"}, # Uniform across the year. - 173: { - "CPT": "CST", - np.nan: "CST", - }, - 174: { + 28: {np.nan: "EST"}, + 31: {np.nan: "CST"}, + 34: {"1": "EST", "2": "EDT", np.nan: "EST", "UTC": "EST"}, # city of Tallahassee + 35: {np.nan: "CST"}, + 37: {"MS": "MST"}, + 40: {"DST": "EST"}, + 42: {np.nan: "CST"}, + 45: {"DST": "CDT", np.nan: "CST"}, + 47: {np.nan: "MST"}, + 48: {np.nan: "MST"}, + 50: {np.nan: "CST"}, + 51: {"DST": "EDT", "EPT": "EST"}, + 54: {"CPT": "CST"}, + 56: {"CPT": "CST"}, + 57: {np.nan: "CST"}, + 58: {"CS": "CST"}, # Uniform across the year. + 65: {"CPT": "CST", np.nan: "CST"}, + 66: { "CS": "CDT", # Only shows up in summer! Seems backwards. "CD": "CST", # Only shows up in winter! Seems backwards. "433": "CDT", }, - 176: { - "E": "EST", - np.nan: "EST", - }, - 182: {"PPT": "PDT"}, # Imperial Irrigation District P looks like D - 186: {"EAS": "EST"}, - 189: {"CPT": "CST"}, - 190: {"CPT": "CST"}, - 193: { - "CS": "CST", - "CD": "CDT", - }, - 194: {"PPT": "PST"}, # LADWP, constant across all years. - 195: {"CPT": "CST"}, - 208: {np.nan: "CST"}, - 211: { - "206": "EST", - "DST": "EDT", - np.nan: "EST", - }, - 213: {"CDS": "CDT"}, - 216: {np.nan: "CDT"}, - 217: { - "MPP": "MST", - "MPT": "MST", - }, - 224: {"DST": "EST"}, - 225: { - "EDS": "EDT", - "DST": "EDT", - "EPT": "EST", - }, - 226: {"DST": "CDT"}, - 230: {"EPT": "EST"}, - 233: { - "DST": "EDT", - "EPT": "EST", - }, - 234: { - "1": "EST", - "2": "EDT", - "DST": "EDT", - }, - # Constant across the year. Never another timezone seen. - 239: {"PPT": "PST"}, - 243: {"DST": "PST"}, - 245: {"CDS": "CDT"}, - 248: {"DST": "EDT"}, - 253: {"CPT": "CST"}, - 254: {"DST": "CDT"}, - 257: {"CPT": "CST"}, - 259: {"DST": "CDT"}, - 264: {"CDS": "CDT"}, - 271: {"EDS": "EDT"}, - 275: {"CPT": "CST"}, - 277: { - "CPT": "CST", - np.nan: "CST", - }, - 281: {"CEN": "CST"}, - 288: {np.nan: "EST"}, - 293: {np.nan: "MST"}, - 294: {np.nan: "EST"}, - 296: {"CPT": "CST"}, - 297: {"CPT": "CST"}, - 298: {"CPT": "CST"}, - 299: {"CPT": "CST"}, - 307: {"PPT": "PST"}, # Pacificorp, constant across the whole year. - 308: { - "DST": "EDT", - "EDS": "EDT", - "EPT": "EST", - }, - 328: { - "EPT": "EST", + 68: {"E": "EST", np.nan: "EST"}, + 73: {"PPT": "PDT"}, # Imperial Irrigation District P looks like D + 77: {"EAS": "EST"}, + 80: {"CPT": "CST"}, + 81: {"CPT": "CST"}, + 83: {"CS": "CST", "CD": "CDT"}, + 84: {"PPT": "PST"}, # LADWP, constant across all years. + 85: {"CPT": "CST"}, + 97: {np.nan: "CST"}, + 100: {"206": "EST", "DST": "EDT", np.nan: "EST"}, + 102: {"CDS": "CDT", "CDST": "CDT"}, + 105: {np.nan: "CDT"}, + 106: {"MPP": "MST", "MPT": "MST"}, + 113: {"DST": "EST"}, + 114: {"EDS": "EDT", "DST": "EDT", "EPT": "EST"}, + 115: {"DST": "CDT"}, + 119: {"EPT": "EST"}, + 122: {"DST": "EDT", "EPT": "EST"}, + 123: {"1": "EST", "2": "EDT", "DST": "EDT"}, + 128: {"PPT": "PST"}, # Constant across the year. Never another timezone seen. + 132: {"DST": "PST", np.nan: "PST"}, + 134: {"CDS": "CDT"}, + 137: {"DST": "EDT"}, + 142: {"CPT": "CST"}, + 143: {"DST": "CDT"}, + 146: {"CPT": "CST"}, + 148: {"DST": "CDT"}, + 153: {"CDS": "CDT"}, + 159: {"EDS": "EDT"}, + 163: {"CPT": "CST"}, + 164: {"CPT": "CST", np.nan: "CST"}, + 168: {"CEN": "CST"}, + 175: {np.nan: "EST"}, + 180: {np.nan: "MST"}, + 181: {np.nan: "EST"}, + 183: {"CPT": "CST"}, + 184: {"CPT": "CST"}, + 185: {"CPT": "CST"}, + 186: {"CPT": "CST"}, + 194: {"PPT": "PST"}, # Pacificorp, constant across the whole year. + 195: {"DST": "EDT", "EDS": "EDT", "EPT": "EST"}, + 210: {"EPT": "EST"}, + 217: {"CPT": "CST"}, + 214: {"EPT": "EST"}, + 215: {"EDT/EST": "EST", "EST/EDT": "EST"}, # this is duke. + 211: { # more recent years have CST & CDT. CDST correspond to DST months + "CDST": "CDT" }, + 20: {"3": "MST"}, # black hills (CO). in year after this 3 its all MST + 95: {np.nan: "PST"}, # just empty in 2021, other years is PST + 29: {np.nan: "PST"}, # just empty in 2022, other years is PST + 101: {np.nan: "EST"}, # this was just one lil empty guy } -OFFSET_CODE_FIXES_BY_YEAR = [ - {"respondent_id_ferc714": 139, "report_year": 2006, "utc_offset_code": "PST"}, - {"respondent_id_ferc714": 235, "report_year": 2015, "utc_offset_code": "MST"}, - {"respondent_id_ferc714": 289, "report_year": 2011, "utc_offset_code": "CST"}, - {"respondent_id_ferc714": 292, "report_year": 2011, "utc_offset_code": "CST"}, +TIMEZONE_OFFSET_CODE_FIXES_BY_YEAR = [ + {"respondent_id_ferc714": 33, "report_year": 2006, "utc_offset_code": "PST"}, + {"respondent_id_ferc714": 124, "report_year": 2015, "utc_offset_code": "MST"}, + {"respondent_id_ferc714": 176, "report_year": 2011, "utc_offset_code": "CST"}, + {"respondent_id_ferc714": 179, "report_year": 2011, "utc_offset_code": "CST"}, ] BAD_RESPONDENTS = [ @@ -169,7 +145,7 @@ ] """Fake respondent IDs for database test entities.""" -OFFSET_CODES = { +TIMEZONE_OFFSET_CODES = { "EST": pd.Timedelta(-5, unit="hours"), # Eastern Standard "EDT": pd.Timedelta(-5, unit="hours"), # Eastern Daylight "CST": pd.Timedelta(-6, unit="hours"), # Central Standard @@ -184,15 +160,14 @@ } """A mapping of timezone offset codes to Timedelta offsets from UTC. -from one year to the next, and these result in duplicate records, which are Note that -the FERC 714 instructions state that all hourly demand is to be reported in STANDARD -time for whatever timezone is being used. Even though many respondents use daylight -savings / standard time abbreviations, a large majority do appear to conform to using a -single UTC offset throughout the year. There are 6 instances in which the timezone -associated with reporting changed dropped. +Note that the FERC 714 instructions state that all hourly demand is to be reported +in STANDARD time for whatever timezone is being used. Even though many respondents +use daylight savings / standard time abbreviations, a large majority do appear to +conform to using a single UTC offset throughout the year. There are 6 instances in +which the timezone associated with reporting changed dropped. """ -TZ_CODES = { +TIMEZONE_CODES = { "EST": "America/New_York", "EDT": "America/New_York", "CST": "America/Chicago", @@ -207,92 +182,110 @@ } """Mapping between standardized time offset codes and canonical timezones.""" -EIA_CODE_FIXES = { - # FERC 714 Respondent ID: EIA BA or Utility ID - 125: 2775, # EIA BA CAISO (fixing bad EIA Code of 229) - 134: 5416, # Duke Energy Corp. (bad id was non-existent 3260) - 203: 12341, # MidAmerican Energy Co. (fixes typo, from 12431) - 257: 59504, # Southwest Power Pool (Fixing bad EIA Coding) - 292: 20382, # City of West Memphis -- (fixes a typo, from 20383) - 295: 40229, # Old Dominion Electric Cooperative (missing) - 301: 14725, # PJM Interconnection Eastern Hub (missing) - 302: 14725, # PJM Interconnection Western Hub (missing) - 303: 14725, # PJM Interconnection Illinois Hub (missing) - 304: 14725, # PJM Interconnection Northern Illinois Hub (missing) - 305: 14725, # PJM Interconnection Dominion Hub (missing) - 306: 14725, # PJM Interconnection AEP-Dayton Hub (missing) - # PacifiCorp Utility ID is 14354. It ALSO has 2 BA IDs: (14378, 14379) - # See https://github.com/catalyst-cooperative/pudl/issues/616 - 307: 14379, # Using this ID for now only b/c it's in the HIFLD geometry - 309: 12427, # Michigan Power Pool / Power Coordination Center (missing) - 315: 56090, # Griffith Energy (bad id was 55124) - 323: 58790, # Gridforce Energy Management (missing) - 324: 58791, # NaturEner Wind Watch LLC (Fixes bad ID 57995) - 329: 39347, # East Texas Electricity Cooperative (missing) +EIA_CODE_FIXES: dict[Literal["combined", "csv", "xbrl"], dict[int | str], int] = { + "combined": { + # FERC 714 Respondent ID: EIA BA or Utility ID + 125: 2775, # EIA BA CAISO (fixing bad EIA Code of 229) + 47: 56812, # Duke Energy Control Area Services, LLC (Arlington Valley WECC AZ) + 146: 59504, # Southwest Power Pool (Fixing bad EIA Coding) + 180: 32790, # New Harquahala. + # PacifiCorp Utility ID is 14354. It ALSO has 2 BA IDs: (14378, 14379) + # See https://github.com/catalyst-cooperative/pudl/issues/616 + 194: 14379, # Using this ID for now only b/c it's in the HIFLD geometry + 206: 58791, # NaturEner Wind Watch LLC (Fixes bad ID 57995) + 201: 56090, # Griffith Energy (bad id was 55124) + 205: 58790, # Gridforce Energy Management (missing or 11378 in xbrl) + 213: 64898, # GridLiance (missing) + }, + "xbrl": { + # FERC 714 Respondent ID XBRL: EIA BA or Utility ID + "C011373": 14610, # Florida Municipal Power Pool (lines up with CSV code & is FL util) + "C011421": 9617, # JEA - lines up w/ CSV code and is EIA util + "C002732": 56365, # NaturEner Power Watch LLC: Fixes bad ID "57049, 57050" + "C002447": 7004, # Buckeye Power: was null or the entity_id + "C001526": 14369, # Avangrid Renewables: was null or the entity_id + "C001132": 15248, # PGE. Bad id was 43. New one lines up w/ CSV and is EIA util + }, + "csv": { + # FERC 714 Respondent ID CSV: EIA BA or Utility ID + 134: 5416, # Duke Energy Corp. (bad id was non-existent 3260) + 203: 12341, # MidAmerican Energy Co. (fixes typo, from 12431) + 292: 20382, # City of West Memphis -- (fixes a typo, from 20383) + 295: 40229, # Old Dominion Electric Cooperative (missing) + 301: 14725, # PJM Interconnection Eastern Hub (missing) + 302: 14725, # PJM Interconnection Western Hub (missing) + 303: 14725, # PJM Interconnection Illinois Hub (missing) + 304: 14725, # PJM Interconnection Northern Illinois Hub (missing) + 305: 14725, # PJM Interconnection Dominion Hub (missing) + 306: 14725, # PJM Interconnection AEP-Dayton Hub (missing) + 309: 12427, # Michigan Power Pool / Power Coordination Center (missing) + 312: 59435, # NaturEner Glacier Wind (missing) + 329: 39347, # East Texas Electricity Cooperative (missing) + }, } -"""Overrides of FERC 714 respondent IDs with wrong or missing EIA Codes.""" +"""Overrides of FERC 714 respondent IDs with wrong or missing EIA Codes. + +This is used in :meth:`RespondentId.spot_fix_eia_codes`. The dictionary +is organized by "source" keys ("combined", "csv", or "xbrl"). Each source's +value is a secondary dictionary which contains source respondent ID's as keys +and fixes for EIA codes as values. + +We separated these fixes by either coming directly from the CSV data, the XBRL +data, or the combined data. We use the corresponding source or PUDL-derived +respondent ID to identify the EIA code to overwrite. We could have combined +these fixes all into one set of combined fixes identified by the PUDL-derived +``respondent_id_ferc714``, but this way we can do more targeted source-based +cleaning and test each source's EIA codes before the sources are concatenated +together. +""" RENAME_COLS = { "core_ferc714__respondent_id": { - "respondent_id": "respondent_id_ferc714", - "respondent_name": "respondent_name_ferc714", + "csv": { + "respondent_id": "respondent_id_ferc714_csv", + "respondent_name": "respondent_name_ferc714", + "eia_code": "eia_code", + }, + "xbrl": { + "entity_id": "respondent_id_ferc714_xbrl", + "respondent_legal_name": "respondent_name_ferc714", + "respondent_identification_code": "eia_code", + }, }, "out_ferc714__hourly_planning_area_demand": { - "report_yr": "report_year", - "plan_date": "report_date", - "respondent_id": "respondent_id_ferc714", - "timezone": "utc_offset_code", - }, - "description_pa_ferc714": { - "report_yr": "report_year", - "respondent_id": "respondent_id_ferc714", - "elec_util_name": "respondent_name_ferc714", - "peak_summer": "peak_demand_summer_mw", - "peak_winter": "peak_demand_winter_mw", - }, - "id_certification_ferc714": { - "report_yr": "report_year", - "respondent_id": "respondent_id_ferc714", - }, - "gen_plants_ba_ferc714": { - "report_yr": "report_year", - "respondent_id": "respondent_id_ferc714", - }, - "demand_monthly_ba_ferc714": { - "report_yr": "report_year", - "respondent_id": "respondent_id_ferc714", - }, - "net_energy_load_ba_ferc714": { - "report_yr": "report_year", - "respondent_id": "respondent_id_ferc714", - }, - "adjacency_ba_ferc714": { - "report_yr": "report_year", - "respondent_id": "respondent_id_ferc714", - }, - "interchange_ba_ferc714": { - "report_yr": "report_year", - "respondent_id": "respondent_id_ferc714", - }, - "lambda_hourly_ba_ferc714": { - "report_yr": "report_year", - "respondent_id": "respondent_id_ferc714", - }, - "lambda_description_ferc714": { - "report_yr": "report_year", - "respondent_id": "respondent_id_ferc714", - }, - "demand_forecast_pa_ferc714": { - "report_yr": "report_year", - "respondent_id": "respondent_id_ferc714", + "csv": { + "report_yr": "report_year", + "plan_date": "report_date", + "respondent_id": "respondent_id_ferc714_csv", + "timezone": "utc_offset_code", + }, + "xbrl": { + "entity_id": "respondent_id_ferc714_xbrl", + "date": "report_date", + "report_year": "report_year", + "time_zone": "utc_offset_code", + "planning_area_hourly_demand_megawatts": "demand_mwh", + }, }, "core_ferc714__yearly_planning_area_demand_forecast": { - "respondent_id": "respondent_id_ferc714", - "report_yr": "report_year", - "plan_year": "forecast_year", - "summer_forecast": "summer_peak_demand_mw", - "winter_forecast": "winter_peak_demand_mw", - "net_energy_forecast": "net_demand_mwh", + "csv": { + "respondent_id": "respondent_id_ferc714_csv", + "report_yr": "report_year", + "plan_year": "forecast_year", + "summer_forecast": "summer_peak_demand_forecast_mw", + "winter_forecast": "winter_peak_demand_forecast_mw", + "net_energy_forecast": "net_demand_forecast_mwh", + }, + "xbrl": { + "entity_id": "respondent_id_ferc714_xbrl", + "start_date": "start_date", + "end_date": "end_date", + "report_year": "report_year", + "planning_area_hourly_demand_and_forecast_year": "forecast_year", + "planning_area_hourly_demand_and_forecast_summer_forecast": "summer_peak_demand_forecast_mw", + "planning_area_hourly_demand_and_forecast_winter_forecast": "winter_peak_demand_forecast_mw", + "planning_area_hourly_demand_and_forecast_forecast_of_annual_net_energy_for_load": "net_demand_forecast_mwh", + }, }, } @@ -300,8 +293,8 @@ ############################################################################## # Internal helper functions. ############################################################################## -def _pre_process(df: pd.DataFrame, table_name: str) -> pd.DataFrame: - """A simple transform function for until the real ones are written. +def _pre_process_csv(df: pd.DataFrame, table_name: str) -> pd.DataFrame: + """A simple transform function for processing the CSV raw data. * Removes footnotes columns ending with _f * Drops report_prd, spplmnt_num, and row_num columns @@ -310,304 +303,911 @@ def _pre_process(df: pd.DataFrame, table_name: str) -> pd.DataFrame: logger.info("Removing unneeded columns and dropping bad respondents.") out_df = ( - df.rename(columns=RENAME_COLS[table_name]) + rename_columns( + df=df, params=RenameColumns(columns=RENAME_COLS[table_name]["csv"]) + ) .filter(regex=r"^(?!.*_f$).*") .drop(["report_prd", "spplmnt_num", "row_num"], axis="columns", errors="ignore") ) # Exclude fake Test IDs -- not real respondents - out_df = out_df[~out_df.respondent_id_ferc714.isin(BAD_RESPONDENTS)] + out_df = out_df[~out_df.respondent_id_ferc714_csv.isin(BAD_RESPONDENTS)] return out_df -def _post_process(df: pd.DataFrame, table_name: str) -> pd.DataFrame: - """Uniform post-processing of FERC 714 tables. +def _assign_respondent_id_ferc714( + df: pd.DataFrame, source: Literal["csv", "xbrl"] +) -> pd.DataFrame: + """Assign the PUDL-assigned respondent_id_ferc714 based on the native respondent ID. - Applies standard data types and ensures that the tables generally conform to the - schemas we have defined for them. + We need to replace the natively reported respondent ID from each of the two FERC714 + sources with a PUDL-assigned respondent ID. The mapping between the native ID's and + these PUDL-assigned ID's can be accessed in the database tables + ``respondents_csv_ferc714`` and ``respondents_xbrl_ferc714``. Args: - df: A dataframe to be post-processed. + df: the input table with the native respondent ID column. + source: the lower-case string name of the source of the FERC714 data. Either csv + or xbrl. Returns: - The post-processed dataframe. + an augmented version of the input ``df`` with a new column that replaces + the natively reported respondent ID with the PUDL-assigned respondent ID. """ - return PUDL_PACKAGE.get_resource(table_name).enforce_schema(df) + respondent_map_ferc714 = pd.read_csv( + importlib.resources.files("pudl.package_data.glue") + / "respondent_id_ferc714.csv" + ).convert_dtypes() + # use the source utility ID column to get a unique map and for merging + resp_id_col = f"respondent_id_ferc714_{source}" + resp_map_series = ( + respondent_map_ferc714.dropna(subset=[resp_id_col]) + .set_index(resp_id_col) + .respondent_id_ferc714 + ) + df["respondent_id_ferc714"] = df[resp_id_col].map(resp_map_series) + return df -def _standardize_offset_codes(df: pd.DataFrame, offset_fixes) -> pd.DataFrame: - """Convert to standardized UTC offset abbreviations. - This function ensures that all of the 3-4 letter abbreviations used to indicate a - timestamp's localized offset from UTC are standardized, so that they can be used to - make the timestamps timezone aware. The standard abbreviations we're using are: +def _fillna_respondent_id_ferc714_source( + df: pd.DataFrame, source: Literal["csv", "xbrl"] +) -> pd.DataFrame: + """Fill missing CSV or XBRL respondent id. + + The source (CSV or XBRL) tables get assigned a PUDL-derived + ``respondent_id_ferc714`` ID column (via :func:`_assign_respondent_id_ferc714`). + After we concatenate the source tables, we sometimes backfill and + forward-fill the source IDs (``respondent_id_ferc714_csv`` and + ``respondent_id_ferc714_xbrl``). This way the older records from the CSV years + will also have the XBRL ID's and vice versa. This will enable users to find + the full timeseries of a respondent that given either source ID (instead of + using the source ID to find the PUDL-derived ID and then finding the records). + """ + respondent_map_ferc714 = pd.read_csv( + importlib.resources.files("pudl.package_data.glue") + / "respondent_id_ferc714.csv" + ).convert_dtypes() + # use the source utility ID column to get a unique map and for merging + resp_id_col = f"respondent_id_ferc714_{source}" + resp_map_series = respondent_map_ferc714.dropna(subset=[resp_id_col]).set_index( + "respondent_id_ferc714" + )[resp_id_col] + + df[resp_id_col] = df[resp_id_col].fillna( + df["respondent_id_ferc714"].map(resp_map_series) + ) + return df - "HST": Hawaii Standard Time - "AKST": Alaska Standard Time - "AKDT": Alaska Daylight Time - "PST": Pacific Standard Time - "PDT": Pacific Daylight Time - "MST": Mountain Standard Time - "MDT": Mountain Daylight Time - "CST": Central Standard Time - "CDT": Central Daylight Time - "EST": Eastern Standard Time - "EDT": Eastern Daylight Time - In some cases different respondents use the same non-standard abbreviations to - indicate different offsets, and so the fixes are applied on a per-respondent basis, - as defined by offset_fixes. +def assign_report_day(df: pd.DataFrame, date_col: str) -> pd.DataFrame: + """Add a report_day column.""" + return df.assign( + report_day=pd.to_datetime(df[date_col], format="%Y-%m-%d", exact=False) + ) - Args: - df: DataFrame containing a utc_offset_code column that needs to be standardized. - offset_fixes: A dictionary with respondent_id_ferc714 values as the keys, and a - dictionary mapping non-standard UTC offset codes to the standardized UTC - offset codes as the value. - Returns: - Standardized UTC offset codes. +class RespondentId: + """Class for building the :ref:`core_ferc714__respondent_id` asset. + + Most of the methods in this class as staticmethods. The purpose of using a class + in this instance is mostly for organizing the table specific transforms under the + same name-space. """ - logger.info("Standardizing UTC offset codes.") - # We only need a couple of columns here: - codes = df[["respondent_id_ferc714", "utc_offset_code"]].copy() - # Set all blank "" missing UTC codes to np.nan - codes["utc_offset_code"] = codes.utc_offset_code.mask(codes.utc_offset_code == "") - # Apply specific fixes on a per-respondent basis: - codes = codes.groupby("respondent_id_ferc714").transform( - lambda x: x.replace(offset_fixes[x.name]) if x.name in offset_fixes else x - ) - return codes + + @classmethod + def run( + cls, raw_csv: pd.DataFrame, raw_xbrl_duration: pd.DataFrame + ) -> pd.DataFrame: + """Build the table for the :ref:`core_ferc714__respondent_id` asset. + + Process and combine the CSV and XBRL based data. + + There are two main threads of transforms happening here: + + * Table compatibility: The CSV raw table is static (does not even report years) + while the xbrl table is reported annually. A lot of the downstream analysis + expects this table to be static. So the first step was to check whether or not + the columns that we have in the CSV years had consistent data over the few XBRL + years that we have. There are a small number of eia_code's we needed to clean + up, but besides that it was static. We then convert the XBRL data into a static + table, then we concat-ed the tables and checked the static-ness again via + :meth:`ensure_eia_code_uniqueness`. + * eia_code cleaning: Clean up FERC-714 respondent names and manually assign EIA + utility IDs to a few FERC Form 714 respondents that report planning area demand, + but which don't have their corresponding EIA utility IDs provided by FERC for + some reason (including PacifiCorp). Done all via :meth:`spot_fix_eia_codes` & + EIA_CODE_FIXES. + + """ + table_name = "core_ferc714__respondent_id" + # CSV STUFF + csv = ( + _pre_process_csv(raw_csv, table_name) + .pipe(_assign_respondent_id_ferc714, source="csv") + .astype({"eia_code": pd.Int64Dtype()}) + .pipe(cls.spot_fix_eia_codes, "csv") + .pipe(cls.ensure_eia_code_uniqueness, "csv") + .assign(source="csv") + ) + # XBRL STUFF + xbrl = ( + rename_columns( + raw_xbrl_duration, + params=RenameColumns(columns=RENAME_COLS[table_name]["xbrl"]), + ) + .pipe(_assign_respondent_id_ferc714, source="xbrl") + .pipe(cls.clean_eia_codes_xbrl) + .astype({"eia_code": pd.Int64Dtype()}) + .pipe(cls.spot_fix_eia_codes, "xbrl") + .pipe(cls.ensure_eia_code_uniqueness, "xbrl") + .pipe(cls.convert_into_static_table_xbrl) + .assign(source="xbrl") + ) + # CONCATED STUFF + df = ( + pd.concat([csv, xbrl]) + .reset_index(drop=True) + .convert_dtypes() + .pipe(cls.spot_fix_eia_codes, "combined") + .pipe(cls.ensure_eia_code_uniqueness, "combined") + .pipe(cls.condense_into_one_source_table) + .pipe(_fillna_respondent_id_ferc714_source, "csv") + # the xbrl version of this is fillna is not *strictly necessary* + # bc we are sorting the records grab the xbrl record if there is one + # for each respondent during condense_into_one_source_table. + .pipe(_fillna_respondent_id_ferc714_source, "xbrl") + ) + return df + + @staticmethod + def spot_fix_eia_codes( + df: pd.DataFrame, source: Literal["csv", "xbrl", "combined"] + ) -> pd.DataFrame: + """Spot fix the eia_codes. + + Using the manually compiled fixes to the ``eia_code`` column stored in + :py:const:`EIA_CODE_FIXES`, replace the reported values by respondent. + """ + df.loc[df.eia_code == 0, "eia_code"] = pd.NA + suffix = "" if source == "combined" else f"_{source}" + # There are a few utilities that seem mappable, but missing: + for rid, new_code in EIA_CODE_FIXES[source].items(): + df.loc[df[f"respondent_id_ferc714{suffix}"] == rid, "eia_code"] = new_code + return df + + @staticmethod + def ensure_eia_code_uniqueness( + df: pd.DataFrame, source: Literal["csv", "xbrl", "combined"] + ) -> pd.DataFrame: + """Ensure there is only one unique eia_code for each respondent.""" + df["eia_code_count"] = ( + df.dropna(subset=["eia_code"]) + .groupby(["respondent_id_ferc714"])[["eia_code"]] + .transform("nunique") + ) + if not ( + multiple_eia_codes := df[(df.eia_code_count != 1) & (df.eia_code.notnull())] + ).empty: + raise AssertionError( + "We expected 0 respondents with multiple different eia_code's " + f"reported for each respondent in {source} data, " + f"but we found {len(multiple_eia_codes)}" + ) + return df.drop(columns=["eia_code_count"]) + + @staticmethod + def clean_eia_codes_xbrl(xbrl: pd.DataFrame) -> pd.DataFrame: + """Make eia_code's cleaner coming from the XBRL data. + + Desired outcomes here include all respondents have only one non-null + eia_code and all eia_codes that are actually the respondent_id_ferc714_xbrl + are nulled. + """ + # we expect all of these submissions to be from the last Q + assert all(xbrl.report_period == "Q4") + # first we are gonna null out all of the "EIA" codes that are really just the respondent id + code_is_respondent_id_mask = xbrl.eia_code.str.startswith("C") & ( + xbrl.respondent_id_ferc714_xbrl == xbrl.eia_code + ) + xbrl.loc[code_is_respondent_id_mask, "eia_code"] = pd.NA + + # lets null out some of the eia_code's from XBRL that we've manually culled + # because they are were determined to be wrong. These respondents + # had more than one value for their eia_code and one was always wrong + respondent_id_xbrl_to_bad_eia_code = { + "C002422": ["5776"], + "C011374": ["8376"], + "C002869": ["F720204"], + "C002732": ["F720204", "57049, 57050"], + "C011420": ["16606"], + } + for rid_xbrl, bad_eia_codes in respondent_id_xbrl_to_bad_eia_code.items(): + xbrl.loc[ + (xbrl.respondent_id_ferc714_xbrl == rid_xbrl) + & (xbrl.eia_code.isin(bad_eia_codes)), + "eia_code", + ] = pd.NA + return xbrl + + @staticmethod + def convert_into_static_table_xbrl(xbrl: pd.DataFrame) -> pd.DataFrame: + """Convert this annually reported table into a skinnier, static table. + + The CSV table is entirely static - it doesn't have any reported + changes that vary over time. The XBRL table does have start and end + dates in it. In order to merge these two sources, we are checking + whether or not the shared variables change over time and then + converting this table into a non-time-varying table. + """ + # the CSV data does not vary by year, so we need to check if that is + # also going to be the case for the XBRL data. we check the eia_codes + # during ensure_eia_code_uniqueness. The name is less crucial but we + # should still check. + assert all( + xbrl.groupby(["respondent_id_ferc714_xbrl"])[ # noqa: PD101 + ["respondent_name_ferc714"] + ].nunique() + == 1 + ) + cols_to_keep = [ + "respondent_id_ferc714", + "respondent_id_ferc714_xbrl", + "respondent_name_ferc714", + "eia_code", + ] + # we are going to first sort by report year (descending) so the more recent + # name is the name we get - just in case - we are checking for consistency of + # the name above. + return ( + xbrl.sort_values(["report_year"], ascending=False)[cols_to_keep] + .sort_values(["respondent_id_ferc714", "eia_code"]) + .drop_duplicates(subset=["respondent_id_ferc714"], keep="first") + ) + + @staticmethod + def condense_into_one_source_table(df): + """Condense the CSV and XBRL records together into one record. + + We have two records coming from each of the two sources in this table. + This method simply drops duplicates based on the PKs of the table. + We know that the names are different in the CSV vs the XBRL source. + We are going to grab the XBRL names because they are more recent. + + NOTE: We could have merged the data in :meth:`run` instead of concatenating + along the index. We would have had to develop different methods for + :meth:`ensure_eia_code_uniqueness`. + """ + return df.sort_values(["source"], ascending=False).drop_duplicates( + subset=["respondent_id_ferc714", "eia_code"], keep="first" + ) @asset( io_manager_key="pudl_io_manager", + ins={ + "raw_csv": AssetIn(key="raw_ferc714_csv__respondent_id"), + "raw_xbrl_duration": AssetIn( + key="raw_ferc714_xbrl__identification_and_certification_01_1_duration" + ), + }, compute_kind="pandas", ) def core_ferc714__respondent_id( - raw_ferc714_csv__respondent_id: pd.DataFrame, + raw_csv: pd.DataFrame, raw_xbrl_duration: pd.DataFrame ) -> pd.DataFrame: """Transform the FERC 714 respondent IDs, names, and EIA utility IDs. - Clean up FERC-714 respondent names and manually assign EIA utility IDs to a few FERC - Form 714 respondents that report planning area demand, but which don't have their - corresponding EIA utility IDs provided by FERC for some reason (including - PacifiCorp). + This is a light wrapper around :class:`RespondentId` because you need to + build an asset from a function - not a staticmethod of a class. Args: - raw_ferc714_csv__respondent_id: Raw table describing the FERC 714 Respondents. + raw_csv: Raw table describing the FERC 714 Respondents from the CSV years. + raw_xbrl_duration: Raw table describing the FERC 714 Respondents from the + XBRL years. Returns: A clean(er) version of the FERC-714 respondents table. """ - df = _pre_process( - raw_ferc714_csv__respondent_id, table_name="core_ferc714__respondent_id" - ) - df["respondent_name_ferc714"] = df.respondent_name_ferc714.str.strip() - df.loc[df.eia_code == 0, "eia_code"] = pd.NA - # There are a few utilities that seem mappable, but missing: - for rid in EIA_CODE_FIXES: - df.loc[df.respondent_id_ferc714 == rid, "eia_code"] = EIA_CODE_FIXES[rid] - return _post_process(df, table_name="core_ferc714__respondent_id") + return RespondentId.run(raw_csv, raw_xbrl_duration) + + +class HourlyPlanningAreaDemand: + """Class for building the :ref:`out_ferc714__hourly_planning_area_demand` asset. + + The :ref:`out_ferc714__hourly_planning_area_demand` table is an hourly time + series of demand by Planning Area. + + Most of the methods in this class as staticmethods. The purpose of using a class + in this instance is mostly for organizing the table specific transforms under the + same name-space. + """ + + @classmethod + def run( + cls, + raw_csv: pd.DataFrame, + raw_xbrl_duration: pd.DataFrame, + raw_xbrl_instant: pd.DataFrame, + ) -> pd.DataFrame: + """Build the :ref:`out_ferc714__hourly_planning_area_demand` asset. + + To transform this table we have to process the instant and duration xbrl + tables so we can merge them together and process the XBRL data. We also + have to process the CSV data so we can concatenate it with the XBLR data. + Then we can process all of the data together. + + For both the CSV and XBRL data, the main transforms that are happening + have to do with cleaning the timestamps in the data, resulting in + timestamps that are in a datetime format and are nearly continuous + for every respondent. + + Once the CSV and XBRL data is merged together, the transforms are mostly + focused on cleaning the timezone codes reported to FERC + and then using those timezone codes to convert all of timestamps into + UTC datetime. + + The outcome here is nearly continuous and non-duplicative time series. + """ + table_name = "out_ferc714__hourly_planning_area_demand" + # XBRL STUFF + duration_xbrl = cls.remove_yearly_records_duration_xbrl(raw_xbrl_duration) + xbrl = ( + cls.merge_instant_and_duration_tables_xbrl( + raw_xbrl_instant, duration_xbrl, table_name=table_name + ) + .pipe( + rename_columns, + params=RenameColumns(columns=RENAME_COLS[table_name]["xbrl"]), + ) + .pipe(_assign_respondent_id_ferc714, "xbrl") + .pipe(cls.convert_dates_to_zero_offset_hours_xbrl) + .astype({"report_date": "datetime64[ns]"}) + .pipe(cls.convert_dates_to_zero_seconds_xbrl) + .pipe(cls.spot_fix_records_xbrl) + .pipe(cls.ensure_dates_are_continuous, source="xbrl") + ) + # CSV STUFF + csv = ( + _pre_process_csv(raw_csv, table_name=table_name) + .pipe(_assign_respondent_id_ferc714, "csv") + .pipe(cls.melt_hourx_columns_csv) + .pipe(cls.parse_date_strings_csv) + .pipe(cls.ensure_dates_are_continuous, source="csv") + ) + # CONCATED STUFF + df = ( + pd.concat([csv, xbrl]) + .reset_index(drop=True) + .assign( + utc_offset_code=lambda x: cls.standardize_offset_codes( + x, TIMEZONE_OFFSET_CODE_FIXES + ) + ) + .pipe(cls.clean_utc_code_offsets_and_set_timezone) + .pipe(cls.drop_missing_utc_offset) + .pipe(cls.construct_utc_datetime) + .pipe(cls.ensure_non_duplicated_datetimes) + .pipe(cls.spot_fix_values) + # Convert report_date to first day of year + .assign( + report_date=lambda x: x.report_date.dt.to_period("Y").dt.to_timestamp() + ) + .pipe(_fillna_respondent_id_ferc714_source, "xbrl") + .pipe(_fillna_respondent_id_ferc714_source, "csv") + # sort so that the parquet files have all the repeating IDs are next + # to each other for smoller storage + .sort_values(by=["respondent_id_ferc714", "datetime_utc"]) + ) + return df + + @staticmethod + def melt_hourx_columns_csv(df): + """Melt hourX columns into hours. + + There are some instances of the CSVs with a 25th hour. We drop + those entirely because almost all of them are unusable (0.0 or + daily totals), and they shouldn't really exist at all based on + FERC instructions. + """ + df = df.drop(columns="hour25") + + # Melt daily rows with 24 demands to hourly rows with single demand + logger.info("Melting daily FERC 714 records into hourly records.") + df = df.rename( + columns=lambda x: int(re.sub(r"^hour", "", x)) - 1 if "hour" in x else x, + ) + df = df.melt( + id_vars=[ + "respondent_id_ferc714", + "respondent_id_ferc714_csv", + "report_year", + "report_date", + "utc_offset_code", + ], + value_vars=range(24), + var_name="hour", + value_name="demand_mwh", + ) + return df + + @staticmethod + def parse_date_strings_csv(csv): + """Convert report_date into pandas Datetime types. + + Make the report_date column from the daily string ``report_date`` and + the integer ``hour`` column. + """ + # Parse date strings + hour_timedeltas = {i: pd.to_timedelta(i, unit="h") for i in range(24)} + # NOTE: Faster to ignore trailing 00:00:00 and use exact=False + csv["report_date"] = pd.to_datetime( + csv["report_date"], format="%m/%d/%Y", exact=False + ) + csv["hour"].map(hour_timedeltas) + return csv.drop(columns=["hour"]) + + @staticmethod + def remove_yearly_records_duration_xbrl(duration_xbrl): + """Convert a table with mostly daily records with some annuals into fully daily. + + Almost all of the records have a start_date that == the end_date + which I'm assuming means the record spans the duration of one day + there are a small handful of records which seem to span a full year. + """ + duration_xbrl = duration_xbrl.astype( + {"start_date": "datetime64[ns]", "end_date": "datetime64[ns]"} + ) + one_day_mask = duration_xbrl.start_date == duration_xbrl.end_date + duration_xbrl_one_day = duration_xbrl[one_day_mask] + duration_xbrl_one_year = duration_xbrl[~one_day_mask] + # ensure there are really only a few of these multi-day records + assert len(duration_xbrl_one_year) / len(duration_xbrl_one_day) < 0.0005 + # ensure all of these records are one year records + assert all( + duration_xbrl_one_year.start_date + + pd.DateOffset(years=1) + - pd.DateOffset(days=1) + == duration_xbrl_one_year.end_date + ) + # these one-year records all show up as one-day records. + idx = ["entity_id", "report_year", "start_date"] + assert all( + duration_xbrl_one_year.merge( + duration_xbrl_one_day, on=idx, how="left", indicator=True + )._merge + == "both" + ) + # all but two of them have the same timezone as the hourly data. + # two of them have UTC instead of a local timezone reported in hourly data. + # this leads me to think these are okay to just drop + return duration_xbrl_one_day + + @staticmethod + def merge_instant_and_duration_tables_xbrl( + instant_xbrl: pd.DataFrame, duration_xbrl: pd.DataFrame, table_name: str + ) -> pd.DataFrame: + """Merge XBRL instant and duration tables, reshaping instant as needed. + + FERC714 XBRL instant period signifies that it is true as of the reported date, + while a duration fact pertains to the specified time period. The ``date`` column + for an instant fact corresponds to the ``end_date`` column of a duration fact. + + Args: + instant_xbrl: table representing XBRL instant facts. + raw_xbrl_duration: table representing XBRL duration facts. + + Returns: + A unified table combining the XBRL duration and instant facts, if both types + of facts were present. If either input dataframe is empty, the other + dataframe is returned unchanged, except that several unused columns are + dropped. If both input dataframes are empty, an empty dataframe is returned. + """ + drop_cols = ["filing_name", "index"] + # Ignore errors in case not all drop_cols are present. + instant = instant_xbrl.drop(columns=drop_cols, errors="ignore").pipe( + assign_report_day, "date" + ) + duration = duration_xbrl.drop(columns=drop_cols, errors="ignore").pipe( + assign_report_day, "start_date" + ) + + merge_keys = ["entity_id", "report_year", "report_day", "sched_table_name"] + # Merge instant into duration. + out_df = pd.merge( + instant, + duration, + how="left", + on=merge_keys, + validate="m:1", + ).drop(columns=["report_day", "start_date", "end_date"]) + return out_df + + @staticmethod + def convert_dates_to_zero_offset_hours_xbrl(xbrl: pd.DataFrame) -> pd.DataFrame: + """Convert all hours to: Hour (24-hour clock) as a zero-padded decimal number. + + The FERC 714 form includes columns for the hours of each day. Those columns are + labeled with 1-24 to indicate the hours of the day. The XBRL filings themselves + have time-like string associated with each of the facts. They include both a the + year-month-day portion (formatted as %Y-%m-%d) as well as an hour-minute-second + component (semi-formatted as T%H:%M:%S). Attempting to simply convert this + timestamp information to a datetime using the format ``"%Y-%m-%dT%H:%M:%S"`` + fails because about a third of the records include hour 24 - which is not an + accepted hour in standard datetime formats. + + The respondents that report hour 24 do not report hour 00. We have done some spot + checking of values reported to FERC and have determined that hour 24 seems to + correspond with hour 00 (of the next day). We have not gotten complete + confirmation from FERC staff that this is always the case, but it seems like a + decent assumption. + + So, this step converts all of the hour 24 records to be hour 00 of the next day. + """ + bad_24_hour_mask = xbrl.report_date.str.contains("T24:") + + xbrl.loc[bad_24_hour_mask, "report_date"] = pd.to_datetime( + xbrl[bad_24_hour_mask].report_date.str.replace("T24:", "T23:"), + format="%Y-%m-%dT%H:%M:%S", + ) + np.timedelta64(1, "h") + return xbrl + + @staticmethod + def convert_dates_to_zero_seconds_xbrl(xbrl: pd.DataFrame) -> pd.DataFrame: + """Convert the last second of the day records to the first (0) second of the next day. + + There are a small amount of records which report the last "hour" of the day + as last second of the day, as opposed to T24 cleaned in + :meth:`convert_dates_to_zero_offset_hours_xbrl` or T00 which is standard for a + datetime. This function finds these records and adds one second to them and + then ensures all of the records has 0's for seconds. + """ + last_second_mask = xbrl.report_date.dt.second == 59 + + xbrl.loc[last_second_mask, "report_date"] = xbrl.loc[ + last_second_mask, "report_date" + ] + pd.Timedelta("1s") + assert xbrl[xbrl.report_date.dt.second != 0].empty + return xbrl + + @staticmethod + def spot_fix_records_xbrl(xbrl: pd.DataFrame): + """Spot fix some specific XBRL records.""" + xbrl_years_mask = xbrl.report_date.dt.year >= min(Ferc714Settings().xbrl_years) + if (len_csv_years := len(xbrl[~xbrl_years_mask])) > 25: + raise AssertionError( + "We expected less than 25 XBRL records that have timestamps " + f"with years before the XBRL years, but we found {len_csv_years}" + ) + return xbrl[xbrl_years_mask] + + @staticmethod + def ensure_dates_are_continuous(df: pd.DataFrame, source: Literal["csv", "xbrl"]): + """Assert that almost all respondents have continuous timestamps. + + In the xbrl data, we found 41 gaps in the timeseries! They are almost entirely + on the hour in which daylight savings times goes into effect. The csv data + had 10 gaps. Pretty good all in all! + """ + df["gap"] = df[["respondent_id_ferc714", "report_date"]].sort_values( + by=["respondent_id_ferc714", "report_date"] + ).groupby("respondent_id_ferc714").diff() > pd.to_timedelta("1h") + if len(gappy_dates := df[df.gap]) > (41 if source == "xbrl" else 10): + raise AssertionError( + "We expect there to be nearly no gaps in the time series." + f"but we found these gaps:\n{gappy_dates}" + ) + return df.drop(columns=["gap"]) + + @staticmethod + def standardize_offset_codes(df: pd.DataFrame, offset_fixes) -> pd.Series: + """Convert to standardized UTC offset abbreviations. + + This function ensures that all of the 3-4 letter abbreviations used to indicate a + timestamp's localized offset from UTC are standardized, so that they can be used to + make the timestamps timezone aware. The standard abbreviations we're using are: + + "HST": Hawaii Standard Time + "AKST": Alaska Standard Time + "AKDT": Alaska Daylight Time + "PST": Pacific Standard Time + "PDT": Pacific Daylight Time + "MST": Mountain Standard Time + "MDT": Mountain Daylight Time + "CST": Central Standard Time + "CDT": Central Daylight Time + "EST": Eastern Standard Time + "EDT": Eastern Daylight Time + + In some cases different respondents use the same non-standard abbreviations to + indicate different offsets, and so the fixes are applied on a per-respondent basis, + as defined by offset_fixes. + + Args: + df: DataFrame containing a utc_offset_code column that needs to be standardized. + offset_fixes: A dictionary with respondent_id_ferc714 values as the keys, and a + dictionary mapping non-standard UTC offset codes to the standardized UTC + offset codes as the value. + + Returns: + Standardized UTC offset codes. + """ + logger.info("Standardizing UTC offset codes.") + # Clean UTC offset codes + df["utc_offset_code"] = df["utc_offset_code"].str.strip().str.upper() + # We only need a couple of columns here: + codes = df[["respondent_id_ferc714", "utc_offset_code"]].copy() + # Set all blank "" missing UTC codes to np.nan + codes["utc_offset_code"] = codes.utc_offset_code.mask( + codes.utc_offset_code == "" + ) + # Apply specific fixes on a per-respondent basis: + codes = codes.groupby("respondent_id_ferc714").transform( + lambda x: x.replace(offset_fixes[x.name]) if x.name in offset_fixes else x + ) + return codes + + @staticmethod + def clean_utc_code_offsets_and_set_timezone(df): + """Clean UTC Codes and set timezone.""" + # NOTE: Assumes constant timezone for entire year + for fix in TIMEZONE_OFFSET_CODE_FIXES_BY_YEAR: + mask = (df["report_year"] == fix["report_year"]) & ( + df["respondent_id_ferc714"] == fix["respondent_id_ferc714"] + ) + df.loc[mask, "utc_offset_code"] = fix["utc_offset_code"] + + # Replace UTC offset codes with UTC offset and timezone + df["utc_offset"] = df["utc_offset_code"].map(TIMEZONE_OFFSET_CODES) + df["timezone"] = df["utc_offset_code"].map(TIMEZONE_CODES) + return df + + @staticmethod + def drop_missing_utc_offset(df): + """Drop records with missing UTC offsets and zero demand.""" + # Assert that all records missing UTC offset have zero demand + missing_offset = df["utc_offset"].isna() + bad_offset_and_demand = df.loc[missing_offset & (df.demand_mwh != 0)] + # there are 12 of these bad guys just in the 2023 fast test. + if len(bad_offset_and_demand) > 12: + raise AssertionError( + "We expect all but 12 of the records without a cleaned " + "utc_offset to not have any demand data, but we found " + f"{len(bad_offset_and_demand)} records.\nUncleaned Codes: " + f"{bad_offset_and_demand.utc_offset_code.unique()}\n{bad_offset_and_demand}" + ) + # Drop these records & then drop the original offset code + df = df.query("~@missing_offset").drop(columns="utc_offset_code") + return df + + @staticmethod + def construct_utc_datetime(df: pd.DataFrame) -> pd.DataFrame: + """Construct datetime_utc column.""" + # Construct UTC datetime + logger.info("Converting local time + offset code to UTC + timezone.") + df["datetime_utc"] = df["report_date"] - df["utc_offset"] + df = df.drop(columns=["utc_offset"]) + return df + + @staticmethod + def ensure_non_duplicated_datetimes(df): + """Report and drop duplicated UTC datetimes.""" + # There should be less than 10 of these, + # resulting from changes to a planning area's reporting timezone. + duplicated = df.duplicated(["respondent_id_ferc714", "datetime_utc"]) + if (num_dupes := np.count_nonzero(duplicated)) > 10: + raise AssertionError( + f"Found {num_dupes} duplicate UTC datetimes, but we expected 10 or less.\n{df[duplicated]}" + ) + df = df.query("~@duplicated") + return df + + @staticmethod + def spot_fix_values(df: pd.DataFrame) -> pd.DataFrame: + """Spot fix values.""" + # Flip the sign on sections of demand which were reported as negative + mask = ( + df["report_year"].isin([2006, 2007, 2008, 2009]) + & (df["respondent_id_ferc714"] == 156) + ) | ( + df["report_year"].isin([2006, 2007, 2008, 2009, 2010]) + & (df["respondent_id_ferc714"] == 289) + ) + df.loc[mask, "demand_mwh"] *= -1 + return df @asset( + ins={ + "raw_csv": AssetIn(key="raw_ferc714_csv__hourly_planning_area_demand"), + "raw_xbrl_duration": AssetIn( + key="raw_ferc714_xbrl__planning_area_hourly_demand_and_forecast_summer_and_winter_peak_demand_and_annual_net_energy_for_load_03_2_duration" + ), + "raw_xbrl_instant": AssetIn( + key="raw_ferc714_xbrl__planning_area_hourly_demand_and_forecast_summer_and_winter_peak_demand_and_annual_net_energy_for_load_03_2_instant" + ), + }, io_manager_key="parquet_io_manager", op_tags={"memory-use": "high"}, compute_kind="pandas", ) def out_ferc714__hourly_planning_area_demand( - raw_ferc714_csv__hourly_planning_area_demand: pd.DataFrame, + raw_csv: pd.DataFrame, + raw_xbrl_duration: pd.DataFrame, + raw_xbrl_instant: pd.DataFrame, ) -> pd.DataFrame: - """Transform the hourly demand time series by Planning Area. - - Transformations include: - - - Clean UTC offset codes. - - Replace UTC offset codes with UTC offset and timezone. - - Drop 25th hour rows. - - Set records with 0 UTC code to 0 demand. - - Drop duplicate rows. - - Flip negative signs for reported demand. + """Build the :ref:`out_ferc714__hourly_planning_area_demand`. - Args: - raw_ferc714_csv__hourly_planning_area_demand: Raw table containing hourly demand - time series by Planning Area. - - Returns: - Clean(er) version of the hourly demand time series by Planning Area. + This is a light wrapper around :class:`HourlyPlanningAreaDemand` because + it seems you need to build an asset from a function - not a staticmethod of + a class. """ - logger.info("Converting dates into pandas Datetime types.") - df = _pre_process( - raw_ferc714_csv__hourly_planning_area_demand, - table_name="out_ferc714__hourly_planning_area_demand", - ) + return HourlyPlanningAreaDemand.run(raw_csv, raw_xbrl_duration, raw_xbrl_instant) - # Parse date strings - # NOTE: Faster to ignore trailing 00:00:00 and use exact=False - df["report_date"] = pd.to_datetime( - df["report_date"], format="%m/%d/%Y", exact=False - ) - - # Assert that all respondents and years have complete and unique dates - all_dates = { - year: set(pd.date_range(f"{year}-01-01", f"{year}-12-31", freq="1D")) - for year in range(df["report_year"].min(), df["report_year"].max() + 1) - } - assert ( # nosec B101 - df.groupby(["respondent_id_ferc714", "report_year"]) - .apply(lambda x: set(x["report_date"]) == all_dates[x.name[1]]) - .all() - ) - # Clean UTC offset codes - df["utc_offset_code"] = df["utc_offset_code"].str.strip().str.upper() - df["utc_offset_code"] = _standardize_offset_codes(df, OFFSET_CODE_FIXES) +class YearlyPlanningAreaDemandForecast: + """Class for building the :ref:`core_ferc714__yearly_planning_area_demand_forecast` asset. - # NOTE: Assumes constant timezone for entire year - for fix in OFFSET_CODE_FIXES_BY_YEAR: - mask = (df["report_year"] == fix["report_year"]) & ( - df["respondent_id_ferc714"] == fix["respondent_id_ferc714"] - ) - df.loc[mask, "utc_offset_code"] = fix["utc_offset_code"] + The :ref:`core_ferc714__yearly_planning_area_demand_forecast` table is an annual, forecasted + time series of demand by Planning Area. - # Replace UTC offset codes with UTC offset and timezone - df["utc_offset"] = df["utc_offset_code"].map(OFFSET_CODES) - df["timezone"] = df["utc_offset_code"].map(TZ_CODES) - df = df.drop(columns="utc_offset_code") - - # Almost all 25th hours are unusable (0.0 or daily totals), - # and they shouldn't really exist at all based on FERC instructions. - df = df.drop(columns="hour25") - - # Melt daily rows with 24 demands to hourly rows with single demand - logger.info("Melting daily FERC 714 records into hourly records.") - df = df.rename( - columns=lambda x: int(re.sub(r"^hour", "", x)) - 1 if "hour" in x else x, - ) - df = df.melt( - id_vars=[ - "respondent_id_ferc714", - "report_year", - "report_date", - "utc_offset", - "timezone", - ], - value_vars=range(24), - var_name="hour", - value_name="demand_mwh", - ) + Most of the methods in this class as staticmethods. The purpose of using a class + in this instance is mostly for organizing the table specific transforms under the + same name-space. + """ - # Assert that all records missing UTC offset have zero demand - missing_offset = df["utc_offset"].isna() - assert df.loc[missing_offset, "demand_mwh"].eq(0).all() # nosec B101 - # Drop these records - df = df.query("~@missing_offset") - - # Construct UTC datetime - logger.info("Converting local time + offset code to UTC + timezone.") - hour_timedeltas = {i: pd.to_timedelta(i, unit="h") for i in range(24)} - df["report_date"] += df["hour"].map(hour_timedeltas) - df["datetime_utc"] = df["report_date"] - df["utc_offset"] - df = df.drop(columns=["hour", "utc_offset"]) - - # Report and drop duplicated UTC datetimes - # There should be less than 10 of these, - # resulting from changes to a planning area's reporting timezone. - duplicated = df.duplicated(["respondent_id_ferc714", "datetime_utc"]) - logger.info(f"Found {np.count_nonzero(duplicated)} duplicate UTC datetimes.") - df = df.query("~@duplicated") - - # Flip the sign on sections of demand which were reported as negative - mask = ( - df["report_year"].isin([2006, 2007, 2008, 2009]) - & (df["respondent_id_ferc714"] == 156) - ) | ( - df["report_year"].isin([2006, 2007, 2008, 2009, 2010]) - & (df["respondent_id_ferc714"] == 289) - ) - df.loc[mask, "demand_mwh"] *= -1 - - # Convert report_date to first day of year - df["report_date"] = df.report_date.dt.to_period("Y").dt.to_timestamp() - - # Format result - columns = [ - "respondent_id_ferc714", - "report_date", - "datetime_utc", - "timezone", - "demand_mwh", - ] - df = df.drop(columns=set(df.columns) - set(columns)) - df = _post_process( - df[columns], table_name="out_ferc714__hourly_planning_area_demand" - ) - return df + @classmethod + def run( + cls, + raw_csv: pd.DataFrame, + raw_xbrl_duration: pd.DataFrame, + ) -> pd.DataFrame: + """Build the :ref:`core_ferc714__yearly_planning_area_demand_forecast` asset. + + To transform this table we have to process the CSV data and the XBRL duration data + (this data has not instant table), merge together the XBRL and CSV data, and + process the combined datasets. + + The main transforms include spot-fixing forecast years with + :meth:`spot_fix_forecast_years_xbrl` and averaging out duplicate forecast values + for duplicate primary key rows in the CSV table. + + """ + table_name = "core_ferc714__yearly_planning_area_demand_forecast" + # XBRL STUFF + xbrl = ( + rename_columns( + df=raw_xbrl_duration, + params=RenameColumns(columns=RENAME_COLS[table_name]["xbrl"]), + ) + .pipe(_assign_respondent_id_ferc714, "xbrl") + .pipe(cls.spot_fix_forecast_years_xbrl) + ) + # CSV STUFF + csv = ( + _pre_process_csv(raw_csv, table_name=table_name) + .pipe(_assign_respondent_id_ferc714, "csv") + .pipe(cls.average_duplicate_pks_csv) + ) + # CONCATED STUFF + df = pd.concat([csv, xbrl]).reset_index(drop=True) + return df + + @staticmethod + def spot_fix_forecast_years_xbrl(df): + """Spot fix forecast year errors. + + This function fixes the following errors: + + - There's one record with an NA forecast_year value. This row + also has no demand forcast values. Because forcast_year is a primary key + we can't have any NA values. Because there are no substantive forcasts + in this row, we can safely remove this row. + - respondent_id_ferc714 number 107 reported their forecast_year + as YY instead of YYYY values. + - There's also at least one forecast year value reported as 3033 that should + be 2033. + + This function also checks that the values for forecast year are within an + expected range. + """ + df = df.astype({"forecast_year": "Int64"}) + # Make sure there's only one NA forecast_year value and remove it + if len(nulls := df[df["forecast_year"].isna()]) > 2: + raise AssertionError( + f"We expected one or 0 NA forecast year, but found:\n{nulls}" + ) + df = df[df["forecast_year"].notna()] + # Convert YY to YYYY for respondent 107 (the culprit). + # The earliest forecast year reported as YY is 22. Any numbers + # lower than that would signify a transition into 2100. + mask = (df["respondent_id_ferc714"] == 107) & (df["forecast_year"] > 21) + df.loc[mask, "forecast_year"] = df["forecast_year"] + 2000 + # Fix extraneus 3022 value from respondent 17 + mask = ( + (df["respondent_id_ferc714"] == 17) + & (df["report_year"] == 2023) + & (df["forecast_year"] == 3033) + ) + df.loc[mask, "forecast_year"] = 2033 + # Make sure forecast_year values are expected + assert ( + df["forecast_year"].isin(range(2021, 2100)).all() + ), "Forecast year values not in expected range" + return df + + @staticmethod + def average_duplicate_pks_csv(df): + """Average forecast values for duplicate primary keys. + + The XBRL data had duplicate primary keys, but it was easy to parse + them by keeping rows with the most recent publication_time value. + The CSVs have no such distinguishing column, dispite having some + duplicate primary keys. + + This function takes the average of the forecast values for rows + with duplicate primary keys. There are only 6 respondent/report_year/ + forecast year rows where the forecast values differ. One of those is a + pair where one forecast value is 0. We'll take the non-zero value here + and average out the rest. + """ + # Record original length of dataframe + original_len = len(df) + # Remove duplicate row with 0 forecast values + error_mask = ( + (df["respondent_id_ferc714"] == 100) + & (df["report_year"] == 2013) + & (df["forecast_year"] == 2014) + & (df["net_demand_forecast_mwh"] == 0) + ) + if (len_dupes := len(df[error_mask])) >= 1: + raise AssertionError( + f"We found {len_dupes} duplicate errors, but expected 1 or less:\n{df[error_mask]}" + ) + df = df[~error_mask] + # Take the average of duplicate PK forecast values. + dupe_mask = df[ + ["respondent_id_ferc714", "report_year", "forecast_year"] + ].duplicated(keep=False) + deduped_df = ( + df[dupe_mask] + .groupby(["respondent_id_ferc714", "report_year", "forecast_year"])[ + [ + "summer_peak_demand_forecast_mw", + "winter_peak_demand_forecast_mw", + "net_demand_forecast_mwh", + ] + ] + .mean() + .reset_index() + ) + df = pd.concat([df[~dupe_mask], deduped_df]) + # Make sure no more rows were dropped than expected + assert ( + original_len - len(df) <= 20 + ), f"dropped {original_len - len(df)} rows, expected 20" + return df @asset( + ins={ + "raw_csv": AssetIn(key="raw_ferc714_csv__yearly_planning_area_demand_forecast"), + "raw_xbrl_duration": AssetIn( + key="raw_ferc714_xbrl__planning_area_hourly_demand_and_forecast_summer_and_winter_peak_demand_and_annual_net_energy_for_load_table_03_2_duration" + ), + }, io_manager_key="pudl_io_manager", compute_kind="pandas", ) def core_ferc714__yearly_planning_area_demand_forecast( - raw_ferc714_csv__yearly_planning_area_demand_forecast: pd.DataFrame, + raw_csv: pd.DataFrame, + raw_xbrl_duration: pd.DataFrame, ) -> pd.DataFrame: - """Transform the yearly planning area forecast data per Planning Area. - - Transformations include: + """Build the :ref:`core_ferc714__yearly_planning_area_demand_forecast`. - - Drop/rename columns. - - Remove duplicate rows and average out the metrics. - - Args: - raw_ferc714_csv__yearly_planning_area_demand_forecast: Raw table containing, - for each year and each planning area, the forecasted summer and winter peak demand, - in megawatts, and annual net energy for load, in megawatthours, for the next - ten years. - - Returns: - Clean(er) version of the yearly forecasted demand by Planning Area. + This is a light wrapper around :class:`YearlyPlanningAreaDemandForecast` because + it seems you need to build an asset from a function - not a staticmethod of + a class. """ - # Clean up columns - df = _pre_process( - raw_ferc714_csv__yearly_planning_area_demand_forecast, - table_name="core_ferc714__yearly_planning_area_demand_forecast", - ) - - # For any rows with non-unique respondent_id_ferc714/report_year/forecast_year, - # group and take the mean measures - # For the 2006-2020 data, there were only 20 such rows. In most cases, demand metrics were identical. - # But for some, demand metrics were different - thus the need to take the average. - logger.info( - "Removing non-unique report rows and taking the average of non-equal metrics." - ) - - # Grab the number of rows before duplicate cleanup - num_rows_before = len(df) - - df = ( - df.groupby(["respondent_id_ferc714", "report_year", "forecast_year"])[ - ["summer_peak_demand_mw", "winter_peak_demand_mw", "net_demand_mwh"] - ] - .mean() - .reset_index() - ) - - # Capture the number of rows after grouping - num_rows_after = len(df) - - # Add the number of duplicates removed as metadata - num_duplicates_removed = num_rows_before - num_rows_after - logger.info(f"Number of duplicate rows removed: {num_duplicates_removed}") - # Assert that number of removed rows meets expectation - assert ( - num_duplicates_removed <= 20 - ), f"Expected no more than 20 duplicates removed, but found {num_duplicates_removed}" - - # Check all data types and columns to ensure consistency with defined schema - df = _post_process( - df, table_name="core_ferc714__yearly_planning_area_demand_forecast" - ) - return df + return YearlyPlanningAreaDemandForecast.run(raw_csv, raw_xbrl_duration) @dataclass @@ -639,6 +1239,9 @@ class Ferc714CheckSpec: 2018: 961, 2019: 950, 2020: 950, + 2021: 905, + 2022: 904, + 2023: 904, }, ) ] @@ -655,6 +1258,7 @@ def _check(df): errors.append( f"Expected {expected_rows} for report year {year}, found {num_rows}" ) + logger.info(errors) if errors: return AssetCheckResult(passed=False, metadata={"errors": errors}) diff --git a/test/unit/settings_test.py b/test/unit/settings_test.py index b8ab2e5d2a..ba644ef298 100644 --- a/test/unit/settings_test.py +++ b/test/unit/settings_test.py @@ -17,8 +17,11 @@ Eia923Settings, EiaSettings, EpaCemsSettings, + EtlSettings, Ferc1DbfToSqliteSettings, Ferc1Settings, + Ferc1XbrlToSqliteSettings, + FercToSqliteSettings, GenericDatasetSettings, _convert_settings_to_dagster_config, ) @@ -269,6 +272,23 @@ def test_convert_settings_to_dagster_config(self: Self): assert isinstance(dct["eia"]["eia923"]["years"], Field) +class TestEtlSettings: + """Test pydantic model that validates all the full ETL Settings.""" + + @staticmethod + def test_validate_xbrl_years(): + """Test validation error is raised when FERC XBRL->SQLite years don't overlap with PUDL years.""" + with pytest.raises(ValidationError): + _ = EtlSettings( + datasets=DatasetsSettings(ferc1=Ferc1Settings(years=[2021])), + ferc_to_sqlite_settings=FercToSqliteSettings( + ferc1_xbrl_to_sqlite_settings=Ferc1XbrlToSqliteSettings( + years=[2023] + ) + ), + ) + + class TestGlobalConfig: """Test global pydantic model config works.""" diff --git a/test/unit/transform/ferc1_test.py b/test/unit/transform/ferc1_test.py index 6d130834d7..e520f03a19 100644 --- a/test/unit/transform/ferc1_test.py +++ b/test/unit/transform/ferc1_test.py @@ -32,6 +32,7 @@ make_xbrl_factoid_dimensions_explicit, read_dbf_to_xbrl_map, reconcile_one_type_of_table_calculations, + select_current_year_annual_records_duration_xbrl, unexpected_total_components, unstack_balances_to_report_year_instant_xbrl, wide_to_tidy, @@ -260,7 +261,7 @@ def test_wide_to_tidy_rename(): def test_select_current_year_annual_records_duration_xbrl(): - """Test :meth:`select_current_year_annual_records_duration_xbrl` date selection.""" + """Test :func:`select_current_year_annual_records_duration_xbrl` date selection.""" df = pd.read_csv( StringIO( """ @@ -275,12 +276,9 @@ def test_select_current_year_annual_records_duration_xbrl(): ) ) - class FakeTransformer(Ferc1AbstractTableTransformer): - # just need any table name here so that one method is callable - table_id = TableIdFerc1.STEAM_PLANTS_FUEL - - fake_transformer = FakeTransformer() - df_out = fake_transformer.select_current_year_annual_records_duration_xbrl(df=df) + df_out = select_current_year_annual_records_duration_xbrl( + df=df, table_name="fake_table" + ) df_expected = df[df.to_numpy() == "good"].astype( {"start_date": "datetime64[s]", "end_date": "datetime64[s]"} ) diff --git a/test/validate/service_territory_test.py b/test/validate/service_territory_test.py index 5172718666..a0f0d53446 100644 --- a/test/validate/service_territory_test.py +++ b/test/validate/service_territory_test.py @@ -15,8 +15,8 @@ @pytest.mark.parametrize( "df_name,expected_rows", [ - ("summarized_demand_ferc714", 3_195), - ("fipsified_respondents_ferc714", 136_011), + ("summarized_demand_ferc714", 3_924), + ("fipsified_respondents_ferc714", 156_392), ("compiled_geometry_balancing_authority_eia861", 113_142), ("compiled_geometry_utility_eia861", 256_949), ], @@ -44,7 +44,7 @@ def test_minmax_rows( @pytest.mark.parametrize( "resource_id,expected_rows", - [("out_ferc714__hourly_planning_area_demand", 15_608_154)], + [("out_ferc714__hourly_planning_area_demand", 17_968_411)], ) def test_minmax_rows_and_year_in_ferc714_hourly_planning_area_demand( live_dbs: bool, diff --git a/test/validate/state_demand_test.py b/test/validate/state_demand_test.py index 0d29b74566..d48cf4709d 100644 --- a/test/validate/state_demand_test.py +++ b/test/validate/state_demand_test.py @@ -12,7 +12,7 @@ @pytest.mark.parametrize( "resource_id,expected_rows", - [("out_ferc714__hourly_estimated_state_demand", 6_706_318)], + [("out_ferc714__hourly_estimated_state_demand", 7_599_842)], ) def test_minmax_rows(live_dbs: bool, resource_id: str, expected_rows: int): """Verify that output DataFrames don't have too many or too few rows.