Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

One RIKOLTI_DATA to rule them all #1121

Merged
merged 12 commits into from
Oct 8, 2024
15 changes: 5 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,18 +48,16 @@ vi env.local

Currently, I only use one virtual environment, even though each folder located at the root of this repository represents an isolated component. If dependency conflicts are encountered, I'll wind up creating separate environments.

Similarly, I also only use one env.local as well. Rikolti fetches data to your local system, maps that data, and then fetches relevant content files (media files, previews, and thumbnails). Set `VERNACULAR_DATA` to the URI where you would like Rikolti to store and retrieve fetched data - Rikolti will create a folder (or s3 prefix) `<collection_id>/vernacular_metadata` at this location. Set `MAPPED_DATA` to the URI where you would like Rikolti to store and retrieve mapped data - Rikolti will create a folder (or s3 prefix) `<collection_id>/mapped_metadata` at this location. Set `WITH_CONTENT_URL_DATA` to the URI where you would like Rikolti to store mapped data that has been updated with urls to content files - Rikolti will create a folder (or s3 prefix) `<collection_id>/with_content_urls` at this location. Set `CONTENT_ROOT` to the URI where you would like Rikolti to store content files.
Similarly, I also only use one env.local as well. Rikolti fetches data to your local system, maps that data, and then fetches relevant content files (media files, previews, and thumbnails). Set `RIKOLTI_DATA` to the URI where you would like Rikolti to store and retrieve data - Rikolti will create a folder (or s3 prefix) `<collection_id>/vernacular_metadata` at this location. Set `RIKOLTI_CONTENT` to the URI where you would like Rikolti to store content files.

For example, one way to configure `env.local` is:

```
VERNACULAR_DATA=file:///Users/awieliczka/Projects/rikolti/rikolti_data
MAPPED_DATA=$VERNACULAR_DATA
WITH_CONTENT_URL_DATA=$VERNACULAR_DATA
CONTENT_ROOT=file:///Users/awieliczka/Projects/rikolti/rikolti_content
RIKOLTI_DATA=file:///Users/awieliczka/Projects/rikolti/rikolti_data
RIKOLTI_CONTENT=file:///Users/awieliczka/Projects/rikolti/rikolti_content
```

Each of these can be different locations, however. For example, if you're attempting to re-run a mapper locally off of previously fetched data stored on s3, you might set `VERNACULAR_DATA=s3://rikolti_data`.
Each of these can be different locations, however. For example, if you're attempting to re-run a mapper locally off of previously fetched data stored on s3, you might set `RIKOLTI_DATA=s3://rikolti_data`.

In env.example you'll also see `METADATA_MOUNT` and `CONTENT_MOUNT` environment variables. These are only relevant if you are running the content harvester using airflow, and want to set and of the CONTENT_ environment variables to the local filesystem. Their usage is described below in the Airflow Development section.

Expand Down Expand Up @@ -170,10 +168,7 @@ These env vars are used in the `aws-mwaa-local-runner/docker/docker-compose-loca
Next, back in the Rikolti repository, create the `startup.sh` file by running `cp env.example dags/startup.sh`. Update the startup.sh file with Nuxeo, Flickr, and Solr keys as available, and make sure that the following environment variables are set:

```
export VERNACULAR_DATA=file:///usr/local/airflow/rikolti_data
export MAPPED_DATA=file:///usr/local/airflow/rikolti_data
export WITH_CONTENT_URL_DATA=file:///usr/local/airflow/rikolti_data
export MERGED_DATA=file:///usr/local/airflow/rikolti_data
export RIKOLTI_DATA=file:///usr/local/airflow/rikolti_data
```

The folder located at `RIKOLTI_DATA_HOME` (set in `aws-mwaa-local-runner/docker/.env`) is mounted to `/usr/local/airflow/rikolti_data` on the airflow docker container.
Expand Down
2 changes: 1 addition & 1 deletion content_harvester/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ The above media and thumbnail fetching processes are enacted upon child metadata

# Settings

You can bypass uploading to s3 by setting `WITH_CONTENT_URL_DATA = "file://<local path>"` and `CONTENT_ROOT = "file://<local_path>"`. This is useful for local development and testing. This will, however, set the metadata records' `media['media_filepath']` and `thumbnail['thumbnail_filepath']` to a local filepath (thus rendering the output useless for publishing).
You can bypass uploading to s3 by setting `RIKOLTI_DATA = "file://<local path>"` and `RIKOLTI_CONTENT = "file://<local_path>"`. This is useful for local development and testing. This will, however, set the metadata records' `media['media_filepath']` and `thumbnail['thumbnail_filepath']` to a local filepath (thus rendering the output useless for publishing).

# Local Development

Expand Down
4 changes: 2 additions & 2 deletions content_harvester/by_collection.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
from .by_page import harvest_page_content
from . import settings
from rikolti.utils.versions import get_mapped_pages, create_with_content_urls_version
from rikolti.utils.versions import get_versioned_pages, create_with_content_urls_version


def harvest_collection_content(collection_id, mapper_type, mapped_data_version: str):
if not collection_id or not mapped_data_version:
print("Error: collection_id and mapped_data_version required")
exit()

page_list = get_mapped_pages(
page_list = get_versioned_pages(
mapped_data_version, **settings.AWS_CREDENTIALS)

print(f"{collection_id:<6}: Harvesting content for {len(page_list)} pages")
Expand Down
6 changes: 3 additions & 3 deletions content_harvester/by_page.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from .by_record import harvest_record_content

from rikolti.utils.versions import (
get_mapped_page_content, put_with_content_urls_page, get_version
get_versioned_page_as_json, put_versioned_page, get_version
)


Expand All @@ -18,7 +18,7 @@ def harvest_page_content(
mapped_version = get_version(collection_id, mapped_page_path)
page_filename = mapped_page_path.split(mapped_version + '/data/')[-1]

records = get_mapped_page_content(mapped_page_path)
records = get_versioned_page_as_json(mapped_page_path)
print(
f"Harvesting content for {len(records)} records at {mapped_page_path}")

Expand All @@ -42,7 +42,7 @@ def harvest_page_content(
f"record {record.get('calisphere-id')} in page {mapped_page_path}"
)

metadata_with_content_urls = put_with_content_urls_page(
metadata_with_content_urls = put_versioned_page(
json.dumps(records), page_filename, with_content_urls_version)

media_source = [r for r in records if r.get('media_source')]
Expand Down
4 changes: 2 additions & 2 deletions content_harvester/by_record.py
Original file line number Diff line number Diff line change
Expand Up @@ -400,9 +400,9 @@ def create_thumbnail_component(

def upload_content(filepath: str, destination: str) -> str:
'''
upload file to CONTENT_ROOT
upload file to RIKOLTI_CONTENT
'''
content_root = os.environ.get("CONTENT_ROOT", 'file:///tmp')
content_root = os.environ.get("RIKOLTI_CONTENT", 'file:///tmp')
content_path = f"{content_root.rstrip('/')}/{destination}"
upload_file(filepath, content_path)
return content_path
5 changes: 2 additions & 3 deletions content_harvester/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,8 @@ services:
- ./:/content_harvester
- ../utils:/rikolti/utils
environment:
- MAPPED_DATA=file:///rikolti_data
- WITH_CONTENT_URL_DATA=file:///rikolti_data
- CONTENT_ROOT=file:///rikolti_content
- RIKOLTI_DATA=file:///rikolti_data
- RIKOLTI_CONTENT=file:///rikolti_content
- NUXEO_USER=${NUXEO_USER}
- NUXEO_PASS=${NUXEO_PASS}
- CONTENT_COMPONENT_CACHE=${CONTENT_COMPONENT_CACHE}
Expand Down
7 changes: 3 additions & 4 deletions dags/dev_dags/mapper_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,7 @@
from rikolti.dags.shared_tasks.shared import batched
from rikolti.utils.versions import get_most_recent_vernacular_version
from rikolti.utils.versions import get_most_recent_mapped_version
from rikolti.utils.versions import get_vernacular_pages
from rikolti.utils.versions import get_mapped_pages
from rikolti.utils.versions import get_versioned_pages


@task(task_id="get_vernacular_page_batches")
Expand All @@ -22,7 +21,7 @@ def get_vernacular_page_batches_task(
vernacular_version = params.get('vernacular_version') if params else None
if not vernacular_version:
vernacular_version = get_most_recent_vernacular_version(collection_id)
pages = get_vernacular_pages(vernacular_version)
pages = get_versioned_pages(vernacular_version)
# TODO: split page_list into pages and children?

# 1024 is the maximum number of fanout tasks allowed
Expand All @@ -38,7 +37,7 @@ def get_mapped_pages_task(params: Optional[dict] = None):
mapped_version = params.get('mapped_version') if params else None
if not mapped_version:
mapped_version = get_most_recent_mapped_version(collection_id)
pages = get_mapped_pages(mapped_version)
pages = get_versioned_pages(mapped_version)
return pages

# This is a functional duplicate of
Expand Down
16 changes: 8 additions & 8 deletions dags/harvest_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
from rikolti.dags.shared_tasks.mapping_tasks import mapping_tasks
from rikolti.dags.shared_tasks.content_harvest_tasks import content_harvesting_tasks
from rikolti.utils.versions import (
get_child_directories, get_with_content_urls_pages,
get_with_content_urls_page_content, get_child_pages,
create_merged_version, put_merged_page)
get_child_directories, get_versioned_pages,
get_versioned_page_as_json, get_child_pages,
create_merged_version, put_versioned_page)
from rikolti.dags.shared_tasks.indexing_tasks import stage_collection_task


Expand All @@ -27,7 +27,7 @@ def get_child_records(version, parent_id) -> list:
children = [page for page in children
if (page.rsplit('/')[-1]).startswith(parent_id)]
for child in children:
child_records.extend(get_with_content_urls_page_content(child))
child_records.extend(get_versioned_page_as_json(child))
return child_records

def get_child_thumbnail(child_records):
Expand All @@ -38,7 +38,7 @@ def get_child_thumbnail(child_records):
@task(task_id="merge_any_child_records",
on_failure_callback=notify_rikolti_failure)
def merge_any_child_records_task(version, **context):
with_content_urls_pages = get_with_content_urls_pages(version)
with_content_urls_pages = get_versioned_pages(version)

# Recurse through the record's children (if any)
child_directories = get_child_directories(version)
Expand All @@ -52,7 +52,7 @@ def merge_any_child_records_task(version, **context):
merged_pages = []
child_count_by_record = {}
for page_path in parent_pages:
parent_records = get_with_content_urls_page_content(page_path)
parent_records = get_versioned_page_as_json(page_path)
for record in parent_records:
calisphere_id = record['calisphere-id']
child_records = get_child_records(version, calisphere_id)
Expand All @@ -70,9 +70,9 @@ def merge_any_child_records_task(version, **context):
if child_thumbnail:
record['thumbnail'] = child_thumbnail
merged_pages.append(
put_merged_page(
put_versioned_page(
json.dumps(parent_records),
os.path.basename(page_path),
f"{os.path.basename(page_path)}.jsonl",
merged_version
)
)
Expand Down
30 changes: 10 additions & 20 deletions dags/shared_tasks/content_harvest_operators.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,15 +70,11 @@ def __init__(self, collection_id=None, with_content_urls_version=None, pages=Non
],
"environment": [
{
"name": "MAPPED_DATA",
"name": "RIKOLTI_DATA",
"value": "s3://rikolti-data"
},
{
"name": "WITH_CONTENT_URL_DATA",
"value": "s3://rikolti-data"
},
{
"name": "CONTENT_ROOT",
"name": "RIKOLTI_CONTENT",
"value": "s3://rikolti-content"
},
{
Expand Down Expand Up @@ -174,20 +170,15 @@ def __init__(self, collection_id, with_content_urls_version, pages, mapper_type,
container_name = (
f"content_harvester_{collection_id}_{page_basename.split('.')[0]}")

if os.environ.get('MAPPED_DATA', '').startswith('s3'):
mapped_data = os.environ.get('MAPPED_DATA')
else:
mapped_data = "file:///rikolti_data"

if os.environ.get('WITH_CONTENT_URL_DATA', '').startswith('s3'):
with_content_url_data = os.environ.get('WITH_CONTENT_URL_DATA')
if os.environ.get('RIKOLTI_DATA', '').startswith('s3'):
rikolti_data = os.environ.get('RIKOLTI_DATA')
else:
with_content_url_data = "file:///rikolti_data"
rikolti_data = "file:///rikolti_data"

if os.environ.get('CONTENT_ROOT', '').startswith('s3'):
content_root = os.environ.get('CONTENT_ROOT')
if os.environ.get('RIKOLTI_CONTENT', '').startswith('s3'):
rikolti_content = os.environ.get('RIKOLTI_CONTENT')
else:
content_root = "file:///rikolti_content"
rikolti_content = "file:///rikolti_content"

prefix, pages = extract_prefix_from_pages(pages)
args = {
Expand All @@ -205,9 +196,8 @@ def __init__(self, collection_id, with_content_urls_version, pages, mapper_type,
"mounts": mounts,
"mount_tmp_dir": False,
"environment": {
"MAPPED_DATA": mapped_data,
"WITH_CONTENT_URL_DATA": with_content_url_data,
"CONTENT_ROOT": content_root,
"RIKOLTI_DATA": rikolti_data,
"RIKOLTI_CONTENT": rikolti_content,
"CONTENT_COMPONENT_CACHE": os.environ.get("CONTENT_COMPONENT_CACHE"),
"NUXEO_USER": os.environ.get("NUXEO_USER"),
"NUXEO_PASS": os.environ.get("NUXEO_PASS")
Expand Down
6 changes: 3 additions & 3 deletions dags/shared_tasks/indexing_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from rikolti.record_indexer.index_collection import (
index_collection, delete_collection)
from rikolti.utils.versions import (
get_version, get_merged_pages, get_with_content_urls_pages)
get_version, get_versioned_pages)

def index_collection_task(alias, collection, version_pages, context):
collection_id = collection.get('id')
Expand Down Expand Up @@ -106,9 +106,9 @@ def get_version_pages(params=None):
version = params.get('version')

if 'merged' in version:
version_pages = get_merged_pages(version)
version_pages = get_versioned_pages(version, recursive=False)
else:
version_pages = get_with_content_urls_pages(version)
version_pages = get_versioned_pages(version)

return version_pages

Expand Down
2 changes: 1 addition & 1 deletion dags/shared_tasks/mapping_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ def get_mapping_status_task(

def print_s3_link(version_page, mapped_version):
# create a link to the file in the logs
data_root = os.environ.get("MAPPED_DATA", "file:///tmp").rstrip('/')
data_root = os.environ.get("RIKOLTI_DATA", "file:///tmp").rstrip('/')
if data_root.startswith('s3'):
s3_path = urlparse(f"{data_root}/{version_page}")
bucket = s3_path.netloc
Expand Down
7 changes: 2 additions & 5 deletions env.example
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,7 @@
export RIKOLTI_EVENTS_SNS_TOPIC= # ask for the topic ARN

# metadata versions
export VERNACULAR_DATA=file:///usr/local/airflow/rikolti_data
export MAPPED_DATA=$VERNACULAR_DATA
export WITH_CONTENT_URL_DATA=$VERNACULAR_DATA
export MERGED_DATA=$VERNACULAR_DATA
export RIKOLTI_DATA=file:///usr/local/airflow/rikolti_data

# metadata_fetcher
export NUXEO= # ask for a key - required to run the NuxeoFetcher
Expand All @@ -32,7 +29,7 @@ export UCLDC_COUCH_URL="https://harvest-prd.cdlib.org/" # this is co
# content_harvester
export NUXEO_USER= # ask for a user/pass - required to fetch Nuxeo Content
export NUXEO_PASS=
export CONTENT_ROOT=file:///usr/local/airflow/rikolti_content
export RIKOLTI_CONTENT=file:///usr/local/airflow/rikolti_content
export CONTENT_COMPONENT_CACHE= # s3://<bucket-name>/<optional_prefix>

# indexer
Expand Down
4 changes: 2 additions & 2 deletions metadata_fetcher/fetchers/Fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from typing import Optional

from requests.adapters import HTTPAdapter, Retry
from rikolti.utils.versions import put_vernacular_page
from rikolti.utils.versions import put_versioned_page


logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -84,7 +84,7 @@ def fetch_page(self) -> FetchedPageStatus:
filepath = None
content = self.aggregate_vernacular_content(response)
try:
filepath = put_vernacular_page(
filepath = put_versioned_page(
content, self.write_page, self.vernacular_version)
except Exception as e:
print(f"Metadata Fetcher: {e}")
Expand Down
6 changes: 3 additions & 3 deletions metadata_fetcher/fetchers/nuxeo_fetcher.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import json
import logging
from urllib.parse import quote as urllib_quote
from rikolti.utils.versions import put_vernacular_page
from rikolti.utils.versions import put_versioned_page

import requests

Expand Down Expand Up @@ -137,7 +137,7 @@ def get_pages_of_record_components(self, record: dict):
more_component_pages = False
continue

child_version_page = put_vernacular_page(
child_version_page = put_versioned_page(
component_resp.text,
f"children/{record['uid']}-{component_page_count}",
self.vernacular_version
Expand Down Expand Up @@ -191,7 +191,7 @@ def get_pages_of_records(self, folder: dict, page_prefix: list):
more_pages_of_records = False
continue

version_page = put_vernacular_page(
version_page = put_versioned_page(
document_resp.text,
f"{'-'.join(page_prefix)}-p{record_page_count}",
self.vernacular_version
Expand Down
4 changes: 2 additions & 2 deletions metadata_fetcher/fetchers/ucd_json_fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from bs4 import BeautifulSoup

from .Fetcher import Fetcher, FetchError, FetchedPageStatus
from rikolti.utils.versions import put_vernacular_page
from rikolti.utils.versions import put_versioned_page

class UcdJsonFetcher(Fetcher):
def __init__(self, params: dict[str]):
Expand Down Expand Up @@ -70,7 +70,7 @@ def fetch_all_pages(
records = [self.fetch_json_ld(url) for url in urls]
document_count = len(records)
try:
filepath = put_vernacular_page(
filepath = put_versioned_page(
json.dumps(records), self.write_page, self.vernacular_version)
fetch_status.append(FetchedPageStatus(document_count, filepath))
except Exception as e:
Expand Down
Loading
Loading