diff --git a/.github/workflows/tox-pytest.yml b/.github/workflows/tox-pytest.yml index 191adda63a..27e8737459 100644 --- a/.github/workflows/tox-pytest.yml +++ b/.github/workflows/tox-pytest.yml @@ -28,17 +28,20 @@ jobs: shell: bash -l {0} steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 with: fetch-depth: 2 - name: Install Conda environment using mamba - uses: mamba-org/provision-with-micromamba@v16 + uses: mamba-org/setup-micromamba@v1 with: environment-file: test/test-environment.yml - cache-env: true - channels: conda-forge,defaults - channel-priority: strict + cache-environment: true + condarc: | + channels: + - conda-forge + - defaults + channel_priority: strict - name: Log environment details run: | @@ -68,17 +71,20 @@ jobs: shell: bash -l {0} steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 with: fetch-depth: 2 - name: Install Conda environment using mamba - uses: mamba-org/provision-with-micromamba@v16 + uses: mamba-org/setup-micromamba@v1 with: environment-file: test/test-environment.yml - cache-env: true - channels: conda-forge,defaults - channel-priority: strict + cache-environment: true + condarc: | + channels: + - conda-forge + - defaults + channel_priority: strict - name: Log environment details run: | @@ -118,17 +124,20 @@ jobs: shell: bash -l {0} steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 with: fetch-depth: 2 - name: Install Conda environment using mamba - uses: mamba-org/provision-with-micromamba@v16 + uses: mamba-org/setup-micromamba@v1 with: environment-file: test/test-environment.yml - cache-env: true - channels: conda-forge,defaults - channel-priority: strict + cache-environment: true + condarc: | + channels: + - conda-forge + - defaults + channel_priority: strict - name: Log environment details run: | @@ -189,7 +198,7 @@ jobs: - ci-integration - ci-static steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 - name: Download coverage id: download-unit uses: actions/download-artifact@v3 diff --git a/.github/workflows/zenodo-cache-sync.yml b/.github/workflows/zenodo-cache-sync.yml index 7bb4555c27..18d8575d52 100644 --- a/.github/workflows/zenodo-cache-sync.yml +++ b/.github/workflows/zenodo-cache-sync.yml @@ -37,12 +37,15 @@ jobs: ref: ${{ env.GITHUB_REF }} - name: Install Conda environment using mamba - uses: mamba-org/provision-with-micromamba@v16 + uses: mamba-org/setup-micromamba@v1 with: environment-file: test/test-environment.yml - cache-env: true - channels: conda-forge,defaults - channel-priority: strict + cache-environment: true + condarc: | + channels: + - conda-forge + - defaults + channel_priority: strict - name: Log environment details run: | diff --git a/src/pudl/convert/censusdp1tract_to_sqlite.py b/src/pudl/convert/censusdp1tract_to_sqlite.py index c55aefa931..0c9b630bc8 100644 --- a/src/pudl/convert/censusdp1tract_to_sqlite.py +++ b/src/pudl/convert/censusdp1tract_to_sqlite.py @@ -62,7 +62,7 @@ def censusdp1tract_to_sqlite(context): # program happens to be in the user's path and named ogr2ogr. This is a # fragile solution that will not work on all platforms, but should cover # conda environments, Docker, and continuous integration on GitHub. - ogr2ogr = os.environ.get("CONDA_PREFIX", "/usr") + "/bin/ogr2ogr" + ogr2ogr = Path(os.environ.get("CONDA_PREFIX", "/usr")) / "bin/ogr2ogr" # Extract the sippzed GeoDB archive from the Datastore into a temporary # directory so that ogr2ogr can operate on it. Output the resulting SQLite # database into the user's PUDL workspace. We do not need to keep the @@ -86,10 +86,10 @@ def censusdp1tract_to_sqlite(context): f"Move {out_path} aside or set clobber=True and try again." ) - logger.info("Extracting the Census DP1 GeoDB to %s", out_path) + logger.info(f"Extracting the Census DP1 GeoDB to {out_path}") zip_ref.extractall(tmpdir_path) - logger.info("extract_root = %s", extract_root) - logger.info("out_path = %s", out_path) + logger.info(f"extract_root = {extract_root}") + logger.info(f"out_path = {out_path}") subprocess.run( [ogr2ogr, str(out_path), str(extract_root)], check=True # noqa: S603 ) diff --git a/src/pudl/output/ferc1.py b/src/pudl/output/ferc1.py index 0aac8081e4..6b9f385670 100644 --- a/src/pudl/output/ferc1.py +++ b/src/pudl/output/ferc1.py @@ -1,6 +1,7 @@ """A collection of denormalized FERC assets and helper functions.""" import importlib import re +from functools import cached_property from typing import Literal, NamedTuple, Self import networkx as nx @@ -1243,7 +1244,7 @@ def __init__( self.seed_nodes = seed_nodes self.tags = tags - @property + @cached_property def calculation_forest(self: Self) -> "XbrlCalculationForestFerc1": """Construct a calculation forest based on class attributes.""" return XbrlCalculationForestFerc1( @@ -1253,12 +1254,12 @@ def calculation_forest(self: Self) -> "XbrlCalculationForestFerc1": tags=self.tags, ) - @property + @cached_property def other_dimensions(self) -> list[str]: """Get all of the column names for the other dimensions.""" return pudl.transform.ferc1.other_dimensions(table_names=self.table_names) - @property + @cached_property def exploded_pks(self) -> list[str]: """Get the joint primary keys of the exploded tables.""" pks = [] @@ -1283,7 +1284,7 @@ def exploded_pks(self) -> list[str]: ] + pudl.helpers.dedupe_n_flatten_list_of_lists(pks) return pks - @property + @cached_property def value_col(self) -> str: """Get the value column for the exploded tables.""" value_cols = [] @@ -1617,6 +1618,7 @@ class Config: """Allow the class to store a dataframe.""" arbitrary_types_allowed = True + keep_untouched = (cached_property,) @validator("parent_cols", always=True) def set_parent_cols(cls, v, values) -> list[str]: @@ -1745,7 +1747,7 @@ def exploded_calcs_to_digraph( forest = nx.from_pandas_edgelist(edgelist, create_using=nx.DiGraph) return forest - @property + @cached_property def annotated_forest(self: Self) -> nx.DiGraph: """Calculation forest annotated with node calculation weights and tags.""" # Reshape the tags to turn them into a dictionary of values per-node. This @@ -1813,7 +1815,7 @@ def annotated_forest(self: Self) -> nx.DiGraph: nx.set_node_attributes(forest, node_attrs.to_dict(orient="index")) return forest - @property + @cached_property def full_digraph(self: Self) -> nx.DiGraph: """A digraph of all calculations described by the exploded metadata.""" full_digraph = self.exploded_calcs_to_digraph( @@ -1863,7 +1865,7 @@ def prune_unrooted(self: Self, graph: nx.DiGraph) -> nx.DiGraph: ) return seeded_digraph - @property + @cached_property def seeded_digraph(self: Self) -> nx.DiGraph: """A digraph of all calculations that contribute to the seed values. @@ -1879,7 +1881,7 @@ def seeded_digraph(self: Self) -> nx.DiGraph: """ return self.prune_unrooted(self.full_digraph) - @property + @cached_property def forest(self: Self) -> nx.DiGraph: """A pruned version of the seeded digraph that should be one or more trees. @@ -2026,17 +2028,17 @@ def roots(graph: nx.DiGraph) -> list[NodeId]: """Identify all root nodes in a digraph.""" return [n for n, d in graph.in_degree() if d == 0] - @property + @cached_property def full_digraph_roots(self: Self) -> list[NodeId]: """Find all roots in the full digraph described by the exploded metadata.""" return self.roots(graph=self.full_digraph) - @property + @cached_property def seeded_digraph_roots(self: Self) -> list[NodeId]: """Find all roots in the seeded digraph.""" return self.roots(graph=self.seeded_digraph) - @property + @cached_property def forest_roots(self: Self) -> list[NodeId]: """Find all roots in the pruned calculation forest.""" return self.roots(graph=self.forest) @@ -2046,22 +2048,22 @@ def leaves(graph: nx.DiGraph) -> list[NodeId]: """Identify all leaf nodes in a digraph.""" return [n for n, d in graph.out_degree() if d == 0] - @property + @cached_property def full_digraph_leaves(self: Self) -> list[NodeId]: """All leaf nodes in the full digraph.""" return self.leaves(graph=self.full_digraph) - @property + @cached_property def seeded_digraph_leaves(self: Self) -> list[NodeId]: """All leaf nodes in the seeded digraph.""" return self.leaves(graph=self.seeded_digraph) - @property + @cached_property def forest_leaves(self: Self) -> list[NodeId]: """All leaf nodes in the pruned forest.""" return self.leaves(graph=self.forest) - @property + @cached_property def orphans(self: Self) -> list[NodeId]: """Identify all nodes that appear in metadata but not in the full digraph.""" nodes = self.full_digraph.nodes @@ -2071,7 +2073,7 @@ def orphans(self: Self) -> list[NodeId]: if n not in nodes ] - @property + @cached_property def pruned(self: Self) -> list[NodeId]: """List of all nodes that appear in the DAG but not in the pruned forest.""" return list(set(self.full_digraph.nodes).difference(self.forest.nodes)) @@ -2088,7 +2090,7 @@ def stepparents(self: Self, graph: nx.DiGraph) -> list[NodeId]: stepparents = stepparents.union(graph.predecessors(stepchild)) return list(stepparents) - @property + @cached_property def passthroughs(self: Self) -> list[NodeId]: """All nodes in the seeded digraph with a single parent and a single child. @@ -2117,7 +2119,7 @@ def passthroughs(self: Self) -> list[NodeId]: return list(has_one_parent.intersection(has_one_child)) - @property + @cached_property def leafy_meta(self: Self) -> pd.DataFrame: """Identify leaf facts and compile their metadata. @@ -2191,7 +2193,7 @@ def leafy_meta(self: Self) -> pd.DataFrame: .convert_dtypes() ) - @property + @cached_property def root_calculations(self: Self) -> pd.DataFrame: """Produce a calculation components dataframe containing only roots and leaves. @@ -2201,7 +2203,7 @@ def root_calculations(self: Self) -> pd.DataFrame: """ return self.leafy_meta.rename(columns=lambda x: re.sub("_root$", "_parent", x)) - @property + @cached_property def table_names(self: Self) -> list[str]: """Produce the list of tables involved in this explosion.""" return list(self.exploded_calcs["table_name_parent"].unique()) @@ -2269,3 +2271,98 @@ def leafy_data( # Scale the data column of interest: leafy_data[value_col] = leafy_data[value_col] * leafy_data["weight"] return leafy_data.reset_index(drop=True).convert_dtypes() + + @cached_property + def forest_as_table(self: Self) -> pd.DataFrame: + """Construct a tabular representation of the calculation forest. + + Each generation of nodes, starting with the root(s) of the calculation forest, + make up a set of columns in the table. Each set of columns is merged onto + """ + logger.info("Recursively building a tabular version of the calculation forest.") + # Identify all root nodes in the forest: + layer0_nodes = [n for n, d in self.annotated_forest.in_degree() if d == 0] + # Convert them into the first layer of the dataframe: + layer0_df = pd.DataFrame(layer0_nodes).rename(columns=lambda x: x + "_layer0") + + return ( + self._add_layers_to_forest_as_table(df=layer0_df) + .dropna(axis="columns", how="all") + .convert_dtypes() + ) + + def _add_layers_to_forest_as_table(self: Self, df: pd.DataFrame) -> pd.DataFrame: + """Recursively add additional layers of nodes from the forest to the table. + + Given a dataframe with one or more set of columns with names corresponding to + the components of a NodeId with suffixes of the form _layerN, identify the + children of the nodes in the set of columns with the largest N, and merge them + onto the table, recursively until there are no more children to add. Creating a + tabular representation of the calculation forest that can be inspected in Excel. + + Include node annotations like weight and tags, as well as other familiar + metadata, to aid in the inspection. + """ + # Identify the last layer of nodes present in the input dataframe. + current_layer = df.rename( + columns=lambda x: int(re.sub(r"^.*_layer(\d+)$", r"\1", x)) + ).columns.max() + logger.info(f"{current_layer=}") + suffix = f"_layer{current_layer}" + parent_cols = [col + suffix for col in self.calc_cols] + # Identify the list of nodes that are part of that last layer: + parent_nodes = list( + df[parent_cols] + .drop_duplicates() + .dropna(how="all") + .rename(columns=lambda x: x.removesuffix(suffix)) + .itertuples(name="NodeId", index=False) + ) + + # Identify the successors (children), if any, of each node in the last layer: + successor_dfs = [] + for node in parent_nodes: + successor_nodes = list(self.forest.successors(node)) + # If this particular node has no successors, skip to the next one. + if not successor_nodes: + continue + # Convert the list of successor nodes into a dataframe with layer = n+1 + successor_df = nodes_to_df( + calc_forest=self.annotated_forest, nodes=successor_nodes + ).rename(columns=lambda x: x + f"_layer{current_layer + 1}") + # Add a set of parent columns that all have the same values so we can merge + # this onto the previous layer + successor_df[parent_cols] = node + successor_dfs.append(successor_df) + + # If any child nodes were found, merge them onto the input dataframe creating + # a new layer , and recurse: + if successor_dfs: + new_df = df.merge(pd.concat(successor_dfs), on=parent_cols, how="outer") + df = self._add_layers_to_forest_as_table(df=new_df) + + # If no child nodes were found return the dataframe terminating the recursion. + return df + + +def nodes_to_df(calc_forest: nx.DiGraph, nodes: list[NodeId]) -> pd.DataFrame: + """Construct a dataframe from a list of nodes, including their annotations. + + NodeIds that are not present in the calculation forest will be ignored. + + Args: + calc_forest: A calculation forest made of nodes with "weight" and "tags" data. + nodes: List of :class:`NodeId` values to extract from the calculation forest. + + Returns: + A tabular dataframe representation of the nodes, including their weights and + tags, extracted from the calculation forest. + """ + node_dict = { + k: v for k, v in dict(calc_forest.nodes(data=True)).items() if k in nodes + } + index = pd.DataFrame(node_dict.keys()).astype("string") + data = pd.DataFrame(node_dict.values()) + weights = data["weight"] + tags = pd.json_normalize(data.tags).astype("string") + return pd.concat([index, weights, tags], axis="columns")