Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

A fancier way of dealing with mixed Zenodo DOIs. #2799

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
194 changes: 107 additions & 87 deletions src/pudl/workspace/datastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,12 @@
from collections import defaultdict
from collections.abc import Iterator
from pathlib import Path
from typing import Any
from typing import Any, Self

import datapackage
import requests
from google.auth.exceptions import DefaultCredentialsError
from pydantic import BaseModel, HttpUrl, constr
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry

Expand Down Expand Up @@ -101,10 +102,10 @@ def get_resources(
"""Returns series of PudlResourceKey identifiers for matching resources.

Args:
name (str): if specified, find resource(s) with this name.
filters (dict): if specified, find resoure(s) matching these key=value constraints.
The constraints are matched against the 'parts' field of the resource
entry in the datapackage.json.
name: if specified, find resource(s) with this name.
filters: if specified, find resoure(s) matching these key=value constraints.
The constraints are matched against the 'parts' field of the resource
entry in the datapackage.json.
"""
for res in self.datapackage_json["resources"]:
if name and res["name"] != name:
Expand Down Expand Up @@ -154,77 +155,109 @@ def get_json_string(self) -> str:
return json.dumps(self.datapackage_json, sort_keys=True, indent=4)


class ZenodoFetcher:
"""API for fetching datapackage descriptors and resource contents from zenodo."""
class ZenodoDoi(BaseModel):
"""A class defining useful validations and methods for working with Zenodo DOIs."""

doi: constr(regex=r"^10\.(5072|5281)/zenodo\.[\d]+$") # noqa: F722

def __str__(self: Self) -> str:
"""A string representation of the DOI."""
return self.doi

@property
def is_prod(self: Self) -> bool:
"""Return True if DOI is from Zenodo production server, False otherwise."""
if self.doi.startswith("10.5281/zenodo"):
return True
else:
assert self.doi.startswith("10.5072/zenodo")
return False

@property
def token(self: Self) -> str:
"""Zenodo read-only personal access token corresponding to this DOI.

# Zenodo tokens recorded here should have read-only access to our archives.
# Including them here is correct in order to allow public use of this tool, so
# long as we stick to read-only keys.
TOKEN = {
Zenodo tokens recorded here should have read-only access to our archives.
Including them here is correct in order to allow public use of this tool, so
long as we stick to read-only keys.
"""
# Read-only personal access tokens for [email protected]:
"sandbox": "qyPC29wGPaflUUVAv1oGw99ytwBqwEEdwi4NuUrpwc3xUcEwbmuB4emwysco",
"production": "KXcG5s9TqeuPh1Ukt5QYbzhCElp9LxuqAuiwdqHP0WS4qGIQiydHn6FBtdJ5",
}
if self.is_prod:
return "KXcG5s9TqeuPh1Ukt5QYbzhCElp9LxuqAuiwdqHP0WS4qGIQiydHn6FBtdJ5"
else:
return "qyPC29wGPaflUUVAv1oGw99ytwBqwEEdwi4NuUrpwc3xUcEwbmuB4emwysco"

@property
def zenodo_id(self: Self) -> str:
"""The Zenodo deposition ID, extracted from the DOI."""
match = re.search(r"(10\.5072|10\.5281)/zenodo.([\d]+)", self.doi)
return match.groups()[1]

@property
def api_root(self: Self) -> HttpUrl:
"""Return appropriate production or sandbox Zenodo API root URL."""
if self.is_prod:
return "https://zenodo.org/api"
else:
return "https://sandbox.zenodo.org/api"

@property
def url(self: Self) -> HttpUrl:
"""Zenodo URL corresponding to this DOI."""
return f"{self.api_root}/deposit/depositions/{self.zenodo_id}"


DOI = {
class ZenodoFetcher:
"""API for fetching datapackage descriptors and resource contents from zenodo."""

dois: dict[str, ZenodoDoi] = {
# Sandbox DOIs are provided for reference
"censusdp1tract": "10.5281/zenodo.4127049",
# "censusdp1tract": "10.5072/zenodo.674992",
"eia860": "10.5281/zenodo.8164776",
# "eia860": "10.5072/zenodo.1222854",
"eia860m": "10.5281/zenodo.8188017",
# "eia860m": "10.5072/zenodo.1225517",
"eia861": "10.5281/zenodo.8231268",
# "eia861": "10.5072/zenodo.1229930",
"eia923": "10.5281/zenodo.8172818",
# "eia923": "10.5072/zenodo.1217724",
"eia_bulk_elec": "10.5281/zenodo.7067367",
# "eia_bulk_elec": "10.5072/zenodo.1103572",
"epacamd_eia": "10.5281/zenodo.7900974",
# "epacamd_eia": "10.5072/zenodo.1199170",
"epacems": "10.5281/zenodo.6910058",
# "epacems": "10.5072/zenodo.672963",
"ferc1": "10.5281/zenodo.7314437",
# "ferc1": "10.5072/zenodo.1070868",
"ferc2": "10.5281/zenodo.8006881",
# "ferc2": "10.5072/zenodo.1188447",
"ferc6": "10.5281/zenodo.7130141",
# "ferc6": "10.5072/zenodo.1098088",
"ferc60": "10.5281/zenodo.7130146",
# "ferc60": "10.5072/zenodo.1098089",
"ferc714": "10.5281/zenodo.7139875",
# "ferc714": "10.5072/zenodo.1098302",
}
API_ROOT = {
"sandbox": "https://sandbox.zenodo.org/api",
"production": "https://zenodo.org/api",
"censusdp1tract": ZenodoDoi(doi="10.5281/zenodo.4127049"),
# "censusdp1tract": ZenodoDoi(doi="10.5072/zenodo.674992"),
"eia860": ZenodoDoi(doi="10.5281/zenodo.8164776"),
# "eia860": ZenodoDoi(doi="10.5072/zenodo.1222854"),
"eia860m": ZenodoDoi(doi="10.5281/zenodo.8188017"),
# "eia860m": ZenodoDoi(doi="10.5072/zenodo.1225517"),
"eia861": ZenodoDoi(doi="10.5281/zenodo.8231268"),
# "eia861": ZenodoDoi(doi="10.5072/zenodo.1229930"),
"eia923": ZenodoDoi(doi="10.5281/zenodo.8172818"),
# "eia923": ZenodoDoi(doi="10.5072/zenodo.1217724"),
"eia_bulk_elec": ZenodoDoi(doi="10.5281/zenodo.7067367"),
# "eia_bulk_elec": ZenodoDoi(doi="10.5072/zenodo.1103572)",
"epacamd_eia": ZenodoDoi(doi="10.5281/zenodo.7900974"),
# "epacamd_eia": ZenodoDoi(doi="10.5072/zenodo.1199170"),
"epacems": ZenodoDoi(doi="10.5281/zenodo.6910058"),
# "epacems": ZenodoDoi(doi="10.5072/zenodo.672963"),
"ferc1": ZenodoDoi(doi="10.5281/zenodo.7314437"),
# "ferc1": ZenodoDoi(doi="10.5072/zenodo.1070868"),
"ferc2": ZenodoDoi(doi="10.5281/zenodo.8006881"),
# "ferc2": ZenodoDoi(doi="10.5072/zenodo.1188447"),
"ferc6": ZenodoDoi(doi="10.5281/zenodo.7130141"),
# "ferc6": ZenodoDoi(doi="10.5072/zenodo.1098088"),
"ferc60": ZenodoDoi(doi="10.5281/zenodo.7130146"),
# "ferc60": ZenodoDoi(doi="10.5072/zenodo.1098089"),
"ferc714": ZenodoDoi(doi="10.5281/zenodo.7139875"),
# "ferc714": ZenodoDoi(doi="10.5072/zenodo.1098302"),
}

def __init__(self, timeout: float = 15.0):
"""Constructs ZenodoFetcher instance.

Args:
timeout (float): timeout (in seconds) for http requests.
timeout: timeout (in seconds) for http requests.
"""
self._dataset_to_doi = self.DOI
self._descriptor_cache: dict[str, DatapackageDescriptor] = {}

self.timeout = timeout
retries = Retry(
backoff_factor=2, total=3, status_forcelist=[429, 500, 502, 503, 504]
)
adapter = HTTPAdapter(max_retries=retries)

self.http = requests.Session()
self.http.mount("http://", adapter)
self.http.mount("https://", adapter)

def _fetch_from_url(self, url: str) -> requests.Response:
def _fetch_from_url(self: Self, url: str, token: str) -> requests.Response:
logger.info(f"Retrieving {url} from zenodo")
if "sandbox" in url:
token = self.TOKEN["sandbox"]
else:
token = self.TOKEN["production"]
response = self.http.get(
url, params={"access_token": token}, timeout=self.timeout
)
Expand All @@ -234,62 +267,49 @@ def _fetch_from_url(self, url: str) -> requests.Response:
else:
raise ValueError(f"Could not download {url}: {response.text}")

def _doi_to_url(self, doi: str) -> str:
"""Returns url that holds the datapackage for given doi."""
match = re.search(r"(10\.5072|10\.5281)/zenodo.([\d]+)", doi)

if match is None:
raise ValueError(f"Invalid Zenodo DOI: {doi}")

doi_prefix = match.groups()[0]
zenodo_id = match.groups()[1]
if doi_prefix == "10.5072":
api_root = self.API_ROOT["sandbox"]
elif doi_prefix == "10.5281":
api_root = self.API_ROOT["production"]
else:
raise ValueError(f"Invalid Zenodo DOI: {doi}")
return f"{api_root}/deposit/depositions/{zenodo_id}"

def get_descriptor(self, dataset: str) -> DatapackageDescriptor:
"""Returns class:`DatapackageDescriptor` for given dataset."""
doi = self._dataset_to_doi.get(dataset, False)
"""Returns :class:`DatapackageDescriptor` for given dataset."""
doi = self.dois.get(dataset, False)
if not doi:
raise KeyError(f"No doi found for dataset {dataset}")
if doi not in self._descriptor_cache:
dpkg = self._fetch_from_url(self._doi_to_url(doi))
raise KeyError(f"No DOI found for dataset {dataset}")
if doi.doi not in self._descriptor_cache:
dpkg = self._fetch_from_url(url=doi.url, token=doi.token)
for f in dpkg.json()["files"]:
if f["filename"] == "datapackage.json":
resp = self._fetch_from_url(f["links"]["download"])
self._descriptor_cache[doi] = DatapackageDescriptor(
resp.json(), dataset=dataset, doi=doi
resp = self._fetch_from_url(
url=f["links"]["download"], token=doi.token
)
self._descriptor_cache[doi.doi] = DatapackageDescriptor(
resp.json(), dataset=dataset, doi=doi.doi
)
break
else:
raise RuntimeError(
f"Zenodo datapackage for {dataset}/{doi} does not contain valid datapackage.json"
)
return self._descriptor_cache[doi]
return self._descriptor_cache[doi.doi]

def get_resource_key(self, dataset: str, name: str) -> PudlResourceKey:
"""Returns PudlResourceKey for given resource."""
return PudlResourceKey(dataset, self._dataset_to_doi[dataset], name)
return PudlResourceKey(dataset, self.dois[dataset].doi, name)

def get_doi(self, dataset: str) -> str:
def get_doi(self, dataset: str) -> ZenodoDoi:
"""Returns DOI for given dataset."""
return self._dataset_to_doi[dataset]
return self.dois[dataset]

def get_resource(self, res: PudlResourceKey) -> bytes:
"""Given resource key, retrieve contents of the file from zenodo."""
desc = self.get_descriptor(res.dataset)
url = desc.get_resource_path(res.name)
content = self._fetch_from_url(url).content
content = self._fetch_from_url(
url=url, token=ZenodoDoi(doi=res.doi).token
).content
desc.validate_checksum(res.name, content)
return content

def get_known_datasets(self) -> list[str]:
"""Returns list of supported datasets."""
return sorted(self._dataset_to_doi)
return sorted(self.dois)


class Datastore:
Expand Down Expand Up @@ -340,7 +360,7 @@ def get_known_datasets(self) -> list[str]:

def get_datapackage_descriptor(self, dataset: str) -> DatapackageDescriptor:
"""Fetch datapackage descriptor for dataset either from cache or Zenodo."""
doi = self._zenodo_fetcher.get_doi(dataset)
doi = self._zenodo_fetcher.get_doi(dataset).doi
if doi not in self._datapackage_descriptors:
res = PudlResourceKey(dataset, doi, "datapackage.json")
if self._cache.contains(res):
Expand Down Expand Up @@ -442,7 +462,7 @@ def __call__(self, parser, namespace, values, option_string=None):

def parse_command_line():
"""Collect the command line arguments."""
dois = "\n".join([f" - {x}" for x in ZenodoFetcher.DOI])
dois = "\n".join([f" - {x}" for x in ZenodoFetcher.dois])

dataset_msg = f"""
Available Production Datasets:
Expand Down
12 changes: 6 additions & 6 deletions test/unit/workspace/datastore_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,18 +247,18 @@ def test_doi_format_is_correct(self):
for dataset in ds.get_known_datasets():
doi = ds.get_doi(dataset)
self.assertFalse(
re.fullmatch(r"10\.5072/zenodo\.[0-9]{5,10}", doi),
msg=f"Zenodo sandbox DOI found for {dataset}: {doi}",
re.fullmatch(r"10\.5072/zenodo\.[0-9]{5,10}", doi.doi),
msg=f"Zenodo sandbox DOI found for {dataset}: {doi.doi}",
)
self.assertTrue(
re.fullmatch(r"10\.5281/zenodo\.[0-9]{5,10}", doi),
msg=f"Zenodo production DOI for {dataset} is {doi}",
re.fullmatch(r"10\.5281/zenodo\.[0-9]{5,10}", doi.doi),
msg=f"Zenodo production DOI for {dataset} is {doi.doi}",
)

def test_get_known_datasets(self):
"""Call to get_known_datasets() produces the expected results."""
self.assertEqual(
sorted(datastore.ZenodoFetcher.DOI),
sorted(datastore.ZenodoFetcher.dois),
self.fetcher.get_known_datasets(),
)

Expand All @@ -267,7 +267,7 @@ def test_doi_of_prod_epacems_matches(self):

This test verifies that the expected value is in use.
"""
self.assertEqual(self.PROD_EPACEMS_DOI, self.fetcher.get_doi("epacems"))
self.assertEqual(self.PROD_EPACEMS_DOI, self.fetcher.get_doi("epacems").doi)

@responses.activate
def test_get_descriptor_http_calls(self):
Expand Down