From 7b805beb0c7fb4f5b5841bcfd670eb01e3a8096d Mon Sep 17 00:00:00 2001 From: thinky Date: Wed, 11 Oct 2023 09:53:23 -0400 Subject: [PATCH 01/11] WIP needs testing, rewrite to use quarterly rather than state-year partitions --- src/pudl_archiver/archivers/epacems.py | 90 +++++--------------------- 1 file changed, 15 insertions(+), 75 deletions(-) diff --git a/src/pudl_archiver/archivers/epacems.py b/src/pudl_archiver/archivers/epacems.py index 4db24c90..a38cce7c 100644 --- a/src/pudl_archiver/archivers/epacems.py +++ b/src/pudl_archiver/archivers/epacems.py @@ -1,5 +1,4 @@ """Download EPACEMS data.""" -import asyncio import json import logging import os @@ -14,67 +13,13 @@ ) logger = logging.getLogger(f"catalystcoop.{__name__}") -STATE_ABBREVIATIONS = [ - "al", - "ak", - "az", - "ar", - "ca", - "co", - "ct", - "dc", - "de", - "fl", - "ga", - "hi", - "id", - "il", - "in", - "ia", - "ks", - "ky", - "la", - "me", - "md", - "ma", - "mi", - "mn", - "ms", - "mo", - "mt", - "ne", - "nv", - "nh", - "nj", - "nm", - "ny", - "nc", - "nd", - "oh", - "ok", - "or", - "pa", - "pr", - "ri", - "sc", - "sd", - "tn", - "tx", - "ut", - "vt", - "va", - "wa", - "wv", - "wi", - "wy", -] class EpaCemsArchiver(AbstractDatasetArchiver): """EPA CEMS archiver.""" name = "epacems" - concurrency_limit = 10 # Number of files to concurrently download + concurrency_limit = 5 # Number of files to concurrently download base_url = "https://api.epa.gov/easey/bulk-files/" parameters = {"api_key": os.environ["EPACEMS_API_KEY"]} # Set to API key @@ -90,26 +35,26 @@ async def get_resources(self) -> ArchiveAwaitable: resjson = file_list.content.decode("utf8").replace("'", '"') file_list.close() # Close connection. bulk_files = json.loads(resjson) - hourly_state_emissions_files = [ + quarterly_emissions_files = [ file for file in bulk_files if (file["metadata"]["dataType"] == "Emissions") and (file["metadata"]["dataSubType"] == "Hourly") - and ("stateCode" in file["metadata"].keys()) + and ("quarter" in file["metadata"].keys()) and (self.valid_year(file["metadata"]["year"])) ] - logger.info(f"Downloading {len(hourly_state_emissions_files)} total files.") - for i, cems_file in enumerate(hourly_state_emissions_files): - yield self.get_state_year_resource(file=cems_file, request_count=i) + logger.info(f"Downloading {len(quarterly_emissions_files)} total files.") + for i, cems_file in enumerate(quarterly_emissions_files): + yield self.get_quarter_year_resource(file=cems_file, request_count=i) else: raise AssertionError( f"EPACEMS API request did not succeed: {file_list.status_code}" ) - async def get_state_year_resource( + async def get_quarter_year_resource( self, file: dict[str, str | dict[str, str]], request_count: int | None ) -> tuple[Path, dict]: - """Download all available data for a single state/year. + """Download all available data for a single quarter in a year. Args: file: a dictionary containing file characteristics from the EPA API. @@ -119,26 +64,21 @@ async def get_state_year_resource( """ url = self.base_url + file["s3Path"] year = file["metadata"]["year"] - state = file["metadata"]["stateCode"].lower() - file_size = file["megaBytes"] - if int(file_size) > 600: # If bigger than 600 mb - await asyncio.sleep(60 * 5) - # Add a five-minute wait time for very big files to let - # other files in group finish first. + quarter = file["metadata"]["quarter"] # Useful to debug at download time-outs. - logger.debug(f"Downloading {year} EPACEMS data for {state.upper()}") + logger.debug(f"Downloading Q{quarter} {year} EPACEMS data.") - # Create zipfile to store year/state combinations of files - filename = f"epacems-{year}-{state}.csv" - archive_path = self.download_directory / f"epacems-{year}-{state}.zip" + # Create zipfile to store year/quarter combinations of files + filename = f"epacems-{year}-{quarter}.csv" + archive_path = self.download_directory / f"epacems-{year}-{quarter}.zip" # Override the default asyncio timeout to 14 minutes, just under the API limit. await self.download_and_zip_file( url=url, filename=filename, archive_path=archive_path, timeout=60 * 14 ) logger.info( # Verbose but helpful to keep track of progress. - f"File no. {request_count}: Downloaded {year} EPACEMS data for {state.upper()}" + f"File no. {request_count}: Downloaded Q{quarter} {year} EPA CEMS hourly emissions data." ) return ResourceInfo( - local_path=archive_path, partitions={"year": year, "state": state} + local_path=archive_path, partitions={"year": year, "quarter": quarter} ) From 38098b8f09aeaa61f375074189597f6614e8b1dc Mon Sep 17 00:00:00 2001 From: thinky Date: Wed, 11 Oct 2023 18:01:10 -0400 Subject: [PATCH 02/11] Add handling for giant zipfiles in download_and_zip --- src/pudl_archiver/archivers/classes.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/pudl_archiver/archivers/classes.py b/src/pudl_archiver/archivers/classes.py index 1353c5b7..1bd168d8 100644 --- a/src/pudl_archiver/archivers/classes.py +++ b/src/pudl_archiver/archivers/classes.py @@ -174,9 +174,11 @@ async def download_and_zip_file( # Write to zipfile with zipfile.ZipFile( - archive_path, "w", compression=zipfile.ZIP_DEFLATED + archive_path, + "w", + compression=zipfile.ZIP_DEFLATED, ) as archive: - with archive.open(filename, "w") as f_disk: + with archive.open(filename, "w", force_zip64=True) as f_disk: f_disk.write(response_bytes) async def get_hyperlinks( From 8bd7fc71bb638e99fd670b7ff316a78a61ed41a1 Mon Sep 17 00:00:00 2001 From: thinky Date: Thu, 12 Oct 2023 10:26:11 -0400 Subject: [PATCH 03/11] Try a new way to write bytes to zipfile --- src/pudl_archiver/archivers/classes.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/pudl_archiver/archivers/classes.py b/src/pudl_archiver/archivers/classes.py index 1bd168d8..d9927a13 100644 --- a/src/pudl_archiver/archivers/classes.py +++ b/src/pudl_archiver/archivers/classes.py @@ -178,8 +178,7 @@ async def download_and_zip_file( "w", compression=zipfile.ZIP_DEFLATED, ) as archive: - with archive.open(filename, "w", force_zip64=True) as f_disk: - f_disk.write(response_bytes) + archive.writestr(filename, response_bytes) async def get_hyperlinks( self, From 7d5a4e1185a5017284daf74c411ccfe37de70073 Mon Sep 17 00:00:00 2001 From: thinky Date: Tue, 17 Oct 2023 10:53:09 -0400 Subject: [PATCH 04/11] Change file count # for legibility --- src/pudl_archiver/archivers/epacems.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/pudl_archiver/archivers/epacems.py b/src/pudl_archiver/archivers/epacems.py index a38cce7c..07070317 100644 --- a/src/pudl_archiver/archivers/epacems.py +++ b/src/pudl_archiver/archivers/epacems.py @@ -45,7 +45,9 @@ async def get_resources(self) -> ArchiveAwaitable: ] logger.info(f"Downloading {len(quarterly_emissions_files)} total files.") for i, cems_file in enumerate(quarterly_emissions_files): - yield self.get_quarter_year_resource(file=cems_file, request_count=i) + yield self.get_quarter_year_resource( + file=cems_file, request_count=i + 1 + ) # Count files starting at 1 for human legibility. else: raise AssertionError( f"EPACEMS API request did not succeed: {file_list.status_code}" @@ -76,7 +78,7 @@ async def get_quarter_year_resource( await self.download_and_zip_file( url=url, filename=filename, archive_path=archive_path, timeout=60 * 14 ) - logger.info( # Verbose but helpful to keep track of progress. + logger.info( # Verbose but helpful to track progress f"File no. {request_count}: Downloaded Q{quarter} {year} EPA CEMS hourly emissions data." ) return ResourceInfo( From 0ddabae7e9e18fd3be5dcea0fb12b378a7471784 Mon Sep 17 00:00:00 2001 From: thinky Date: Wed, 1 Nov 2023 18:06:59 -0400 Subject: [PATCH 05/11] Zip quarters by year --- src/pudl_archiver/archivers/classes.py | 40 +++++++++++------- src/pudl_archiver/archivers/epacems.py | 56 ++++++++++++++------------ tests/unit/archive_base_test.py | 17 ++++++-- 3 files changed, 69 insertions(+), 44 deletions(-) diff --git a/src/pudl_archiver/archivers/classes.py b/src/pudl_archiver/archivers/classes.py index d9927a13..0d8ddbda 100644 --- a/src/pudl_archiver/archivers/classes.py +++ b/src/pudl_archiver/archivers/classes.py @@ -154,31 +154,41 @@ async def download_file( elif isinstance(file, io.BytesIO): file.write(response_bytes) - async def download_and_zip_file( + async def download_and_zip_files( self, - url: str, - filename: str, - archive_path: Path, + files_dict: dict[str, dict[str, str | Path]], **kwargs, ): """Download and zip a file using async session manager. Args: url: URL to file to download. - filename: name of file to be zipped - archive_path: Local path to write file to disk. + files_dict: Dictionary of file parameters kwargs: Key word args to pass to request. """ - response = await retry_async(self.session.get, args=[url], kwargs=kwargs) - response_bytes = await retry_async(response.read) + for key, file in files_dict.items(): + # Useful to debug download time-outs. + if "quarter" in file.keys(): + partition = "Q" + file["quarter"] + elif "month" in file.keys(): + partition = "M" + file["month"] + else: + partition = "" + year = file["year"] + logger.debug(f"Downloading {partition} {year} data.") - # Write to zipfile - with zipfile.ZipFile( - archive_path, - "w", - compression=zipfile.ZIP_DEFLATED, - ) as archive: - archive.writestr(filename, response_bytes) + response = await retry_async( + self.session.get, args=[file["url"]], kwargs=kwargs + ) + response_bytes = await retry_async(response.read) + + # Write to zipfile + with zipfile.ZipFile( + file["archive_path"], + "w", + compression=zipfile.ZIP_DEFLATED, + ) as archive: + archive.writestr(file["filename"], response_bytes) async def get_hyperlinks( self, diff --git a/src/pudl_archiver/archivers/epacems.py b/src/pudl_archiver/archivers/epacems.py index 07070317..a1459d56 100644 --- a/src/pudl_archiver/archivers/epacems.py +++ b/src/pudl_archiver/archivers/epacems.py @@ -44,43 +44,49 @@ async def get_resources(self) -> ArchiveAwaitable: and (self.valid_year(file["metadata"]["year"])) ] logger.info(f"Downloading {len(quarterly_emissions_files)} total files.") - for i, cems_file in enumerate(quarterly_emissions_files): - yield self.get_quarter_year_resource( - file=cems_file, request_count=i + 1 - ) # Count files starting at 1 for human legibility. + # For each year, download all quarters and zip into one file. + years = list( + {file["metadata"]["year"] for file in quarterly_emissions_files} + ) + for year in years: + cems_files = [ + file + for file in quarterly_emissions_files + if file["metadata"]["year"] == year + ] + yield self.get_year_resource(files=cems_files, year=year) else: raise AssertionError( f"EPACEMS API request did not succeed: {file_list.status_code}" ) - async def get_quarter_year_resource( - self, file: dict[str, str | dict[str, str]], request_count: int | None + async def get_year_resource( + self, files: list[dict[str, str | dict[str, str]]], year: int ) -> tuple[Path, dict]: """Download all available data for a single quarter in a year. Args: - file: a dictionary containing file characteristics from the EPA API. - See https://www.epa.gov/power-sector/cam-api-portal#/swagger/camd-services + files: a list of dictionaries containing file characteristics from the EPA + API. See + https://www.epa.gov/power-sector/cam-api-portal#/swagger/camd-services for expected format of dictionary. request_count: the number of the request. + year: year of data to download. """ - url = self.base_url + file["s3Path"] - year = file["metadata"]["year"] - quarter = file["metadata"]["quarter"] - - # Useful to debug at download time-outs. - logger.debug(f"Downloading Q{quarter} {year} EPACEMS data.") + logger.info(f"Downloading and zipping {year} CEMS data.") + files_dict = {} + archive_path = self.download_directory / f"epacems-{year}.zip" + for i, file in enumerate(files): + files_dict[i] = {} + quarter = file["metadata"]["quarter"] + files_dict[i]["url"] = self.base_url + file["s3Path"] + files_dict[i]["year"] = file["metadata"]["year"] + files_dict[i]["quarter"] = quarter + files_dict[i]["filename"] = f"epacems-{year}-{quarter}.csv" + files_dict[i]["archive_path"] = archive_path # Create zipfile to store year/quarter combinations of files - filename = f"epacems-{year}-{quarter}.csv" - archive_path = self.download_directory / f"epacems-{year}-{quarter}.zip" # Override the default asyncio timeout to 14 minutes, just under the API limit. - await self.download_and_zip_file( - url=url, filename=filename, archive_path=archive_path, timeout=60 * 14 - ) - logger.info( # Verbose but helpful to track progress - f"File no. {request_count}: Downloaded Q{quarter} {year} EPA CEMS hourly emissions data." - ) - return ResourceInfo( - local_path=archive_path, partitions={"year": year, "quarter": quarter} - ) + await self.download_and_zip_files(files_dict=files_dict, timeout=60 * 14) + + return ResourceInfo(local_path=archive_path, partitions={"year": year}) diff --git a/tests/unit/archive_base_test.py b/tests/unit/archive_base_test.py index aebf6edc..93093b0e 100644 --- a/tests/unit/archive_base_test.py +++ b/tests/unit/archive_base_test.py @@ -199,7 +199,7 @@ async def test_download_file(mocker, file_data): @pytest.mark.asyncio -async def test_download_and_zip_file(mocker, file_data): +async def test_download_and_zip_files(mocker, file_data): """Test download_and_zip_file. Tests that expected data is written to file on disk in a zipfile. @@ -222,12 +222,21 @@ async def test_download_and_zip_file(mocker, file_data): with tempfile.TemporaryDirectory() as path: file_path = str(Path(path) / "test.csv") archive_path = str(Path(path) / "test.zip") - - await archiver.download_and_zip_file(url, file_path, archive_path) + files_dict = { + 1: { + "file_path": file_path, + "archive_path": archive_path, + "url": url, + "year": "1996", + "filename": "test.csv", + } + } + + await archiver.download_and_zip_files(files_dict=files_dict) # Assert that the zipfile at archive_path contains a file at file_path session_mock.get.assert_called_once_with(url) with zipfile.ZipFile(archive_path) as zf: - zipped_file = zf.open(file_path) + zipped_file = zf.open("test.csv") assert zipped_file.read() == file_data From 3414cf90d87ef08a7da1079275607359d629f424 Mon Sep 17 00:00:00 2001 From: e-belfer Date: Wed, 22 Nov 2023 15:42:55 -0500 Subject: [PATCH 06/11] Revert "Zip quarters by year" This reverts commit 0ddabae7e9e18fd3be5dcea0fb12b378a7471784. --- src/pudl_archiver/archivers/classes.py | 42 ++++++++----------- src/pudl_archiver/archivers/epacems.py | 56 ++++++++++++-------------- tests/unit/archive_base_test.py | 17 ++------ 3 files changed, 45 insertions(+), 70 deletions(-) diff --git a/src/pudl_archiver/archivers/classes.py b/src/pudl_archiver/archivers/classes.py index aeedeceb..6e94bc10 100644 --- a/src/pudl_archiver/archivers/classes.py +++ b/src/pudl_archiver/archivers/classes.py @@ -154,41 +154,31 @@ async def download_file( elif isinstance(file, io.BytesIO): file.write(response_bytes) - async def download_and_zip_files( + async def download_and_zip_file( self, - files_dict: dict[str, dict[str, str | Path]], + url: str, + filename: str, + archive_path: Path, **kwargs, ): """Download and zip a file using async session manager. Args: url: URL to file to download. - files_dict: Dictionary of file parameters + filename: name of file to be zipped + archive_path: Local path to write file to disk. kwargs: Key word args to pass to request. """ - for key, file in files_dict.items(): - # Useful to debug download time-outs. - if "quarter" in file: - partition = "Q" + file["quarter"] - elif "month" in file: - partition = "M" + file["month"] - else: - partition = "" - year = file["year"] - logger.debug(f"Downloading {partition} {year} data.") - - response = await retry_async( - self.session.get, args=[file["url"]], kwargs=kwargs - ) - response_bytes = await retry_async(response.read) - - # Write to zipfile - with zipfile.ZipFile( - file["archive_path"], - "w", - compression=zipfile.ZIP_DEFLATED, - ) as archive: - archive.writestr(file["filename"], response_bytes) + response = await retry_async(self.session.get, args=[url], kwargs=kwargs) + response_bytes = await retry_async(response.read) + + # Write to zipfile + with zipfile.ZipFile( + archive_path, + "w", + compression=zipfile.ZIP_DEFLATED, + ) as archive: + archive.writestr(filename, response_bytes) async def get_hyperlinks( self, diff --git a/src/pudl_archiver/archivers/epacems.py b/src/pudl_archiver/archivers/epacems.py index 3d3344d6..8442d1e5 100644 --- a/src/pudl_archiver/archivers/epacems.py +++ b/src/pudl_archiver/archivers/epacems.py @@ -44,49 +44,43 @@ async def get_resources(self) -> ArchiveAwaitable: and (self.valid_year(file["metadata"]["year"])) ] logger.info(f"Downloading {len(quarterly_emissions_files)} total files.") - # For each year, download all quarters and zip into one file. - years = list( - {file["metadata"]["year"] for file in quarterly_emissions_files} - ) - for year in years: - cems_files = [ - file - for file in quarterly_emissions_files - if file["metadata"]["year"] == year - ] - yield self.get_year_resource(files=cems_files, year=year) + for i, cems_file in enumerate(quarterly_emissions_files): + yield self.get_quarter_year_resource( + file=cems_file, request_count=i + 1 + ) # Count files starting at 1 for human legibility. else: raise AssertionError( f"EPACEMS API request did not succeed: {file_list.status_code}" ) - async def get_year_resource( - self, files: list[dict[str, str | dict[str, str]]], year: int + async def get_quarter_year_resource( + self, file: dict[str, str | dict[str, str]], request_count: int | None ) -> tuple[Path, dict]: """Download all available data for a single quarter in a year. Args: - files: a list of dictionaries containing file characteristics from the EPA - API. See - https://www.epa.gov/power-sector/cam-api-portal#/swagger/camd-services + file: a dictionary containing file characteristics from the EPA API. + See https://www.epa.gov/power-sector/cam-api-portal#/swagger/camd-services for expected format of dictionary. request_count: the number of the request. - year: year of data to download. """ - logger.info(f"Downloading and zipping {year} CEMS data.") - files_dict = {} - archive_path = self.download_directory / f"epacems-{year}.zip" - for i, file in enumerate(files): - files_dict[i] = {} - quarter = file["metadata"]["quarter"] - files_dict[i]["url"] = self.base_url + file["s3Path"] - files_dict[i]["year"] = file["metadata"]["year"] - files_dict[i]["quarter"] = quarter - files_dict[i]["filename"] = f"epacems-{year}-{quarter}.csv" - files_dict[i]["archive_path"] = archive_path + url = self.base_url + file["s3Path"] + year = file["metadata"]["year"] + quarter = file["metadata"]["quarter"] + + # Useful to debug at download time-outs. + logger.debug(f"Downloading Q{quarter} {year} EPACEMS data.") # Create zipfile to store year/quarter combinations of files + filename = f"epacems-{year}-{quarter}.csv" + archive_path = self.download_directory / f"epacems-{year}-{quarter}.zip" # Override the default asyncio timeout to 14 minutes, just under the API limit. - await self.download_and_zip_files(files_dict=files_dict, timeout=60 * 14) - - return ResourceInfo(local_path=archive_path, partitions={"year": year}) + await self.download_and_zip_file( + url=url, filename=filename, archive_path=archive_path, timeout=60 * 14 + ) + logger.info( # Verbose but helpful to track progress + f"File no. {request_count}: Downloaded Q{quarter} {year} EPA CEMS hourly emissions data." + ) + return ResourceInfo( + local_path=archive_path, partitions={"year": year, "quarter": quarter} + ) diff --git a/tests/unit/archive_base_test.py b/tests/unit/archive_base_test.py index 53252935..cef6ebc7 100644 --- a/tests/unit/archive_base_test.py +++ b/tests/unit/archive_base_test.py @@ -199,7 +199,7 @@ async def test_download_file(mocker, file_data): @pytest.mark.asyncio -async def test_download_and_zip_files(mocker, file_data): +async def test_download_and_zip_file(mocker, file_data): """Test download_and_zip_file. Tests that expected data is written to file on disk in a zipfile. @@ -222,21 +222,12 @@ async def test_download_and_zip_files(mocker, file_data): with tempfile.TemporaryDirectory() as path: file_path = str(Path(path) / "test.csv") archive_path = str(Path(path) / "test.zip") - files_dict = { - 1: { - "file_path": file_path, - "archive_path": archive_path, - "url": url, - "year": "1996", - "filename": "test.csv", - } - } - - await archiver.download_and_zip_files(files_dict=files_dict) + + await archiver.download_and_zip_file(url, file_path, archive_path) # Assert that the zipfile at archive_path contains a file at file_path session_mock.get.assert_called_once_with(url) with zipfile.ZipFile(archive_path) as zf: - zipped_file = zf.open("test.csv") + zipped_file = zf.open(file_path) assert zipped_file.read() == file_data From 9669bc419b98a7b17f6277c3d21434b9daa98624 Mon Sep 17 00:00:00 2001 From: "e.belfer" Date: Thu, 23 Nov 2023 18:43:07 +0000 Subject: [PATCH 07/11] Reduce concurrency to prevent memory overload --- src/pudl_archiver/archivers/epacems.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/pudl_archiver/archivers/epacems.py b/src/pudl_archiver/archivers/epacems.py index 8442d1e5..efe2c5ca 100644 --- a/src/pudl_archiver/archivers/epacems.py +++ b/src/pudl_archiver/archivers/epacems.py @@ -19,7 +19,7 @@ class EpaCemsArchiver(AbstractDatasetArchiver): """EPA CEMS archiver.""" name = "epacems" - concurrency_limit = 5 # Number of files to concurrently download + concurrency_limit = 2 # Number of files to concurrently download base_url = "https://api.epa.gov/easey/bulk-files/" parameters = {"api_key": os.environ["EPACEMS_API_KEY"]} # Set to API key From a9e341545ac000c1e8c32c9c203041be834bda60 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Thu, 23 Nov 2023 18:50:31 +0000 Subject: [PATCH 08/11] [pre-commit.ci] auto fixes from pre-commit.com hooks For more information, see https://pre-commit.ci --- src/pudl_archiver/archivers/epacems.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/pudl_archiver/archivers/epacems.py b/src/pudl_archiver/archivers/epacems.py index efe2c5ca..d4e2a0f4 100644 --- a/src/pudl_archiver/archivers/epacems.py +++ b/src/pudl_archiver/archivers/epacems.py @@ -19,7 +19,7 @@ class EpaCemsArchiver(AbstractDatasetArchiver): """EPA CEMS archiver.""" name = "epacems" - concurrency_limit = 2 # Number of files to concurrently download + concurrency_limit = 2 # Number of files to concurrently download base_url = "https://api.epa.gov/easey/bulk-files/" parameters = {"api_key": os.environ["EPACEMS_API_KEY"]} # Set to API key From 64428e493257a09c9649b9952e8782bb271a7f32 Mon Sep 17 00:00:00 2001 From: e-belfer Date: Fri, 24 Nov 2023 10:01:03 -0500 Subject: [PATCH 09/11] Standardize to year-quarter --- src/pudl_archiver/archivers/epacems.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/pudl_archiver/archivers/epacems.py b/src/pudl_archiver/archivers/epacems.py index d4e2a0f4..92d9fcd2 100644 --- a/src/pudl_archiver/archivers/epacems.py +++ b/src/pudl_archiver/archivers/epacems.py @@ -45,7 +45,7 @@ async def get_resources(self) -> ArchiveAwaitable: ] logger.info(f"Downloading {len(quarterly_emissions_files)} total files.") for i, cems_file in enumerate(quarterly_emissions_files): - yield self.get_quarter_year_resource( + yield self.get_year_quarter_resource( file=cems_file, request_count=i + 1 ) # Count files starting at 1 for human legibility. else: @@ -53,7 +53,7 @@ async def get_resources(self) -> ArchiveAwaitable: f"EPACEMS API request did not succeed: {file_list.status_code}" ) - async def get_quarter_year_resource( + async def get_year_quarter_resource( self, file: dict[str, str | dict[str, str]], request_count: int | None ) -> tuple[Path, dict]: """Download all available data for a single quarter in a year. @@ -69,7 +69,7 @@ async def get_quarter_year_resource( quarter = file["metadata"]["quarter"] # Useful to debug at download time-outs. - logger.debug(f"Downloading Q{quarter} {year} EPACEMS data.") + logger.debug(f"Downloading {year} Q{quarter} EPACEMS data.") # Create zipfile to store year/quarter combinations of files filename = f"epacems-{year}-{quarter}.csv" From ff1fcb0f8f06b0c963cde1cdd6234e0a43779677 Mon Sep 17 00:00:00 2001 From: e-belfer Date: Fri, 24 Nov 2023 10:17:13 -0500 Subject: [PATCH 10/11] Correct year-quarter for missed logging statement --- src/pudl_archiver/archivers/epacems.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/pudl_archiver/archivers/epacems.py b/src/pudl_archiver/archivers/epacems.py index 92d9fcd2..81c931a7 100644 --- a/src/pudl_archiver/archivers/epacems.py +++ b/src/pudl_archiver/archivers/epacems.py @@ -79,7 +79,7 @@ async def get_year_quarter_resource( url=url, filename=filename, archive_path=archive_path, timeout=60 * 14 ) logger.info( # Verbose but helpful to track progress - f"File no. {request_count}: Downloaded Q{quarter} {year} EPA CEMS hourly emissions data." + f"File no. {request_count}: Downloaded {year} Q{quarter} EPA CEMS hourly emissions data." ) return ResourceInfo( local_path=archive_path, partitions={"year": year, "quarter": quarter} From e7d2ffce44231ca76b0702a1b0bf99e2d9a52128 Mon Sep 17 00:00:00 2001 From: e-belfer Date: Fri, 24 Nov 2023 12:43:57 -0500 Subject: [PATCH 11/11] update sandbox concept doi --- dataset_doi.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dataset_doi.yaml b/dataset_doi.yaml index 3cd67554..dfdd73b1 100644 --- a/dataset_doi.yaml +++ b/dataset_doi.yaml @@ -27,7 +27,7 @@ epacamd_eia: sandbox_doi: 10.5072/zenodo.1072000 epacems: production_doi: 10.5281/zenodo.4127054 - sandbox_doi: 10.5072/zenodo.1228518 + sandbox_doi: 10.5072/zenodo.2236 ferc1: production_doi: 10.5281/zenodo.4127043 sandbox_doi: 10.5072/zenodo.1114564