diff --git a/app/api/crud.py b/app/api/crud.py index f492ec4..4adb306 100644 --- a/app/api/crud.py +++ b/app/api/crud.py @@ -3,12 +3,36 @@ import asyncio import warnings -from fastapi import HTTPException, status -from fastapi.responses import JSONResponse +from fastapi import HTTPException from . import utility as util +def build_combined_response( + total_nodes: int, cross_node_results: list | dict, node_errors: list +) -> dict: + """Return a combined response containing all the nodes' responses and errors. Logs to console a summary of the federated request.""" + content = {"errors": node_errors, "responses": cross_node_results} + + if node_errors: + # TODO: Use logger instead of print. For example of how to set this up for FastAPI, see https://github.com/tiangolo/fastapi/discussions/8517 + print( + f"Requests to {len(node_errors)}/{total_nodes} nodes failed: {[node_error['node_name'] for node_error in node_errors]}." + ) + if len(node_errors) == total_nodes: + # See https://fastapi.tiangolo.com/advanced/additional-responses/ for more info + content["nodes_response_status"] = "fail" + else: + content["nodes_response_status"] = "partial success" + else: + print( + f"Requests to all nodes succeeded ({total_nodes}/{total_nodes})." + ) + content["nodes_response_status"] = "success" + + return content + + async def get( min_age: float, max_age: float, @@ -57,7 +81,6 @@ async def get( node_errors = [] node_urls = util.validate_query_node_url_list(node_urls) - total_nodes = len(node_urls) # Node API query parameters params = {} @@ -92,51 +115,25 @@ async def get( node_errors.append( {"node_name": node_name, "error": response.detail} ) + # TODO: Replace with logger warnings.warn( - f"Query to node {node_name} ({node_url}) did not succeed: {response.detail}" + f"Request to node {node_name} ({node_url}) did not succeed: {response.detail}" ) else: for result in response: result["node_name"] = node_name cross_node_results.extend(response) - if node_errors: - # TODO: Use logger instead of print, see https://github.com/tiangolo/fastapi/issues/5003 - print( - f"Queries to {len(node_errors)}/{total_nodes} nodes failed: {[node_error['node_name'] for node_error in node_errors]}." - ) - - if len(node_errors) == total_nodes: - # See https://fastapi.tiangolo.com/advanced/additional-responses/ for more info - return JSONResponse( - status_code=status.HTTP_207_MULTI_STATUS, - content={ - "errors": node_errors, - "responses": cross_node_results, - "nodes_response_status": "fail", - }, - ) - return JSONResponse( - status_code=status.HTTP_207_MULTI_STATUS, - content={ - "errors": node_errors, - "responses": cross_node_results, - "nodes_response_status": "partial success", - }, - ) - - print(f"All nodes queried successfully ({total_nodes}/{total_nodes}).") - return { - "errors": node_errors, - "responses": cross_node_results, - "nodes_response_status": "success", - } + return build_combined_response( + total_nodes=len(node_urls), + cross_node_results=cross_node_results, + node_errors=node_errors, + ) async def get_terms(data_element_URI: str): - # TODO: Make this path able to handle partial successes as well """ - Makes a GET request to one or more Neurobagel node APIs using send_get_request utility function where the only parameter is a data element URI. + Makes a GET request to all available Neurobagel node APIs using send_get_request utility function where the only parameter is a data element URI. Parameters ---------- @@ -148,20 +145,38 @@ async def get_terms(data_element_URI: str): dict Dictionary where the key is the Neurobagel class and values correspond to all the unique terms representing available (i.e. used) instances of that class. """ - cross_node_results = [] - params = {data_element_URI: data_element_URI} + node_errors = [] + unique_terms_dict = {} - for node_url in util.FEDERATION_NODES: - response = util.send_get_request( + params = {data_element_URI: data_element_URI} + tasks = [ + util.send_get_request( node_url + "attributes/" + data_element_URI, params ) + for node_url in util.FEDERATION_NODES + ] + responses = await asyncio.gather(*tasks, return_exceptions=True) - cross_node_results.append(response) - - unique_terms_dict = {} + for (node_url, node_name), response in zip( + util.FEDERATION_NODES.items(), responses + ): + if isinstance(response, HTTPException): + node_errors.append( + {"node_name": node_name, "error": response.detail} + ) + # TODO: Replace with logger + warnings.warn( + f"Request to node {node_name} ({node_url}) did not succeed: {response.detail}" + ) + else: + # Build the dictionary of unique term-label pairings from all nodes + for term_dict in response[data_element_URI]: + unique_terms_dict[term_dict["TermURL"]] = term_dict - for list_of_terms in cross_node_results: - for term in list_of_terms[data_element_URI]: - unique_terms_dict[term["TermURL"]] = term + cross_node_results = {data_element_URI: list(unique_terms_dict.values())} - return {data_element_URI: list(unique_terms_dict.values())} + return build_combined_response( + total_nodes=len(util.FEDERATION_NODES), + cross_node_results=cross_node_results, + node_errors=node_errors, + ) diff --git a/app/api/models.py b/app/api/models.py index 59ffc18..0627f72 100644 --- a/app/api/models.py +++ b/app/api/models.py @@ -62,3 +62,11 @@ class CombinedQueryResponse(BaseModel): errors: list[NodeError] responses: list[CohortQueryResponse] nodes_response_status: NodesResponseStatus + + +class CombinedAttributeResponse(BaseModel): + """Data model for the combined available terms for a given Neurobagel attribute/variable across all available nodes.""" + + errors: list[NodeError] + responses: dict + nodes_response_status: NodesResponseStatus diff --git a/app/api/routers/attributes.py b/app/api/routers/attributes.py index 1c711f9..5de6061 100644 --- a/app/api/routers/attributes.py +++ b/app/api/routers/attributes.py @@ -1,15 +1,26 @@ -from fastapi import APIRouter +from fastapi import APIRouter, Response, status from pydantic import constr from .. import crud -from ..models import CONTROLLED_TERM_REGEX +from ..models import CONTROLLED_TERM_REGEX, CombinedAttributeResponse router = APIRouter(prefix="/attributes", tags=["attributes"]) -@router.get("/{data_element_URI}") -async def get_terms(data_element_URI: constr(regex=CONTROLLED_TERM_REGEX)): +# We use the Response parameter below to change the status code of the response while still being able to validate the returned data using the response model. +# (see https://fastapi.tiangolo.com/advanced/response-change-status-code/ for more info). +# +# TODO: if our response model for fully successful vs. not fully successful responses grows more complex in the future, +# consider additionally using https://fastapi.tiangolo.com/advanced/additional-responses/#additional-response-with-model to document +# example responses for different status codes in the OpenAPI docs (less relevant for now since there is only one response model). +@router.get("/{data_element_URI}", response_model=CombinedAttributeResponse) +async def get_terms( + data_element_URI: constr(regex=CONTROLLED_TERM_REGEX), response: Response +): """When a GET request is sent, return a list dicts with the only key corresponding to controlled term of a neurobagel class and value corresponding to all the available terms.""" - response = await crud.get_terms(data_element_URI) + response_dict = await crud.get_terms(data_element_URI) - return response + if response_dict["errors"]: + response.status_code = status.HTTP_207_MULTI_STATUS + + return response_dict diff --git a/app/api/routers/query.py b/app/api/routers/query.py index 6c70472..c5a526a 100644 --- a/app/api/routers/query.py +++ b/app/api/routers/query.py @@ -1,6 +1,6 @@ """Router for query path operations.""" -from fastapi import APIRouter, Depends +from fastapi import APIRouter, Depends, Response, status from .. import crud from ..models import CombinedQueryResponse, QueryModel @@ -8,10 +8,18 @@ router = APIRouter(prefix="/query", tags=["query"]) +# We use the Response parameter below to change the status code of the response while still being able to validate the returned data using the response model. +# (see https://fastapi.tiangolo.com/advanced/response-change-status-code/ for more info). +# +# TODO: if our response model for fully successful vs. not fully successful responses grows more complex in the future, +# consider additionally using https://fastapi.tiangolo.com/advanced/additional-responses/#additional-response-with-model to document +# example responses for different status codes in the OpenAPI docs (less relevant for now since there is only one response model). @router.get("/", response_model=CombinedQueryResponse) -async def get_query(query: QueryModel = Depends(QueryModel)): +async def get_query( + response: Response, query: QueryModel = Depends(QueryModel) +): """When a GET request is sent, return list of dicts corresponding to subject-level metadata aggregated by dataset.""" - response = await crud.get( + response_dict = await crud.get( query.min_age, query.max_age, query.sex, @@ -24,4 +32,7 @@ async def get_query(query: QueryModel = Depends(QueryModel)): query.node_url, ) - return response + if response_dict["errors"]: + response.status_code = status.HTTP_207_MULTI_STATUS + + return response_dict diff --git a/tests/conftest.py b/tests/conftest.py index 3d9516b..a8b4b5f 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,6 +1,8 @@ +import httpx import pytest from starlette.testclient import TestClient +from app.api import utility as util from app.main import app @@ -8,3 +10,28 @@ def test_app(): client = TestClient(app) yield client + + +@pytest.fixture(scope="function") +def set_valid_test_federation_nodes(monkeypatch): + """Set two correctly formatted federation nodes for a test function (mocks the result of reading/parsing available public and local nodes on startup).""" + monkeypatch.setattr( + util, + "FEDERATION_NODES", + { + "https://firstpublicnode.org/": "First Public Node", + "https://secondpublicnode.org/": "Second Public Node", + }, + ) + + +@pytest.fixture() +def mock_failed_connection_httpx_get(): + """Return a mock for the httpx.AsyncClient.get method that raises a ConnectError when called.""" + + async def _mock_httpx_get_with_connect_error(self, **kwargs): + # The self parameter is necessary to match the signature of the method being mocked, + # which is a class method of the httpx.AsyncClient class (see https://www.python-httpx.org/api/#asyncclient). + raise httpx.ConnectError("Some connection error") + + return _mock_httpx_get_with_connect_error diff --git a/tests/test_attributes.py b/tests/test_attributes.py new file mode 100644 index 0000000..6dc6c7d --- /dev/null +++ b/tests/test_attributes.py @@ -0,0 +1,83 @@ +import httpx +import pytest +from fastapi import status + + +def test_partially_failed_terms_fetching_handled_gracefully( + test_app, + monkeypatch, + set_valid_test_federation_nodes, +): + """ + When some nodes fail while getting term instances for an attribute (/attribute/{data_element_URI}), + the overall API get request still succeeds, and the response includes a list of the encountered errors along with the successfully fetched terms. + """ + mocked_node_attribute_response = { + "nb:Assessment": [ + { + "TermURL": "cogatlas:trm_56a9137d9dce1", + "Label": "behavioral approach/inhibition systems", + }, + { + "TermURL": "cogatlas:trm_55a6a8e81b7f4", + "Label": "Barratt Impulsiveness Scale", + }, + ] + } + + async def mock_httpx_get(self, **kwargs): + # The self parameter is necessary to match the signature of the method being mocked, + # which is a class method of the httpx.AsyncClient class (see https://www.python-httpx.org/api/#asyncclient). + if ( + kwargs["url"] + == "https://secondpublicnode.org/attributes/nb:Assessment" + ): + return httpx.Response( + status_code=500, json={}, text="Some internal server error" + ) + return httpx.Response( + status_code=200, + json=mocked_node_attribute_response, + ) + + monkeypatch.setattr(httpx.AsyncClient, "get", mock_httpx_get) + + with pytest.warns(UserWarning): + response = test_app.get("/attributes/nb:Assessment") + + assert response.status_code == status.HTTP_207_MULTI_STATUS + + response_object = response.json() + assert response_object["errors"] == [ + { + "node_name": "Second Public Node", + "error": "Internal Server Error: Some internal server error", + } + ] + assert response_object["responses"] == mocked_node_attribute_response + assert response_object["nodes_response_status"] == "partial success" + + +def test_fully_failed_terms_fetching_handled_gracefully( + test_app, + monkeypatch, + mock_failed_connection_httpx_get, + set_valid_test_federation_nodes, +): + """ + When *all* nodes fail while getting term instances for an attribute (/attribute/{data_element_URI}), + the overall API get request still succeeds, but includes an overall failure status and all encountered errors in the response. + """ + monkeypatch.setattr( + httpx.AsyncClient, "get", mock_failed_connection_httpx_get + ) + + with pytest.warns(UserWarning): + response = test_app.get("/attributes/nb:Assessment") + + assert response.status_code == status.HTTP_207_MULTI_STATUS + + response = response.json() + assert response["nodes_response_status"] == "fail" + assert len(response["errors"]) == 2 + assert response["responses"] == {"nb:Assessment": []} diff --git a/tests/test_query.py b/tests/test_query.py index 997118b..41f5a3f 100644 --- a/tests/test_query.py +++ b/tests/test_query.py @@ -2,8 +2,6 @@ import pytest from fastapi import status -from app.api import utility as util - @pytest.fixture() def mocked_single_matching_dataset_result(): @@ -24,22 +22,20 @@ def mocked_single_matching_dataset_result(): def test_partial_node_failure_responses_handled_gracefully( - monkeypatch, test_app, capsys, mocked_single_matching_dataset_result + monkeypatch, + test_app, + capsys, + set_valid_test_federation_nodes, + mocked_single_matching_dataset_result, ): """ Test that when queries to some nodes return errors, the overall API get request still succeeds, the successful responses are returned along with a list of the encountered errors, and the failed nodes are logged to the console. """ - monkeypatch.setattr( - util, - "FEDERATION_NODES", - { - "https://firstpublicnode.org/": "First Public Node", - "https://secondpublicnode.org/": "Second Public Node", - }, - ) async def mock_httpx_get(self, **kwargs): + # The self parameter is necessary to match the signature of the method being mocked, + # which is a class method of the httpx.AsyncClient class (see https://www.python-httpx.org/api/#asyncclient). if kwargs["url"] == "https://firstpublicnode.org/query/": return httpx.Response( status_code=200, json=[mocked_single_matching_dataset_result] @@ -75,25 +71,21 @@ async def mock_httpx_get(self, **kwargs): "nodes_response_status": "partial success", } assert ( - "Queries to 1/2 nodes failed: ['Second Public Node']" in captured.out + "Requests to 1/2 nodes failed: ['Second Public Node']" in captured.out ) def test_partial_node_connection_failures_handled_gracefully( - monkeypatch, test_app, capsys, mocked_single_matching_dataset_result + monkeypatch, + test_app, + capsys, + set_valid_test_federation_nodes, + mocked_single_matching_dataset_result, ): """ Test that when requests to some nodes fail (e.g., if API is unreachable), the overall API get request still succeeds, the successful responses are returned along with a list of the encountered errors, and the failed nodes are logged to the console. """ - monkeypatch.setattr( - util, - "FEDERATION_NODES", - { - "https://firstpublicnode.org/": "First Public Node", - "https://secondpublicnode.org/": "Second Public Node", - }, - ) async def mock_httpx_get(self, **kwargs): if kwargs["url"] == "https://firstpublicnode.org/query/": @@ -129,29 +121,25 @@ async def mock_httpx_get(self, **kwargs): "nodes_response_status": "partial success", } assert ( - "Queries to 1/2 nodes failed: ['Second Public Node']" in captured.out + "Requests to 1/2 nodes failed: ['Second Public Node']" in captured.out ) -def test_all_nodes_failure_handled_gracefully(monkeypatch, test_app, capsys): +def test_all_nodes_failure_handled_gracefully( + monkeypatch, + test_app, + mock_failed_connection_httpx_get, + set_valid_test_federation_nodes, + capsys, +): """ Test that when queries sent to all nodes fail, the federation API get request still succeeds, but includes an overall failure status and all encountered errors in the response. """ monkeypatch.setattr( - util, - "FEDERATION_NODES", - { - "https://firstpublicnode.org/": "First Public Node", - "https://secondpublicnode.org/": "Second Public Node", - }, + httpx.AsyncClient, "get", mock_failed_connection_httpx_get ) - async def mock_httpx_get(self, **kwargs): - raise httpx.ConnectError("Some connection error") - - monkeypatch.setattr(httpx.AsyncClient, "get", mock_httpx_get) - with pytest.warns( UserWarning, ) as w: @@ -166,25 +154,21 @@ async def mock_httpx_get(self, **kwargs): assert len(response["errors"]) == 2 assert response["responses"] == [] assert ( - "Queries to 2/2 nodes failed: ['First Public Node', 'Second Public Node']" + "Requests to 2/2 nodes failed: ['First Public Node', 'Second Public Node']" in captured.out ) def test_all_nodes_success_handled_gracefully( - monkeypatch, test_app, capsys, mocked_single_matching_dataset_result + monkeypatch, + test_app, + capsys, + set_valid_test_federation_nodes, + mocked_single_matching_dataset_result, ): """ Test that when queries sent to all nodes succeed, the federation API response includes an overall success status and no errors. """ - monkeypatch.setattr( - util, - "FEDERATION_NODES", - { - "https://firstpublicnode.org/": "First Public Node", - "https://secondpublicnode.org/": "Second Public Node", - }, - ) async def mock_httpx_get(self, **kwargs): return httpx.Response( @@ -202,4 +186,4 @@ async def mock_httpx_get(self, **kwargs): assert response["nodes_response_status"] == "success" assert response["errors"] == [] assert len(response["responses"]) == 2 - assert "All nodes queried successfully (2/2)" in captured.out + assert "Requests to all nodes succeeded (2/2)" in captured.out