Skip to content

Commit

Permalink
Enable convenient, shorter ingest definitions for single-asset items
Browse files Browse the repository at this point in the history
  • Loading branch information
moradology committed Aug 16, 2023
1 parent d1d0207 commit ed2d7d5
Showing 1 changed file with 25 additions and 1 deletion.
26 changes: 25 additions & 1 deletion dags/veda_data_pipeline/utils/s3_discovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,15 @@ def group_by_item(discovered_files: List[str], id_regex: str, assets: dict) -> d
items_with_assets.append(item)
return items_with_assets

def construct_single_asset_items(discovered_files: List[str]) -> dict:
items_with_assets = []
for uri in discovered_files:
# Each file gets its matched asset type and id
filename = uri.split("/")[-1]
prefix = "/".join(uri.split("/")[:-1])
item = { "item_id": filename, "assets": { "cog_default": { "href": f"{prefix}/{filename}" } } }
items_with_assets.append(item)
return items_with_assets

def generate_payload(s3_prefix_key: str, payload: dict):
"""Generate a payload and write it to an S3 file.
Expand Down Expand Up @@ -155,6 +164,8 @@ def s3_discovery_handler(event, chunk_size=2800, role_arn=None, bucket_output=No
id_template = event.get("id_template", collection + "-{}")
date_fields = propagate_forward_datetime_args(event)
dry_run = event.get("dry_run", False)
if dry_run:
print("Running discovery in dry run mode")

payload = {**event, "objects": []}
slice = event.get("slice")
Expand All @@ -174,11 +185,24 @@ def s3_discovery_handler(event, chunk_size=2800, role_arn=None, bucket_output=No
f"s3://{bucket}/{obj['Key']}"
for obj in discover_from_s3(s3_iterator, filename_regex)
]
items_with_assets = group_by_item(file_uris, id_regex, assets)

if len(file_uris) == 0:
raise ValueError(f"No files discovered at bucket: {bucket}, prefix: {prefix}")

# out of convenience, we might not always want to explicitly define assets
if assets is not None:
items_with_assets = group_by_item(file_uris, id_regex, assets)
else:
items_with_assets = construct_single_asset_items(file_uris)

if len(items_with_assets) == 0:
raise ValueError(f"No items could be constructed for files at bucket: {bucket}, prefix: {prefix}")

# Update IDs using id_template
for item in items_with_assets:
item["item_id"] = id_template.format(item["item_id"])


if dry_run:
print(f"-DRYRUN- Discovered {len(items_with_assets)} items")
for idx in range(0, min(10, len(items_with_assets))):
Expand Down

0 comments on commit ed2d7d5

Please sign in to comment.