Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: collection and item ingest notebook #125

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
394 changes: 394 additions & 0 deletions transformation-scripts/collection-and-item-workflows-ingest.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,394 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Notebook to Publish Collections and Start Discovery Workflow"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"This notebook publishes the collections in `/ingestion-data/collections` excluding:\n",
"- 'hls-l30-002-ej-reprocessed'\n",
"- 'hls-s30-002-ej-reprocessed'\n",
"- 'ls8-covid-19-example-data'\n",
"- 'landsat-c2l2-sr-antarctic-glaciers-pine-island'\n",
"- 'landsat-c2l2-sr-lakes-aral-sea'\n",
"- 'landsat-c2l2-sr-lakes-tonle-sap'\n",
"- 'landsat-c2l2-sr-lakes-lake-balaton'\n",
"- 'landsat-c2l2-sr-lakes-vanern'\n",
"- 'landsat-c2l2-sr-antarctic-glaciers-thwaites'\n",
"- 'landsat-c2l2-sr-lakes-lake-biwa'\n",
"- 'combined_CMIP6_daily_GISS-E2-1-G_tas_kerchunk_DEMO'"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import glob\n",
"import os\n",
"import json\n",
"import requests"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The following cell retrieves collection JSON files from `/ingestion-data/collections/` and save collectionIds to a list."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"excluded_collections = [\n",
" \"hls-l30-002-ej-reprocessed\",\n",
" \"hls-s30-002-ej-reprocessed\",\n",
" \"ls8-covid-19-example-data\",\n",
" \"landsat-c2l2-sr-antarctic-glaciers-pine-island\",\n",
" \"landsat-c2l2-sr-lakes-aral-sea\",\n",
" \"landsat-c2l2-sr-lakes-tonle-sap\",\n",
" \"landsat-c2l2-sr-lakes-lake-balaton\",\n",
" \"landsat-c2l2-sr-lakes-vanern\",\n",
" \"landsat-c2l2-sr-antarctic-glaciers-thwaites\",\n",
" \"landsat-c2l2-sr-lakes-lake-biwa\",\n",
" \"combined_CMIP6_daily_GISS-E2-1-G_tas_kerchunk_DEMO\",\n",
"]\n",
"\n",
"collection_json_file_paths = glob.glob(\"../ingestion-data/collections/*.json\")\n",
"filtered_list = [\n",
" item\n",
" for item in json_file_paths\n",
" if all(\n",
" excluded_collections not in item\n",
" for excluded_collections in excluded_collections\n",
" )\n",
"]\n",
"\n",
"file_paths_and_collection_ids = [\n",
" {\"filePath\": file_path, \"collectionId\": data[\"id\"]}\n",
" for file_path in filtered_list\n",
" if \"id\" in (data := json.load(open(file_path, \"r\")))\n",
"]"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Set the testing mode to `True` when testing and `False` otherwise. When the testing mode is `True`, the notebook will be set to run against `dev` endpoints."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"testing_mode = True"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The following cell compares files in '/ingestion/collections' with those in 'ingestion/staging/discovery-items' or 'ingestion/production/discovery-items' and returns a list of all the discovery-items that have a corresponding collection."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def find_matching_file_names(collections_list, discovery_items_list):\n",
" matching_file_names = []\n",
" for collection_filename in collections_list:\n",
" collection_json = load_json_file(collection_filename)\n",
" id1 = collection_json.get(\"id\")\n",
" if id1 is not None:\n",
" for discovery_items_filename in discovery_items_list:\n",
" item_json = load_json_file(discovery_items_filename)\n",
" if isinstance(item_json, list):\n",
" if len(item_json) > 0:\n",
" collection2 = item_json[0].get(\"collection\")\n",
" else:\n",
" collection2 = item_json.get(\"collection\")\n",
"\n",
" if collection2 is not None:\n",
" if collection2 == id1:\n",
" # Found a match\n",
" matching_file_names.append(discovery_items_filename)\n",
" break\n",
" return matching_file_names\n",
"\n",
"\n",
"def load_json_file(file_path):\n",
" with open(file_path, \"r\") as file:\n",
" return json.load(file)\n",
"\n",
"\n",
"collections_files = \"../ingestion-data/collections/\"\n",
"discovery_items_files = (\n",
" \"../ingestion-data/staging/discovery-items/\"\n",
" if testing_mode\n",
" else \"../ingestion-data/production/discovery-items/\"\n",
")\n",
"\n",
"discovery_items_json_file_paths = (\n",
" glob.glob(\"../ingestion-data/staging/discovery-items//*.json\")\n",
" if testing_mode\n",
" else glob.glob(\"../ingestion-data/production/discovery-items//*.json\")\n",
")\n",
"# Find matching file names\n",
"matching_file_names = find_matching_file_names(\n",
" collections_json_file_paths, discovery_items_json_file_paths\n",
")\n",
"\n",
"# for file_pair in matching_file_names:\n",
"# print(\"Match found:\")\n",
"# print(\"File 1:\", file_pair[0])\n",
"# print(\"File 2:\", file_pair[1])\n",
"discovery_items_to_process = matching_file_names\n",
"print(discovery_items_to_process)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Have your Cognito `username` and `password` ready to set up Cognito Client to retrieve a token that will be used to access the STAC Ingestor API."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"test_endpoint = \"https://test.openveda.cloud\"\n",
"test_client_id = \"CHANGE ME\"\n",
"test_user_pool_id = \"CHANGE ME\"\n",
"test_identity_pool_id = \"CHANGE ME\"\n",
"\n",
"mcp_prod_endpoint = \"https://openveda.cloud\"\n",
"mcp_prod_client_id = \"CHANGE ME\"\n",
"mcp_prod_user_pool_id = \"CHANGE ME\"\n",
"mcp_prod_identity_pool_id = \"CHANGE ME\"\n",
"\n",
"print(f\"TESTING MODE? {testing_mode}\")\n",
"if testing_mode:\n",
" STAC_INGESTOR_API = f\"{test_endpoint}/api/ingest/\"\n",
" VEDA_STAC_API = f\"{test_endpoint}/api/stac/\"\n",
" WORKFLOWS_API = \"https://4hrks0hk0b.execute-api.us-west-2.amazonaws.com/docs/\"\n",
"else:\n",
" STAC_INGESTOR_API = f\"{mcp_prod_endpoint}/api/ingest/\"\n",
" VEDA_STAC_API = f\"{mcp_prod_endpoint}/api/stac/\"\n",
" WORKFLOWS_API = \"https://bct2n8in53.execute-api.us-west-2.amazonaws.com/docs/\""
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The following cell sets up headers for requests."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"TOKEN = \"REPLACE ME\"\n",
"\n",
"authorization_header = f\"Bearer {TOKEN}\"\n",
"headers = {\n",
" \"Authorization\": authorization_header,\n",
" \"content-type\": \"application/json\",\n",
" \"accept\": \"application/json\",\n",
"}"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The following cell defines the function that will post the collection."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def post_collection(collection, collection_id):\n",
" collection_url = f\"{VEDA_STAC_API}collections/{collection_id}\"\n",
" ingest_url = f\"{STAC_INGESTOR_API}collections\"\n",
"\n",
" try:\n",
" response = requests.post(ingest_url, json=collection, headers=headers)\n",
" response.raise_for_status()\n",
" if response.status_code == 201:\n",
" print(\n",
" f\"Request was successful. Find the updated collection at {collection_url}\"\n",
" )\n",
" else:\n",
" print(\n",
" f\"Updating {collection_id} failed. Request failed with status code: {response.status_code}\"\n",
" )\n",
" except requests.RequestException as e:\n",
" print(\n",
" f\"Updating {collection_id} failed. An error occurred during the request: {e}\"\n",
" )\n",
" except Exception as e:\n",
" print(\n",
" f\"An unexpected error occurred while trying to update {collection_id}: {e}\"\n",
" )\n",
"\n",
"\n",
"def ingest_discovery_item(discovery_item):\n",
" discovery_url = f\"{WORKFLOWS_API}/discovery\"\n",
" try:\n",
" response = requests.post(\n",
" discovery_url, json=ingest_discovery_item, headers=headers\n",
" )\n",
" response.raise_for_status()\n",
" if response.status_code == 201:\n",
" print(f\"Request was successful. \")\n",
" else:\n",
" print(\n",
" f\"Kicking off discovery for {ingest_discovery_item} failed. Request failed with status code: {response.status_code}\"\n",
" )\n",
" except requests.RequestException as e:\n",
" print(\n",
" f\"Kicking off discovery for {ingest_discovery_item} failed. An error occurred during the request: {e}\"\n",
" )\n",
" except Exception as e:\n",
" print(\n",
" f\"An unexpected error occurred while trying to kick off discovery for {item} failed: {e}\"\n",
" )"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"If testing_mode is enabled, use a test list:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"test_file_paths_and_collection_ids = [file_paths_and_collection_ids[0]]\n",
"test_discovery_item = [f\"../ingestion-data/staging/discovery-items/{file_paths_and_collection_ids[0].get(\"collectionId\")}.json\"]\n",
"\n",
"print(test_discovery_item)\n",
"print(test_file_paths_and_collection_ids)\n",
"print(VEDA_STAC_API)\n",
"\n",
"file_paths_and_collection_ids = (\n",
" test_file_paths_and_collection_ids\n",
" if testing_mode\n",
" else file_paths_and_collection_ids\n",
")\n",
"discovery_items_to_process = (\n",
" test_discovery_item\n",
" if testing_mode\n",
" else discovery_items_to_process\n",
")\n",
"\n",
"print(file_paths_and_collection_ids)\n",
"print(discovery_items_to_process)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The following cell publishes the collection to the target ingestion `api/collections` endpoint."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"for collection in file_paths_and_collection_ids:\n",
" collection_id = collection[\"collectionId\"]\n",
" file_path = collection[\"filePath\"]\n",
"\n",
" try:\n",
" with open(file_path, \"r\", encoding=\"utf-8\") as file:\n",
" collection = json.load(file)\n",
"\n",
" # Publish the updated collection to the target ingestion `api/collections` endpoint\n",
" post_collection(collection, collection_id)\n",
"\n",
" except requests.RequestException as e:\n",
" print(f\"An error occurred for collectionId {collection_id}: {e}\")\n",
" except Exception as e:\n",
" print(f\"An unexpected error occurred for collectionId {collection_id}: {e}\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"for discovery_item in discovery_items_to_process:\n",
" try:\n",
" with open(discovery_item, \"r\", encoding=\"utf-8\") as file:\n",
" discovery_item_json = json.load(file)\n",
"\n",
" # Publish the updated collection to the target ingestion `api/collections` endpoint\n",
" if isinstance(discovery_item_json, list):\n",
" for single_discovery_item in discovery_item_json:\n",
" ingest_discovery_item(single_discovery_item)\n",
" else:\n",
" ingest_discovery_item(discovery_item_json)\n",
"\n",
" except requests.RequestException as e:\n",
" print(f\"An error occurred for discovery item {discovery_item}: {e}\")\n",
" except Exception as e:\n",
" print(f\"An unexpected error occurred for discovery item {discovery_item}: {e}\")"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "venv",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.12.1"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
Loading