Skip to content

Commit

Permalink
Migrate metadata (#1192)
Browse files Browse the repository at this point in the history
* dataset metadata is working

* register migration extractor and successfully migrate machine metadata
  • Loading branch information
longshuicy authored Aug 29, 2024
1 parent 86c8493 commit 2a52c7c
Show file tree
Hide file tree
Showing 3 changed files with 230 additions and 56 deletions.
1 change: 1 addition & 0 deletions backend/app/models/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
"int": int,
"float": float,
"str": str,
"string": str,
"TextField": str,
"bool": bool,
# TODO figure out how to parse "yyyymmdd hh:mm:ssssssz" into datetime object
Expand Down
260 changes: 208 additions & 52 deletions scripts/migration/migrate.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,12 @@
import requests
from dotenv import dotenv_values

from scripts.migration.migrate_metadata_definitions import (
check_metadata_definition_exists,
get_clowder_v1_metadata_definitions,
post_metadata_definition,
)

# Configuration and Constants
DEFAULT_PASSWORD = "Password123&"

Expand Down Expand Up @@ -63,33 +69,33 @@ def generate_user_api_key(user, password=DEFAULT_PASSWORD):

def get_clowder_v1_users():
"""Retrieve all users from Clowder v1."""
endpoint = f"{CLOWDER_V1}/api/users"
endpoint = f"{CLOWDER_V1}/api/users?superAdmin=true"
response = requests.get(endpoint, headers=base_headers_v1, verify=False)
return response.json()


def get_clowder_v1_user_datasets(user_id):
"""Retrieve datasets created by a specific user in Clowder v1."""
# TODO what about pagination
endpoint = f"{CLOWDER_V1}/api/datasets?limit=0"
endpoint = f"{CLOWDER_V1}/api/datasets?limit=0&superAdmin=true"
response = requests.get(endpoint, headers=clowder_headers_v1, verify=False)
return [dataset for dataset in response.json() if dataset["authorId"] == user_id]


def get_clowder_v1_user_spaces(user_v1):
endpoint = f"{CLOWDER_V1}/api/spaces"
endpoint = f"{CLOWDER_V1}/api/spaces?superAdmin=true"
response = requests.get(endpoint, headers=clowder_headers_v1, verify=False)
return [space for space in response.json() if space["creator"] == user_v1["id"]]


def get_clowder_v1_user_spaces_members(space_id):
endpoint = f"{CLOWDER_V1}/api/spaces/{space_id}/users"
endpoint = f"{CLOWDER_V1}/api/spaces/{space_id}/users?superAdmin=true"
response = requests.get(endpoint, headers=clowder_headers_v1, verify=False)
return response.json()


def get_clowder_v2_space_datasets(space_id):
endpoint = f"{CLOWDER_V1}/api/spaces/{space_id}/datasets"
endpoint = f"{CLOWDER_V1}/api/spaces/{space_id}/datasets?superAdmin=true"
response = requests.get(endpoint, headers=clowder_headers_v1, verify=False)
return response.json()

Expand Down Expand Up @@ -265,32 +271,180 @@ def download_and_upload_file(file, all_dataset_folders, dataset_v2_id, headers_v
dataset_file_upload_endpoint = f"{CLOWDER_V2}/api/v2/datasets/{dataset_v2_id}/files"
if matching_folder:
dataset_file_upload_endpoint += f"Multiple?folder_id={matching_folder['id']}"
file_exists = os.path.exists(filename)
# with open(filename, "rb") as file_data:
response = requests.post(
dataset_file_upload_endpoint, headers=headers_v2, files={"file": open(filename, "rb")}
dataset_file_upload_endpoint,
headers=headers_v2,
files={"file": open(filename, "rb")},
)

if response.status_code == 200:
print(f"Uploaded file: {filename} to dataset {dataset_v2_id}")

# Clean up the local file after upload
try:
os.remove(filename)
except Exception as e:
print(f"Could not delete locally downloaded file: {filename}")
print(e)
print(f"Completed upload for file: {filename}")

if response.status_code == 200:
print(f"Uploaded file: {filename} to dataset {dataset_v2_id}")
return response.json().get("id")
else:
print(f"Failed to upload file: {filename} to dataset {dataset_v2_id}")

return None


def add_file_metadata(file_v1, file_v2_id, headers_v1, headers_v2):
# Get metadata from Clowder V1
endpoint = f"{CLOWDER_V1}/api/files/{file_v1['id']}/metadata.jsonld?superAdmin=true"
metadata_v1 = requests.get(endpoint, headers=headers_v1).json()

# Iterate through the metadata and post it to Clowder V2
for metadata in metadata_v1:
# Extract and map each key-value pair from the metadata's content
if "content" in metadata:
for key, value in metadata["content"].items():
# Define the payload to send to V2
metadata_payload_v2 = {
"definition": key,
"content": metadata["content"],
}

# Check if the metadata definition exists;
# if exists, post to user metadat; otherwise, post to machine metadata
v2_metadata_endpoint = (
f"{CLOWDER_V2}/api/v2/files/{file_v2_id}/metadata"
)
if check_metadata_definition_exists(
CLOWDER_V2, key, headers=headers_v2
):
response = requests.post(
v2_metadata_endpoint,
json=metadata_payload_v2,
headers=headers_v2,
)

if response.status_code != 200:
print(f"Failed to post file metadata to V2: {response.text}")
else:
print(f"Successfully posted file metadata to V2: {key}")
else:
if "agent" in metadata and "listener" not in metadata:
metadata["listener"] = {
"name": "migration",
"version": "1",
"description": "Migration of metadata from Clowder v1 to Clowder v2",
}
response = requests.post(
v2_metadata_endpoint, json=metadata, headers=headers_v2
)

if response.status_code != 200:
print(f"Failed to post file metadata to V2: {response.text}")
else:
print("Successfully posted file machine metadata to V2")
break # machine metadata no need to iterate through all the keys


def add_dataset_metadata(dataset_v1, dataset_v2_id, headers_v1, headers_v2):
# Get metadata from Clowder V1
endpoint = (
f"{CLOWDER_V1}/api/datasets/{dataset_v1['id']}/metadata.jsonld?superAdmin=true"
)
metadata_v1 = requests.get(endpoint, headers=headers_v1).json()

# Iterate through the metadata and post it to Clowder V2
for metadata in metadata_v1:
# Extract and map each key-value pair from the metadata's content
if "content" in metadata:
for key, value in metadata["content"].items():
# Define the payload to send to V2
metadata_payload_v2 = {
"definition": key,
"content": metadata["content"],
}

# Check if the metadata definition exists;
# if exists, post to user metadat; otherwise, post to machine metadata
v2_metadata_endpoint = (
f"{CLOWDER_V2}/api/v2/datasets/{dataset_v2_id}/metadata"
)
if check_metadata_definition_exists(
CLOWDER_V2, key, headers=headers_v2
):
response = requests.post(
v2_metadata_endpoint,
json=metadata_payload_v2,
headers=headers_v2,
)

if response.status_code != 200:
print(f"Failed to post dataset metadata to V2: {response.text}")
else:
print(f"Successfully posted dataset metadata to V2: {key}")
else:
if "agent" in metadata and "listener" not in metadata:
metadata["listener"] = {
"name": "migration",
"version": "1",
"description": "Migration of metadata from Clowder v1 to Clowder v2",
}
response = requests.post(
v2_metadata_endpoint, json=metadata, headers=headers_v2
)

if response.status_code != 200:
print(f"Failed to post dataset metadata to V2: {response.text}")
else:
print("Successfully posted dataset machine metadata to V2")
break # machine metadata no need to iterate through all the keys


def register_migration_extractor():
"""Register the migration extractor in Clowder v2."""
migration_extractor = {
"name": "migration",
"description": "Migration of metadata from Clowder v1 to Clowder v2",
"version": "1",
"author": "Clowder Devs",
}

# check if migration extractor already exists
search_endpoint = f"{CLOWDER_V2}/api/v2/listeners/search"
search_params = {"text": migration_extractor["name"]}
search_response = requests.get(
search_endpoint, headers=clowder_headers_v2, params=search_params
)

# Check if extractor already exists
if search_response.status_code == 200:
search_data = search_response.json()
if search_data.get("metadata", {}).get("total_count", 0) > 0:
for existing_extractor in search_response.json().get("data", []):
if existing_extractor.get("name") == migration_extractor["name"]:
print(
f"Extractor {migration_extractor['name']} already exists in Clowder v2."
)
return

endpoint = f"{CLOWDER_V2}/api/v2/extractors"
response = requests.post(
endpoint, json=migration_extractor, headers=clowder_headers_v2
)

if response.status_code == 200:
print("Successfully registered migration extractor in Clowder v2.")
else:
print(
f"Failed to register migration extractor in Clowder v2. Status code: {response.status_code}"
)


def process_user_and_resources(user_v1, USER_MAP, DATASET_MAP):
"""Process user resources from Clowder v1 to Clowder v2."""
user_v1_datasets = get_clowder_v1_user_datasets(user_id=user_v1["id"])
user_v2_api_key = create_local_user(user_v1)
USER_MAP[user_v1["id"]] = user_v2_api_key
base_user_headers_v2 = {
"x-api-key": user_v2_api_key
}
base_user_headers_v2 = {"x-api-key": user_v2_api_key}
user_headers_v2 = {
"x-api-key": user_v2_api_key,
"content-type": "application/json",
Expand All @@ -301,6 +455,7 @@ def process_user_and_resources(user_v1, USER_MAP, DATASET_MAP):
print(f"Creating dataset in v2: {dataset['id']} - {dataset['name']}")
dataset_v2_id = create_v2_dataset(dataset, user_headers_v2)
DATASET_MAP[dataset["id"]] = dataset_v2_id
add_dataset_metadata(dataset, dataset_v2_id, base_headers_v1, user_headers_v2)
add_dataset_folders(dataset, dataset_v2_id, user_headers_v2)
print("Created folders in the new dataset")

Expand All @@ -316,34 +471,33 @@ def process_user_and_resources(user_v1, USER_MAP, DATASET_MAP):
files_result = files_response.json()

for file in files_result:
download_and_upload_file(
file_v2_id = download_and_upload_file(
file, all_dataset_folders, dataset_v2_id, base_user_headers_v2
)
if file_v2_id is not None:
add_file_metadata(file, file_v2_id, clowder_headers_v1, user_headers_v2)

return [USER_MAP, DATASET_MAP]


if __name__ == "__main__":
# users_v1 = get_clowder_v1_users()
##############################################################################################################
# migrate metadata definition
v1_md_definitions = get_clowder_v1_metadata_definitions(CLOWDER_V1, base_headers_v1)
posted_ids = []
for v1_md in v1_md_definitions:
definition_id = post_metadata_definition(v1_md, CLOWDER_V2, clowder_headers_v2)
if definition_id:
posted_ids.append(definition_id)

##############################################################################################################
# Register the migration extractor in Clowder v2
register_migration_extractor()

##############################################################################################################
# migrate users and resources
USER_MAP = {}
DATASET_MAP = {}
users_v1 = [
{
"@context": {
"firstName": "http://schema.org/Person/givenName",
"lastName": "http://schema.org/Person/familyName",
"email": "http://schema.org/Person/email",
"affiliation": "http://schema.org/Person/affiliation",
},
"id": "576313ce1407b25fe19fc381",
"firstName": "Chen",
"lastName": "Wang",
"fullName": "Chen Wang",
"email": "[email protected]",
"avatar": "http://www.gravatar.com/avatar/2f97a52f2214949c4172d7fb796f173e?d=404",
"profile": {},
"identityProvider": "Chen Wang ([email protected]) [Local Account]",
}
]
users_v1 = get_clowder_v1_users()
for user_v1 in users_v1:
if (
Expand All @@ -357,21 +511,23 @@ def process_user_and_resources(user_v1, USER_MAP, DATASET_MAP):
else:
print(f"Skipping user {user_v1['email']} as it is not a local account.")

print("Now migrating spaces.")
for user_v1 in users_v1:
print(f"Migrating spaces of user {user_v1['email']}")
user_v1_spaces = get_clowder_v1_user_spaces(user_v1)
user_v2_api_key = USER_MAP[user_v1["id"]]
for space in user_v1_spaces:
group_id = create_v2_group(space, headers={"X-API-key": user_v2_api_key})
add_v1_space_members_to_v2_group(
space, group_id, headers={"X-API-key": user_v2_api_key}
)
space_datasets = get_clowder_v2_space_datasets(space["id"])
for space_dataset in space_datasets:
dataset_v2_id = DATASET_MAP[space_dataset["id"]]
share_dataset_with_group(
group_id, space, headers={"X-API-key": user_v2_api_key}
)
print(f"Migrated spaces of user {user_v1['email']}")
##############################################################################################################
# migrate spaces
# print("Now migrating spaces.")
# for user_v1 in users_v1:
# print(f"Migrating spaces of user {user_v1['email']}")
# user_v1_spaces = get_clowder_v1_user_spaces(user_v1)
# user_v2_api_key = USER_MAP[user_v1["id"]]
# for space in user_v1_spaces:
# group_id = create_v2_group(space, headers={"X-API-key": user_v2_api_key})
# add_v1_space_members_to_v2_group(
# space, group_id, headers={"X-API-key": user_v2_api_key}
# )
# space_datasets = get_clowder_v2_space_datasets(space["id"])
# for space_dataset in space_datasets:
# dataset_v2_id = DATASET_MAP[space_dataset["id"]]
# share_dataset_with_group(
# group_id, space, headers={"X-API-key": user_v2_api_key}
# )
# print(f"Migrated spaces of user {user_v1['email']}")
print("Migration complete.")
Loading

0 comments on commit 2a52c7c

Please sign in to comment.