From 41e7f58f6133372ffe8653a75db0527e42908bb5 Mon Sep 17 00:00:00 2001 From: Tom Gebhardt Date: Wed, 24 Jul 2024 21:33:49 +0200 Subject: [PATCH] * added missing indicator functions TODO: make it run all functions + testing --- app/celery/automated_tasks/csh_fair.py | 154 ++++++++++++++++++++----- 1 file changed, 124 insertions(+), 30 deletions(-) diff --git a/app/celery/automated_tasks/csh_fair.py b/app/celery/automated_tasks/csh_fair.py index 9421d35..f1c92c7 100644 --- a/app/celery/automated_tasks/csh_fair.py +++ b/app/celery/automated_tasks/csh_fair.py @@ -16,7 +16,6 @@ def is_doi(identifier: str): # Use the re.match function to check if the string matches the pattern print("identifier: ", identifier) match = re.match(doi_pattern, identifier) - print("?? match") print("doi match: ", match) return False #return bool(re.match(doi_pattern, identifier)) @@ -72,6 +71,20 @@ def incoperate_results(task_dict: dict, result: 'app.models.TaskStatus', test: b # Works, but does not trigger updating of children # redis_app.json().set(f"session:{session_id}", f".tasks.{task_id}.status", obj=result) +@app.task +def csh_f1_1_persistent_identifier(task_dict: dict, data: dict, test: bool = False): + """ + check if metadata has: resource > identifier attribute + """ + identifier = check_route(data, ["resource", "resource_identifier"]) + if identifier: + result = True + else: + result = False + + incoperate_results(task_dict, result, test) + + @app.task def csh_f1_2_globally_unique_identifier(task_dict: dict, data: dict, test: bool = False): """ @@ -95,54 +108,86 @@ def assessment_task(task_dict: dict, data: dict) -> None: :param data: (Meta)Data to evaluate :return: None """ - - print("CHECK!! globally uni ") identifier = check_route(data, ["resource", "resource_identifier"]) print("grabbed identifier: ", identifier) #could also retrive "type" from data instead of using .startswith if(identifier is False): result = "failed" - elif(is_doi(identifier)): + elif(is_doi(identifier)): #TODO: extend by a DOI check when CSH starts supporting DOIs result = "success" - elif(identifier.startswith("DRKS")): + elif(identifier.startswith("DRKS")) and is_url_reachable("https://drks.de/search/de/trial/" + identifier): result = "success" else: result = "failed" incoperate_results(task_dict, result, test) + + def is_url_reachable(url): + try: + response = requests.head(url, timeout=5) # Send a HEAD request to check if the server is reachable + return response.status_code < 400 # If the status code is less than 400, the URL is reachable + except requests.RequestException: + return False # If any exception occurs (e.g., timeout), consider the URL unreachable + + # Example usage: + url = "https://www.example.com" + if is_url_reachable(url): + print("URL is reachable") + else: + print("URL is not reachable") -# @app.task -# def csh_f1_1_persistent_identifier(task_dict: dict, data: dict, test: bool = False): - -# """ -# Task to test weather an identifier is persistent. -# Since the identifier is either unique for CSH, it is persistent -# """ -# result = "success" +@app.task +def csh_f2_rich_metadata_provided(task_dict: dict, data: dict, test: bool = False): + """ + mandatory: + resource.identifier + resource.keywords + resource.classification + resource.descriptions.language + resource.descriptions.text + resource.contributors + resource.provenance + + conditional: + resource.nonStudyDetails + design + """ -# incoperate_results(task_dict, result, test) + result = "success" + if check_route(data, ["resource", "identifier"]): result = False + if check_route(data, ["resource", "keywords"]): result = False + if check_route(data, ["resource", "descriptions", "language"]): result = False + if check_route(data, ["resource", "descriptions", "text"]): result = False + if check_route(data, ["resource", "contributors"]): result = False + if check_route(data, ["resource", "provenance"]): result = False -# @app.task -# def csh_f2_rich_metadata_provided(task_dict: dict, data: dict, test: bool = False): -# """ -# The nature of the CSH with all its mandatory fields implies a success -# """ + #TODO: handle conditional attributes; waiting for info + incoperate_results(task_dict, result, test) -# result = "success" +@app.task +def csh_f3_id_of_data_included(task_dict: dict, data: dict, test: bool = False): + """ + resource.ids.identifier + resource.ids.scheme + resource.ids.relationType -# incoperate_results(task_dict, result, test) + for each ressource id in resource.ids -> check if relationType is “A describes B” or “A is metadata for B” + """ + #print(check_route(["resource", "ids"])) -# @app.task -# def csh_f3_id_of_do_included(task_dict: dict, data: dict, test: bool = False): -# """ -# we are unsure about this indicator. At the moment we consider it as a fail -# """ + result = "failed" -# result = "success" + for el in check_route(["resource", "ids"]): + if not hasattr(el, 'identifier') or not hasattr(el, 'scheme') or not hasattr(el, 'relationType'): + incoperate_results(task_dict, "failed", test) + else: + #TODO: check if its enough to have one relation out of all resources + if el.relationType == 'A describes B' or el.relationType == 'A is metadata for B': + result = "success" -# incoperate_results(task_dict, result, test) + incoperate_results(task_dict, result, test) # @app.task @@ -256,8 +301,8 @@ def csh_r1_1_01_has_reuse_license(task_dict: dict, data: dict, test: bool = Fals @app.task #TODO: verify if this automated task really works since it depends on a parent task def csh_r1_1_02_has_standard_reuse_license(task_dict: dict, data: dict, test: bool = False): #check if userights label is a fitting license - license_label = check_route(data,["resource", "nonStudyDetails", "nonStudyDetails", "useRights"]) - if(license_label in ["Creative Commons Zero v1.0 Universal", "Creative Commons Attribution 4.0 International", "Creative Commons Attribution Non Commercial 4.0 International", "Creative Commons Attribution Share Alike 4.0 International", "Creative Commons Attribution Non Commercial Share Alike 4.0 International"]): + license_label = check_route(data,["resource", "nonStudyDetails", "nonStudyDetails", "useRights", "label"]) + if(license_label in ["CC0 1.0 (Creative Commons Zero v1.0 Universal)", "CC BY 4.0 (Creative Commons Attribution 4.0 International)", "CC BY-NC 4.0 (Creative Commons Attribution Non Commercial 4.0 International)", "CC BY-SA 4.0 (Creative Commons Attribution Share Alike 4.0 International)", "CC BY-NC-SA 4.0 (Creative Commons Attribution Non Commercial Share Alike 4.0 International)"]): result = "success" elif(license_label == "Other"): result = "warning" @@ -265,3 +310,52 @@ def csh_r1_1_02_has_standard_reuse_license(task_dict: dict, data: dict, test: bo result = "failed" incoperate_results(task_dict, result, test) +# Fitting licenses according to indicators: CC0 1.0, CC BY 4.0, CC BY-NC 4.0, CC BY-SA 4.0, CC BY-NC-SA 4.0 +""" Allowed values in CSH: +CC0 1.0 (Creative Commons Zero v1.0 Universal) +CC BY 4.0 (Creative Commons Attribution 4.0 International) +CC BY-NC 4.0 (Creative Commons Attribution Non Commercial 4.0 International) +CC BY-SA 4.0 (Creative Commons Attribution Share Alike 4.0 International) +CC BY-NC-SA 4.0 (Creative Commons Attribution Non Commercial Share Alike 4.0 International) +All rights reserved +Other +Not applicable +Not assigned +Unknown +""" +@app.task #TODO: verify if this automated task really works since it depends on a parent task +def csh_r1_1_03_has_machine_readable_reuse_license(task_dict: dict, data: dict, test: bool = False): + license_label = check_route(data,["resource", "nonStudyDetails", "nonStudyDetails", "useRights", "label"]) + if(license_label in ["Creative Commons Zero v1.0 Universal", "Creative Commons Attribution 4.0 International", "Creative Commons Attribution Non Commercial 4.0 International", "Creative Commons Attribution Share Alike 4.0 International", "Creative Commons Attribution Non Commercial Share Alike 4.0 International"]): + result = "success" + else: + result = "failed" + incoperate_results(task_dict, result, test) + +@app.task +def csh_r1_2_01_has_provenance_information(task_dict: dict, data: dict, test: bool = False): + provenance_info = check_route(data, ["resource", "provenance"]) + if(provenance_info): + if(provenance_info.get("verificationDate") and provenance_info.get("dataSource") and provenance_info.get("firstSubmittedDate") and provenance_info.get("lastUpdatePostedDate")): + result = "success" + else: + result = "failed" + incoperate_results(task_dict, result, test) + +# currently the same as the attribute before according to the indicators doc +@app.task +def csh_r1_2_02_has_standardized_provenance_information(task_dict: dict, data: dict, test: bool = False): + provenance_info = check_route(data, ["resource", "provenance"]) + if(provenance_info): + if(provenance_info.get("verificationDate") and provenance_info.get("dataSource") and provenance_info.get("firstSubmittedDate") and provenance_info.get("lastUpdatePostedDate")): + result = "success" + else: + result = "failed" + incoperate_results(task_dict, result, test) + + +#@app.task ## implicit pass according to indicators doc +#def csh_r1_3_01_metadata_standardized(task_dict: dict, data: dict, test: bool = False): + +#@app.task ## implicit pass according to indicators doc +#def csh_r1_3_02_metadata_stadardized_machine_readable(task_dict: dict, data: dict, test: bool = False):