diff --git a/README.md b/README.md index 57ffe15..f41b50e 100644 --- a/README.md +++ b/README.md @@ -4,3 +4,10 @@ This tool uses the [PDAP API](https://docs.pdap.io/api/endpoints/data-sources-da Then, it uses the PDAP API to update the Data Sources' `last_cached` and `url_status` properties. The script is set up to run with a GitHub Actions workflow. + +Requires the following environment variables to be set: + +```text +VUE_APP_PDAP_API_KEY= +VITE_VUE_APP_BASE_URL= +``` \ No newline at end of file diff --git a/cache_url.py b/cache_url.py index 44c4336..5d82c2f 100644 --- a/cache_url.py +++ b/cache_url.py @@ -1,11 +1,32 @@ import json from datetime import datetime, timedelta -from concurrent.futures import as_completed, ThreadPoolExecutor +from dataclasses import dataclass from tqdm import tqdm import requests import os import time +# How long to wait in between archive requests, in seconds +# Too many requests will result in the IP being temporarily blocked: https://archive.org/details/toomanyrequests_20191110 +ARCHIVE_WAIT_TIME = 7 + +class ArchiveLastCacheNotFoundError(Exception): + pass + + +@dataclass +class ArchiveEntry: + url: str + last_archived: datetime + broken_source_url_as_of: datetime | None + source_url: str + update_delta: timedelta + + @classmethod + def from_dict(cls, dict_entry: dict): + return cls(**dict_entry) + + API_KEY = "Bearer " + os.getenv("VUE_APP_PDAP_API_KEY") UPDATE_FREQUENCY_MAPPING = { "Incident-based": 7, @@ -24,113 +45,183 @@ } -def cache_url(entry): +def archive_url(entry: dict): + """ + + :param entry: + :return: + """ entry["broken_source_url_as_of"] = None source_url = entry.get("source_url") - if source_url is None: - entry["broken_source_url_as_of"] = datetime.now().strftime("%Y-%m-%d") - # try: - entry_json = json.dumps(entry) - requests.put( - f"{os.getenv('VITE_VUE_APP_BASE_URL')}/archives", - json=entry_json, - headers={"Authorization": API_KEY}, - ) - raise Exception("No source_url") - # except Exception as error: - # print(str(error)) - # exceptions.append({ - # "source_url": source_url, - # "exception": str(error)}) - - update_delta = ( - UPDATE_FREQUENCY_MAPPING[entry.get("update_frequency")] - if entry.get("update_frequency") is not None - else None - ) - if update_delta is None: - update_delta = datetime.max - datetime.today() - else: - update_delta = timedelta(days=int(update_delta)) - - last_cached = entry.get("last_cached") - if last_cached is not None: - last_cached = datetime.strptime(last_cached, "%Y-%m-%d") - else: - # Check if website exists in archive and compare archived website to current site - last_cached = datetime.min - website_info_data = None - try: - website_info = requests.get( - f"https://archive.org/wayback/available?url={source_url}" - ) - website_info_data = website_info.json() - if website_info_data["archived_snapshots"]: - website_info_data_last_cached = datetime.strptime( - website_info_data["archived_snapshots"]["closest"]["timestamp"], - "%Y%m%d%H%M%S", - ) - if website_info_data_last_cached > last_cached: - last_cached = website_info_data_last_cached - except Exception as error: - # print(str(error)) - website_info_data = {"archived_snapshots": None} - # Cache if never cached or more than update_delta days have passed since last_cache - if last_cached + update_delta < datetime.today(): + try: + wait_then_post(entry, source_url, ARCHIVE_WAIT_TIME) + except Exception as error: try: - time.sleep(1) - api_url = f"http://web.archive.org/save/{source_url}" - requests.post(api_url) - # Update the last_cached date if cache is successful - entry["last_cached"] = datetime.now().strftime("%Y-%m-%d") + wait_then_post(entry, source_url, 10) except Exception as error: - try: - time.sleep(3) - requests.post(api_url) - # Update the last_cached date if cache is successful - entry["last_cached"] = datetime.now().strftime("%Y-%m-%d") - except: - print(str(error)) - # exceptions.append({ - # "source_url": source_url, - # "exception": str(error)}) - else: - entry["last_cached"] = last_cached.strftime("%Y-%m-%d") - + print(str(error)) # Send updated data to Data Sources + update_pdap_archives(entry) + +def wait_then_post(entry: dict, source_url: str, wait_time: int): + """ + Wait then post to Internet Archive + :param entry: + :param source_url: + :param wait_time: The amount of time to wait + :return: + """ + api_url = f"http://web.archive.org/save/{source_url}" + time.sleep(wait_time) + requests.post(api_url) + # Update the last_cached date if cache is successful + entry["last_cached"] = datetime.now().strftime("%Y-%m-%d") + + +def handle_missing_source_url(entry: dict): + """ + Record when url was found to be missing, + update PDAP archives, and throw exception + :param entry: + :return: + """ + entry["broken_source_url_as_of"] = datetime.now().strftime("%Y-%m-%d") + update_pdap_archives(entry) + raise Exception("No source_url") + + +def update_pdap_archives(entry: dict): + """ + Update data in PDAP archives + :param entry: + :return: + """ entry_json = json.dumps(entry) - requests.put( + response = requests.put( f"{os.getenv('VITE_VUE_APP_BASE_URL')}/archives", json=entry_json, headers={"Authorization": API_KEY}, ) + response.raise_for_status() + + + +def get_update_delta(update_frequency: str | None) -> timedelta: + """ + Calculate update delt based on entry's update frequency + :param entry: + :return: + """ + try: + update_delta = UPDATE_FREQUENCY_MAPPING[update_frequency] + except KeyError: + return datetime.max - datetime.today() + if update_delta is None: + return datetime.max - datetime.today() + return timedelta(days=int(update_delta)) + + +def get_website_info_data_last_cached(source_url) -> datetime: + website_info_data = get_website_info_data(source_url) + if not website_info_data["archived_snapshots"]: + raise ArchiveLastCacheNotFoundError + return datetime.strptime( + website_info_data["archived_snapshots"]["closest"]["timestamp"], + "%Y%m%d%H%M%S", + ) + + +def get_last_archived(last_archived: str | None, source_url: str) -> datetime: + """ + Get last archived date of website from Internet Archive. + :param entry: + :param source_url: + :return: + """ + if last_archived is not None: + try: + return datetime.strptime(last_archived, "%Y-%m-%d") + except ValueError: + return datetime.min + # Check if website exists in archive and compare archived website to current site + last_archived = datetime.min + try: + website_info_data_last_cached = get_website_info_data_last_cached(source_url) + except ArchiveLastCacheNotFoundError: + return last_archived + if website_info_data_last_cached > last_archived: + return website_info_data_last_cached + return last_archived + + +def get_website_info_data(source_url): + website_info = requests.get( + f"https://archive.org/wayback/available?url={source_url}" + ) + website_info_data = website_info.json() + return website_info_data def main(): + data = get_from_archives() + extract_url_info_and_archived_if_needed(data) + + +def extract_url_info_and_archived_if_needed(data: list[dict]): + """ + + :param data: + :return: + """ + # Create a tuple of entries with missing source URLs + missing_source_url_entries = tuple(filter(missing_source_url, data)) + + # Handle entries with missing source URLs + print("Handling missing source urls") + for entry in tqdm(missing_source_url_entries): + handle_missing_source_url(entry) + + print("\nFinding entries that need updates") + non_missing_source_url_entries = tuple(filter(lambda e: not missing_source_url(e), data)) + entries_needing_updates = [] + for entry in tqdm(non_missing_source_url_entries): + if needs_updated(entry): + entries_needing_updates.append(entry) + + print(f"Updating {len(entries_needing_updates)} entries that need updates") + # Handle entries that need to be updated + for entry in tqdm(entries_needing_updates): + try: + archive_url(entry) + except Exception as error: + print(str(error)) + +def missing_source_url(entry: dict): + return entry['source_url'] is None + +def needs_updated(entry: dict) -> bool: + """ + Check if entry needs to be updated + :param entry: + :return: + """ + last_archived = get_last_archived(entry["last_cached"], entry["source_url"]) + update_delta = get_update_delta(entry["update_frequency"]) + return last_archived + update_delta < datetime.now() + +def get_from_archives() -> list[dict]: + """ + Get data from PDAP Archive. + :param url: + :return: + """ response = requests.get( f"{os.getenv('VITE_VUE_APP_BASE_URL')}/archives", headers={"Authorization": API_KEY}, ) - data = response.json() - - # Extract url info and cache if needed - exceptions = [] - if data is not str: - with ThreadPoolExecutor(max_workers=100) as executor: - print("Caching urls...") - future_cache = [executor.submit(cache_url, entry) for entry in data] - - for future in tqdm(as_completed(future_cache), total=len(future_cache)): - future.result() - - # Write any exceptions to a daily error log - # file_name = "ErrorLogs/" + datetime.now().strftime("%Y-%m-%d") + "_errorlog.txt" - # with open(file_name, "w") as error_log: - # if len(exceptions) > 0: - # json.dump(exceptions, fp=error_log, indent=4) - # else: - # error_log.write("no exceptions thrown") + response.raise_for_status() + return response.json() if __name__ == "__main__": diff --git a/requirements.txt b/requirements.txt index 15378a5..b483a61 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,3 +5,5 @@ idna==3.4 requests==2.28.2 savepagenow==1.2.3 urllib3==1.26.15 +pytest==8.2.1 +tqdm==4.66.4 \ No newline at end of file