diff --git a/.github/workflows/build-deploy-pudl.yml b/.github/workflows/build-deploy-pudl.yml index 141dc76681..be0a3680bd 100644 --- a/.github/workflows/build-deploy-pudl.yml +++ b/.github/workflows/build-deploy-pudl.yml @@ -117,6 +117,7 @@ jobs: --container-env DAGSTER_PG_HOST="104.154.182.24" \ --container-env DAGSTER_PG_DB="dagster-storage" \ --container-env PUDL_SETTINGS_YML="/home/catalyst/src/pudl/package_data/settings/etl_full.yml" \ + --container-env FLY_ACCESS_TOKEN=${{ secrets.FLY_ACCESS_TOKEN }} \ # Start the VM - name: Start the deploy-pudl-vm diff --git a/.github/workflows/update-lockfile.yml b/.github/workflows/update-lockfile.yml new file mode 100644 index 0000000000..42d9ec0393 --- /dev/null +++ b/.github/workflows/update-lockfile.yml @@ -0,0 +1,57 @@ +--- +name: update-lockfile + +on: + workflow_dispatch: + # schedule: + # At 5:28am UTC Monday and Thursday + # - cron: 28 5 * * MON,THU + +jobs: + conda-lock: + # Don't run scheduled job on forks. + if: (github.event_name == 'schedule' && github.repository == 'catalyst-cooperative/pudl') || (github.event_name != 'schedule') + defaults: + run: + # Ensure the environment is activated + # + shell: bash -l {0} + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - name: Install Micromamba + uses: mamba-org/setup-micromamba@v1 + with: + environment-file: environments/conda-lock.yml + environment-name: pudl-dev + + - name: Install pudl from branch + run: pip install --editable "./[dev,docs,test,datasette]" + + - name: Run conda-lock to recreate lockfile from scratch + run: | + rm environments/conda-lock.yml + conda-lock \ + --file=environments/dev-environment.yml \ + --file=pyproject.toml \ + --lockfile=environments/conda-lock.yml + - name: Open a pull request + uses: peter-evans/create-pull-request@v5 + with: + # # The default GITHUB_TOKEN doesn't allow other workflows to trigger. + # # Thus if there are tests to be run, they won't be run. For more info, + # # see the note under + # # . + # # One possible workaround is to specify a Personal Access Token (PAT). + # # This PAT should have read-write permissions for "Pull Requests" + # # and read-write permissions for "Contents". + # token: ${{ secrets.GH_PAT_FOR_PR }} + commit-message: Update lockfile + title: Update Lockfile + body: > + This pull request relocks the dependencies with conda-lock. + It is triggered by [update-lockfile](https://github.com/catalyst-cooperative/pudl/blob/main/.github/workflows/update-lockfile.yml). + branch: update-lockfile + labels: dependencies, conda-lock + reviewers: zaneselvans + delete-branch: true diff --git a/.gitignore b/.gitignore index 2c0293ffae..997dd77884 100644 --- a/.gitignore +++ b/.gitignore @@ -38,3 +38,9 @@ notebooks/*.tgz terraform/.terraform/* .env .hypothesis/ + +# generated by datasette/publish.py fresh for every deploy - we shouldn't track changes. +devtools/datasette/fly/Dockerfile +devtools/datasette/fly/inspect-data.json +devtools/datasette/fly/metadata.yml +devtools/datasette/fly/all_dbs.tar.zst diff --git a/devtools/datasette/fly/fly.toml b/devtools/datasette/fly/fly.toml new file mode 100644 index 0000000000..4b8923dacf --- /dev/null +++ b/devtools/datasette/fly/fly.toml @@ -0,0 +1,34 @@ +# fly.toml app configuration file generated for catalyst-coop-pudl on 2023-11-03T15:31:15-04:00 +# +# See https://fly.io/docs/reference/configuration/ for information about how to use this file. +# +app = "catalyst-coop-pudl" +primary_region = "bos" + +[[mounts]] + destination = "/data" + source = "datasette" + +[[services]] + internal_port = 8080 + protocol = "tcp" + + [services.concurrency] + hard_limit = 25 + soft_limit = 20 + + [[services.ports]] + handlers = ["http"] + port = 80 + + [[services.ports]] + handlers = ["tls", "http"] + port = 443 + + [[services.tcp_checks]] + grace_period = "1m" + interval = 10000 + timeout = 2000 + +[deploy] +wait_timeout = "15m" \ No newline at end of file diff --git a/devtools/datasette/fly/run.sh b/devtools/datasette/fly/run.sh new file mode 100755 index 0000000000..9516d73d7a --- /dev/null +++ b/devtools/datasette/fly/run.sh @@ -0,0 +1,10 @@ +#! /usr/bin/env bash +set -eux + +shopt -s nullglob + +find /data/ -name '*.sqlite' -delete +mv all_dbs.tar.zst /data +zstd -f -d /data/all_dbs.tar.zst -o /data/all_dbs.tar +tar -xf /data/all_dbs.tar --directory /data +datasette serve --host 0.0.0.0 /data/*.sqlite --cors --inspect-file inspect-data.json --metadata metadata.yml --setting sql_time_limit_ms 5000 --port $PORT \ No newline at end of file diff --git a/devtools/datasette/publish.py b/devtools/datasette/publish.py new file mode 100644 index 0000000000..a5b3b3123f --- /dev/null +++ b/devtools/datasette/publish.py @@ -0,0 +1,122 @@ +"""Publish the datasette to fly.io. + +We use custom logic here because the datasette-publish-fly plugin bakes the +uncompressed databases into the image, which makes the image too large. + +We compress the databases before baking them into the image. Then we decompress +them at runtime to a Fly volume mounted at /data. This avoids a long download +at startup, and allows us stay within the Fly.io 8GB image size limit. + +The volume handling is done manually outside of this publish.py script - it +should be terraformed at some point. + +Some static fly.io deployment-related files live in ./fly: +* fly.toml - service configuration +* run.sh - service entrypoint + +Apart from that: the Dockerfile and dataset-specific +metadata.yml/inspect-data.json are generated by this script. +""" + +import json +import logging +import secrets +from pathlib import Path +from subprocess import check_call, check_output + +from pudl.metadata.classes import DatasetteMetadata +from pudl.workspace.setup import PudlPaths + +logging.basicConfig(format="%(asctime)s %(message)s", level=logging.INFO) + +DOCKERFILE_TEMPLATE = """ +FROM python:3.11.0-slim-bullseye +COPY . /app +WORKDIR /app + +RUN apt-get update +RUN apt-get install -y zstd + +ENV DATASETTE_SECRET '{datasette_secret}' +RUN pip install -U datasette datasette-cluster-map datasette-vega datasette-block-robots +ENV PORT 8080 +EXPOSE 8080 + +CMD ["./run.sh"] +""" + + +def make_dockerfile(): + """Write a dockerfile from template, to use in fly deploy. + + We write this from template so we can generate a datasette secret. This way + we don't have to manage secrets at all. + """ + datasette_secret = secrets.token_hex(16) + return DOCKERFILE_TEMPLATE.format(datasette_secret=datasette_secret) + + +def inspect_data(datasets, pudl_out): + """Pre-inspect databases to generate some metadata for Datasette. + + This is done in the image build process in datasette-publish-fly, but since + we don't have access to the databases in the build process we have to + inspect before building the Docker image. + """ + inspect_output = json.loads( + check_output( + [ # noqa: S603 + "datasette", + "inspect", + ] + + [str(pudl_out / ds) for ds in datasets] + ) + ) + + for dataset in inspect_output: + name = Path(inspect_output[dataset]["file"]).name + new_filepath = Path("/data") / name + inspect_output[dataset]["file"] = str(new_filepath) + return inspect_output + + +def metadata(pudl_out) -> str: + """Return human-readable metadata for Datasette.""" + return DatasetteMetadata.from_data_source_ids(pudl_out).to_yaml() + + +def main(): + """Generate deployment files and run the deploy.""" + fly_dir = Path(__file__).parent.absolute() / "fly" + docker_path = fly_dir / "Dockerfile" + inspect_path = fly_dir / "inspect-data.json" + metadata_path = fly_dir / "metadata.yml" + + pudl_out = PudlPaths().pudl_output + datasets = [str(p.name) for p in pudl_out.glob("*.sqlite")] + logging.info(f"Inspecting DBs for datasette: {datasets}...") + inspect_output = inspect_data(datasets, pudl_out) + with inspect_path.open("w") as f: + f.write(json.dumps(inspect_output)) + + logging.info("Writing metadata...") + with metadata_path.open("w") as f: + f.write(metadata(pudl_out)) + + logging.info("Writing Dockerfile...") + with docker_path.open("w") as f: + f.write(make_dockerfile()) + + logging.info(f"Compressing {datasets} and putting into docker context...") + check_call( + ["tar", "-a", "-czvf", fly_dir / "all_dbs.tar.zst"] + datasets, # noqa: S603 + cwd=pudl_out, + ) + + logging.info("Running fly deploy...") + check_call(["/usr/bin/env", "flyctl", "deploy"], cwd=fly_dir) # noqa: S603 + logging.info("Deploy finished!") + + +if __name__ == "__main__": + main() diff --git a/devtools/datasette/publish.sh b/devtools/datasette/publish.sh deleted file mode 100755 index 53bed7d3f7..0000000000 --- a/devtools/datasette/publish.sh +++ /dev/null @@ -1,26 +0,0 @@ -#!/bin/sh - -PUDL_OUT=`grep "^pudl_out" $HOME/.pudl.yml | sed -e "s/^pudl_out: //"` -SQLITE_DIR="$PUDL_OUTPUT" - -# make metadata.yml -datasette_metadata_to_yml -o "metadata.yml" - -datasette publish cloudrun \ - --service catalyst-datasette \ - --memory 32Gi \ - --install datasette-cluster-map \ - --install datasette-vega \ - --install datasette-block-robots \ - --metadata metadata.yml \ - --extra-options="--setting sql_time_limit_ms 5000" \ - $SQLITE_DIR/pudl.sqlite \ - $SQLITE_DIR/ferc1.sqlite \ - $SQLITE_DIR/ferc2.sqlite \ - $SQLITE_DIR/ferc6.sqlite \ - $SQLITE_DIR/ferc60.sqlite \ - $SQLITE_DIR/ferc1_xbrl.sqlite \ - $SQLITE_DIR/ferc2_xbrl.sqlite \ - $SQLITE_DIR/ferc6_xbrl.sqlite \ - $SQLITE_DIR/ferc60_xbrl.sqlite \ - $SQLITE_DIR/ferc714_xbrl.sqlite diff --git a/docker/Dockerfile b/docker/Dockerfile index 324bf79061..9095a38f31 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -1,5 +1,7 @@ FROM condaforge/mambaforge:23.3.1-1 +SHELL [ "/bin/bash", "-exo", "pipefail", "-c" ] + # Install curl and js # awscli requires unzip, less, groff and mandoc # hadolint ignore=DL3008 @@ -24,6 +26,10 @@ ENV CONTAINER_HOME=/home/catalyst USER catalyst WORKDIR ${CONTAINER_HOME} +# Install flyctl +RUN curl -L https://fly.io/install.sh | sh +ENV PATH="${CONTAINER_HOME}/.fly/bin:$PATH" + ENV CONDA_PREFIX=${CONTAINER_HOME}/env ENV PUDL_REPO=${CONTAINER_HOME}/pudl ENV CONDA_RUN="conda run --no-capture-output --prefix ${CONDA_PREFIX}" diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index 47b6a730b6..ee49390b09 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -12,6 +12,7 @@ services: environment: - API_KEY_EIA - GCP_BILLING_PROJECT + - FLY_ACCESS_TOKEN env_file: - .env build: diff --git a/docker/gcp_pudl_etl.sh b/docker/gcp_pudl_etl.sh index ba6d1e0f89..e22739802a 100644 --- a/docker/gcp_pudl_etl.sh +++ b/docker/gcp_pudl_etl.sh @@ -85,10 +85,8 @@ function notify_slack() { # 2>&1 redirects stderr to stdout. run_pudl_etl 2>&1 | tee $LOGFILE -# Notify slack if the etl succeeded. +# if pipeline is successful, distribute + publish datasette if [[ ${PIPESTATUS[0]} == 0 ]]; then - notify_slack "success" - # Dump outputs to s3 bucket if branch is dev or build was triggered by a tag if [ $GITHUB_ACTION_TRIGGER = "push" ] || [ $GITHUB_REF = "dev" ]; then copy_outputs_to_distribution_bucket @@ -96,9 +94,15 @@ if [[ ${PIPESTATUS[0]} == 0 ]]; then # Deploy the updated data to datasette if [ $GITHUB_REF = "dev" ]; then - gcloud config set run/region us-central1 - source ~/devtools/datasette/publish.sh + python ~/devtools/datasette/publish.py 2>&1 | tee -a $LOGFILE fi +fi + +# Notify slack about entire pipeline's success or failure; +# PIPESTATUS[0] either refers to the failed ETL run or the last distribution +# task that was run above +if [[ ${PIPESTATUS[0]} == 0 ]]; then + notify_slack "success" else notify_slack "failure" fi diff --git a/notebooks/work-in-progress/CEMS_by_utility.ipynb b/notebooks/work-in-progress/CEMS_by_utility.ipynb index c8a085ac32..1d2a593dcc 100644 --- a/notebooks/work-in-progress/CEMS_by_utility.ipynb +++ b/notebooks/work-in-progress/CEMS_by_utility.ipynb @@ -47,7 +47,7 @@ "from pudl.workspace.setup import PudlPaths\n", "\n", "\n", - "ferc1_engine = sa.create_engine(PudlPaths().sqlite_db(\"ferc1\"))\n", + "ferc1_engine = sa.create_engine(PudlPaths().sqlite_db_uri(\"ferc1\"))\n", "\n", "pudl_engine = sa.create_engine(PudlPaths().pudl_db())\n", "#display(pudl_engine)\n", diff --git a/notebooks/work-in-progress/better-heatrates.ipynb b/notebooks/work-in-progress/better-heatrates.ipynb index 4547c5ba3d..3be8d324e5 100644 --- a/notebooks/work-in-progress/better-heatrates.ipynb +++ b/notebooks/work-in-progress/better-heatrates.ipynb @@ -324,7 +324,7 @@ "from pudl.workspace.setup import PudlPaths\n", "\n", "# TODO(janrous): provide property for accessing ferc db?\n", - "ferc1_engine = sa.create_engine(PudlPaths().sqlite_db(\"ferc1\"))\n", + "ferc1_engine = sa.create_engine(PudlPaths().sqlite_db_uri(\"ferc1\"))\n", "pudl_engine = sa.create_engine(PudlPaths().pudl_db)\n", "\n", "API_KEY_EIA = os.environ[\"API_KEY_EIA\"]\n", diff --git a/notebooks/work-in-progress/ferc714-output.ipynb b/notebooks/work-in-progress/ferc714-output.ipynb index 396a764b19..65cc1da40e 100644 --- a/notebooks/work-in-progress/ferc714-output.ipynb +++ b/notebooks/work-in-progress/ferc714-output.ipynb @@ -142,7 +142,7 @@ "source": [ "from pudl.workspace.setup import PudlPaths\n", "\n", - "ferc1_engine = sa.create_engine(PudlPaths().sqlite_db(\"ferc1\"))\n", + "ferc1_engine = sa.create_engine(PudlPaths().sqlite_db_uri(\"ferc1\"))\n", "display(ferc1_engine)\n", "\n", "pudl_engine = sa.create_engine(PudlPaths().pudl_db)\n", diff --git a/notebooks/work-in-progress/jupyterhub-test.ipynb b/notebooks/work-in-progress/jupyterhub-test.ipynb index 18d0e9a037..3a59d03a7a 100644 --- a/notebooks/work-in-progress/jupyterhub-test.ipynb +++ b/notebooks/work-in-progress/jupyterhub-test.ipynb @@ -51,7 +51,7 @@ "source": [ "from pudl.workspace.setup import PudlPaths\n", "\n", - "ferc1_engine = sa.create_engine(PudlPaths().sqlite_db(\"ferc1\"))\n", + "ferc1_engine = sa.create_engine(PudlPaths().sqlite_db_uri(\"ferc1\"))\n", "pudl_engine = sa.create_engine(PudlPaths().pudl_db)\n", "pudl_out = pudl.output.pudltabl.PudlTabl(pudl_engine=pudl_engine)" ] diff --git a/notebooks/work-in-progress/state-demand.ipynb b/notebooks/work-in-progress/state-demand.ipynb index d319093948..a3158054c0 100644 --- a/notebooks/work-in-progress/state-demand.ipynb +++ b/notebooks/work-in-progress/state-demand.ipynb @@ -113,7 +113,7 @@ "#HARVEST_ACCOUNT_ID = os.environ[\"HARVEST_ACCOUNT_ID\"]\n", "\n", "from pudl.workspace.setup import PudlPaths\n", - "ferc1_engine = sa.create_engine(PudlPaths().sqlite_db(\"ferc1\"))\n", + "ferc1_engine = sa.create_engine(PudlPaths().sqlite_db_uri(\"ferc1\"))\n", "pudl_engine = sa.create_engine(PudlPaths().pudl_db)\n", "pudl_out = pudl.output.pudltabl.PudlTabl(pudl_engine=pudl_engine)" ] diff --git a/pyproject.toml b/pyproject.toml index a0518d645d..12af588f86 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -100,7 +100,6 @@ keywords = [ metadata_to_rst = "pudl.convert.metadata_to_rst:main" epacems_to_parquet = "pudl.convert.epacems_to_parquet:main" ferc_to_sqlite = "pudl.ferc_to_sqlite.cli:main" -datasette_metadata_to_yml = "pudl.convert.datasette_metadata_to_yml:main" pudl_datastore = "pudl.workspace.datastore:main" pudl_etl = "pudl.cli.etl:main" pudl_setup = "pudl.workspace.setup_cli:main" diff --git a/src/pudl/analysis/classify_plants_ferc1.py b/src/pudl/analysis/classify_plants_ferc1.py index 1a0f2835f6..9a8e6a1096 100644 --- a/src/pudl/analysis/classify_plants_ferc1.py +++ b/src/pudl/analysis/classify_plants_ferc1.py @@ -654,9 +654,11 @@ def fuel_by_plant_ferc1( ] # Ensure that the dataframe we've gotten has all the information we need: - for col in keep_cols: - if col not in fuel_df.columns: - raise AssertionError(f"Required column {col} not found in input fuel_df.") + missing_cols = [col for col in keep_cols if col not in fuel_df.columns] + if missing_cols: + raise AssertionError( + f"Required columns not found in input fuel_df: {missing_cols}" + ) # Calculate per-fuel derived values and add them to the DataFrame df = ( @@ -679,7 +681,8 @@ def fuel_by_plant_ferc1( "plant_name_ferc1", "report_year", "fuel_type_code_pudl", - ] + ], + observed=True, ) .sum() .reset_index() @@ -732,6 +735,13 @@ def fuel_by_plant_ferc1( ).reset_index() # Label each plant-year record by primary fuel: + df.loc[:, ["primary_fuel_by_cost", "primary_fuel_by_mmbtu"]] = pd.NA + df = df.astype( + { + "primary_fuel_by_cost": pd.StringDtype(), + "primary_fuel_by_mmbtu": pd.StringDtype(), + } + ) for fuel_str in fuel_categories: try: mmbtu_mask = df[f"{fuel_str}_fraction_mmbtu"] > thresh diff --git a/src/pudl/convert/__init__.py b/src/pudl/convert/__init__.py index 1085accee9..02676d1eeb 100644 --- a/src/pudl/convert/__init__.py +++ b/src/pudl/convert/__init__.py @@ -13,7 +13,6 @@ """ from . import ( censusdp1tract_to_sqlite, - datasette_metadata_to_yml, epacems_to_parquet, metadata_to_rst, ) diff --git a/src/pudl/convert/datasette_metadata_to_yml.py b/src/pudl/convert/datasette_metadata_to_yml.py deleted file mode 100644 index 7d88931788..0000000000 --- a/src/pudl/convert/datasette_metadata_to_yml.py +++ /dev/null @@ -1,62 +0,0 @@ -"""Export metadata to YAML for Datasette.""" - -import argparse -import os -import sys - -from dotenv import load_dotenv - -import pudl -from pudl.metadata.classes import DatasetteMetadata - -logger = pudl.logging_helpers.get_logger(__name__) - - -def parse_command_line(argv): - """Parse command line arguments. See the -h option. - - Args: - argv (str): Command line arguments, including absolute path to output filename. - - Returns: - dict: Dictionary of command line arguments and their parsed values. - """ - parser = argparse.ArgumentParser(description=__doc__) - parser.add_argument( - "-o", - "--output", - help="Path to the file where the YAML output should be written.", - default=False, - ) - parser.add_argument( - "--logfile", - default=None, - type=str, - help="If specified, write logs to this file.", - ) - parser.add_argument( - "--loglevel", - help="Set logging level (DEBUG, INFO, WARNING, ERROR, or CRITICAL).", - default="INFO", - ) - arguments = parser.parse_args(argv[1:]) - return arguments - - -def main(): - """Convert metadata to YAML.""" - load_dotenv() - args = parse_command_line(sys.argv) - - pudl.logging_helpers.configure_root_logger( - logfile=args.logfile, loglevel=args.loglevel - ) - - logger.info(f"Exporting Datasette metadata to: {args.output}") - - dm = DatasetteMetadata.from_data_source_ids(os.getenv("PUDL_OUTPUT")) - dm.to_yaml(path=args.output) - - -if __name__ == "__main__": - sys.exit(main()) diff --git a/src/pudl/extract/xbrl.py b/src/pudl/extract/xbrl.py index 9e726dabcb..108fa5e025 100644 --- a/src/pudl/extract/xbrl.py +++ b/src/pudl/extract/xbrl.py @@ -85,12 +85,22 @@ def xbrl2sqlite(context) -> None: logger.info(f"Dataset ferc{form}_xbrl is disabled, skipping") continue + sql_path = PudlPaths().sqlite_db_path(f"ferc{form.value}_xbrl") + + if sql_path.exists(): + if clobber: + sql_path.unlink() + else: + raise RuntimeError( + f"Found existing DB at {sql_path} and clobber was set to False. Aborting." + ) + convert_form( settings, form, datastore, output_path=output_path, - clobber=clobber, + sql_path=sql_path, batch_size=batch_size, workers=workers, ) @@ -101,7 +111,7 @@ def convert_form( form: XbrlFormNumber, datastore: FercXbrlDatastore, output_path: Path, - clobber: bool, + sql_path: Path, batch_size: int | None = None, workers: int | None = None, ) -> None: @@ -111,8 +121,8 @@ def convert_form( form_settings: Validated settings for converting the desired XBRL form to SQLite. form: FERC form number. datastore: Instance of a FERC XBRL datastore for retrieving data. - pudl_settings: Dictionary containing paths and database URLs - used by PUDL. + output_path: PUDL output directory + sql_path: path to the SQLite DB we'd like to write to. batch_size: Number of XBRL filings to process in a single CPU process. workers: Number of CPU processes to create for processing XBRL filings. @@ -125,13 +135,12 @@ def convert_form( for year in form_settings.years: taxonomy_archive, taxonomy_entry_point = datastore.get_taxonomy(year, form) filings_archive = datastore.get_filings(year, form) - + # if we set clobber=True, clobbers on *every* call to run_main; + # we already delete the existing base on `clobber=True` in `xbrl2sqlite` run_main( instance_path=filings_archive, - sql_path=PudlPaths() - .sqlite_db(f"ferc{form.value}_xbrl") - .removeprefix("sqlite:///"), # Temp hacky solution - clobber=clobber, + sql_path=sql_path, + clobber=False, taxonomy=taxonomy_archive, entry_point=taxonomy_entry_point, form_number=form.value, diff --git a/src/pudl/metadata/classes.py b/src/pudl/metadata/classes.py index 49b97cfb8f..a5e8f0be31 100644 --- a/src/pudl/metadata/classes.py +++ b/src/pudl/metadata/classes.py @@ -2021,7 +2021,7 @@ def from_data_source_ids( xbrl_resources=xbrl_resources, ) - def to_yaml(self, path: str = None) -> None: + def to_yaml(self) -> str: """Output database, table, and column metadata to YAML file.""" template = _get_jinja_environment().get_template("datasette-metadata.yml.jinja") rendered = template.render( @@ -2031,7 +2031,4 @@ def to_yaml(self, path: str = None) -> None: xbrl_resources=self.xbrl_resources, label_columns=self.label_columns, ) - if path: - Path(path).write_text(rendered) - else: - sys.stdout.write(rendered) + return rendered diff --git a/src/pudl/output/ferc1.py b/src/pudl/output/ferc1.py index fe11acb346..5ed3551f3e 100644 --- a/src/pudl/output/ferc1.py +++ b/src/pudl/output/ferc1.py @@ -810,10 +810,6 @@ def drop_other_fuel_types(df): return df[df.fuel_type_code_pudl != "other"].copy() thresh = context.op_config["thresh"] - # The existing function expects `fuel_type_code_pudl` to be an object, rather than - # a category. This is a legacy of pre-dagster code, and we convert here to prevent - # further retooling in the code-base. - fuel_ferc1["fuel_type_code_pudl"] = fuel_ferc1["fuel_type_code_pudl"].astype(str) fuel_categories = list( pudl.transform.ferc1.FuelFerc1TableTransformer() diff --git a/src/pudl/output/ferc714.py b/src/pudl/output/ferc714.py index 8049536b46..7d027f413a 100644 --- a/src/pudl/output/ferc714.py +++ b/src/pudl/output/ferc714.py @@ -652,18 +652,20 @@ def summarized_demand_ferc714( demand_hourly_pa_ferc714.loc[ :, ["report_date", "respondent_id_ferc714", "demand_mwh"] ], + on=["report_date", "respondent_id_ferc714"], how="left", ) - .groupby(["report_date", "respondent_id_ferc714"]) - .agg({"demand_mwh": sum}) + .groupby(["report_date", "respondent_id_ferc714"], as_index=False)[ + ["demand_mwh"] + ] + .sum(min_count=1) .rename(columns={"demand_mwh": "demand_annual_mwh"}) - .reset_index() .merge( georeferenced_counties_ferc714.groupby( - ["report_date", "respondent_id_ferc714"] - ) - .agg({"population": sum, "area_km2": sum}) - .reset_index() + ["report_date", "respondent_id_ferc714"], as_index=False + )[["population", "area_km2"]].sum(min_count=1), + on=["report_date", "respondent_id_ferc714"], + how="left", ) .assign( population_density_km2=lambda x: x.population / x.area_km2, diff --git a/src/pudl/workspace/setup.py b/src/pudl/workspace/setup.py index d33777614b..6afaa751ea 100644 --- a/src/pudl/workspace/setup.py +++ b/src/pudl/workspace/setup.py @@ -54,12 +54,12 @@ class Config: @property def input_dir(self) -> Path: """Path to PUDL input directory.""" - return Path(self.pudl_input) + return Path(self.pudl_input).absolute() @property def output_dir(self) -> Path: """Path to PUDL output directory.""" - return Path(self.pudl_output) + return Path(self.pudl_output).absolute() @property def settings_dir(self) -> Path: @@ -73,18 +73,22 @@ def data_dir(self) -> Path: return self.input_dir @property - def pudl_db(self) -> Path: + def pudl_db(self) -> str: """Returns url of locally stored pudl sqlite database.""" - return self.sqlite_db("pudl") + return self.sqlite_db_uri("pudl") - def sqlite_db(self, name: str) -> str: - """Returns url of locally stored pudl slqlite database with given name. + def sqlite_db_uri(self, name: str) -> str: + """Returns url of locally stored pudl sqlite database with given name. The name is expected to be the name of the database without the .sqlite suffix. E.g. pudl, ferc1 and so on. """ - db_path = PudlPaths().output_dir / f"{name}.sqlite" - return f"sqlite:///{db_path}" + # SQLite URI has 3 slashes - 2 to separate URI scheme, 1 to separate creds + # sqlite://{credentials}/{db_path} + return f"sqlite:///{self.sqlite_db_path(name)}" + + def sqlite_db_path(self, name: str) -> Path: + """Return path to locally stored SQLite DB file.""" return self.output_dir / f"{name}.sqlite" def output_file(self, filename: str) -> Path: diff --git a/test/integration/datasette_metadata_test.py b/test/integration/datasette_metadata_test.py index dfd0f0838f..e039a156b3 100644 --- a/test/integration/datasette_metadata_test.py +++ b/test/integration/datasette_metadata_test.py @@ -18,7 +18,8 @@ def test_datasette_metadata_to_yml(ferc1_engine_xbrl): logger.info(f"Writing Datasette Metadata to {metadata_yml}") dm = DatasetteMetadata.from_data_source_ids(PudlPaths().output_dir) - dm.to_yaml(path=metadata_yml) + with metadata_yml.open("w") as f: + f.write(dm.to_yaml()) logger.info("Parsing generated metadata using datasette utils.") metadata_json = json.dumps(yaml.safe_load(metadata_yml.open())) diff --git a/test/unit/extract/xbrl_test.py b/test/unit/extract/xbrl_test.py index 2770e7c561..0aeb2a920d 100644 --- a/test/unit/extract/xbrl_test.py +++ b/test/unit/extract/xbrl_test.py @@ -122,12 +122,91 @@ def test_xbrl2sqlite(settings, forms, mocker): form, mock_datastore, output_path=PudlPaths().output_dir, + sql_path=PudlPaths().output_dir / f"ferc{form.value}_xbrl.sqlite", batch_size=20, workers=10, - clobber=True, ) +def test_xbrl2sqlite_db_exists_no_clobber(mocker): + convert_form_mock = mocker.MagicMock() + mocker.patch("pudl.extract.xbrl.convert_form", new=convert_form_mock) + + # Mock datastore object to allow comparison + mock_datastore = mocker.MagicMock() + mocker.patch("pudl.extract.xbrl.FercXbrlDatastore", return_value=mock_datastore) + + ferc1_sqlite_path = PudlPaths().output_dir / "ferc1_xbrl.sqlite" + ferc1_sqlite_path.touch() + settings = FercToSqliteSettings( + ferc1_dbf_to_sqlite_settings=Ferc1DbfToSqliteSettings(), + ferc1_xbrl_to_sqlite_settings=Ferc1XbrlToSqliteSettings(), + ferc2_xbrl_to_sqlite_settings=None, + ferc6_xbrl_to_sqlite_settings=None, + ferc60_xbrl_to_sqlite_settings=None, + ferc714_xbrl_to_sqlite_settings=None, + ) + # Construct xbrl2sqlite op context + context = build_op_context( + resources={ + "ferc_to_sqlite_settings": settings, + "datastore": "datastore", + }, + config={ + "workers": 10, + "batch_size": 20, + "clobber": False, + }, + ) + + with pytest.raises(RuntimeError, match="Found existing DB"): + xbrl2sqlite(context) + + +def test_xbrl2sqlite_db_exists_yes_clobber(mocker): + convert_form_mock = mocker.MagicMock() + mocker.patch("pudl.extract.xbrl.convert_form", new=convert_form_mock) + + # Mock datastore object to allow comparison + mock_datastore = mocker.MagicMock() + mocker.patch("pudl.extract.xbrl.FercXbrlDatastore", return_value=mock_datastore) + + ferc1_sqlite_path = PudlPaths().output_dir / "ferc1_xbrl.sqlite" + ferc1_sqlite_path.touch() + + # mock the db path so we can assert it gets clobbered + mock_db_path = mocker.MagicMock(spec=ferc1_sqlite_path) + mock_pudl_paths = mocker.MagicMock( + spec=PudlPaths(), sqlite_db_path=lambda _x: mock_db_path + ) + mocker.patch("pudl.extract.xbrl.PudlPaths", return_value=mock_pudl_paths) + + settings = FercToSqliteSettings( + ferc1_dbf_to_sqlite_settings=Ferc1DbfToSqliteSettings(), + ferc1_xbrl_to_sqlite_settings=Ferc1XbrlToSqliteSettings(), + ferc2_xbrl_to_sqlite_settings=None, + ferc6_xbrl_to_sqlite_settings=None, + ferc60_xbrl_to_sqlite_settings=None, + ferc714_xbrl_to_sqlite_settings=None, + ) + + context = build_op_context( + resources={ + "ferc_to_sqlite_settings": settings, + "datastore": "datastore", + }, + config={ + "workers": 10, + "batch_size": 20, + "clobber": True, + }, + ) + + xbrl2sqlite(context) + + mock_db_path.unlink.assert_any_call() + + def test_convert_form(mocker): """Test convert_form method is properly calling extractor.""" extractor_mock = mocker.MagicMock() @@ -158,7 +237,7 @@ def get_filings(self, year, form: XbrlFormNumber): form, FakeDatastore(), output_path=output_path, - clobber=True, + sql_path=output_path / f"ferc{form.value}_xbrl.sqlite", batch_size=10, workers=5, ) @@ -169,8 +248,8 @@ def get_filings(self, year, form: XbrlFormNumber): expected_calls.append( mocker.call( instance_path=f"filings_{year}_{form.value}", - sql_path=str(output_path / f"ferc{form.value}_xbrl.sqlite"), - clobber=True, + sql_path=output_path / f"ferc{form.value}_xbrl.sqlite", + clobber=False, taxonomy=f"raw_archive_{year}_{form.value}", entry_point=f"taxonomy_entry_point_{year}_{form.value}", form_number=form.value, diff --git a/test/unit/io_managers_test.py b/test/unit/io_managers_test.py index 89d8a80988..7934877901 100644 --- a/test/unit/io_managers_test.py +++ b/test/unit/io_managers_test.py @@ -1,7 +1,9 @@ """Test Dagster IO Managers.""" import datetime import json +from pathlib import Path +import alembic.config import hypothesis import pandas as pd import pandera @@ -204,40 +206,65 @@ def test_missing_schema_error(sqlite_io_manager_fixture): @pytest.fixture -def pudl_sqlite_io_manager_fixture(tmp_path, test_pkg): - """Create a SQLiteIOManager fixture with a PUDL database schema.""" - db_path = tmp_path / "pudl.sqlite" +def fake_pudl_sqlite_io_manager_fixture(tmp_path, test_pkg, monkeypatch): + """Create a SQLiteIOManager fixture with a fake database schema.""" + db_path = tmp_path / "fake.sqlite" # Create the database and schemas engine = sa.create_engine(f"sqlite:///{db_path}") md = test_pkg.to_sql() md.create_all(engine) - return PudlSQLiteIOManager(base_dir=tmp_path, db_name="pudl", package=test_pkg) + return PudlSQLiteIOManager(base_dir=tmp_path, db_name="fake", package=test_pkg) -def test_error_when_handling_view_without_metadata(pudl_sqlite_io_manager_fixture): +def test_migrations_match_metadata(tmp_path, monkeypatch): + """If you create a `PudlSQLiteIOManager` that points at a non-existing + `pudl.sqlite` - it will initialize the DB based on the `package`. + + If you create a `PudlSQLiteIOManager` that points at an existing + `pudl.sqlite`, like one initialized via `alembic upgrade head`, it + will compare the existing db schema with the db schema in `package`. + + We want to make sure that the schema defined in `package` is the same as + the one we arrive at by applying all the migrations. + """ + # alembic wants current directory to be the one with `alembic.ini` in it + monkeypatch.chdir(Path(__file__).parent.parent.parent) + # alembic knows to use PudlPaths().pudl_db - so we need to set PUDL_OUTPUT env var + monkeypatch.setenv("PUDL_OUTPUT", tmp_path) + # run all the migrations on a fresh DB at tmp_path/pudl.sqlite + alembic.config.main(["upgrade", "head"]) + + pkg = Package.from_resource_ids() + PudlSQLiteIOManager(base_dir=tmp_path, db_name="pudl", package=pkg) + + # all we care about is that it didn't raise an error + assert True + + +def test_error_when_handling_view_without_metadata(fake_pudl_sqlite_io_manager_fixture): """Make sure an error is thrown when a user creates a view without metadata.""" asset_key = "track_view" sql_stmt = "CREATE VIEW track_view AS SELECT * FROM track;" output_context = build_output_context(asset_key=AssetKey(asset_key)) with pytest.raises(ValueError): - pudl_sqlite_io_manager_fixture.handle_output(output_context, sql_stmt) + fake_pudl_sqlite_io_manager_fixture.handle_output(output_context, sql_stmt) @pytest.mark.skip(reason="SQLAlchemy is not finding the view. Debug or remove.") -def test_handling_view_with_metadata(pudl_sqlite_io_manager_fixture): +def test_handling_view_with_metadata(fake_pudl_sqlite_io_manager_fixture): """Make sure an users can create and load views when it has metadata.""" # Create some sample data asset_key = "artist" artist = pd.DataFrame({"artistid": [1], "artistname": ["Co-op Mop"]}) output_context = build_output_context(asset_key=AssetKey(asset_key)) - pudl_sqlite_io_manager_fixture.handle_output(output_context, artist) + fake_pudl_sqlite_io_manager_fixture.handle_output(output_context, artist) # create the view asset_key = "artist_view" sql_stmt = "CREATE VIEW artist_view AS SELECT * FROM artist;" output_context = build_output_context(asset_key=AssetKey(asset_key)) - pudl_sqlite_io_manager_fixture.handle_output(output_context, sql_stmt) + fake_pudl_sqlite_io_manager_fixture.handle_output(output_context, sql_stmt) # read the view data as a dataframe input_context = build_input_context(asset_key=AssetKey(asset_key)) @@ -246,15 +273,15 @@ def test_handling_view_with_metadata(pudl_sqlite_io_manager_fixture): # sqlalchemy.exc.InvalidRequestError: Could not reflect: requested table(s) not available in # Engine(sqlite:////private/var/folders/pg/zrqnq8l113q57bndc5__h2640000gn/ # # T/pytest-of-nelsonauner/pytest-38/test_handling_view_with_metada0/pudl.sqlite): (artist_view) - pudl_sqlite_io_manager_fixture.load_input(input_context) + fake_pudl_sqlite_io_manager_fixture.load_input(input_context) -def test_error_when_reading_view_without_metadata(pudl_sqlite_io_manager_fixture): +def test_error_when_reading_view_without_metadata(fake_pudl_sqlite_io_manager_fixture): """Make sure and error is thrown when a user loads a view without metadata.""" asset_key = "track_view" input_context = build_input_context(asset_key=AssetKey(asset_key)) with pytest.raises(ValueError): - pudl_sqlite_io_manager_fixture.load_input(input_context) + fake_pudl_sqlite_io_manager_fixture.load_input(input_context) def test_ferc_xbrl_sqlite_io_manager_dedupes(mocker, tmp_path): diff --git a/test/validate/notebooks/validate_bf_eia923.ipynb b/test/validate/notebooks/validate_bf_eia923.ipynb index 750b0e926e..c8ae9e9ad6 100644 --- a/test/validate/notebooks/validate_bf_eia923.ipynb +++ b/test/validate/notebooks/validate_bf_eia923.ipynb @@ -79,7 +79,7 @@ "outputs": [], "source": [ "from pudl.workspace.setup import PudlPaths\n", - "ferc1_engine = sa.create_engine(PudlPaths().sqlite_db(\"ferc1\"))\n", + "ferc1_engine = sa.create_engine(PudlPaths().sqlite_db_uri(\"ferc1\"))\n", "pudl_engine = sa.create_engine(PudlPaths().pudl_db)" ] }, diff --git a/test/validate/notebooks/validate_fbp_ferc1.ipynb b/test/validate/notebooks/validate_fbp_ferc1.ipynb index fb4fd2920d..19bea73485 100644 --- a/test/validate/notebooks/validate_fbp_ferc1.ipynb +++ b/test/validate/notebooks/validate_fbp_ferc1.ipynb @@ -81,7 +81,7 @@ "outputs": [], "source": [ "from pudl.workspace.setup import PudlPaths\n", - "ferc1_engine = sa.create_engine(PudlPaths().sqlite_db(\"ferc1\"))\n", + "ferc1_engine = sa.create_engine(PudlPaths().sqlite_db_uri(\"ferc1\"))\n", "pudl_engine = sa.create_engine(PudlPaths().pudl_db)" ] }, diff --git a/test/validate/notebooks/validate_frc_eia923.ipynb b/test/validate/notebooks/validate_frc_eia923.ipynb index e1834129b7..63bf3862c4 100644 --- a/test/validate/notebooks/validate_frc_eia923.ipynb +++ b/test/validate/notebooks/validate_frc_eia923.ipynb @@ -79,7 +79,7 @@ "outputs": [], "source": [ "from pudl.workspace.setup import PudlPaths\n", - "ferc1_engine = sa.create_engine(PudlPaths().sqlite_db(\"ferc1\"))\n", + "ferc1_engine = sa.create_engine(PudlPaths().sqlite_db_uri(\"ferc1\"))\n", "pudl_engine = sa.create_engine(PudlPaths().pudl_db)" ] }, diff --git a/test/validate/notebooks/validate_fuel_ferc1.ipynb b/test/validate/notebooks/validate_fuel_ferc1.ipynb index cc86703c20..7cc54302b8 100644 --- a/test/validate/notebooks/validate_fuel_ferc1.ipynb +++ b/test/validate/notebooks/validate_fuel_ferc1.ipynb @@ -79,7 +79,7 @@ "outputs": [], "source": [ "from pudl.workspace.setup import PudlPaths\n", - "ferc1_engine = sa.create_engine(PudlPaths().sqlite_db("ferc1"))\n", + "ferc1_engine = sa.create_engine(PudlPaths().sqlite_db_uri("ferc1"))\n", "pudl_engine = sa.create_engine(PudlPaths().pudl_db)\n", "pudl_settings" ] diff --git a/test/validate/notebooks/validate_gens_eia860.ipynb b/test/validate/notebooks/validate_gens_eia860.ipynb index 17b2916e44..59ad58531d 100644 --- a/test/validate/notebooks/validate_gens_eia860.ipynb +++ b/test/validate/notebooks/validate_gens_eia860.ipynb @@ -78,7 +78,7 @@ "outputs": [], "source": [ "from pudl.workspace.setup import PudlPaths\n", - "ferc1_engine = sa.create_engine(PudlPaths().sqlite_db("ferc1"))\n", + "ferc1_engine = sa.create_engine(PudlPaths().sqlite_db_uri("ferc1"))\n", "pudl_engine = sa.create_engine(PudlPaths().pudl_db)" ] }, diff --git a/test/validate/notebooks/validate_gf_eia923.ipynb b/test/validate/notebooks/validate_gf_eia923.ipynb index 216d07e8b8..9dede77d30 100644 --- a/test/validate/notebooks/validate_gf_eia923.ipynb +++ b/test/validate/notebooks/validate_gf_eia923.ipynb @@ -78,7 +78,7 @@ "outputs": [], "source": [ "from pudl.workspace.setup import PudlPaths\n", - "ferc1_engine = sa.create_engine(PudlPaths().sqlite_db("ferc1"))\n", + "ferc1_engine = sa.create_engine(PudlPaths().sqlite_db_uri("ferc1"))\n", "pudl_engine = sa.create_engine(PudlPaths().pudl_db)\n", "pudl_settings" ] diff --git a/test/validate/notebooks/validate_mcoe.ipynb b/test/validate/notebooks/validate_mcoe.ipynb index e8c884f558..4b948fc073 100644 --- a/test/validate/notebooks/validate_mcoe.ipynb +++ b/test/validate/notebooks/validate_mcoe.ipynb @@ -79,7 +79,7 @@ "outputs": [], "source": [ "from pudl.workspace.setup import PudlPaths\n", - "ferc1_engine = sa.create_engine(PudlPaths().sqlite_db("ferc1"))\n", + "ferc1_engine = sa.create_engine(PudlPaths().sqlite_db_uri("ferc1"))\n", "pudl_engine = sa.create_engine(PudlPaths().pudl_db)\n", "pudl_settings" ] diff --git a/test/validate/notebooks/validate_plants_steam_ferc1.ipynb b/test/validate/notebooks/validate_plants_steam_ferc1.ipynb index e435974dc0..0004d00081 100644 --- a/test/validate/notebooks/validate_plants_steam_ferc1.ipynb +++ b/test/validate/notebooks/validate_plants_steam_ferc1.ipynb @@ -79,7 +79,7 @@ "outputs": [], "source": [ "from pudl.workspace.setup import PudlPaths\n", - "ferc1_engine = sa.create_engine(PudlPaths().sqlite_db("ferc1"))\n", + "ferc1_engine = sa.create_engine(PudlPaths().sqlite_db_uri("ferc1"))\n", "pudl_engine = sa.create_engine(PudlPaths().pudl_db)\n", "pudl_settings" ] diff --git a/test/validate/service_territory_test.py b/test/validate/service_territory_test.py index 4dbee9a988..203f40ff0e 100644 --- a/test/validate/service_territory_test.py +++ b/test/validate/service_territory_test.py @@ -13,7 +13,7 @@ "df_name,expected_rows", [ ("summarized_demand_ferc714", 3_195), - ("fipsified_respondents_ferc714", 135_627), + ("fipsified_respondents_ferc714", 135_537), ("compiled_geometry_balancing_authority_eia861", 112_507), ("compiled_geometry_utility_eia861", 247_705), ], @@ -46,3 +46,43 @@ def test_minmax_rows( pv.check_max_rows, expected_rows=expected_rows, margin=0.0, df_name=df_name ) ) + + +@pytest.mark.parametrize( + "df_name,expected_rows", + [("demand_hourly_pa_ferc714", 15_608_154)], +) +def test_minmax_rows_and_year_in_demand_hourly_pa_ferc714( + pudl_out_orig: "pudl.output.pudltabl.PudlTabl", + live_dbs: bool, + expected_rows: int, + df_name: str, +): + """Test if the majority of the years in the two date columns line up & min/max rows. + + We are parameterizing this test even though it only has one input because the + test_minmax_rows is a common test across many tables and we wanted to preserve the + format. + """ + if not live_dbs: + pytest.skip("Data validation only works with a live PUDL DB.") + demand_hourly_pa_ferc714 = pudl_out_orig.__getattribute__(df_name)() + _ = demand_hourly_pa_ferc714.pipe( + pv.check_min_rows, expected_rows=expected_rows, margin=0.0, df_name=df_name + ).pipe(pv.check_max_rows, expected_rows=expected_rows, margin=0.0, df_name=df_name) + + logger.info("Checking the consistency of the year in the multiple date columns.") + mismatched_report_years = demand_hourly_pa_ferc714[ + ( + demand_hourly_pa_ferc714.utc_datetime.dt.year + != demand_hourly_pa_ferc714.report_date.dt.year + ) + ] + if ( + off_ratio := len(mismatched_report_years) / len(demand_hourly_pa_ferc714) + ) > 0.001: + raise AssertionError( + f"Found more ({off_ratio:.2%}) than expected (>.1%) FERC714 records" + " where the report year from the utc_datetime differs from the " + "report_date column." + )