Skip to content

Commit

Permalink
* added missing indicator functions
Browse files Browse the repository at this point in the history
TODO: make it run all functions + testing
  • Loading branch information
Tom Gebhardt committed Jul 24, 2024
1 parent a83aa08 commit 41e7f58
Showing 1 changed file with 124 additions and 30 deletions.
154 changes: 124 additions & 30 deletions app/celery/automated_tasks/csh_fair.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ def is_doi(identifier: str):
# Use the re.match function to check if the string matches the pattern
print("identifier: ", identifier)
match = re.match(doi_pattern, identifier)
print("?? match")
print("doi match: ", match)
return False
#return bool(re.match(doi_pattern, identifier))
Expand Down Expand Up @@ -72,6 +71,20 @@ def incoperate_results(task_dict: dict, result: 'app.models.TaskStatus', test: b
# Works, but does not trigger updating of children
# redis_app.json().set(f"session:{session_id}", f".tasks.{task_id}.status", obj=result)

@app.task
def csh_f1_1_persistent_identifier(task_dict: dict, data: dict, test: bool = False):
"""
check if metadata has: resource > identifier attribute
"""
identifier = check_route(data, ["resource", "resource_identifier"])
if identifier:
result = True
else:
result = False

incoperate_results(task_dict, result, test)


@app.task
def csh_f1_2_globally_unique_identifier(task_dict: dict, data: dict, test: bool = False):
"""
Expand All @@ -95,54 +108,86 @@ def assessment_task(task_dict: dict, data: dict) -> None:
:param data: (Meta)Data to evaluate
:return: None
"""

print("CHECK!! globally uni ")
identifier = check_route(data, ["resource", "resource_identifier"])
print("grabbed identifier: ", identifier)
#could also retrive "type" from data instead of using .startswith
if(identifier is False):
result = "failed"
elif(is_doi(identifier)):
elif(is_doi(identifier)): #TODO: extend by a DOI check when CSH starts supporting DOIs
result = "success"
elif(identifier.startswith("DRKS")):
elif(identifier.startswith("DRKS")) and is_url_reachable("https://drks.de/search/de/trial/" + identifier):
result = "success"
else:
result = "failed"

incoperate_results(task_dict, result, test)


def is_url_reachable(url):
try:
response = requests.head(url, timeout=5) # Send a HEAD request to check if the server is reachable
return response.status_code < 400 # If the status code is less than 400, the URL is reachable
except requests.RequestException:
return False # If any exception occurs (e.g., timeout), consider the URL unreachable

# Example usage:
url = "https://www.example.com"
if is_url_reachable(url):
print("URL is reachable")
else:
print("URL is not reachable")

# @app.task
# def csh_f1_1_persistent_identifier(task_dict: dict, data: dict, test: bool = False):

# """
# Task to test weather an identifier is persistent.
# Since the identifier is either unique for CSH, it is persistent
# """

# result = "success"
@app.task
def csh_f2_rich_metadata_provided(task_dict: dict, data: dict, test: bool = False):
"""
mandatory:
resource.identifier
resource.keywords
resource.classification
resource.descriptions.language
resource.descriptions.text
resource.contributors
resource.provenance
conditional:
resource.nonStudyDetails
design
"""

# incoperate_results(task_dict, result, test)
result = "success"
if check_route(data, ["resource", "identifier"]): result = False
if check_route(data, ["resource", "keywords"]): result = False
if check_route(data, ["resource", "descriptions", "language"]): result = False
if check_route(data, ["resource", "descriptions", "text"]): result = False
if check_route(data, ["resource", "contributors"]): result = False
if check_route(data, ["resource", "provenance"]): result = False

# @app.task
# def csh_f2_rich_metadata_provided(task_dict: dict, data: dict, test: bool = False):
# """
# The nature of the CSH with all its mandatory fields implies a success
# """
#TODO: handle conditional attributes; waiting for info
incoperate_results(task_dict, result, test)

# result = "success"
@app.task
def csh_f3_id_of_data_included(task_dict: dict, data: dict, test: bool = False):
"""
resource.ids.identifier
resource.ids.scheme
resource.ids.relationType
# incoperate_results(task_dict, result, test)
for each ressource id in resource.ids -> check if relationType is “A describes B” or “A is metadata for B”
"""
#print(check_route(["resource", "ids"]))

# @app.task
# def csh_f3_id_of_do_included(task_dict: dict, data: dict, test: bool = False):
# """
# we are unsure about this indicator. At the moment we consider it as a fail
# """
result = "failed"

# result = "success"
for el in check_route(["resource", "ids"]):
if not hasattr(el, 'identifier') or not hasattr(el, 'scheme') or not hasattr(el, 'relationType'):
incoperate_results(task_dict, "failed", test)
else:
#TODO: check if its enough to have one relation out of all resources
if el.relationType == 'A describes B' or el.relationType == 'A is metadata for B':
result = "success"

# incoperate_results(task_dict, result, test)
incoperate_results(task_dict, result, test)


# @app.task
Expand Down Expand Up @@ -256,12 +301,61 @@ def csh_r1_1_01_has_reuse_license(task_dict: dict, data: dict, test: bool = Fals
@app.task #TODO: verify if this automated task really works since it depends on a parent task
def csh_r1_1_02_has_standard_reuse_license(task_dict: dict, data: dict, test: bool = False):
#check if userights label is a fitting license
license_label = check_route(data,["resource", "nonStudyDetails", "nonStudyDetails", "useRights"])
if(license_label in ["Creative Commons Zero v1.0 Universal", "Creative Commons Attribution 4.0 International", "Creative Commons Attribution Non Commercial 4.0 International", "Creative Commons Attribution Share Alike 4.0 International", "Creative Commons Attribution Non Commercial Share Alike 4.0 International"]):
license_label = check_route(data,["resource", "nonStudyDetails", "nonStudyDetails", "useRights", "label"])
if(license_label in ["CC0 1.0 (Creative Commons Zero v1.0 Universal)", "CC BY 4.0 (Creative Commons Attribution 4.0 International)", "CC BY-NC 4.0 (Creative Commons Attribution Non Commercial 4.0 International)", "CC BY-SA 4.0 (Creative Commons Attribution Share Alike 4.0 International)", "CC BY-NC-SA 4.0 (Creative Commons Attribution Non Commercial Share Alike 4.0 International)"]):
result = "success"
elif(license_label == "Other"):
result = "warning"
else:
result = "failed"
incoperate_results(task_dict, result, test)

# Fitting licenses according to indicators: CC0 1.0, CC BY 4.0, CC BY-NC 4.0, CC BY-SA 4.0, CC BY-NC-SA 4.0
""" Allowed values in CSH:
CC0 1.0 (Creative Commons Zero v1.0 Universal)
CC BY 4.0 (Creative Commons Attribution 4.0 International)
CC BY-NC 4.0 (Creative Commons Attribution Non Commercial 4.0 International)
CC BY-SA 4.0 (Creative Commons Attribution Share Alike 4.0 International)
CC BY-NC-SA 4.0 (Creative Commons Attribution Non Commercial Share Alike 4.0 International)
All rights reserved
Other
Not applicable
Not assigned
Unknown
"""
@app.task #TODO: verify if this automated task really works since it depends on a parent task
def csh_r1_1_03_has_machine_readable_reuse_license(task_dict: dict, data: dict, test: bool = False):
license_label = check_route(data,["resource", "nonStudyDetails", "nonStudyDetails", "useRights", "label"])
if(license_label in ["Creative Commons Zero v1.0 Universal", "Creative Commons Attribution 4.0 International", "Creative Commons Attribution Non Commercial 4.0 International", "Creative Commons Attribution Share Alike 4.0 International", "Creative Commons Attribution Non Commercial Share Alike 4.0 International"]):
result = "success"
else:
result = "failed"
incoperate_results(task_dict, result, test)

@app.task
def csh_r1_2_01_has_provenance_information(task_dict: dict, data: dict, test: bool = False):
provenance_info = check_route(data, ["resource", "provenance"])
if(provenance_info):
if(provenance_info.get("verificationDate") and provenance_info.get("dataSource") and provenance_info.get("firstSubmittedDate") and provenance_info.get("lastUpdatePostedDate")):
result = "success"
else:
result = "failed"
incoperate_results(task_dict, result, test)

# currently the same as the attribute before according to the indicators doc
@app.task
def csh_r1_2_02_has_standardized_provenance_information(task_dict: dict, data: dict, test: bool = False):
provenance_info = check_route(data, ["resource", "provenance"])
if(provenance_info):
if(provenance_info.get("verificationDate") and provenance_info.get("dataSource") and provenance_info.get("firstSubmittedDate") and provenance_info.get("lastUpdatePostedDate")):
result = "success"
else:
result = "failed"
incoperate_results(task_dict, result, test)


#@app.task ## implicit pass according to indicators doc
#def csh_r1_3_01_metadata_standardized(task_dict: dict, data: dict, test: bool = False):

#@app.task ## implicit pass according to indicators doc
#def csh_r1_3_02_metadata_stadardized_machine_readable(task_dict: dict, data: dict, test: bool = False):

0 comments on commit 41e7f58

Please sign in to comment.