From 5684a84479ee38e138ccebc213763bbdb2ff2daf Mon Sep 17 00:00:00 2001 From: Vincent Emonet Date: Thu, 11 Apr 2024 18:38:28 +0200 Subject: [PATCH] fix dcr participants --- README.md | 10 ++-- backend/src/auth.py | 9 +++ backend/src/decentriq.py | 40 +++++++++---- backend/src/explore.py | 114 +------------------------------------ backend/src/main.py | 2 + backend/src/mapping.py | 120 +++++++++++++++++++++++++++++++++++++++ 6 files changed, 168 insertions(+), 127 deletions(-) create mode 100644 backend/src/mapping.py diff --git a/README.md b/README.md index c200fb6..e75fc57 100644 --- a/README.md +++ b/README.md @@ -1,8 +1,8 @@ # 🫀 iCARE4CVD Cohort Explorer -Webapp enabling to upload and explore cohorts metadata built for the [iCARE4CVD project](https://icare4cvd.eu). +Webapp enabling to upload and explore cohorts metadata, built for the [iCARE4CVD project](https://icare4cvd.eu). -It interacts with a privacy computing platform ([Decentriq](https://www.decentriq.com/)) to create secure workspace where data scientists can run analysis on the selected cohorts (the cohorts data is uploaded only to Decentriq, the explorer only uses cohorts metadata). +It interacts with a privacy computing platform ([Decentriq](https://www.decentriq.com/)) to create secure workspace where data scientists can run analysis on the selected cohorts. The cohorts data is uploaded only to Decentriq, the explorer only uses cohorts metadata. It aims to enable *data custodians* and *data scientists* to: @@ -32,7 +32,7 @@ This platform is composed of 3 main components: * **[Oxigraph](https://github.com/oxigraph/oxigraph) triplestore database** containing the cohorts and their variables metadata, exposing a SPARQL endpoint only available to the backend API. * The data stored in the triplestore complies with the custom **[iCARE4CVD OWL ontology](https://maastrichtu-ids.github.io/cohort-explorer/)**. It contains 3 classes: Cohort, Variable, and Variable category. You can explore the ontology classes and properties [here](https://maastrichtu-ids.github.io/cohort-explorer/browse). * Oxigraph has not yet reached release 1.0, but it is already stable enough for our currently expected use. It has the advantages of being open source (important for accountability and trust), and developed in Europe. If missing features appears to be blocking, consider migrating to [OpenLink Virtuoso](https://github.com/openlink/virtuoso-opensource), you'll only need to update the function that upload a RDFLib graph as file. - + * **`backend/` server**, built with python, FastAPI and RDFLib. * **`frontend/` web app** running in the client browser, built with TypeScript, NextJS, ReactJS, TailwindCSS, and DaisyUI. @@ -52,6 +52,8 @@ This platform is composed of 3 main components: ## 🧑‍💻 Development +[![Update ontology documentation](https://github.com/MaastrichtU-IDS/cohort-explorer/actions/workflows/docs.yml/badge.svg)](https://github.com/MaastrichtU-IDS/cohort-explorer/actions/workflows/docs.yml) + ### 📥 Install dependencies 1. Install [hatch](https://hatch.pypa.io/latest/) for managing python projects, and [pnpm](https://pnpm.io/installation) for TS/JS projects @@ -154,7 +156,7 @@ Put the excel spreadsheet with all cohorts metadata in `data/iCARE4CVD_Cohorts.x We currently use [nginx-proxy](https://github.com/nginx-proxy/nginx-proxy) for routing through environment variables in the `docker-compose.yml` file, you can change for the proxy of your liking. -## 🪄 Administration +## 🪄 Database administration ### 🗑️ Reset database diff --git a/backend/src/auth.py b/backend/src/auth.py index 8385c5c..320dc9b 100644 --- a/backend/src/auth.py +++ b/backend/src/auth.py @@ -121,6 +121,15 @@ async def auth_callback(code: str) -> RedirectResponse: "https://explorer.icare4cvd.eu" in access_payload["aud"] and "read:icare4cvd-dataset-descriptions" in access_payload["permissions"] ): + # TODO: for LUCE blockchain: check if user email has a blockchain address + # Where? Either a JSON file on the server, or in the triplestore + # blockchain_addrs = json.load(settings.data_folder / "blockchain_addresses.json") + # if id_payload["email"] not in blockchain_addrs: + # blockchain_addrs[id_payload["email"]] = "0x1234567890" + # json.dump(blockchain_addrs, settings.data_folder / "blockchain_addresses.json") + + + # Reuse expiration time from decentriq Auth0 access token exp_timestamp = access_payload["exp"] jwt_token = create_access_token( diff --git a/backend/src/decentriq.py b/backend/src/decentriq.py index 49c45c9..9add12a 100644 --- a/backend/src/decentriq.py +++ b/backend/src/decentriq.py @@ -159,6 +159,8 @@ async def create_compute_dcr( .with_description("A data clean room to run computations on cohorts for the iCARE4CVD project") ) + participants = {} + participants[user["email"]] = {"data_owner_of": set(), "analyst_of": set()} preview_nodes = [] # Convert cohort variables to decentriq schema for cohort_id, cohort in selected_cohorts.items(): @@ -168,8 +170,7 @@ async def create_compute_dcr( data_nodes.append(data_node_id) # TODO: made airlock always True for testing - # if cohort.airlock: - if True: + if cohort.airlock: # Add airlock node to make it easy to access small part of the dataset preview_node_id = f"preview-{data_node_id}" builder.add_node_definition(PreviewComputeNodeDefinition( @@ -181,21 +182,24 @@ async def create_compute_dcr( # Add data owners to provision the data for owner in cohort.cohort_email: - builder.add_participant(owner, data_owner_of=[data_node_id]) + if owner not in participants: + participants[owner] = {"data_owner_of": set(), "analyst_of": set()} + participants[owner]["data_owner_of"].add(owner) # Add pandas preparation script pandas_script = "import pandas as pd\nimport decentriq_util\n\n" df_var = f"df_{cohort_id.replace('-', '_')}" requested_vars = cohorts_request["cohorts"][cohort_id] + + # Direct cohort variables list if isinstance(requested_vars, list): - # Direct cohort variables list pandas_script += f'{df_var} = decentriq_util.read_tabular_data("/input/{cohort_id}")\n' - if len(requested_vars) <= len(cohort.variables): # Add filter variables to pandas script pandas_script += f"{df_var} = {df_var}[{requested_vars}]\n" + + # TODO: Merge operation, need to be implemented on the frontend elif isinstance(requested_vars, dict): - # Merge operation, need to be implemented on the frontend pandas_script += pandas_script_merge_cohorts(requested_vars, all_cohorts) # TODO: add merged cohorts schema to selected_cohorts else: @@ -206,12 +210,26 @@ async def create_compute_dcr( builder.add_node_definition( PythonComputeNodeDefinition(name=f"prepare-{cohort_id}", script=pandas_script, dependencies=[data_node_id]) ) - builder.add_participant(user["email"], analyst_of=[f"prepare-{cohort_id}"]) + # builder.add_participant(user["email"], analyst_of=[f"prepare-{cohort_id}"]) - # Add users permissions - if airlock: - builder.add_participant(user["email"], analyst_of=preview_nodes) - builder.add_participant(settings.decentriq_email, data_owner_of=data_nodes) + # Add the requester as analyst of prepare script + participants[user["email"]]["analyst_of"].add(f"prepare-{cohort_id}") + + # Add users permissions for previews + for prev_node in preview_nodes: + participants[user["email"]]["analyst_of"].add(prev_node) + + # TODO: we add the DQ admin as data owner just for testing for now + # Will need to be removed when the workflow has been tested + for data_node in data_nodes: + participants[user["email"]]["data_owner_of"].add(data_node) + + for p_email, p_perm in participants.items(): + builder.add_participant( + p_email, + data_owner_of=list(p_perm["data_owner_of"]), + analyst_of=list(p_perm["analyst_of"]), + ) # Build and publish DCR dcr_definition = builder.build() diff --git a/backend/src/explore.py b/backend/src/explore.py index 2b6ea01..89eca22 100644 --- a/backend/src/explore.py +++ b/backend/src/explore.py @@ -2,13 +2,13 @@ from typing import Any import requests -from fastapi import APIRouter, Depends, HTTPException, Query +from fastapi import APIRouter, Depends, HTTPException from fastapi.responses import FileResponse from src.auth import get_current_user from src.config import settings from src.models import Cohort -from src.utils import converter, retrieve_cohorts_metadata, run_query +from src.utils import retrieve_cohorts_metadata router = APIRouter() @@ -33,113 +33,3 @@ async def download_cohort_spreasheet(cohort_id: str, user: Any = Depends(get_cur # If no file is found, return an error response raise HTTPException(status_code=404, detail=f"No data dictionary found for cohort ID '{cohort_id}'") - - -@router.get("/search-concepts") -async def search_concepts( - query: str, domain: list[str] | None = Query(default=None), user: Any = Depends(get_current_user) -): - """Search for concepts in the Athena API and check how many time those concepts are use in our KG.""" - if not domain: - domain = [] - vocabs = ["LOINC", "ATC", "SNOMED"] # "RxNorm" - try: - response = requests.get( - "https://athena.ohdsi.org/api/v1/concepts", - params={ - "query": query, - "domain": domain, - "vocabulary": vocabs, - "standardConcept": ["Standard", "Classification"], - "pageSize": 15, - "page": 1, - }, - headers={ - # We need to fake the user agent to avoid a 403 error. What a bunch of douchebags, and incompetent with this! Try again losers! - "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3" - }, - timeout=60, - ) - response.raise_for_status() - search_res = response.json().get("content", []) - except Exception: - raise HTTPException(status_code=response.status_code, detail="Error fetching data from OHDSI Athena API") - - # print(search_res) - found_concepts = [] - for res in search_res: - # Convert snomed CURIE to snomedct - concept_id = f"{'snomedct' if res.get('vocabulary').lower() == 'snomed' else res.get('vocabulary').lower()}:{res.get('id')}" - found_concepts.append( - { - "id": concept_id, - "uri": converter.expand(concept_id), - "label": res.get("name"), - "domain": res.get("domain"), - "vocabulary": res.get("vocabulary"), - "used_by": [], - } - ) - - found_concepts_filter = " ".join([f"<{concept['uri']}>" for concept in found_concepts]) - sparql_query = f""" - PREFIX icare: - PREFIX rdf: - PREFIX rdfs: - PREFIX dc: - PREFIX dcterms: - - SELECT DISTINCT ?cohortId ?varName ?varLabel ?omopDomain ?mappedId - WHERE {{ - VALUES ?mappedId {{ {found_concepts_filter} }} - GRAPH ?cohortMetadataGraph {{ - ?cohort a icare:Cohort ; - dc:identifier ?cohortId . - }} - - GRAPH ?cohortVarGraph {{ - ?cohort icare:hasVariable ?variable . - ?variable a icare:Variable ; - dc:identifier ?varName ; - rdfs:label ?varLabel ; - icare:varType ?varType ; - icare:index ?index . - OPTIONAL {{ ?variable icare:omop ?omopDomain }} - }} - - {{ - GRAPH ?cohortMappingsGraph {{ - ?variable icare:mappedId ?mappedId . - }} - }} UNION {{ - GRAPH ?cohortVarGraph {{ - ?variable icare:categories ?category. - }} - GRAPH ?cohortMappingsGraph {{ - ?category icare:mappedId ?mappedId . - }} - }} - OPTIONAL {{ ?mappedId rdfs:label ?mappedLabel }} - }} - ORDER BY ?cohort ?index - """ - # TODO also get mappings from categories? - # print(sparql_query) - for row in run_query(sparql_query)["results"]["bindings"]: - # print(row) - # Find the concept in the list and add the cohort and variable to the used_by list - for concept in found_concepts: - if concept["uri"] == row["mappedId"]["value"]: - used_by_entry = { - "cohort_id": row["cohortId"]["value"], - "var_name": row["varName"]["value"], - "var_label": row["varLabel"]["value"], - "omop_domain": row["omopDomain"]["value"] if "omopDomain" in row else None, - } - # NOTE: Normally the SPARQL query should note return duplicates, but in case it does: - # existing_entries = [entry for entry in concept["used_by"] if entry["cohort_id"] == used_by_entry["cohort_id"] and entry["var_name"] == used_by_entry["var_name"]] - # if not existing_entries: - concept["used_by"].append(used_by_entry) - break - found_concepts.sort(key=lambda x: len(x["used_by"]), reverse=True) - return found_concepts diff --git a/backend/src/main.py b/backend/src/main.py index 04250a7..b5400b1 100644 --- a/backend/src/main.py +++ b/backend/src/main.py @@ -6,6 +6,7 @@ from src.config import settings from src.decentriq import router as decentriq_router from src.explore import router as explore_router +from src.mapping import router as mapping_router from src.upload import init_triplestore from src.upload import router as upload_router @@ -17,6 +18,7 @@ ) app.include_router(explore_router, tags=["explore"]) +app.include_router(mapping_router, tags=["mapping"]) app.include_router(upload_router, tags=["upload"]) app.include_router(decentriq_router, tags=["upload"]) app.include_router(auth_router, tags=["authentication"]) diff --git a/backend/src/mapping.py b/backend/src/mapping.py new file mode 100644 index 0000000..486b8b5 --- /dev/null +++ b/backend/src/mapping.py @@ -0,0 +1,120 @@ +from typing import Any + +import requests +from fastapi import APIRouter, Depends, HTTPException, Query + +from src.auth import get_current_user +from src.utils import converter, run_query + +router = APIRouter() + + +@router.get("/search-concepts") +async def search_concepts( + query: str, domain: list[str] | None = Query(default=None), user: Any = Depends(get_current_user) +): + """Search for concepts in the Athena API and check how many time those concepts are use in our KG.""" + if not domain: + domain = [] + vocabs = ["LOINC", "ATC", "SNOMED"] # "RxNorm" + # TODO: Komal implement the search from her model + try: + response = requests.get( + "https://athena.ohdsi.org/api/v1/concepts", + params={ + "query": query, + "domain": domain, + "vocabulary": vocabs, + "standardConcept": ["Standard", "Classification"], + "pageSize": 15, + "page": 1, + }, + headers={ + # We need to fake the user agent to avoid a 403 error. What a bunch of douchebags, and incompetent with this! Try again losers! + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3" + }, + timeout=60, + ) + response.raise_for_status() + search_res = response.json().get("content", []) + except Exception: + raise HTTPException(status_code=response.status_code, detail="Error fetching data from OHDSI Athena API") + + # print(search_res) + found_concepts = [] + for res in search_res: + # Convert snomed CURIE to snomedct + concept_id = f"{'snomedct' if res.get('vocabulary').lower() == 'snomed' else res.get('vocabulary').lower()}:{res.get('id')}" + found_concepts.append( + { + "id": concept_id, + "uri": converter.expand(concept_id), + "label": res.get("name"), + "domain": res.get("domain"), + "vocabulary": res.get("vocabulary"), + "used_by": [], + } + ) + + found_concepts_filter = " ".join([f"<{concept['uri']}>" for concept in found_concepts]) + sparql_query = f""" + PREFIX icare: + PREFIX rdf: + PREFIX rdfs: + PREFIX dc: + PREFIX dcterms: + + SELECT DISTINCT ?cohortId ?varName ?varLabel ?omopDomain ?mappedId + WHERE {{ + VALUES ?mappedId {{ {found_concepts_filter} }} + GRAPH ?cohortMetadataGraph {{ + ?cohort a icare:Cohort ; + dc:identifier ?cohortId . + }} + + GRAPH ?cohortVarGraph {{ + ?cohort icare:hasVariable ?variable . + ?variable a icare:Variable ; + dc:identifier ?varName ; + rdfs:label ?varLabel ; + icare:varType ?varType ; + icare:index ?index . + OPTIONAL {{ ?variable icare:omop ?omopDomain }} + }} + + {{ + GRAPH ?cohortMappingsGraph {{ + ?variable icare:mappedId ?mappedId . + }} + }} UNION {{ + GRAPH ?cohortVarGraph {{ + ?variable icare:categories ?category. + }} + GRAPH ?cohortMappingsGraph {{ + ?category icare:mappedId ?mappedId . + }} + }} + OPTIONAL {{ ?mappedId rdfs:label ?mappedLabel }} + }} + ORDER BY ?cohort ?index + """ + # TODO also get mappings from categories? + # print(sparql_query) + for row in run_query(sparql_query)["results"]["bindings"]: + # print(row) + # Find the concept in the list and add the cohort and variable to the used_by list + for concept in found_concepts: + if concept["uri"] == row["mappedId"]["value"]: + used_by_entry = { + "cohort_id": row["cohortId"]["value"], + "var_name": row["varName"]["value"], + "var_label": row["varLabel"]["value"], + "omop_domain": row["omopDomain"]["value"] if "omopDomain" in row else None, + } + # NOTE: Normally the SPARQL query should note return duplicates, but in case it does: + # existing_entries = [entry for entry in concept["used_by"] if entry["cohort_id"] == used_by_entry["cohort_id"] and entry["var_name"] == used_by_entry["var_name"]] + # if not existing_entries: + concept["used_by"].append(used_by_entry) + break + found_concepts.sort(key=lambda x: len(x["used_by"]), reverse=True) + return found_concepts