Skip to content

Commit

Permalink
[ENH] Add route for fetching available versions of a pipeline across …
Browse files Browse the repository at this point in the history
…n-APIs (#125)

* make query params optional in n-API request, & create CRUD func for fetching pipeline vers

* add pipelines router with endpoint for fetching versions

* test new endpoint

* add TODO
  • Loading branch information
alyssadai authored Oct 18, 2024
1 parent eacdaa7 commit 1c879d7
Show file tree
Hide file tree
Showing 5 changed files with 113 additions and 7 deletions.
49 changes: 47 additions & 2 deletions app/api/crud.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,10 +156,9 @@ async def get_terms(data_element_URI: str):
node_errors = []
unique_terms_dict = {}

params = {data_element_URI: data_element_URI}
tasks = [
util.send_get_request(
node_url + "attributes/" + data_element_URI, params
node_url + "attributes/" + data_element_URI,
)
for node_url in util.FEDERATION_NODES
]
Expand Down Expand Up @@ -187,3 +186,49 @@ async def get_terms(data_element_URI: str):
cross_node_results=cross_node_results,
node_errors=node_errors,
)


async def get_pipeline_versions(pipeline_term: str):
"""
Make a GET request to all available node APIs for available versions of a specified pipeline.
Parameters
----------
pipeline_term : str
Controlled term of pipeline for which all the available terms should be retrieved.
Returns
-------
dict
Dictionary where the key is the pipeline term and the value is the list of unique available (i.e. used) versions of the pipeline.
"""
# TODO: The logic in this function is very similar to get_terms. Consider refactoring to reduce code duplication.
node_errors = []
all_pipe_versions = []

tasks = [
util.send_get_request(f"{node_url}pipelines/{pipeline_term}/versions")
for node_url in util.FEDERATION_NODES
]
responses = await asyncio.gather(*tasks, return_exceptions=True)

for (node_url, node_name), response in zip(
util.FEDERATION_NODES.items(), responses
):
if isinstance(response, HTTPException):
node_errors.append(
{"node_name": node_name, "error": response.detail}
)
logging.warning(
f"Request to node {node_name} ({node_url}) did not succeed: {response.detail}"
)
else:
all_pipe_versions.extend(response[pipeline_term])

cross_node_results = {pipeline_term: sorted(list(set(all_pipe_versions)))}

return build_combined_response(
total_nodes=len(util.FEDERATION_NODES),
cross_node_results=cross_node_results,
node_errors=node_errors,
)
25 changes: 25 additions & 0 deletions app/api/routers/pipelines.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
from fastapi import APIRouter, Response, status
from pydantic import constr

from .. import crud
from ..models import CONTROLLED_TERM_REGEX, CombinedAttributeResponse

router = APIRouter(prefix="/pipelines", tags=["pipelines"])


@router.get(
"/{pipeline_term}/versions", response_model=CombinedAttributeResponse
)
async def get_pipeline_versions(
pipeline_term: constr(regex=CONTROLLED_TERM_REGEX), response: Response
):
"""
When a GET request is sent, return a dict where the key is the pipeline term and the value
is a list containing all available versions of a pipeline, across all nodes known to the f-API.
"""
response_dict = await crud.get_pipeline_versions(pipeline_term)

if response_dict["errors"]:
response.status_code = status.HTTP_207_MULTI_STATUS

return response_dict
9 changes: 5 additions & 4 deletions app/api/utility.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ def validate_query_node_url_list(node_urls: list) -> list:


async def send_get_request(
url: str, params: list, timeout: float = None
url: str, params: list = None, timeout: float = None
) -> dict:
"""
Makes a GET request to one or more Neurobagel nodes.
Expand All @@ -229,8 +229,10 @@ async def send_get_request(
----------
url : str
URL of Neurobagel node API.
params : list
Neurobagel query parameters.
params : list, optional
Neurobagel query parameters, by default None.
timeout : float, optional
Timeout for the request, by default None.
Returns
-------
Expand All @@ -253,7 +255,6 @@ async def send_get_request(
# APIs behind a proxy can be reached
follow_redirects=True,
)

if not response.is_success:
raise HTTPException(
status_code=response.status_code,
Expand Down
3 changes: 2 additions & 1 deletion app/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from fastapi.responses import HTMLResponse, ORJSONResponse, RedirectResponse

from .api import utility as util
from .api.routers import attributes, nodes, query
from .api.routers import attributes, nodes, pipelines, query
from .api.security import check_client_id

logger = logging.getLogger("nb-f-API")
Expand Down Expand Up @@ -99,6 +99,7 @@ def overridden_redoc():

app.include_router(query.router)
app.include_router(attributes.router)
app.include_router(pipelines.router)
app.include_router(nodes.router)

# Automatically start uvicorn server on execution of main.py
Expand Down
34 changes: 34 additions & 0 deletions tests/test_pipelines.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import httpx
from fastapi import status


def test_unique_pipeline_versions_returned_from_nodes(
test_app, monkeypatch, set_valid_test_federation_nodes
):
"""
Test that given a successful request to two nodes for versions of a specific pipeline term,
the API correctly returns a list of unique versions across both nodes.
"""

# Predefine the responses from the mocked n-APIs set using the fixture set_valid_test_federation_nodes
async def mock_httpx_get(self, **kwargs):
if "https://firstpublicnode.org/" in kwargs["url"]:
mocked_response_json = {"np:pipeline1": ["1.0.0", "1.0.1"]}
else:
mocked_response_json = {"np:pipeline1": ["1.0.1", "1.0.2"]}
return httpx.Response(
status_code=200,
json=mocked_response_json,
)

monkeypatch.setattr(httpx.AsyncClient, "get", mock_httpx_get)

response = test_app.get("/pipelines/np:pipeline1/versions")
assert response.status_code == status.HTTP_200_OK

response_object = response.json()
assert len(response_object["errors"]) == 0
assert response_object["responses"] == {
"np:pipeline1": ["1.0.0", "1.0.1", "1.0.2"]
}
assert response_object["nodes_response_status"] == "success"

0 comments on commit 1c879d7

Please sign in to comment.