Skip to content

Commit

Permalink
Merge pull request #181 from catalyst-cooperative/cems_quarterly
Browse files Browse the repository at this point in the history
Rewrite CEMS archiver to use quarterly partitions rather than state-year.
  • Loading branch information
e-belfer authored Dec 4, 2023
2 parents 93f2012 + 3f52550 commit 9eab759
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 79 deletions.
8 changes: 5 additions & 3 deletions src/pudl_archiver/archivers/classes.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,9 +170,11 @@ async def download_and_zip_file(

# Write to zipfile
with zipfile.ZipFile(
archive_path, "w", compression=zipfile.ZIP_DEFLATED
) as archive, archive.open(filename, "w") as f_disk:
f_disk.write(response_bytes)
archive_path,
"w",
compression=zipfile.ZIP_DEFLATED,
) as archive:
archive.writestr(filename, response_bytes)

async def get_hyperlinks(
self,
Expand Down
94 changes: 18 additions & 76 deletions src/pudl_archiver/archivers/epacems.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
"""Download EPACEMS data."""
import asyncio
import json
import logging
import os
Expand All @@ -14,67 +13,13 @@
)

logger = logging.getLogger(f"catalystcoop.{__name__}")
STATE_ABBREVIATIONS = [
"al",
"ak",
"az",
"ar",
"ca",
"co",
"ct",
"dc",
"de",
"fl",
"ga",
"hi",
"id",
"il",
"in",
"ia",
"ks",
"ky",
"la",
"me",
"md",
"ma",
"mi",
"mn",
"ms",
"mo",
"mt",
"ne",
"nv",
"nh",
"nj",
"nm",
"ny",
"nc",
"nd",
"oh",
"ok",
"or",
"pa",
"pr",
"ri",
"sc",
"sd",
"tn",
"tx",
"ut",
"vt",
"va",
"wa",
"wv",
"wi",
"wy",
]


class EpaCemsArchiver(AbstractDatasetArchiver):
"""EPA CEMS archiver."""

name = "epacems"
concurrency_limit = 10 # Number of files to concurrently download
concurrency_limit = 2 # Number of files to concurrently download

base_url = "https://api.epa.gov/easey/bulk-files/"
parameters = {"api_key": os.environ["EPACEMS_API_KEY"]} # Set to API key
Expand All @@ -90,26 +35,28 @@ async def get_resources(self) -> ArchiveAwaitable:
resjson = file_list.content.decode("utf8").replace("'", '"')
file_list.close() # Close connection.
bulk_files = json.loads(resjson)
hourly_state_emissions_files = [
quarterly_emissions_files = [
file
for file in bulk_files
if (file["metadata"]["dataType"] == "Emissions")
and (file["metadata"]["dataSubType"] == "Hourly")
and ("stateCode" in file["metadata"])
and ("quarter" in file["metadata"])
and (self.valid_year(file["metadata"]["year"]))
]
logger.info(f"Downloading {len(hourly_state_emissions_files)} total files.")
for i, cems_file in enumerate(hourly_state_emissions_files):
yield self.get_state_year_resource(file=cems_file, request_count=i)
logger.info(f"Downloading {len(quarterly_emissions_files)} total files.")
for i, cems_file in enumerate(quarterly_emissions_files):
yield self.get_year_quarter_resource(
file=cems_file, request_count=i + 1
) # Count files starting at 1 for human legibility.
else:
raise AssertionError(
f"EPACEMS API request did not succeed: {file_list.status_code}"
)

async def get_state_year_resource(
async def get_year_quarter_resource(
self, file: dict[str, str | dict[str, str]], request_count: int | None
) -> tuple[Path, dict]:
"""Download all available data for a single state/year.
"""Download all available data for a single quarter in a year.
Args:
file: a dictionary containing file characteristics from the EPA API.
Expand All @@ -119,26 +66,21 @@ async def get_state_year_resource(
"""
url = self.base_url + file["s3Path"]
year = file["metadata"]["year"]
state = file["metadata"]["stateCode"].lower()
file_size = file["megaBytes"]
if int(file_size) > 600: # If bigger than 600 mb
await asyncio.sleep(60 * 5)
# Add a five-minute wait time for very big files to let
# other files in group finish first.
quarter = file["metadata"]["quarter"]

# Useful to debug at download time-outs.
logger.debug(f"Downloading {year} EPACEMS data for {state.upper()}")
logger.debug(f"Downloading {year} Q{quarter} EPACEMS data.")

# Create zipfile to store year/state combinations of files
filename = f"epacems-{year}-{state}.csv"
archive_path = self.download_directory / f"epacems-{year}-{state}.zip"
# Create zipfile to store year/quarter combinations of files
filename = f"epacems-{year}-{quarter}.csv"
archive_path = self.download_directory / f"epacems-{year}-{quarter}.zip"
# Override the default asyncio timeout to 14 minutes, just under the API limit.
await self.download_and_zip_file(
url=url, filename=filename, archive_path=archive_path, timeout=60 * 14
)
logger.info( # Verbose but helpful to keep track of progress.
f"File no. {request_count}: Downloaded {year} EPACEMS data for {state.upper()}"
logger.info( # Verbose but helpful to track progress
f"File no. {request_count}: Downloaded {year} Q{quarter} EPA CEMS hourly emissions data."
)
return ResourceInfo(
local_path=archive_path, partitions={"year": year, "state": state}
local_path=archive_path, partitions={"year": year, "quarter": quarter}
)

0 comments on commit 9eab759

Please sign in to comment.