Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mc fix archives bug #19

Merged
merged 3 commits into from
Jun 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,10 @@ This tool uses the [PDAP API](https://docs.pdap.io/api/endpoints/data-sources-da
Then, it uses the PDAP API to update the Data Sources' `last_cached` and `url_status` properties.

The script is set up to run with a GitHub Actions workflow.

Requires the following environment variables to be set:

```text
VUE_APP_PDAP_API_KEY=<YOUR_PDAP_API_KEY>
VITE_VUE_APP_BASE_URL=<YOUR_PDAP_API_URL>
```
271 changes: 181 additions & 90 deletions cache_url.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,32 @@
import json
from datetime import datetime, timedelta
from concurrent.futures import as_completed, ThreadPoolExecutor
from dataclasses import dataclass
from tqdm import tqdm
import requests
import os
import time

# How long to wait in between archive requests, in seconds
# Too many requests will result in the IP being temporarily blocked: https://archive.org/details/toomanyrequests_20191110
ARCHIVE_WAIT_TIME = 7

class ArchiveLastCacheNotFoundError(Exception):
pass


@dataclass
class ArchiveEntry:
url: str
last_archived: datetime
broken_source_url_as_of: datetime | None
source_url: str
update_delta: timedelta

@classmethod
def from_dict(cls, dict_entry: dict):
return cls(**dict_entry)


API_KEY = "Bearer " + os.getenv("VUE_APP_PDAP_API_KEY")
UPDATE_FREQUENCY_MAPPING = {
"Incident-based": 7,
Expand All @@ -24,113 +45,183 @@
}


def cache_url(entry):
def archive_url(entry: dict):
"""

:param entry:
:return:
"""
entry["broken_source_url_as_of"] = None
source_url = entry.get("source_url")
if source_url is None:
entry["broken_source_url_as_of"] = datetime.now().strftime("%Y-%m-%d")
# try:
entry_json = json.dumps(entry)
requests.put(
f"{os.getenv('VITE_VUE_APP_BASE_URL')}/archives",
json=entry_json,
headers={"Authorization": API_KEY},
)
raise Exception("No source_url")
# except Exception as error:
# print(str(error))
# exceptions.append({
# "source_url": source_url,
# "exception": str(error)})

update_delta = (
UPDATE_FREQUENCY_MAPPING[entry.get("update_frequency")]
if entry.get("update_frequency") is not None
else None
)
if update_delta is None:
update_delta = datetime.max - datetime.today()
else:
update_delta = timedelta(days=int(update_delta))

last_cached = entry.get("last_cached")
if last_cached is not None:
last_cached = datetime.strptime(last_cached, "%Y-%m-%d")
else:
# Check if website exists in archive and compare archived website to current site
last_cached = datetime.min
website_info_data = None
try:
website_info = requests.get(
f"https://archive.org/wayback/available?url={source_url}"
)
website_info_data = website_info.json()
if website_info_data["archived_snapshots"]:
website_info_data_last_cached = datetime.strptime(
website_info_data["archived_snapshots"]["closest"]["timestamp"],
"%Y%m%d%H%M%S",
)
if website_info_data_last_cached > last_cached:
last_cached = website_info_data_last_cached
except Exception as error:
# print(str(error))
website_info_data = {"archived_snapshots": None}

# Cache if never cached or more than update_delta days have passed since last_cache
if last_cached + update_delta < datetime.today():
try:
wait_then_post(entry, source_url, ARCHIVE_WAIT_TIME)
except Exception as error:
try:
time.sleep(1)
api_url = f"http://web.archive.org/save/{source_url}"
requests.post(api_url)
# Update the last_cached date if cache is successful
entry["last_cached"] = datetime.now().strftime("%Y-%m-%d")
wait_then_post(entry, source_url, 10)
except Exception as error:
try:
time.sleep(3)
requests.post(api_url)
# Update the last_cached date if cache is successful
entry["last_cached"] = datetime.now().strftime("%Y-%m-%d")
except:
print(str(error))
# exceptions.append({
# "source_url": source_url,
# "exception": str(error)})
else:
entry["last_cached"] = last_cached.strftime("%Y-%m-%d")

print(str(error))
# Send updated data to Data Sources
update_pdap_archives(entry)

def wait_then_post(entry: dict, source_url: str, wait_time: int):
"""
Wait then post to Internet Archive
:param entry:
:param source_url:
:param wait_time: The amount of time to wait
:return:
"""
api_url = f"http://web.archive.org/save/{source_url}"
time.sleep(wait_time)
requests.post(api_url)
# Update the last_cached date if cache is successful
entry["last_cached"] = datetime.now().strftime("%Y-%m-%d")


def handle_missing_source_url(entry: dict):
"""
Record when url was found to be missing,
update PDAP archives, and throw exception
:param entry:
:return:
"""
entry["broken_source_url_as_of"] = datetime.now().strftime("%Y-%m-%d")
update_pdap_archives(entry)
raise Exception("No source_url")


def update_pdap_archives(entry: dict):
"""
Update data in PDAP archives
:param entry:
:return:
"""
entry_json = json.dumps(entry)
requests.put(
response = requests.put(
f"{os.getenv('VITE_VUE_APP_BASE_URL')}/archives",
json=entry_json,
headers={"Authorization": API_KEY},
)
response.raise_for_status()



def get_update_delta(update_frequency: str | None) -> timedelta:
"""
Calculate update delt based on entry's update frequency
:param entry:
:return:
"""
try:
update_delta = UPDATE_FREQUENCY_MAPPING[update_frequency]
except KeyError:
return datetime.max - datetime.today()
if update_delta is None:
return datetime.max - datetime.today()
return timedelta(days=int(update_delta))


def get_website_info_data_last_cached(source_url) -> datetime:
website_info_data = get_website_info_data(source_url)
if not website_info_data["archived_snapshots"]:
raise ArchiveLastCacheNotFoundError
return datetime.strptime(
website_info_data["archived_snapshots"]["closest"]["timestamp"],
"%Y%m%d%H%M%S",
)


def get_last_archived(last_archived: str | None, source_url: str) -> datetime:
"""
Get last archived date of website from Internet Archive.
:param entry:
:param source_url:
:return:
"""
if last_archived is not None:
try:
return datetime.strptime(last_archived, "%Y-%m-%d")
except ValueError:
return datetime.min
# Check if website exists in archive and compare archived website to current site
last_archived = datetime.min
try:
website_info_data_last_cached = get_website_info_data_last_cached(source_url)
except ArchiveLastCacheNotFoundError:
return last_archived
if website_info_data_last_cached > last_archived:
return website_info_data_last_cached
return last_archived


def get_website_info_data(source_url):
website_info = requests.get(
f"https://archive.org/wayback/available?url={source_url}"
)
website_info_data = website_info.json()
return website_info_data


def main():
data = get_from_archives()
extract_url_info_and_archived_if_needed(data)


def extract_url_info_and_archived_if_needed(data: list[dict]):
"""

:param data:
:return:
"""
# Create a tuple of entries with missing source URLs
missing_source_url_entries = tuple(filter(missing_source_url, data))

# Handle entries with missing source URLs
print("Handling missing source urls")
for entry in tqdm(missing_source_url_entries):
handle_missing_source_url(entry)

print("\nFinding entries that need updates")
non_missing_source_url_entries = tuple(filter(lambda e: not missing_source_url(e), data))
entries_needing_updates = []
for entry in tqdm(non_missing_source_url_entries):
if needs_updated(entry):
entries_needing_updates.append(entry)

print(f"Updating {len(entries_needing_updates)} entries that need updates")
# Handle entries that need to be updated
for entry in tqdm(entries_needing_updates):
try:
archive_url(entry)
except Exception as error:
print(str(error))

def missing_source_url(entry: dict):
return entry['source_url'] is None

def needs_updated(entry: dict) -> bool:
"""
Check if entry needs to be updated
:param entry:
:return:
"""
last_archived = get_last_archived(entry["last_cached"], entry["source_url"])
update_delta = get_update_delta(entry["update_frequency"])
return last_archived + update_delta < datetime.now()

def get_from_archives() -> list[dict]:
"""
Get data from PDAP Archive.
:param url:
:return:
"""
response = requests.get(
f"{os.getenv('VITE_VUE_APP_BASE_URL')}/archives",
headers={"Authorization": API_KEY},
)
data = response.json()

# Extract url info and cache if needed
exceptions = []
if data is not str:
with ThreadPoolExecutor(max_workers=100) as executor:
print("Caching urls...")
future_cache = [executor.submit(cache_url, entry) for entry in data]

for future in tqdm(as_completed(future_cache), total=len(future_cache)):
future.result()

# Write any exceptions to a daily error log
# file_name = "ErrorLogs/" + datetime.now().strftime("%Y-%m-%d") + "_errorlog.txt"
# with open(file_name, "w") as error_log:
# if len(exceptions) > 0:
# json.dump(exceptions, fp=error_log, indent=4)
# else:
# error_log.write("no exceptions thrown")
response.raise_for_status()
return response.json()


if __name__ == "__main__":
Expand Down
2 changes: 2 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,5 @@ idna==3.4
requests==2.28.2
savepagenow==1.2.3
urllib3==1.26.15
pytest==8.2.1
tqdm==4.66.4