Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rewrite CEMS archiver to use quarterly partitions rather than state-year. #181

Merged
merged 18 commits into from
Dec 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions src/pudl_archiver/archivers/classes.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,9 +170,11 @@ async def download_and_zip_file(

# Write to zipfile
with zipfile.ZipFile(
archive_path, "w", compression=zipfile.ZIP_DEFLATED
) as archive, archive.open(filename, "w") as f_disk:
f_disk.write(response_bytes)
archive_path,
"w",
compression=zipfile.ZIP_DEFLATED,
) as archive:
archive.writestr(filename, response_bytes)

async def get_hyperlinks(
self,
Expand Down
94 changes: 18 additions & 76 deletions src/pudl_archiver/archivers/epacems.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
"""Download EPACEMS data."""
import asyncio
import json
import logging
import os
Expand All @@ -14,67 +13,13 @@
)

logger = logging.getLogger(f"catalystcoop.{__name__}")
STATE_ABBREVIATIONS = [
"al",
"ak",
"az",
"ar",
"ca",
"co",
"ct",
"dc",
"de",
"fl",
"ga",
"hi",
"id",
"il",
"in",
"ia",
"ks",
"ky",
"la",
"me",
"md",
"ma",
"mi",
"mn",
"ms",
"mo",
"mt",
"ne",
"nv",
"nh",
"nj",
"nm",
"ny",
"nc",
"nd",
"oh",
"ok",
"or",
"pa",
"pr",
"ri",
"sc",
"sd",
"tn",
"tx",
"ut",
"vt",
"va",
"wa",
"wv",
"wi",
"wy",
]


class EpaCemsArchiver(AbstractDatasetArchiver):
"""EPA CEMS archiver."""

name = "epacems"
concurrency_limit = 10 # Number of files to concurrently download
concurrency_limit = 2 # Number of files to concurrently download
Comment on lines -77 to +22
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did it end up just being impossible to run this archive via GitHub Actions? What's the bottleneck? Would we be able to run it on a 4CPU 16GB runner?

Copy link
Member Author

@e-belfer e-belfer Nov 24, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't tried yet, let me see! I think the Github action we'd want to use is getting defined in #197, and the existing action will fail because of the sandbox DOIs having changed.

Relatedly, @jdangerx our branches are in a bit of an intertwined state so let me know how you want to handle merge order in this instance?


base_url = "https://api.epa.gov/easey/bulk-files/"
parameters = {"api_key": os.environ["EPACEMS_API_KEY"]} # Set to API key
Expand All @@ -90,26 +35,28 @@ async def get_resources(self) -> ArchiveAwaitable:
resjson = file_list.content.decode("utf8").replace("'", '"')
file_list.close() # Close connection.
bulk_files = json.loads(resjson)
hourly_state_emissions_files = [
quarterly_emissions_files = [
file
for file in bulk_files
if (file["metadata"]["dataType"] == "Emissions")
and (file["metadata"]["dataSubType"] == "Hourly")
and ("stateCode" in file["metadata"])
and ("quarter" in file["metadata"])
and (self.valid_year(file["metadata"]["year"]))
]
logger.info(f"Downloading {len(hourly_state_emissions_files)} total files.")
for i, cems_file in enumerate(hourly_state_emissions_files):
yield self.get_state_year_resource(file=cems_file, request_count=i)
logger.info(f"Downloading {len(quarterly_emissions_files)} total files.")
for i, cems_file in enumerate(quarterly_emissions_files):
yield self.get_year_quarter_resource(
file=cems_file, request_count=i + 1
) # Count files starting at 1 for human legibility.
else:
raise AssertionError(
f"EPACEMS API request did not succeed: {file_list.status_code}"
)

async def get_state_year_resource(
async def get_year_quarter_resource(
self, file: dict[str, str | dict[str, str]], request_count: int | None
) -> tuple[Path, dict]:
"""Download all available data for a single state/year.
"""Download all available data for a single quarter in a year.

Args:
file: a dictionary containing file characteristics from the EPA API.
Expand All @@ -119,26 +66,21 @@ async def get_state_year_resource(
"""
url = self.base_url + file["s3Path"]
year = file["metadata"]["year"]
state = file["metadata"]["stateCode"].lower()
file_size = file["megaBytes"]
if int(file_size) > 600: # If bigger than 600 mb
await asyncio.sleep(60 * 5)
# Add a five-minute wait time for very big files to let
# other files in group finish first.
quarter = file["metadata"]["quarter"]

# Useful to debug at download time-outs.
logger.debug(f"Downloading {year} EPACEMS data for {state.upper()}")
logger.debug(f"Downloading {year} Q{quarter} EPACEMS data.")

# Create zipfile to store year/state combinations of files
filename = f"epacems-{year}-{state}.csv"
archive_path = self.download_directory / f"epacems-{year}-{state}.zip"
# Create zipfile to store year/quarter combinations of files
filename = f"epacems-{year}-{quarter}.csv"
archive_path = self.download_directory / f"epacems-{year}-{quarter}.zip"
# Override the default asyncio timeout to 14 minutes, just under the API limit.
await self.download_and_zip_file(
url=url, filename=filename, archive_path=archive_path, timeout=60 * 14
)
logger.info( # Verbose but helpful to keep track of progress.
f"File no. {request_count}: Downloaded {year} EPACEMS data for {state.upper()}"
logger.info( # Verbose but helpful to track progress
f"File no. {request_count}: Downloaded {year} Q{quarter} EPA CEMS hourly emissions data."
)
return ResourceInfo(
local_path=archive_path, partitions={"year": year, "state": state}
local_path=archive_path, partitions={"year": year, "quarter": quarter}
)