Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ENH] Support partial term fetching failures #65

Merged
merged 17 commits into from
Feb 6, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 70 additions & 46 deletions app/api/crud.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,42 @@
from . import utility as util


def build_combined_response(
total_nodes: int, cross_node_results: list | dict, node_errors: list
) -> dict:
"""Return a combined response containing all the nodes' responses and errors. Logs to console a summary of the federated request."""
if node_errors:
# TODO: Use logger instead of print, see https://github.com/tiangolo/fastapi/issues/5003
alyssadai marked this conversation as resolved.
Show resolved Hide resolved
print(
f"Requests to {len(node_errors)}/{total_nodes} nodes failed: {[node_error['node_name'] for node_error in node_errors]}."
)
if len(node_errors) == total_nodes:
# See https://fastapi.tiangolo.com/advanced/additional-responses/ for more info
alyssadai marked this conversation as resolved.
Show resolved Hide resolved
return JSONResponse(
status_code=status.HTTP_207_MULTI_STATUS,
content={
"errors": node_errors,
"responses": cross_node_results,
"nodes_response_status": "fail",
},
)
return JSONResponse(
status_code=status.HTTP_207_MULTI_STATUS,
content={
"errors": node_errors,
"responses": cross_node_results,
"nodes_response_status": "partial success",
},
)

print(f"Requests to all nodes succeeded ({total_nodes}/{total_nodes}).")
return {
"errors": node_errors,
"responses": cross_node_results,
"nodes_response_status": "success",
}
alyssadai marked this conversation as resolved.
Show resolved Hide resolved


async def get(
min_age: float,
max_age: float,
Expand Down Expand Up @@ -57,7 +93,6 @@ async def get(
node_errors = []

node_urls = util.validate_query_node_url_list(node_urls)
total_nodes = len(node_urls)

# Node API query parameters
params = {}
Expand Down Expand Up @@ -93,50 +128,23 @@ async def get(
{"node_name": node_name, "error": response.detail}
)
warnings.warn(
f"Query to node {node_name} ({node_url}) did not succeed: {response.detail}"
f"Request to node {node_name} ({node_url}) did not succeed: {response.detail}"
)
else:
for result in response:
result["node_name"] = node_name
cross_node_results.extend(response)

if node_errors:
# TODO: Use logger instead of print, see https://github.com/tiangolo/fastapi/issues/5003
print(
f"Queries to {len(node_errors)}/{total_nodes} nodes failed: {[node_error['node_name'] for node_error in node_errors]}."
)

if len(node_errors) == total_nodes:
# See https://fastapi.tiangolo.com/advanced/additional-responses/ for more info
return JSONResponse(
status_code=status.HTTP_207_MULTI_STATUS,
content={
"errors": node_errors,
"responses": cross_node_results,
"nodes_response_status": "fail",
},
)
return JSONResponse(
status_code=status.HTTP_207_MULTI_STATUS,
content={
"errors": node_errors,
"responses": cross_node_results,
"nodes_response_status": "partial success",
},
)

print(f"All nodes queried successfully ({total_nodes}/{total_nodes}).")
return {
"errors": node_errors,
"responses": cross_node_results,
"nodes_response_status": "success",
}
return build_combined_response(
total_nodes=len(node_urls),
cross_node_results=cross_node_results,
node_errors=node_errors,
)


async def get_terms(data_element_URI: str):
# TODO: Make this path able to handle partial successes as well
"""
Makes a GET request to one or more Neurobagel node APIs using send_get_request utility function where the only parameter is a data element URI.
Makes a GET request to all available Neurobagel node APIs using send_get_request utility function where the only parameter is a data element URI.

Parameters
----------
Expand All @@ -148,20 +156,36 @@ async def get_terms(data_element_URI: str):
dict
Dictionary where the key is the Neurobagel class and values correspond to all the unique terms representing available (i.e. used) instances of that class.
"""
cross_node_results = []
params = {data_element_URI: data_element_URI}
node_errors = []
unique_terms_dict = {}

for node_url in util.FEDERATION_NODES:
response = util.send_get_request(
params = {data_element_URI: data_element_URI}
tasks = [
util.send_get_request(
node_url + "attributes/" + data_element_URI, params
)
for node_url in util.FEDERATION_NODES
]
responses = await asyncio.gather(*tasks, return_exceptions=True)

cross_node_results.append(response)

unique_terms_dict = {}
for node_url, response in zip(util.FEDERATION_NODES, responses):
node_name = util.FEDERATION_NODES[node_url]
alyssadai marked this conversation as resolved.
Show resolved Hide resolved
if isinstance(response, HTTPException):
node_errors.append(
{"node_name": node_name, "error": response.detail}
)
warnings.warn(
f"Request to node {node_name} ({node_url}) did not succeed: {response.detail}"
)
alyssadai marked this conversation as resolved.
Show resolved Hide resolved
else:
# Build the dictionary of unique term-label pairings from all nodes
for term_dict in response[data_element_URI]:
unique_terms_dict[term_dict["TermURL"]] = term_dict
alyssadai marked this conversation as resolved.
Show resolved Hide resolved

for list_of_terms in cross_node_results:
for term in list_of_terms[data_element_URI]:
unique_terms_dict[term["TermURL"]] = term
cross_node_results = {data_element_URI: list(unique_terms_dict.values())}

return {data_element_URI: list(unique_terms_dict.values())}
return build_combined_response(
total_nodes=len(util.FEDERATION_NODES),
cross_node_results=cross_node_results,
node_errors=node_errors,
)
8 changes: 8 additions & 0 deletions app/api/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,3 +61,11 @@ class CombinedQueryResponse(BaseModel):
errors: list[NodeError]
responses: list[CohortQueryResponse]
nodes_response_status: NodesResponseStatus


class CombinedAttributeResponse(BaseModel):
"""Data model for the combined available terms for a given Neurobagel attribute/variable across all available nodes."""

errors: list[NodeError]
responses: dict
nodes_response_status: NodesResponseStatus
4 changes: 2 additions & 2 deletions app/api/routers/attributes.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@
from pydantic import constr

from .. import crud
from ..models import CONTROLLED_TERM_REGEX
from ..models import CONTROLLED_TERM_REGEX, CombinedAttributeResponse

router = APIRouter(prefix="/attributes", tags=["attributes"])


@router.get("/{data_element_URI}")
@router.get("/{data_element_URI}", response_model=CombinedAttributeResponse)
alyssadai marked this conversation as resolved.
Show resolved Hide resolved
async def get_terms(data_element_URI: constr(regex=CONTROLLED_TERM_REGEX)):
"""When a GET request is sent, return a list dicts with the only key corresponding to controlled term of a neurobagel class and value corresponding to all the available terms."""
response = await crud.get_terms(data_element_URI)
Expand Down
9 changes: 9 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import httpx
import pytest
from starlette.testclient import TestClient

Expand All @@ -8,3 +9,11 @@
def test_app():
client = TestClient(app)
yield client


@pytest.fixture()
def mock_failed_connection_httpx_get():
async def _mock_httpx_get_with_connect_error(self, **kwargs):
alyssadai marked this conversation as resolved.
Show resolved Hide resolved
raise httpx.ConnectError("Some connection error")

return _mock_httpx_get_with_connect_error
96 changes: 96 additions & 0 deletions tests/test_attributes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
import httpx
import pytest
from fastapi import status

from app.api import utility as util


def test_partially_failed_terms_fetching_handled_gracefully(
test_app, monkeypatch
):
"""
Test that when getting term instances for an attribute (/attribute/{data_element_URI}) fails for some nodes due to an error,
the overall API get request still succeeds, and the response includes a list of the encountered errors along with the successfully fetched terms.
alyssadai marked this conversation as resolved.
Show resolved Hide resolved
"""
monkeypatch.setattr(
util,
"FEDERATION_NODES",
{
"https://firstpublicnode.org/": "First Public Node",
"https://secondpublicnode.org/": "Second Public Node",
},
)
alyssadai marked this conversation as resolved.
Show resolved Hide resolved

mocked_node_attribute_response = {
"nb:Assessment": [
{
"TermURL": "cogatlas:trm_56a9137d9dce1",
"Label": "behavioral approach/inhibition systems",
},
{
"TermURL": "cogatlas:trm_55a6a8e81b7f4",
"Label": "Barratt Impulsiveness Scale",
},
]
}

async def mock_httpx_get(self, **kwargs):
alyssadai marked this conversation as resolved.
Show resolved Hide resolved
if (
kwargs["url"]
== "https://firstpublicnode.org/attributes/nb:Assessment"
):
return httpx.Response(
status_code=200,
json=mocked_node_attribute_response,
)
return httpx.Response(
status_code=500, json={}, text="Some internal server error"
)
alyssadai marked this conversation as resolved.
Show resolved Hide resolved

monkeypatch.setattr(httpx.AsyncClient, "get", mock_httpx_get)

with pytest.warns(UserWarning):
response = test_app.get("/attributes/nb:Assessment")

assert response.status_code == status.HTTP_207_MULTI_STATUS

assert response.json() == {
"errors": [
{
"node_name": "Second Public Node",
"error": "Internal Server Error: Some internal server error",
},
],
"responses": mocked_node_attribute_response,
"nodes_response_status": "partial success",
}
alyssadai marked this conversation as resolved.
Show resolved Hide resolved


def test_fully_failed_terms_fetching_handled_gracefully(
test_app, monkeypatch, mock_failed_connection_httpx_get
):
"""
Test that when getting term instances for an attribute (/attribute/{data_element_URI}) fails for *all* nodes,
the overall API get request still succeeds, but includes an overall failure status and all encountered errors in the response.
alyssadai marked this conversation as resolved.
Show resolved Hide resolved
"""
monkeypatch.setattr(
util,
"FEDERATION_NODES",
{
"https://firstpublicnode.org/": "First Public Node",
"https://secondpublicnode.org/": "Second Public Node",
},
)
monkeypatch.setattr(
httpx.AsyncClient, "get", mock_failed_connection_httpx_get
)

with pytest.warns(UserWarning):
response = test_app.get("/attributes/nb:Assessment")

assert response.status_code == status.HTTP_207_MULTI_STATUS

response = response.json()
assert response["nodes_response_status"] == "fail"
assert len(response["errors"]) == 2
assert response["responses"] == {"nb:Assessment": []}
20 changes: 10 additions & 10 deletions tests/test_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ async def mock_httpx_get(self, **kwargs):
"nodes_response_status": "partial success",
}
assert (
"Queries to 1/2 nodes failed: ['Second Public Node']" in captured.out
"Requests to 1/2 nodes failed: ['Second Public Node']" in captured.out
)


Expand Down Expand Up @@ -129,11 +129,13 @@ async def mock_httpx_get(self, **kwargs):
"nodes_response_status": "partial success",
}
assert (
"Queries to 1/2 nodes failed: ['Second Public Node']" in captured.out
"Requests to 1/2 nodes failed: ['Second Public Node']" in captured.out
)


def test_all_nodes_failure_handled_gracefully(monkeypatch, test_app, capsys):
def test_all_nodes_failure_handled_gracefully(
monkeypatch, test_app, mock_failed_connection_httpx_get, capsys
):
"""
Test that when queries sent to all nodes fail, the federation API get request still succeeds,
but includes an overall failure status and all encountered errors in the response.
Expand All @@ -146,11 +148,9 @@ def test_all_nodes_failure_handled_gracefully(monkeypatch, test_app, capsys):
"https://secondpublicnode.org/": "Second Public Node",
},
)

async def mock_httpx_get(self, **kwargs):
raise httpx.ConnectError("Some connection error")

monkeypatch.setattr(httpx.AsyncClient, "get", mock_httpx_get)
monkeypatch.setattr(
httpx.AsyncClient, "get", mock_failed_connection_httpx_get
)

with pytest.warns(
UserWarning,
Expand All @@ -166,7 +166,7 @@ async def mock_httpx_get(self, **kwargs):
assert len(response["errors"]) == 2
assert response["responses"] == []
assert (
"Queries to 2/2 nodes failed: ['First Public Node', 'Second Public Node']"
"Requests to 2/2 nodes failed: ['First Public Node', 'Second Public Node']"
in captured.out
)

Expand Down Expand Up @@ -202,4 +202,4 @@ async def mock_httpx_get(self, **kwargs):
assert response["nodes_response_status"] == "success"
assert response["errors"] == []
assert len(response["responses"]) == 2
assert "All nodes queried successfully (2/2)" in captured.out
assert "Requests to all nodes succeeded (2/2)" in captured.out