Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate where/when we filter for the freshest XBRL data #3861

Merged
merged 10 commits into from
Oct 1, 2024
2 changes: 1 addition & 1 deletion src/pudl/extract/ferc1.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@
"sales_of_electricity_by_rate_schedules_account_445_other_sales_to_public_authorities_304",
"sales_of_electricity_by_rate_schedules_account_446_sales_to_railroads_and_railways_304",
"sales_of_electricity_by_rate_schedules_account_448_interdepartmental_sales_304",
"sales_of_electricity_by_rate_schedules_account_4491_provision_for_rate_refunds_304",
"sales_of_electricity_by_rate_schedules_account_449_1_provision_for_rate_refunds_304",
cmgosnell marked this conversation as resolved.
Show resolved Hide resolved
"sales_of_electricity_by_rate_schedules_account_totals_304",
],
},
Expand Down
130 changes: 3 additions & 127 deletions src/pudl/io_managers.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
"""Dagster IO Managers."""

import json
import re
from pathlib import Path
from sqlite3 import sqlite_version
Expand Down Expand Up @@ -688,107 +687,6 @@ class FercXBRLSQLiteIOManager(FercSQLiteIOManager):
metadata.
"""

@staticmethod
def filter_for_freshest_data(
table: pd.DataFrame, primary_key: list[str]
) -> pd.DataFrame:
"""Get most updated values for each XBRL context.

An XBRL context includes an entity ID, the time period the data applies to, and
other dimensions such as utility type. Each context has its own ID, but they are
frequently redefined with the same contents but different IDs - so we identify
them by their actual content.

Each row in our SQLite database includes all the facts for one context/filing
pair.

If one context is represented in multiple filings, we take the most
recently-reported non-null value.

This means that if a utility reports a non-null value, then later
either reports a null value for it or simply omits it from the report,
we keep the old non-null value, which may be erroneous. This appears to
be fairly rare, affecting < 0.005% of reported values.
"""

def __apply_diffs(
duped_groups: pd.core.groupby.DataFrameGroupBy,
) -> pd.DataFrame:
"""Take the latest reported non-null value for each group."""
return duped_groups.last()

def __best_snapshot(
duped_groups: pd.core.groupby.DataFrameGroupBy,
) -> pd.DataFrame:
"""Take the row that has most non-null values out of each group."""
# Ignore errors when dropping the "count" column since empty
# groupby won't have this column.
return duped_groups.apply(
lambda df: df.assign(count=df.count(axis="columns"))
.sort_values(by="count", ascending=True)
.tail(1)
).drop(columns="count", errors="ignore")

def __compare_dedupe_methodologies(
apply_diffs: pd.DataFrame, best_snapshot: pd.DataFrame
):
"""Compare deduplication methodologies.

By cross-referencing these we can make sure that the apply-diff
methodology isn't doing something unexpected.

The main thing we want to keep tabs on is apply-diff adding new
non-null values compared to best-snapshot, because some of those
are instances of a value correctly being reported as `null`.

Instead of stacking the two datasets, merging by context, and then
looking for left_only or right_only values, we just count non-null
values. This is because we would want to use the report_year as a
merge key, but that isn't available until after we pipe the
dataframe through `refine_report_year`.
"""
n_diffs = apply_diffs.count().sum()
n_best = best_snapshot.count().sum()

if n_diffs < n_best:
raise ValueError(
f"Found {n_diffs} non-null values with apply-diffs"
f"methodology, and {n_best} with best-snapshot. "
"apply-diffs should be >= best-snapshot."
)

# 2024-04-10: this threshold set by looking at existing values for FERC
# <=2022. It was updated from .3 to .44 during the 2023 update.
threshold_ratio = 1.0044
if (found_ratio := n_diffs / n_best) > threshold_ratio:
raise ValueError(
"Found more than expected excess non-null values using the "
f"currently implemented apply_diffs methodology (#{n_diffs}) as "
f"compared to the best_snapshot methodology (#{n_best}). We expected"
" the apply_diffs methodology to result in no more than "
f"{threshold_ratio:.2%} non-null records but found {found_ratio:.2%}.\n\n"
"We are concerned about excess non-null values because apply-diffs "
"grabs the most recent non-null values. If this error is raised, "
"investigate filter_for_freshest_data."
)

filing_metadata_cols = {"publication_time", "filing_name"}
xbrl_context_cols = [c for c in primary_key if c not in filing_metadata_cols]
original = table.sort_values("publication_time")
dupe_mask = original.duplicated(subset=xbrl_context_cols, keep=False)
duped_groups = original.loc[dupe_mask].groupby(
xbrl_context_cols, as_index=False, dropna=True
)
never_duped = original.loc[~dupe_mask]
apply_diffs = __apply_diffs(duped_groups)
best_snapshot = __best_snapshot(duped_groups)
__compare_dedupe_methodologies(
apply_diffs=apply_diffs, best_snapshot=best_snapshot
)

deduped = pd.concat([never_duped, apply_diffs], ignore_index=True)
return deduped

@staticmethod
def refine_report_year(df: pd.DataFrame, xbrl_years: list[int]) -> pd.DataFrame:
"""Set a fact's report year by its actual dates.
Expand Down Expand Up @@ -831,19 +729,6 @@ def get_year(df: pd.DataFrame, col: str) -> pd.Series:
.reset_index(drop=True)
)

def _get_primary_key(self, sched_table_name: str) -> list[str]:
# TODO (daz): as of 2023-10-13, our datapackage.json is merely
# "frictionless-like" so we manually parse it as JSON. once we make our
# datapackage.json conformant, we will need to at least update the
# "primary_key" to "primaryKey", but maybe there will be other changes
# as well.
with (self.base_dir / f"{self.db_name}_datapackage.json").open() as f:
datapackage = json.loads(f.read())
[table_resource] = [
tr for tr in datapackage["resources"] if tr["name"] == sched_table_name
]
return table_resource["schema"]["primary_key"]

def handle_output(self, context: OutputContext, obj: pd.DataFrame | str):
"""Handle an op or asset output."""
raise NotImplementedError("FercXBRLSQLiteIOManager can't write outputs yet.")
Expand Down Expand Up @@ -880,18 +765,9 @@ def load_input(self, context: InputContext) -> pd.DataFrame:
con=con,
).assign(sched_table_name=sched_table_name)

primary_key = self._get_primary_key(table_name)

return (
df.pipe(
FercXBRLSQLiteIOManager.filter_for_freshest_data,
primary_key=primary_key,
)
.pipe(
FercXBRLSQLiteIOManager.refine_report_year,
xbrl_years=ferc_settings.xbrl_years,
)
.drop(columns=["publication_time"])
return df.pipe(
FercXBRLSQLiteIOManager.refine_report_year,
xbrl_years=ferc_settings.xbrl_years,
)


Expand Down
88 changes: 82 additions & 6 deletions src/pudl/transform/ferc1.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
cache_df,
enforce_snake_case,
)
from pudl.workspace.setup import PudlPaths

logger = pudl.logging_helpers.get_logger(__name__)

Expand Down Expand Up @@ -6092,6 +6093,63 @@ class OtherRegulatoryLiabilitiesTableTransformer(Ferc1AbstractTableTransformer):
}


def filter_for_freshest_data_xbrl(df: pd.DataFrame, primary_keys) -> pd.DataFrame:
cmgosnell marked this conversation as resolved.
Show resolved Hide resolved
"""Get most updated values for each XBRL context.

An XBRL context includes an entity ID, the time period the data applies to, and
other dimensions such as utility type. Each context has its own ID, but they are
frequently redefined with the same contents but different IDs - so we identify
them by their actual content.

Each row in our SQLite database includes all the facts for one context/filing
pair.

If one context is represented in multiple filings, we take the most
recently-reported non-null value.

This means that if a utility reports a non-null value, then later
either reports a null value for it or simply omits it from the report,
we keep the old non-null value, which may be erroneous. This appears to
be fairly rare, affecting < 0.005% of reported values.
"""

def __apply_diffs(
duped_groups: pd.core.groupby.DataFrameGroupBy,
) -> pd.DataFrame:
"""Take the latest reported non-null value for each group."""
return duped_groups.last()
cmgosnell marked this conversation as resolved.
Show resolved Hide resolved

if not df.empty:
filing_metadata_cols = {"publication_time", "filing_name"}
xbrl_context_cols = [c for c in primary_keys if c not in filing_metadata_cols]
original = df.sort_values("publication_time")
dupe_mask = original.duplicated(subset=xbrl_context_cols, keep=False)
duped_groups = original.loc[dupe_mask].groupby(
xbrl_context_cols, as_index=False, dropna=True
)
never_duped = original.loc[~dupe_mask]
apply_diffs = __apply_diffs(duped_groups)

df = pd.concat([never_duped, apply_diffs], ignore_index=True).drop(
columns=["publication_time"]
)
return df


def _get_primary_key(sched_table_name: str) -> list[str]:
# TODO (daz): as of 2023-10-13, our datapackage.json is merely
# "frictionless-like" so we manually parse it as JSON. once we make our
# datapackage.json conformant, we will need to at least update the
# "primary_key" to "primaryKey", but maybe there will be other changes
# as well.
with (PudlPaths().output_dir / "ferc1_xbrl_datapackage.json").open() as f:
datapackage = json.loads(f.read())
[table_resource] = [
tr for tr in datapackage["resources"] if tr["name"] == sched_table_name
]
return table_resource["schema"]["primary_key"]
cmgosnell marked this conversation as resolved.
Show resolved Hide resolved


def ferc1_transform_asset_factory(
table_name: str,
tfr_class: Ferc1AbstractTableTransformer,
Expand Down Expand Up @@ -6121,13 +6179,13 @@ def ferc1_transform_asset_factory(
dbf_tables = listify(TABLE_NAME_MAP_FERC1[table_name]["dbf"])
xbrl_tables = listify(TABLE_NAME_MAP_FERC1[table_name]["xbrl"])

ins = {f"raw_dbf__{tn}": AssetIn(f"raw_ferc1_dbf__{tn}") for tn in dbf_tables}
ins = {f"raw_ferc1_dbf__{tn}": AssetIn(f"raw_ferc1_dbf__{tn}") for tn in dbf_tables}
ins |= {
f"raw_xbrl_instant__{tn}": AssetIn(f"raw_ferc1_xbrl__{tn}_instant")
f"raw_ferc1_xbrl__{tn}_instant": AssetIn(f"raw_ferc1_xbrl__{tn}_instant")
for tn in xbrl_tables
}
ins |= {
f"raw_xbrl_duration__{tn}": AssetIn(f"raw_ferc1_xbrl__{tn}_duration")
f"raw_ferc1_xbrl__{tn}_duration": AssetIn(f"raw_ferc1_xbrl__{tn}_duration")
cmgosnell marked this conversation as resolved.
Show resolved Hide resolved
for tn in xbrl_tables
}
ins["_core_ferc1_xbrl__metadata_json"] = AssetIn("_core_ferc1_xbrl__metadata_json")
Expand Down Expand Up @@ -6160,13 +6218,31 @@ def ferc1_transform_asset(**kwargs: dict[str, pd.DataFrame]) -> pd.DataFrame:
)

raw_dbf = pd.concat(
[df for key, df in kwargs.items() if key.startswith("raw_dbf__")]
[df for key, df in kwargs.items() if key.startswith("raw_ferc1_dbf__")]
)
raw_xbrls = {
tn: filter_for_freshest_data_xbrl(
df=df,
primary_keys=_get_primary_key(tn.removeprefix("raw_ferc1_xbrl__")),
)
for tn, df in kwargs.items()
if tn.startswith("raw_ferc1_xbrl__")
}
for raw_xbrl_table_name in listify(TABLE_NAME_MAP_FERC1[table_name]["xbrl"]):
if (
raw_xbrls[f"raw_ferc1_xbrl__{raw_xbrl_table_name}_instant"].empty
and raw_xbrls[f"raw_ferc1_xbrl__{raw_xbrl_table_name}_duration"].empty
):
raise AssertionError(
"We expect there to be no raw xbrl tables that have neither instant or duration "
cmgosnell marked this conversation as resolved.
Show resolved Hide resolved
f"tables, but {raw_xbrl_table_name} has neither. Consider checking "
"TABLE_NAME_MAP_FERC1 for spelling errors."
)
raw_xbrl_instant = pd.concat(
[df for key, df in kwargs.items() if key.startswith("raw_xbrl_instant__")]
[df for key, df in raw_xbrls.items() if key.endswith("_instant")]
)
raw_xbrl_duration = pd.concat(
[df for key, df in kwargs.items() if key.startswith("raw_xbrl_duration__")]
[df for key, df in raw_xbrls.items() if key.endswith("_duration")]
)
df = transformer.transform(
raw_dbf=raw_dbf,
Expand Down
1 change: 1 addition & 0 deletions src/pudl/transform/params/ferc1.py
Original file line number Diff line number Diff line change
Expand Up @@ -4854,6 +4854,7 @@
"other_sales_to_public_authorities_axis",
"sales_to_railroads_and_railways_axis",
"interdepartmental_sales_axis",
"provision_for_rate_refunds_axis",
cmgosnell marked this conversation as resolved.
Show resolved Hide resolved
],
"new_axis_column_name": "sales_axis",
},
Expand Down
Loading
Loading