From 22d5356111df5181f477db84a58abcc2a599aaf5 Mon Sep 17 00:00:00 2001 From: amy wieliczka Date: Tue, 1 Oct 2024 18:09:08 -0700 Subject: [PATCH 01/12] Remove MAPPED_DATA, WITH_CONTENT_URLS_DATA, MERGED_DATA --- README.md | 13 ++-- content_harvester/README.md | 2 +- content_harvester/docker-compose.yml | 3 +- .../shared_tasks/content_harvest_operators.py | 20 ++----- dags/shared_tasks/mapping_tasks.py | 2 +- env.example | 5 +- metadata_mapper/lambda_function.py | 2 +- utils/versions.py | 60 +++++++++---------- 8 files changed, 44 insertions(+), 63 deletions(-) diff --git a/README.md b/README.md index 3182836f7..a707e6902 100644 --- a/README.md +++ b/README.md @@ -48,18 +48,16 @@ vi env.local Currently, I only use one virtual environment, even though each folder located at the root of this repository represents an isolated component. If dependency conflicts are encountered, I'll wind up creating separate environments. -Similarly, I also only use one env.local as well. Rikolti fetches data to your local system, maps that data, and then fetches relevant content files (media files, previews, and thumbnails). Set `VERNACULAR_DATA` to the URI where you would like Rikolti to store and retrieve fetched data - Rikolti will create a folder (or s3 prefix) `/vernacular_metadata` at this location. Set `MAPPED_DATA` to the URI where you would like Rikolti to store and retrieve mapped data - Rikolti will create a folder (or s3 prefix) `/mapped_metadata` at this location. Set `WITH_CONTENT_URL_DATA` to the URI where you would like Rikolti to store mapped data that has been updated with urls to content files - Rikolti will create a folder (or s3 prefix) `/with_content_urls` at this location. Set `CONTENT_ROOT` to the URI where you would like Rikolti to store content files. +Similarly, I also only use one env.local as well. Rikolti fetches data to your local system, maps that data, and then fetches relevant content files (media files, previews, and thumbnails). Set `RIKOLTI_DATA` to the URI where you would like Rikolti to store and retrieve data - Rikolti will create a folder (or s3 prefix) `/vernacular_metadata` at this location. Set `CONTENT_ROOT` to the URI where you would like Rikolti to store content files. For example, one way to configure `env.local` is: ``` -VERNACULAR_DATA=file:///Users/awieliczka/Projects/rikolti/rikolti_data -MAPPED_DATA=$VERNACULAR_DATA -WITH_CONTENT_URL_DATA=$VERNACULAR_DATA +RIKOLTI_DATA=file:///Users/awieliczka/Projects/rikolti/rikolti_data CONTENT_ROOT=file:///Users/awieliczka/Projects/rikolti/rikolti_content ``` -Each of these can be different locations, however. For example, if you're attempting to re-run a mapper locally off of previously fetched data stored on s3, you might set `VERNACULAR_DATA=s3://rikolti_data`. +Each of these can be different locations, however. For example, if you're attempting to re-run a mapper locally off of previously fetched data stored on s3, you might set `RIKOLTI_DATA=s3://rikolti_data`. In env.example you'll also see `METADATA_MOUNT` and `CONTENT_MOUNT` environment variables. These are only relevant if you are running the content harvester using airflow, and want to set and of the CONTENT_ environment variables to the local filesystem. Their usage is described below in the Airflow Development section. @@ -170,10 +168,7 @@ These env vars are used in the `aws-mwaa-local-runner/docker/docker-compose-loca Next, back in the Rikolti repository, create the `startup.sh` file by running `cp env.example dags/startup.sh`. Update the startup.sh file with Nuxeo, Flickr, and Solr keys as available, and make sure that the following environment variables are set: ``` -export VERNACULAR_DATA=file:///usr/local/airflow/rikolti_data -export MAPPED_DATA=file:///usr/local/airflow/rikolti_data -export WITH_CONTENT_URL_DATA=file:///usr/local/airflow/rikolti_data -export MERGED_DATA=file:///usr/local/airflow/rikolti_data +export RIKOLTI_DATA=file:///usr/local/airflow/rikolti_data ``` The folder located at `RIKOLTI_DATA_HOME` (set in `aws-mwaa-local-runner/docker/.env`) is mounted to `/usr/local/airflow/rikolti_data` on the airflow docker container. diff --git a/content_harvester/README.md b/content_harvester/README.md index 14b133a9f..9302a333c 100644 --- a/content_harvester/README.md +++ b/content_harvester/README.md @@ -34,7 +34,7 @@ The above media and thumbnail fetching processes are enacted upon child metadata # Settings -You can bypass uploading to s3 by setting `WITH_CONTENT_URL_DATA = "file://"` and `CONTENT_ROOT = "file://"`. This is useful for local development and testing. This will, however, set the metadata records' `media['media_filepath']` and `thumbnail['thumbnail_filepath']` to a local filepath (thus rendering the output useless for publishing). +You can bypass uploading to s3 by setting `RIKOLTI_DATA = "file://"` and `CONTENT_ROOT = "file://"`. This is useful for local development and testing. This will, however, set the metadata records' `media['media_filepath']` and `thumbnail['thumbnail_filepath']` to a local filepath (thus rendering the output useless for publishing). # Local Development diff --git a/content_harvester/docker-compose.yml b/content_harvester/docker-compose.yml index 174b69e35..e4a9fd3ef 100644 --- a/content_harvester/docker-compose.yml +++ b/content_harvester/docker-compose.yml @@ -18,8 +18,7 @@ services: - ./:/content_harvester - ../utils:/rikolti/utils environment: - - MAPPED_DATA=file:///rikolti_data - - WITH_CONTENT_URL_DATA=file:///rikolti_data + - RIKOLTI_DATA=file:///rikolti_data - CONTENT_ROOT=file:///rikolti_content - NUXEO_USER=${NUXEO_USER} - NUXEO_PASS=${NUXEO_PASS} diff --git a/dags/shared_tasks/content_harvest_operators.py b/dags/shared_tasks/content_harvest_operators.py index dd3fd96c8..b27b9d660 100644 --- a/dags/shared_tasks/content_harvest_operators.py +++ b/dags/shared_tasks/content_harvest_operators.py @@ -70,11 +70,7 @@ def __init__(self, collection_id=None, with_content_urls_version=None, pages=Non ], "environment": [ { - "name": "MAPPED_DATA", - "value": "s3://rikolti-data" - }, - { - "name": "WITH_CONTENT_URL_DATA", + "name": "RIKOLTI_DATA", "value": "s3://rikolti-data" }, { @@ -174,15 +170,10 @@ def __init__(self, collection_id, with_content_urls_version, pages, mapper_type, container_name = ( f"content_harvester_{collection_id}_{page_basename.split('.')[0]}") - if os.environ.get('MAPPED_DATA', '').startswith('s3'): - mapped_data = os.environ.get('MAPPED_DATA') - else: - mapped_data = "file:///rikolti_data" - - if os.environ.get('WITH_CONTENT_URL_DATA', '').startswith('s3'): - with_content_url_data = os.environ.get('WITH_CONTENT_URL_DATA') + if os.environ.get('RIKOLTI_DATA', '').startswith('s3'): + rikolti_data = os.environ.get('RIKOLTI_DATA') else: - with_content_url_data = "file:///rikolti_data" + rikolti_data = "file:///rikolti_data" if os.environ.get('CONTENT_ROOT', '').startswith('s3'): content_root = os.environ.get('CONTENT_ROOT') @@ -205,8 +196,7 @@ def __init__(self, collection_id, with_content_urls_version, pages, mapper_type, "mounts": mounts, "mount_tmp_dir": False, "environment": { - "MAPPED_DATA": mapped_data, - "WITH_CONTENT_URL_DATA": with_content_url_data, + "RIKOLTI_DATA": rikolti_data, "CONTENT_ROOT": content_root, "CONTENT_COMPONENT_CACHE": os.environ.get("CONTENT_COMPONENT_CACHE"), "NUXEO_USER": os.environ.get("NUXEO_USER"), diff --git a/dags/shared_tasks/mapping_tasks.py b/dags/shared_tasks/mapping_tasks.py index 78383c3e5..e028f29dc 100644 --- a/dags/shared_tasks/mapping_tasks.py +++ b/dags/shared_tasks/mapping_tasks.py @@ -123,7 +123,7 @@ def get_mapping_status_task( def print_s3_link(version_page, mapped_version): # create a link to the file in the logs - data_root = os.environ.get("MAPPED_DATA", "file:///tmp").rstrip('/') + data_root = os.environ.get("RIKOLTI_DATA", "file:///tmp").rstrip('/') if data_root.startswith('s3'): s3_path = urlparse(f"{data_root}/{version_page}") bucket = s3_path.netloc diff --git a/env.example b/env.example index e93450581..96974e95b 100644 --- a/env.example +++ b/env.example @@ -2,10 +2,7 @@ export RIKOLTI_EVENTS_SNS_TOPIC= # ask for the topic ARN # metadata versions -export VERNACULAR_DATA=file:///usr/local/airflow/rikolti_data -export MAPPED_DATA=$VERNACULAR_DATA -export WITH_CONTENT_URL_DATA=$VERNACULAR_DATA -export MERGED_DATA=$VERNACULAR_DATA +export RIKOLTI_DATA=file:///usr/local/airflow/rikolti_data # metadata_fetcher export NUXEO= # ask for a key - required to run the NuxeoFetcher diff --git a/metadata_mapper/lambda_function.py b/metadata_mapper/lambda_function.py index 127f08bfc..4c3af3407 100644 --- a/metadata_mapper/lambda_function.py +++ b/metadata_mapper/lambda_function.py @@ -188,7 +188,7 @@ def map_page( print(f"{mapped_page_status.num_mapped_records} records mapped") print( - f"mapped page at {os.environ.get('MAPPED_DATA')}/" + f"mapped page at {os.environ.get('RIKOLTI_DATA')}/" f"{mapped_page_status.mapped_page_path}") for report, couch_ids in mapped_page_status.exceptions.items(): diff --git a/utils/versions.py b/utils/versions.py index c423755d7..bbe0eb5d1 100644 --- a/utils/versions.py +++ b/utils/versions.py @@ -114,11 +114,11 @@ def create_merged_version( def get_most_recent_vernacular_version(collection_id: Union[int, str]): """ - Sorts the contents of $VERNACULAR_DATA//, and returns the + Sorts the contents of $RIKOLTI_DATA//, and returns the version path of the first item - this presumes a sortable vernacular version suffix. """ - data_root = os.environ.get("VERNACULAR_DATA", "file:///tmp") + data_root = os.environ.get("RIKOLTI_DATA", "file:///tmp") versions = storage.list_dirs(f"{data_root.rstrip('/')}/{collection_id}/") if not versions: raise Exception( @@ -127,7 +127,7 @@ def get_most_recent_vernacular_version(collection_id: Union[int, str]): return f"{collection_id}/{recent_version}/" def get_most_recent_mapped_version(collection_id: Union[int, str]): - data_root = os.environ.get("MAPPED_DATA", "file:///tmp") + data_root = os.environ.get("RIKOLTI_DATA", "file:///tmp") collection_path = f"{data_root.rstrip('/')}/{collection_id}/" vernacular_versions = storage.list_dirs(collection_path) if not vernacular_versions: @@ -143,22 +143,22 @@ def get_most_recent_mapped_version(collection_id: Union[int, str]): def get_vernacular_pages(version, **kwargs): """ - resolves a vernacular version to a data_uri at $VERNACULAR_DATA// + resolves a vernacular version to a data_uri at $RIKOLTI_DATA// returns a list of version pages. """ - data_root = os.environ.get('VERNACULAR_DATA', "file:///tmp") + data_root = os.environ.get('RIKOLTI_DATA', "file:///tmp") data_path = f"{data_root.rstrip('/')}/{version.rstrip('/')}/data/" page_list = storage.list_pages(data_path, recursive=True, **kwargs) return [path[len(data_root)+1:] for path in page_list] def get_mapped_pages(version, **kwargs): """ - resolves a mapped version to a data_uri at $MAPPED_DATA// + resolves a mapped version to a data_uri at $RIKOLTI_DATA// returns a list of version pages located at that data_uri. """ if not version: raise ValueError("versions.get_mapped_pages: No mapped version path provided") - data_root = os.environ.get("MAPPED_DATA", "file:///tmp") + data_root = os.environ.get("RIKOLTI_DATA", "file:///tmp") data_path = f"{data_root.rstrip('/')}/{version.rstrip('/')}/data/" page_list = storage.list_pages(data_path, recursive=True, **kwargs) return [path[len(data_root)+1:] for path in page_list] @@ -166,34 +166,34 @@ def get_mapped_pages(version, **kwargs): def get_with_content_urls_pages(version, **kwargs): """ resolves a with_content_urls version to a data_uri at - $WITH_CONTENT_URL_DATA// + $RIKOLTI_DATA// returns a list of version pages located at that data_uri. """ - data_root = os.environ.get("WITH_CONTENT_URL_DATA", "file:///tmp") + data_root = os.environ.get("RIKOLTI_DATA", "file:///tmp") data_path = f"{data_root.rstrip('/')}/{version.rstrip('/')}/data/" page_list = storage.list_pages(data_path, recursive=True, **kwargs) return [path[len(data_root)+1:] for path in page_list] def get_merged_pages(version, **kwargs): """ - resolves a merged version to a data_uri at $MERGED_DATA// + resolves a merged version to a data_uri at $RIKOLTI_DATA// returns a list of version pages located at that data_uri. """ - data_root = os.environ.get("MERGED_DATA", "file:///tmp") + data_root = os.environ.get("RIKOLTI_DATA", "file:///tmp") data_path = f"{data_root.rstrip('/')}/{version.rstrip('/')}/data/" page_list = storage.list_pages(data_path, recursive=False, **kwargs) return [path[len(data_root)+1:] for path in page_list] def get_child_directories(version, **kwargs): """ - resolves a mapped version to a data_uri at $MAPPED_DATA//data/ + resolves a mapped version to a data_uri at $RIKOLTI_DATA//data/ returns a list of directories. complex objects are stored in a directory named "children" within the mapped version data directory. This function is used to check if any directory named "children" is inside the mapped version's data directory. """ - data_root = os.environ.get('MAPPED_DATA', "file:///tmp") + data_root = os.environ.get('RIKOLTI_DATA', "file:///tmp") child_directories = storage.list_dirs( f"{data_root.rstrip('/')}/{version.rstrip('/')}/data/", recursive=False @@ -202,10 +202,10 @@ def get_child_directories(version, **kwargs): def get_child_pages(version, **kwargs): """ - resolves a mapped version to a data_uri at $MAPPED_DATA//data/children/ + resolves a mapped version to a data_uri at $RIKOLTI_DATA//data/children/ returns a list of version pages located at data_uri. """ - data_root = os.environ.get("MAPPED_DATA", "file:///tmp") + data_root = os.environ.get("RIKOLTI_DATA", "file:///tmp") data_path = f"{data_root.rstrip('/')}/{version.rstrip('/')}/data/children/" try: page_list = storage.list_pages(data_path, recursive=False, **kwargs) @@ -217,79 +217,79 @@ def get_child_pages(version, **kwargs): def get_vernacular_page_content(version_page): """ - resolves a version page to a data_uri at $VERNACULAR_DATA// + resolves a version page to a data_uri at $RIKOLTI_DATA// returns the contents of the page. """ - data_root = os.environ.get("VERNACULAR_DATA", "file:///tmp").rstrip('/') + data_root = os.environ.get("RIKOLTI_DATA", "file:///tmp").rstrip('/') return storage.get_page_content(f"{data_root.rstrip('/')}/{version_page}") def get_mapped_page_content(version_page): """ - resolves a version page to a data_uri at $MAPPED_DATA// + resolves a version page to a data_uri at $RIKOLTI_DATA// returns the contents of the page loaded as json """ - data_root = os.environ.get("MAPPED_DATA", "file:///tmp").rstrip('/') + data_root = os.environ.get("RIKOLTI_DATA", "file:///tmp").rstrip('/') content = storage.get_page_content(f"{data_root.rstrip('/')}/{version_page}") return json.loads(content) def get_with_content_urls_page_content(version_page): - data_root = os.environ.get("WITH_CONTENT_URL_DATA", "file:///tmp").rstrip('/') + data_root = os.environ.get("RIKOLTI_DATA", "file:///tmp").rstrip('/') content = storage.get_page_content(f"{data_root}/{version_page}") return json.loads(content) def get_merged_page_content(version_page): - data_root = os.environ.get("MERGED_DATA", "file:///tmp").rstrip('/') + data_root = os.environ.get("RIKOLTI_DATA", "file:///tmp").rstrip('/') content = storage.get_page_content(f"{data_root}/{version_page}") return json.loads(content) def put_vernacular_page(content: str, page_name: Union[int, str], version: str): """ - resolves a version path to a page uri at $VERNACULAR_DATA//data/ + resolves a version path to a page uri at $RIKOLTI_DATA//data/ and writes content to that data uri. returns the version page. """ - data_root = os.environ.get("VERNACULAR_DATA", "file:///tmp") + data_root = os.environ.get("RIKOLTI_DATA", "file:///tmp") path = f"{data_root.rstrip('/')}/{version.rstrip('/')}/data/{page_name}" storage.put_page_content(content, path) return f"{version.rstrip('/')}/data/{page_name}" def put_mapped_page(content, page_name, version): """ - resolves a version path to a page uri at $MAPPED_DATA//data/.jsonl + resolves a version path to a page uri at $RIKOLTI_DATA//data/.jsonl and writes content to that data uri. returns the version page. content should be a json.dumped string of a list of dicts. """ - data_root = os.environ.get("MAPPED_DATA", "file:///tmp") + data_root = os.environ.get("RIKOLTI_DATA", "file:///tmp") path = f"{data_root.rstrip('/')}/{version.rstrip('/')}/data/{page_name}.jsonl" storage.put_page_content(content, path) return f"{version.rstrip('/')}/data/{page_name}.jsonl" def put_with_content_urls_page(content, page_name, version): """ - resolves a version path to a page uri at $WITH_CONTENT_URL_DATA//data/ + resolves a version path to a page uri at $RIKOLTI_DATA//data/ and writes content to that data uri. returns the version page. content should be a json.dumped string of a list of dicts. """ - data_root = os.environ.get("WITH_CONTENT_URL_DATA", "file:///tmp") + data_root = os.environ.get("RIKOLTI_DATA", "file:///tmp") path = f"{data_root.rstrip('/')}/{version.rstrip('/')}/data/{page_name}" storage.put_page_content(content, path) return f"{version.rstrip('/')}/data/{page_name}" def put_merged_page(content, page_name, version): - data_root = os.environ.get("MERGED_DATA", "file:///tmp") + data_root = os.environ.get("RIKOLTI_DATA", "file:///tmp") path = f"{data_root.rstrip('/')}/{version.rstrip('/')}/data/{page_name}.jsonl" storage.put_page_content(content, path) return f"{version.rstrip('/')}/data/{page_name}.jsonl" def put_validation_report(content, version_page): """ - resolves a version path to a page uri at $MAPPED_DATA/ + resolves a version path to a page uri at $RIKOLTI_DATA/ and writes content to that data uri. returns the version page. content should be a csv string. """ - data_root = os.environ.get("MAPPED_DATA", "file:///tmp") + data_root = os.environ.get("RIKOLTI_DATA", "file:///tmp") path = f"{data_root.rstrip('/')}/{version_page}" storage.put_page_content(content, path) return version_page From c12affac0e62300ee81e27efc0c513543cc49321 Mon Sep 17 00:00:00 2001 From: amy wieliczka Date: Tue, 1 Oct 2024 18:32:03 -0700 Subject: [PATCH 02/12] Simplify down to get_versioned_pages --- content_harvester/by_collection.py | 4 ++-- dags/dev_dags/mapper_dag.py | 7 +++--- dags/harvest_dag.py | 4 ++-- dags/shared_tasks/indexing_tasks.py | 4 ++-- metadata_mapper/lambda_shepherd.py | 4 ++-- metadata_mapper/map_registry_collections.py | 4 ++-- metadata_mapper/validate_mapping.py | 4 ++-- utils/versions.py | 25 ++------------------- 8 files changed, 17 insertions(+), 39 deletions(-) diff --git a/content_harvester/by_collection.py b/content_harvester/by_collection.py index 47560bed5..35e3bc810 100644 --- a/content_harvester/by_collection.py +++ b/content_harvester/by_collection.py @@ -1,6 +1,6 @@ from .by_page import harvest_page_content from . import settings -from rikolti.utils.versions import get_mapped_pages, create_with_content_urls_version +from rikolti.utils.versions import get_versioned_pages, create_with_content_urls_version def harvest_collection_content(collection_id, mapper_type, mapped_data_version: str): @@ -8,7 +8,7 @@ def harvest_collection_content(collection_id, mapper_type, mapped_data_version: print("Error: collection_id and mapped_data_version required") exit() - page_list = get_mapped_pages( + page_list = get_versioned_pages( mapped_data_version, **settings.AWS_CREDENTIALS) print(f"{collection_id:<6}: Harvesting content for {len(page_list)} pages") diff --git a/dags/dev_dags/mapper_dag.py b/dags/dev_dags/mapper_dag.py index f9d51c061..25e24cf1f 100644 --- a/dags/dev_dags/mapper_dag.py +++ b/dags/dev_dags/mapper_dag.py @@ -11,8 +11,7 @@ from rikolti.dags.shared_tasks.shared import batched from rikolti.utils.versions import get_most_recent_vernacular_version from rikolti.utils.versions import get_most_recent_mapped_version -from rikolti.utils.versions import get_vernacular_pages -from rikolti.utils.versions import get_mapped_pages +from rikolti.utils.versions import get_versioned_pages @task(task_id="get_vernacular_page_batches") @@ -22,7 +21,7 @@ def get_vernacular_page_batches_task( vernacular_version = params.get('vernacular_version') if params else None if not vernacular_version: vernacular_version = get_most_recent_vernacular_version(collection_id) - pages = get_vernacular_pages(vernacular_version) + pages = get_versioned_pages(vernacular_version) # TODO: split page_list into pages and children? # 1024 is the maximum number of fanout tasks allowed @@ -38,7 +37,7 @@ def get_mapped_pages_task(params: Optional[dict] = None): mapped_version = params.get('mapped_version') if params else None if not mapped_version: mapped_version = get_most_recent_mapped_version(collection_id) - pages = get_mapped_pages(mapped_version) + pages = get_versioned_pages(mapped_version) return pages # This is a functional duplicate of diff --git a/dags/harvest_dag.py b/dags/harvest_dag.py index 4231ec9b5..231f16235 100644 --- a/dags/harvest_dag.py +++ b/dags/harvest_dag.py @@ -15,7 +15,7 @@ from rikolti.dags.shared_tasks.mapping_tasks import mapping_tasks from rikolti.dags.shared_tasks.content_harvest_tasks import content_harvesting_tasks from rikolti.utils.versions import ( - get_child_directories, get_with_content_urls_pages, + get_child_directories, get_versioned_pages, get_with_content_urls_page_content, get_child_pages, create_merged_version, put_merged_page) from rikolti.dags.shared_tasks.indexing_tasks import stage_collection_task @@ -38,7 +38,7 @@ def get_child_thumbnail(child_records): @task(task_id="merge_any_child_records", on_failure_callback=notify_rikolti_failure) def merge_any_child_records_task(version, **context): - with_content_urls_pages = get_with_content_urls_pages(version) + with_content_urls_pages = get_versioned_pages(version) # Recurse through the record's children (if any) child_directories = get_child_directories(version) diff --git a/dags/shared_tasks/indexing_tasks.py b/dags/shared_tasks/indexing_tasks.py index 4f8cbf0fd..4ae6b904c 100644 --- a/dags/shared_tasks/indexing_tasks.py +++ b/dags/shared_tasks/indexing_tasks.py @@ -8,7 +8,7 @@ from rikolti.record_indexer.index_collection import ( index_collection, delete_collection) from rikolti.utils.versions import ( - get_version, get_merged_pages, get_with_content_urls_pages) + get_version, get_merged_pages, get_versioned_pages) def index_collection_task(alias, collection, version_pages, context): collection_id = collection.get('id') @@ -108,7 +108,7 @@ def get_version_pages(params=None): if 'merged' in version: version_pages = get_merged_pages(version) else: - version_pages = get_with_content_urls_pages(version) + version_pages = get_versioned_pages(version) return version_pages diff --git a/metadata_mapper/lambda_shepherd.py b/metadata_mapper/lambda_shepherd.py index 4b1c59e7d..726214c06 100644 --- a/metadata_mapper/lambda_shepherd.py +++ b/metadata_mapper/lambda_shepherd.py @@ -10,7 +10,7 @@ from .lambda_function import map_page, MappedPageStatus from .mappers.mapper import Record from rikolti.utils.versions import ( - get_most_recent_vernacular_version, get_vernacular_pages, + get_most_recent_vernacular_version, get_versioned_pages, get_version, create_mapped_version ) @@ -173,7 +173,7 @@ def map_collection(collection_id, vernacular_version=None, validate=False): if not vernacular_version: vernacular_version = get_most_recent_vernacular_version(collection_id) - page_list = get_vernacular_pages(vernacular_version) + page_list = get_versioned_pages(vernacular_version) # TODO: split page_list into pages and children? vernacular_version = get_version(collection_id, page_list[0]) diff --git a/metadata_mapper/map_registry_collections.py b/metadata_mapper/map_registry_collections.py index 0027ab708..62632667d 100644 --- a/metadata_mapper/map_registry_collections.py +++ b/metadata_mapper/map_registry_collections.py @@ -10,7 +10,7 @@ from .lambda_shepherd import MappedCollectionStatus from .lambda_shepherd import map_collection from .validate_mapping import create_collection_validation_csv -from rikolti.utils.versions import get_mapped_pages +from rikolti.utils.versions import get_versioned_pages from rikolti.utils.registry_client import registry_endpoint logger = logging.getLogger(__name__) @@ -95,7 +95,7 @@ def validate_endpoint( mapped_version = mapped_versions.get(str(collection_id)) try: - mapped_pages = get_mapped_pages(mapped_version) + mapped_pages = get_versioned_pages(mapped_version) except (FileNotFoundError, ValueError) as e: print(f"{collection_id:<6}: not mapped yet", file=sys.stderr) status = ValidationReportStatus( diff --git a/metadata_mapper/validate_mapping.py b/metadata_mapper/validate_mapping.py index d0d97bca6..ed5794bf0 100644 --- a/metadata_mapper/validate_mapping.py +++ b/metadata_mapper/validate_mapping.py @@ -12,7 +12,7 @@ from .validator.validation_mode import ValidationMode from .validator.validator import Validator from rikolti.utils.versions import ( - get_mapped_page_content, get_version, get_mapped_pages) + get_mapped_page_content, get_version, get_versioned_pages) urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) @@ -390,7 +390,7 @@ def get_validator_class(collection_id: int) -> Type[Validator]: print(f"Generating validations for collection {args.collection_id} with options:") print(kwargs) - mapped_page_paths = get_mapped_pages(args.mapped_data_version) + mapped_page_paths = get_versioned_pages(args.mapped_data_version) num_rows, file_location = create_collection_validation_csv( args.collection_id, mapped_page_paths, **kwargs) diff --git a/utils/versions.py b/utils/versions.py index bbe0eb5d1..97f47e484 100644 --- a/utils/versions.py +++ b/utils/versions.py @@ -141,34 +141,13 @@ def get_most_recent_mapped_version(collection_id: Union[int, str]): recent_version = sorted(mapped_versions)[-1] return f"{collection_id}/{vernacular_version}/{recent_version}/" -def get_vernacular_pages(version, **kwargs): +def get_versioned_pages(version, **kwargs): """ resolves a vernacular version to a data_uri at $RIKOLTI_DATA// returns a list of version pages. """ - data_root = os.environ.get('RIKOLTI_DATA', "file:///tmp") - data_path = f"{data_root.rstrip('/')}/{version.rstrip('/')}/data/" - page_list = storage.list_pages(data_path, recursive=True, **kwargs) - return [path[len(data_root)+1:] for path in page_list] - -def get_mapped_pages(version, **kwargs): - """ - resolves a mapped version to a data_uri at $RIKOLTI_DATA// - returns a list of version pages located at that data_uri. - """ if not version: - raise ValueError("versions.get_mapped_pages: No mapped version path provided") - data_root = os.environ.get("RIKOLTI_DATA", "file:///tmp") - data_path = f"{data_root.rstrip('/')}/{version.rstrip('/')}/data/" - page_list = storage.list_pages(data_path, recursive=True, **kwargs) - return [path[len(data_root)+1:] for path in page_list] - -def get_with_content_urls_pages(version, **kwargs): - """ - resolves a with_content_urls version to a data_uri at - $RIKOLTI_DATA// - returns a list of version pages located at that data_uri. - """ + raise ValueError("versions.get_versioned_pages: No version path provided") data_root = os.environ.get("RIKOLTI_DATA", "file:///tmp") data_path = f"{data_root.rstrip('/')}/{version.rstrip('/')}/data/" page_list = storage.list_pages(data_path, recursive=True, **kwargs) From baa8f49ff65f051da5dfbd5cdae2e8329bf66ff4 Mon Sep 17 00:00:00 2001 From: amy wieliczka Date: Tue, 1 Oct 2024 18:35:07 -0700 Subject: [PATCH 03/12] Add recursive as kwarg to get_versioned_pages --- dags/shared_tasks/indexing_tasks.py | 4 ++-- utils/versions.py | 15 ++++----------- 2 files changed, 6 insertions(+), 13 deletions(-) diff --git a/dags/shared_tasks/indexing_tasks.py b/dags/shared_tasks/indexing_tasks.py index 4ae6b904c..35e11e902 100644 --- a/dags/shared_tasks/indexing_tasks.py +++ b/dags/shared_tasks/indexing_tasks.py @@ -8,7 +8,7 @@ from rikolti.record_indexer.index_collection import ( index_collection, delete_collection) from rikolti.utils.versions import ( - get_version, get_merged_pages, get_versioned_pages) + get_version, get_versioned_pages) def index_collection_task(alias, collection, version_pages, context): collection_id = collection.get('id') @@ -106,7 +106,7 @@ def get_version_pages(params=None): version = params.get('version') if 'merged' in version: - version_pages = get_merged_pages(version) + version_pages = get_versioned_pages(version, recursive=False) else: version_pages = get_versioned_pages(version) diff --git a/utils/versions.py b/utils/versions.py index 97f47e484..9ddf3c701 100644 --- a/utils/versions.py +++ b/utils/versions.py @@ -148,19 +148,12 @@ def get_versioned_pages(version, **kwargs): """ if not version: raise ValueError("versions.get_versioned_pages: No version path provided") + recursive = True + if "recursive" in kwargs: + recursive = kwargs.pop("recursive") data_root = os.environ.get("RIKOLTI_DATA", "file:///tmp") data_path = f"{data_root.rstrip('/')}/{version.rstrip('/')}/data/" - page_list = storage.list_pages(data_path, recursive=True, **kwargs) - return [path[len(data_root)+1:] for path in page_list] - -def get_merged_pages(version, **kwargs): - """ - resolves a merged version to a data_uri at $RIKOLTI_DATA// - returns a list of version pages located at that data_uri. - """ - data_root = os.environ.get("RIKOLTI_DATA", "file:///tmp") - data_path = f"{data_root.rstrip('/')}/{version.rstrip('/')}/data/" - page_list = storage.list_pages(data_path, recursive=False, **kwargs) + page_list = storage.list_pages(data_path, recursive=recursive, **kwargs) return [path[len(data_root)+1:] for path in page_list] def get_child_directories(version, **kwargs): From 398e832d3fcb6d146e53b316b8f5ba1c744214c7 Mon Sep 17 00:00:00 2001 From: amy wieliczka Date: Tue, 1 Oct 2024 18:42:17 -0700 Subject: [PATCH 04/12] Simplify down to get_versioned_page_as_json --- content_harvester/by_page.py | 4 ++-- dags/harvest_dag.py | 6 +++--- metadata_mapper/validate_mapping.py | 4 ++-- record_indexer/index_page.py | 8 ++------ utils/versions.py | 17 ++--------------- 5 files changed, 11 insertions(+), 28 deletions(-) diff --git a/content_harvester/by_page.py b/content_harvester/by_page.py index 2d4491653..24264ea13 100644 --- a/content_harvester/by_page.py +++ b/content_harvester/by_page.py @@ -4,7 +4,7 @@ from .by_record import harvest_record_content from rikolti.utils.versions import ( - get_mapped_page_content, put_with_content_urls_page, get_version + get_versioned_page_as_json, put_with_content_urls_page, get_version ) @@ -18,7 +18,7 @@ def harvest_page_content( mapped_version = get_version(collection_id, mapped_page_path) page_filename = mapped_page_path.split(mapped_version + '/data/')[-1] - records = get_mapped_page_content(mapped_page_path) + records = get_versioned_page_as_json(mapped_page_path) print( f"Harvesting content for {len(records)} records at {mapped_page_path}") diff --git a/dags/harvest_dag.py b/dags/harvest_dag.py index 231f16235..f1c615698 100644 --- a/dags/harvest_dag.py +++ b/dags/harvest_dag.py @@ -16,7 +16,7 @@ from rikolti.dags.shared_tasks.content_harvest_tasks import content_harvesting_tasks from rikolti.utils.versions import ( get_child_directories, get_versioned_pages, - get_with_content_urls_page_content, get_child_pages, + get_versioned_page_as_json, get_child_pages, create_merged_version, put_merged_page) from rikolti.dags.shared_tasks.indexing_tasks import stage_collection_task @@ -27,7 +27,7 @@ def get_child_records(version, parent_id) -> list: children = [page for page in children if (page.rsplit('/')[-1]).startswith(parent_id)] for child in children: - child_records.extend(get_with_content_urls_page_content(child)) + child_records.extend(get_versioned_page_as_json(child)) return child_records def get_child_thumbnail(child_records): @@ -52,7 +52,7 @@ def merge_any_child_records_task(version, **context): merged_pages = [] child_count_by_record = {} for page_path in parent_pages: - parent_records = get_with_content_urls_page_content(page_path) + parent_records = get_versioned_page_as_json(page_path) for record in parent_records: calisphere_id = record['calisphere-id'] child_records = get_child_records(version, calisphere_id) diff --git a/metadata_mapper/validate_mapping.py b/metadata_mapper/validate_mapping.py index ed5794bf0..0428c6776 100644 --- a/metadata_mapper/validate_mapping.py +++ b/metadata_mapper/validate_mapping.py @@ -12,7 +12,7 @@ from .validator.validation_mode import ValidationMode from .validator.validator import Validator from rikolti.utils.versions import ( - get_mapped_page_content, get_version, get_versioned_pages) + get_versioned_page_as_json, get_version, get_versioned_pages) urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) @@ -165,7 +165,7 @@ def validate_page(collection_id: int, page_path: str, "collection_id": collection_id, "page_path": page_path } - collection = get_mapped_page_content(page_path) + collection = get_versioned_page_as_json(page_path) if len(collection) == 0: print( diff --git a/record_indexer/index_page.py b/record_indexer/index_page.py index e8be6ea09..823639106 100644 --- a/record_indexer/index_page.py +++ b/record_indexer/index_page.py @@ -7,8 +7,7 @@ from . import settings from .utils import print_opensearch_error -from rikolti.utils.versions import ( - get_merged_page_content, get_with_content_urls_page_content) +from rikolti.utils.versions import get_versioned_page_as_json def bulk_add(records: list, index: str): @@ -123,10 +122,7 @@ def get_opensearch_schema(index_alias: str): def index_page(version_page: str, index: str, rikolti_data: dict): - if 'merged' in version_page: - records = get_merged_page_content(version_page) - else: - records = get_with_content_urls_page_content(version_page) + records = get_versioned_page_as_json(version_page) schema = get_opensearch_schema(index) removed_fields_report = defaultdict(list) diff --git a/utils/versions.py b/utils/versions.py index 9ddf3c701..7c58a81aa 100644 --- a/utils/versions.py +++ b/utils/versions.py @@ -192,24 +192,11 @@ def get_vernacular_page_content(version_page): resolves a version page to a data_uri at $RIKOLTI_DATA// returns the contents of the page. """ - data_root = os.environ.get("RIKOLTI_DATA", "file:///tmp").rstrip('/') - return storage.get_page_content(f"{data_root.rstrip('/')}/{version_page}") - -def get_mapped_page_content(version_page): - """ - resolves a version page to a data_uri at $RIKOLTI_DATA// - returns the contents of the page loaded as json - """ - data_root = os.environ.get("RIKOLTI_DATA", "file:///tmp").rstrip('/') - content = storage.get_page_content(f"{data_root.rstrip('/')}/{version_page}") - return json.loads(content) - -def get_with_content_urls_page_content(version_page): data_root = os.environ.get("RIKOLTI_DATA", "file:///tmp").rstrip('/') content = storage.get_page_content(f"{data_root}/{version_page}") - return json.loads(content) + return content -def get_merged_page_content(version_page): +def get_versioned_page_as_json(version_page): data_root = os.environ.get("RIKOLTI_DATA", "file:///tmp").rstrip('/') content = storage.get_page_content(f"{data_root}/{version_page}") return json.loads(content) From a1a1d92d6a61834af2bc37c16c1583dc1f9b7104 Mon Sep 17 00:00:00 2001 From: amy wieliczka Date: Tue, 1 Oct 2024 18:44:18 -0700 Subject: [PATCH 05/12] Rename to get_versioned_page_content for consistency --- metadata_mapper/lambda_function.py | 4 ++-- utils/versions.py | 5 ++--- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/metadata_mapper/lambda_function.py b/metadata_mapper/lambda_function.py index 4c3af3407..9e0a8311e 100644 --- a/metadata_mapper/lambda_function.py +++ b/metadata_mapper/lambda_function.py @@ -9,7 +9,7 @@ from . import settings from .mappers.mapper import Record, Vernacular -from rikolti.utils.versions import get_vernacular_page_content, put_mapped_page, get_version +from rikolti.utils.versions import get_versioned_page_content, put_mapped_page, get_version logger = logging.getLogger(__name__) @@ -119,7 +119,7 @@ def map_page( collection.get('rikolti_mapper_type')) vernacular_version = get_version(collection_id, vernacular_page_path) page_filename = vernacular_page_path.split(vernacular_version + '/data/')[-1] - api_resp = get_vernacular_page_content(vernacular_page_path) + api_resp = get_versioned_page_content(vernacular_page_path) source_vernacular = vernacular_reader(collection_id, page_filename) source_metadata_records = source_vernacular.parse(api_resp) diff --git a/utils/versions.py b/utils/versions.py index 7c58a81aa..d78c81872 100644 --- a/utils/versions.py +++ b/utils/versions.py @@ -187,7 +187,7 @@ def get_child_pages(version, **kwargs): return [] return [path[len(data_root)+1:] for path in page_list] -def get_vernacular_page_content(version_page): +def get_versioned_page_content(version_page): """ resolves a version page to a data_uri at $RIKOLTI_DATA// returns the contents of the page. @@ -197,8 +197,7 @@ def get_vernacular_page_content(version_page): return content def get_versioned_page_as_json(version_page): - data_root = os.environ.get("RIKOLTI_DATA", "file:///tmp").rstrip('/') - content = storage.get_page_content(f"{data_root}/{version_page}") + content = get_versioned_page_content(version_page) return json.loads(content) def put_vernacular_page(content: str, page_name: Union[int, str], version: str): From 6d0888f1e4ca4d4f815eb1a74acff963d62e90ef Mon Sep 17 00:00:00 2001 From: amy wieliczka Date: Tue, 1 Oct 2024 18:51:18 -0700 Subject: [PATCH 06/12] Simplify down to put_versioned_page --- dags/harvest_dag.py | 4 ++-- metadata_mapper/lambda_function.py | 4 ++-- utils/versions.py | 18 ++++++------------ 3 files changed, 10 insertions(+), 16 deletions(-) diff --git a/dags/harvest_dag.py b/dags/harvest_dag.py index f1c615698..e9520e355 100644 --- a/dags/harvest_dag.py +++ b/dags/harvest_dag.py @@ -17,7 +17,7 @@ from rikolti.utils.versions import ( get_child_directories, get_versioned_pages, get_versioned_page_as_json, get_child_pages, - create_merged_version, put_merged_page) + create_merged_version, put_versioned_page) from rikolti.dags.shared_tasks.indexing_tasks import stage_collection_task @@ -70,7 +70,7 @@ def merge_any_child_records_task(version, **context): if child_thumbnail: record['thumbnail'] = child_thumbnail merged_pages.append( - put_merged_page( + put_versioned_page( json.dumps(parent_records), os.path.basename(page_path), merged_version diff --git a/metadata_mapper/lambda_function.py b/metadata_mapper/lambda_function.py index 9e0a8311e..67376964c 100644 --- a/metadata_mapper/lambda_function.py +++ b/metadata_mapper/lambda_function.py @@ -9,7 +9,7 @@ from . import settings from .mappers.mapper import Record, Vernacular -from rikolti.utils.versions import get_versioned_page_content, put_mapped_page, get_version +from rikolti.utils.versions import get_versioned_page_content, put_versioned_page, get_version logger = logging.getLogger(__name__) @@ -155,7 +155,7 @@ def map_page( mapped_metadata = [record.to_dict() for record in mapped_records] - mapped_page_path = put_mapped_page( + mapped_page_path = put_versioned_page( json.dumps(mapped_metadata, ensure_ascii=False), page_filename, mapped_data_version) diff --git a/utils/versions.py b/utils/versions.py index d78c81872..cb0ee7c9c 100644 --- a/utils/versions.py +++ b/utils/versions.py @@ -210,31 +210,25 @@ def put_vernacular_page(content: str, page_name: Union[int, str], version: str): storage.put_page_content(content, path) return f"{version.rstrip('/')}/data/{page_name}" -def put_mapped_page(content, page_name, version): +def put_with_content_urls_page(content, page_name, version): """ - resolves a version path to a page uri at $RIKOLTI_DATA//data/.jsonl + resolves a version path to a page uri at $RIKOLTI_DATA//data/ and writes content to that data uri. returns the version page. content should be a json.dumped string of a list of dicts. """ data_root = os.environ.get("RIKOLTI_DATA", "file:///tmp") - path = f"{data_root.rstrip('/')}/{version.rstrip('/')}/data/{page_name}.jsonl" + path = f"{data_root.rstrip('/')}/{version.rstrip('/')}/data/{page_name}" storage.put_page_content(content, path) - return f"{version.rstrip('/')}/data/{page_name}.jsonl" + return f"{version.rstrip('/')}/data/{page_name}" -def put_with_content_urls_page(content, page_name, version): +def put_versioned_page(content, page_name, version): """ - resolves a version path to a page uri at $RIKOLTI_DATA//data/ + resolves a version path to a page uri at $RIKOLTI_DATA//data/.jsonl and writes content to that data uri. returns the version page. content should be a json.dumped string of a list of dicts. """ - data_root = os.environ.get("RIKOLTI_DATA", "file:///tmp") - path = f"{data_root.rstrip('/')}/{version.rstrip('/')}/data/{page_name}" - storage.put_page_content(content, path) - return f"{version.rstrip('/')}/data/{page_name}" - -def put_merged_page(content, page_name, version): data_root = os.environ.get("RIKOLTI_DATA", "file:///tmp") path = f"{data_root.rstrip('/')}/{version.rstrip('/')}/data/{page_name}.jsonl" storage.put_page_content(content, path) From cbda16e4bd76a0de3e04df1083f31046105bcef2 Mon Sep 17 00:00:00 2001 From: amy wieliczka Date: Tue, 1 Oct 2024 18:56:23 -0700 Subject: [PATCH 07/12] Move extension addition out of put_versioned_page --- dags/harvest_dag.py | 2 +- metadata_mapper/lambda_function.py | 2 +- utils/versions.py | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/dags/harvest_dag.py b/dags/harvest_dag.py index e9520e355..627e37bc8 100644 --- a/dags/harvest_dag.py +++ b/dags/harvest_dag.py @@ -72,7 +72,7 @@ def merge_any_child_records_task(version, **context): merged_pages.append( put_versioned_page( json.dumps(parent_records), - os.path.basename(page_path), + f"{os.path.basename(page_path)}.jsonl", merged_version ) ) diff --git a/metadata_mapper/lambda_function.py b/metadata_mapper/lambda_function.py index 67376964c..5cf54eef5 100644 --- a/metadata_mapper/lambda_function.py +++ b/metadata_mapper/lambda_function.py @@ -157,7 +157,7 @@ def map_page( mapped_page_path = put_versioned_page( json.dumps(mapped_metadata, ensure_ascii=False), - page_filename, mapped_data_version) + f"{page_filename}.jsonl", mapped_data_version) return MappedPageStatus( 'success', diff --git a/utils/versions.py b/utils/versions.py index cb0ee7c9c..4cbcf9bff 100644 --- a/utils/versions.py +++ b/utils/versions.py @@ -230,9 +230,9 @@ def put_versioned_page(content, page_name, version): content should be a json.dumped string of a list of dicts. """ data_root = os.environ.get("RIKOLTI_DATA", "file:///tmp") - path = f"{data_root.rstrip('/')}/{version.rstrip('/')}/data/{page_name}.jsonl" + path = f"{data_root.rstrip('/')}/{version.rstrip('/')}/data/{page_name}" storage.put_page_content(content, path) - return f"{version.rstrip('/')}/data/{page_name}.jsonl" + return f"{version.rstrip('/')}/data/{page_name}" def put_validation_report(content, version_page): """ From 873748bb6232bdd07fda0b07aad209c0b79d8871 Mon Sep 17 00:00:00 2001 From: amy wieliczka Date: Tue, 1 Oct 2024 18:57:35 -0700 Subject: [PATCH 08/12] Simplify down to put_versioned_page --- content_harvester/by_page.py | 4 ++-- metadata_fetcher/fetchers/Fetcher.py | 4 ++-- metadata_fetcher/fetchers/nuxeo_fetcher.py | 6 ++--- metadata_fetcher/fetchers/ucd_json_fetcher.py | 4 ++-- utils/versions.py | 24 +------------------ 5 files changed, 10 insertions(+), 32 deletions(-) diff --git a/content_harvester/by_page.py b/content_harvester/by_page.py index 24264ea13..44b5ad0f9 100644 --- a/content_harvester/by_page.py +++ b/content_harvester/by_page.py @@ -4,7 +4,7 @@ from .by_record import harvest_record_content from rikolti.utils.versions import ( - get_versioned_page_as_json, put_with_content_urls_page, get_version + get_versioned_page_as_json, put_versioned_page, get_version ) @@ -42,7 +42,7 @@ def harvest_page_content( f"record {record.get('calisphere-id')} in page {mapped_page_path}" ) - metadata_with_content_urls = put_with_content_urls_page( + metadata_with_content_urls = put_versioned_page( json.dumps(records), page_filename, with_content_urls_version) media_source = [r for r in records if r.get('media_source')] diff --git a/metadata_fetcher/fetchers/Fetcher.py b/metadata_fetcher/fetchers/Fetcher.py index d2c769603..bf591aab5 100644 --- a/metadata_fetcher/fetchers/Fetcher.py +++ b/metadata_fetcher/fetchers/Fetcher.py @@ -5,7 +5,7 @@ from typing import Optional from requests.adapters import HTTPAdapter, Retry -from rikolti.utils.versions import put_vernacular_page +from rikolti.utils.versions import put_versioned_page logger = logging.getLogger(__name__) @@ -84,7 +84,7 @@ def fetch_page(self) -> FetchedPageStatus: filepath = None content = self.aggregate_vernacular_content(response) try: - filepath = put_vernacular_page( + filepath = put_versioned_page( content, self.write_page, self.vernacular_version) except Exception as e: print(f"Metadata Fetcher: {e}") diff --git a/metadata_fetcher/fetchers/nuxeo_fetcher.py b/metadata_fetcher/fetchers/nuxeo_fetcher.py index 277e18dea..0730924e2 100644 --- a/metadata_fetcher/fetchers/nuxeo_fetcher.py +++ b/metadata_fetcher/fetchers/nuxeo_fetcher.py @@ -1,7 +1,7 @@ import json import logging from urllib.parse import quote as urllib_quote -from rikolti.utils.versions import put_vernacular_page +from rikolti.utils.versions import put_versioned_page import requests @@ -137,7 +137,7 @@ def get_pages_of_record_components(self, record: dict): more_component_pages = False continue - child_version_page = put_vernacular_page( + child_version_page = put_versioned_page( component_resp.text, f"children/{record['uid']}-{component_page_count}", self.vernacular_version @@ -191,7 +191,7 @@ def get_pages_of_records(self, folder: dict, page_prefix: list): more_pages_of_records = False continue - version_page = put_vernacular_page( + version_page = put_versioned_page( document_resp.text, f"{'-'.join(page_prefix)}-p{record_page_count}", self.vernacular_version diff --git a/metadata_fetcher/fetchers/ucd_json_fetcher.py b/metadata_fetcher/fetchers/ucd_json_fetcher.py index 4b51ecada..2bc94e311 100644 --- a/metadata_fetcher/fetchers/ucd_json_fetcher.py +++ b/metadata_fetcher/fetchers/ucd_json_fetcher.py @@ -10,7 +10,7 @@ from bs4 import BeautifulSoup from .Fetcher import Fetcher, FetchError, FetchedPageStatus -from rikolti.utils.versions import put_vernacular_page +from rikolti.utils.versions import put_versioned_page class UcdJsonFetcher(Fetcher): def __init__(self, params: dict[str]): @@ -70,7 +70,7 @@ def fetch_all_pages( records = [self.fetch_json_ld(url) for url in urls] document_count = len(records) try: - filepath = put_vernacular_page( + filepath = put_versioned_page( json.dumps(records), self.write_page, self.vernacular_version) fetch_status.append(FetchedPageStatus(document_count, filepath)) except Exception as e: diff --git a/utils/versions.py b/utils/versions.py index 4cbcf9bff..36d2c8d17 100644 --- a/utils/versions.py +++ b/utils/versions.py @@ -200,29 +200,7 @@ def get_versioned_page_as_json(version_page): content = get_versioned_page_content(version_page) return json.loads(content) -def put_vernacular_page(content: str, page_name: Union[int, str], version: str): - """ - resolves a version path to a page uri at $RIKOLTI_DATA//data/ - and writes content to that data uri. returns the version page. - """ - data_root = os.environ.get("RIKOLTI_DATA", "file:///tmp") - path = f"{data_root.rstrip('/')}/{version.rstrip('/')}/data/{page_name}" - storage.put_page_content(content, path) - return f"{version.rstrip('/')}/data/{page_name}" - -def put_with_content_urls_page(content, page_name, version): - """ - resolves a version path to a page uri at $RIKOLTI_DATA//data/ - and writes content to that data uri. returns the version page. - - content should be a json.dumped string of a list of dicts. - """ - data_root = os.environ.get("RIKOLTI_DATA", "file:///tmp") - path = f"{data_root.rstrip('/')}/{version.rstrip('/')}/data/{page_name}" - storage.put_page_content(content, path) - return f"{version.rstrip('/')}/data/{page_name}" - -def put_versioned_page(content, page_name, version): +def put_versioned_page(content: str, page_name: Union[int, str], version: str): """ resolves a version path to a page uri at $RIKOLTI_DATA//data/.jsonl and writes content to that data uri. returns the version page. From 35a0e72779e6d4e658aa15bf047d372bbfc9ca98 Mon Sep 17 00:00:00 2001 From: amy wieliczka Date: Tue, 1 Oct 2024 19:21:44 -0700 Subject: [PATCH 09/12] Add create_version function --- utils/versions.py | 24 +++++++++++++++++++++++- 1 file changed, 23 insertions(+), 1 deletion(-) diff --git a/utils/versions.py b/utils/versions.py index 36d2c8d17..8520138cb 100644 --- a/utils/versions.py +++ b/utils/versions.py @@ -1,7 +1,7 @@ import json import os from datetime import datetime -from typing import Union, Optional +from typing import Union, Optional, Literal from . import storage """ @@ -37,6 +37,28 @@ def get_version(collection_id: Union[int, str], uri: str) -> str: version = "/".join(path_list) return version + +prefixes = Literal[ + "vernacular_metadata_", + "mapped_metadata_", + "validation_", + "with_content_urls_", + "merged_" +] +def create_version(version: str, prefix: prefixes, suffix: Optional[str] = None) -> str: + """ + Given a version path, ex: 3433/vernacular_metadata_v1/ and a version prefix, + ex: mapped_metadata_, and a version suffix, ex: v2, creates a new version + path, ex: 3433/vernacular_metadata_v1/mapped_metadata_v2/ + + If no suffix is provided, uses the current datetime. + """ + version = version.rstrip('/') + if not suffix: + suffix = datetime.now().strftime('%Y-%m-%dT%H:%M:%S') + return f"{version}/{prefix}{suffix}/" + + def create_vernacular_version( collection_id: Union[int, str], suffix: Optional[str] = None From c6bc0f4b82526f53ec4a484ba0d6ed15db1eebfa Mon Sep 17 00:00:00 2001 From: amy wieliczka Date: Tue, 1 Oct 2024 19:33:35 -0700 Subject: [PATCH 10/12] Simplify to create_version --- utils/versions.py | 86 +++++++++-------------------------------------- 1 file changed, 15 insertions(+), 71 deletions(-) diff --git a/utils/versions.py b/utils/versions.py index 8520138cb..1eb628d73 100644 --- a/utils/versions.py +++ b/utils/versions.py @@ -45,7 +45,8 @@ def get_version(collection_id: Union[int, str], uri: str) -> str: "with_content_urls_", "merged_" ] -def create_version(version: str, prefix: prefixes, suffix: Optional[str] = None) -> str: +def create_version( + version: str, prefix: prefixes, suffix: Optional[str] = None) -> str: """ Given a version path, ex: 3433/vernacular_metadata_v1/ and a version prefix, ex: mapped_metadata_, and a version suffix, ex: v2, creates a new version @@ -59,80 +60,23 @@ def create_version(version: str, prefix: prefixes, suffix: Optional[str] = None) return f"{version}/{prefix}{suffix}/" -def create_vernacular_version( - collection_id: Union[int, str], - suffix: Optional[str] = None - ) -> str: - """ - Given a collection id, ex: 3433, and version suffix, ex: v1, creates a new - vernacular version, ex: 3433/vernacular_metadata_v1/ +def create_vernacular_version(collection_id: Union[int, str], **kwargs) -> str: + return create_version(f"{collection_id}", "vernacular_metadata_", **kwargs) - If no suffix is provided, uses the current datetime. - """ - version_path = f"{collection_id}" - if not suffix: - suffix = datetime.now().strftime('%Y-%m-%dT%H:%M:%S') - return f"{version_path}/vernacular_metadata_{suffix}/" +def create_mapped_version(vernacular_version: str, **kwargs) -> str: + return create_version(vernacular_version, "mapped_metadata_", **kwargs) -def create_mapped_version( - vernacular_version: str, suffix: Optional[str] = None) -> str: - """ - Given a vernacular version, ex: 3433/vernacular_metadata_v1/ and version - suffix, ex: v2, creates a new mapped version, ex: - 3433/vernacular_metadata_v1/mapped_metadata_v2/ +def create_validation_version(mapped_version: str, **kwargs) -> str: + version = create_version(mapped_version, "validation_", **kwargs) + versioned_file = f"{version[:-1]}.csv" + return versioned_file - If no suffix is provided, uses the current datetime. - """ - vernacular_version = vernacular_version.rstrip('/') - if not suffix: - suffix = datetime.now().strftime('%Y-%m-%dT%H:%M:%S') - return f"{vernacular_version}/mapped_metadata_{suffix}/" +def create_with_content_urls_version(mapped_version: str, **kwargs) -> str: + return create_version(mapped_version, "with_content_urls_", **kwargs) -def create_validation_version( - mapped_version: str, - suffix: Optional[str] = None -): - """ - Given a mapped version, ex: 3433/vernacular_metadata_v1/mapped_metadata_v2/ - and a version suffix, ex: v2, creates a new validation version, ex: - 3433/vernacular_metadata_v1/mapped_metadata_v2/validation_v2.csv - Validation versions paths are also version pages. +def create_merged_version(with_content_urls_version: str, **kwargs) -> str: + return create_version(with_content_urls_version, "merged_", **kwargs) - If no suffix is provided, uses the current datetime. - """ - mapped_version = mapped_version.rstrip('/') - if not suffix: - suffix = datetime.now().strftime('%Y-%m-%dT%H:%M:%S') - return f"{mapped_version}/validation_{suffix}.csv" - -def create_with_content_urls_version( - mapped_version: str, suffix: Optional[str] = None) -> str: - """ - Given a mapped version, ex: 3433/vernacular_metadata_v1/mapped_metadata_v2/ - and a version suffix, ex: v2, creates a new with content urls version, ex: - 3433/vernacular_metadata_v1/mapped_metadata_v2/with_content_urls_v2/ - - If no suffix is provided, uses the current datetime. - """ - mapped_version = mapped_version.rstrip('/') - if not suffix: - suffix = datetime.now().strftime('%Y-%m-%dT%H:%M:%S') - return f"{mapped_version}/with_content_urls_{suffix}/" - -def create_merged_version( - with_content_urls_version: str, suffix: Optional[str] = None) -> str: - """ - Given a with content urls version, ex: - 3433/vernacular_metadata_v1/mapped_metadata_v2/with_content_urls_v2/ and a - version suffix, ex: v1, creates a new merged version, ex: - 3433/vernacular_metadata_v1/mapped_metadata_v2/with_content_urls_v2/merged_v1/ - - If no suffix is provided, uses the current datetime. - """ - with_content_urls_version = with_content_urls_version.rstrip('/') - if not suffix: - suffix = datetime.now().strftime('%Y-%m-%dT%H:%M:%S') - return f"{with_content_urls_version}/merged_{suffix}/" def get_most_recent_vernacular_version(collection_id: Union[int, str]): """ @@ -227,7 +171,7 @@ def put_versioned_page(content: str, page_name: Union[int, str], version: str): resolves a version path to a page uri at $RIKOLTI_DATA//data/.jsonl and writes content to that data uri. returns the version page. - content should be a json.dumped string of a list of dicts. + content is a string or a json.dumped string of a list of dicts. """ data_root = os.environ.get("RIKOLTI_DATA", "file:///tmp") path = f"{data_root.rstrip('/')}/{version.rstrip('/')}/data/{page_name}" From 37ed3dab84cdb19d526c0a61255a53f91fca7629 Mon Sep 17 00:00:00 2001 From: amy wieliczka Date: Tue, 1 Oct 2024 19:39:07 -0700 Subject: [PATCH 11/12] change CONTENT_ROOT to RIKOLTI_CONTENT --- README.md | 4 ++-- content_harvester/README.md | 2 +- content_harvester/by_record.py | 4 ++-- content_harvester/docker-compose.yml | 2 +- dags/shared_tasks/content_harvest_operators.py | 10 +++++----- env.example | 2 +- 6 files changed, 12 insertions(+), 12 deletions(-) diff --git a/README.md b/README.md index a707e6902..eb738bbe6 100644 --- a/README.md +++ b/README.md @@ -48,13 +48,13 @@ vi env.local Currently, I only use one virtual environment, even though each folder located at the root of this repository represents an isolated component. If dependency conflicts are encountered, I'll wind up creating separate environments. -Similarly, I also only use one env.local as well. Rikolti fetches data to your local system, maps that data, and then fetches relevant content files (media files, previews, and thumbnails). Set `RIKOLTI_DATA` to the URI where you would like Rikolti to store and retrieve data - Rikolti will create a folder (or s3 prefix) `/vernacular_metadata` at this location. Set `CONTENT_ROOT` to the URI where you would like Rikolti to store content files. +Similarly, I also only use one env.local as well. Rikolti fetches data to your local system, maps that data, and then fetches relevant content files (media files, previews, and thumbnails). Set `RIKOLTI_DATA` to the URI where you would like Rikolti to store and retrieve data - Rikolti will create a folder (or s3 prefix) `/vernacular_metadata` at this location. Set `RIKOLTI_CONTENT` to the URI where you would like Rikolti to store content files. For example, one way to configure `env.local` is: ``` RIKOLTI_DATA=file:///Users/awieliczka/Projects/rikolti/rikolti_data -CONTENT_ROOT=file:///Users/awieliczka/Projects/rikolti/rikolti_content +RIKOLTI_CONTENT=file:///Users/awieliczka/Projects/rikolti/rikolti_content ``` Each of these can be different locations, however. For example, if you're attempting to re-run a mapper locally off of previously fetched data stored on s3, you might set `RIKOLTI_DATA=s3://rikolti_data`. diff --git a/content_harvester/README.md b/content_harvester/README.md index 9302a333c..e918b84ec 100644 --- a/content_harvester/README.md +++ b/content_harvester/README.md @@ -34,7 +34,7 @@ The above media and thumbnail fetching processes are enacted upon child metadata # Settings -You can bypass uploading to s3 by setting `RIKOLTI_DATA = "file://"` and `CONTENT_ROOT = "file://"`. This is useful for local development and testing. This will, however, set the metadata records' `media['media_filepath']` and `thumbnail['thumbnail_filepath']` to a local filepath (thus rendering the output useless for publishing). +You can bypass uploading to s3 by setting `RIKOLTI_DATA = "file://"` and `RIKOLTI_CONTENT = "file://"`. This is useful for local development and testing. This will, however, set the metadata records' `media['media_filepath']` and `thumbnail['thumbnail_filepath']` to a local filepath (thus rendering the output useless for publishing). # Local Development diff --git a/content_harvester/by_record.py b/content_harvester/by_record.py index 6674ec75f..da82ad4c0 100644 --- a/content_harvester/by_record.py +++ b/content_harvester/by_record.py @@ -400,9 +400,9 @@ def create_thumbnail_component( def upload_content(filepath: str, destination: str) -> str: ''' - upload file to CONTENT_ROOT + upload file to RIKOLTI_CONTENT ''' - content_root = os.environ.get("CONTENT_ROOT", 'file:///tmp') + content_root = os.environ.get("RIKOLTI_CONTENT", 'file:///tmp') content_path = f"{content_root.rstrip('/')}/{destination}" upload_file(filepath, content_path) return content_path \ No newline at end of file diff --git a/content_harvester/docker-compose.yml b/content_harvester/docker-compose.yml index e4a9fd3ef..fe8659da2 100644 --- a/content_harvester/docker-compose.yml +++ b/content_harvester/docker-compose.yml @@ -19,7 +19,7 @@ services: - ../utils:/rikolti/utils environment: - RIKOLTI_DATA=file:///rikolti_data - - CONTENT_ROOT=file:///rikolti_content + - RIKOLTI_CONTENT=file:///rikolti_content - NUXEO_USER=${NUXEO_USER} - NUXEO_PASS=${NUXEO_PASS} - CONTENT_COMPONENT_CACHE=${CONTENT_COMPONENT_CACHE} diff --git a/dags/shared_tasks/content_harvest_operators.py b/dags/shared_tasks/content_harvest_operators.py index b27b9d660..69a23fc20 100644 --- a/dags/shared_tasks/content_harvest_operators.py +++ b/dags/shared_tasks/content_harvest_operators.py @@ -74,7 +74,7 @@ def __init__(self, collection_id=None, with_content_urls_version=None, pages=Non "value": "s3://rikolti-data" }, { - "name": "CONTENT_ROOT", + "name": "RIKOLTI_CONTENT", "value": "s3://rikolti-content" }, { @@ -175,10 +175,10 @@ def __init__(self, collection_id, with_content_urls_version, pages, mapper_type, else: rikolti_data = "file:///rikolti_data" - if os.environ.get('CONTENT_ROOT', '').startswith('s3'): - content_root = os.environ.get('CONTENT_ROOT') + if os.environ.get('RIKOLTI_CONTENT', '').startswith('s3'): + rikolti_content = os.environ.get('RIKOLTI_CONTENT') else: - content_root = "file:///rikolti_content" + rikolti_content = "file:///rikolti_content" prefix, pages = extract_prefix_from_pages(pages) args = { @@ -197,7 +197,7 @@ def __init__(self, collection_id, with_content_urls_version, pages, mapper_type, "mount_tmp_dir": False, "environment": { "RIKOLTI_DATA": rikolti_data, - "CONTENT_ROOT": content_root, + "RIKOLTI_CONTENT": rikolti_content, "CONTENT_COMPONENT_CACHE": os.environ.get("CONTENT_COMPONENT_CACHE"), "NUXEO_USER": os.environ.get("NUXEO_USER"), "NUXEO_PASS": os.environ.get("NUXEO_PASS") diff --git a/env.example b/env.example index 96974e95b..d9530bf68 100644 --- a/env.example +++ b/env.example @@ -29,7 +29,7 @@ export UCLDC_COUCH_URL="https://harvest-prd.cdlib.org/" # this is co # content_harvester export NUXEO_USER= # ask for a user/pass - required to fetch Nuxeo Content export NUXEO_PASS= -export CONTENT_ROOT=file:///usr/local/airflow/rikolti_content +export RIKOLTI_CONTENT=file:///usr/local/airflow/rikolti_content export CONTENT_COMPONENT_CACHE= # s3:/// # indexer From 6953902e0b424ce0f9ab4ef791583fd39f71bba4 Mon Sep 17 00:00:00 2001 From: amy wieliczka Date: Wed, 2 Oct 2024 10:41:45 -0700 Subject: [PATCH 12/12] Add utils.s3copy - maintains s3 directory structure --- utils/s3copy.py | 65 ++++++++++++++++++++++++++++++++++++++++++++++++ utils/storage.py | 12 ++++++--- 2 files changed, 73 insertions(+), 4 deletions(-) create mode 100644 utils/s3copy.py diff --git a/utils/s3copy.py b/utils/s3copy.py new file mode 100644 index 000000000..5fe9c6035 --- /dev/null +++ b/utils/s3copy.py @@ -0,0 +1,65 @@ +import os +import boto3 +from .storage import DataStorage, parse_data_uri + + +def copy_s3_to_local(data: DataStorage, destination: str, **kwargs): + """ + Copy the object(s) located at data.path to the local filesystem at destination + """ + s3 = boto3.client('s3', **kwargs) + # list all objects at data.path + prefix = data.path.lstrip('/') + s3_objects = s3.list_objects_v2( + Bucket=data.bucket, + Prefix=prefix + ) + if 'Contents' not in s3_objects: + raise FileNotFoundError(f"Error: {data.uri} not found") + + for obj in s3_objects['Contents']: + target_path = os.path.join(destination, obj['Key'][len(prefix):]) + if not os.path.exists(os.path.dirname(target_path)): + os.makedirs(os.path.dirname(target_path), exist_ok=True) + s3.download_file( + data.bucket, + obj['Key'], + target_path + ) + + return destination + + +def copy_to_local(data_uri: str, destination: str, **kwargs): + """ + Copies the file located at data_uri to the local filesystem at destination. + """ + data = parse_data_uri(data_uri) + if data.store == 's3': + return copy_s3_to_local(data, destination, **kwargs) + else: + raise Exception(f"Unknown data store: {data.store}") + + +if __name__ == "__main__": + """ + Copy content from s3 to local filesystem, preserving directory + structure. Overwrites files of the same name at the same location, + but otherwise does not clean out directories prior to copying. + + Usage: + python s3copy.py s3://rikolti-data/26147/ ../rikolti_data/26147 + """ + import argparse + parser = argparse.ArgumentParser( + description="Copy content from s3 to local filesystem") + parser.add_argument( + 'data_uri', + help="s3://bucket-name/path/to/file" + ) + parser.add_argument( + 'destination_path', + help="path/to/destination" + ) + args = parser.parse_args() + print(copy_to_local(args.data_uri, args.destination)) \ No newline at end of file diff --git a/utils/storage.py b/utils/storage.py index dacf3eac8..413b98ed5 100644 --- a/utils/storage.py +++ b/utils/storage.py @@ -5,7 +5,7 @@ import shutil from urllib.parse import urlparse -from collections import namedtuple +from dataclasses import dataclass """ @@ -16,9 +16,13 @@ """ -DataStorage = namedtuple( - "DateStorage", "uri, store, bucket, path" -) +@dataclass +class DataStorage: + uri: str + store: str + bucket: str + path: str + def parse_data_uri(data_uri: str): data_loc = urlparse(data_uri)