Skip to content

Commit

Permalink
[ENH] Handle partial nodes success (#55)
Browse files Browse the repository at this point in the history
* add comment explaining FEDERATION_NODES format

* implement custom HTTP response for partial success federated query
- use custom HTTP success status code
- return both node-specific errors and combined successful query results in response body
- log node errors and query federation successfulness to console

* test API path response for partial success federated query

* mock individual node request to assert over combined query response

* add new response model returning errors and node query results

* do not use exception object for partial success responses

* update response returned when all nodes succeed or fail

* handle network errors in federated query

* test federated response given unreachable nodes, create fixture for single matching dataset result

* test response when queries to all nodes either fail or succeed

* turn status of node responses into enum

* use model for node error in federated query response

* switch to sending requests to nodes asynchronously

* update mocked get function in tests to be async

* make code a bit cleaner

Co-authored-by: Sebastian Urchs <[email protected]>

* rename fixture for mocked data

---------

Co-authored-by: Sebastian Urchs <[email protected]>
  • Loading branch information
alyssadai and surchs authored Jan 14, 2024
1 parent 06c3c4f commit 3e76fd0
Show file tree
Hide file tree
Showing 5 changed files with 315 additions and 29 deletions.
66 changes: 58 additions & 8 deletions app/api/crud.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
"""CRUD functions called by path operations."""

import asyncio
import warnings

from fastapi import HTTPException, status
from fastapi.responses import JSONResponse

from . import utility as util


Expand All @@ -13,7 +19,7 @@ async def get(
assessment: str,
image_modal: str,
node_urls: list[str],
):
) -> dict:
"""
Makes GET requests to one or more Neurobagel node APIs using send_get_request utility function where the parameters are Neurobagel query parameters.
Expand Down Expand Up @@ -45,8 +51,10 @@ async def get(
"""
cross_node_results = []
node_errors = []

node_urls = util.validate_query_node_url_list(node_urls)
total_nodes = len(node_urls)

# Node API query parameters
params = {}
Expand All @@ -67,19 +75,61 @@ async def get(
if image_modal:
params["image_modal"] = image_modal

for node_url in node_urls:
node_name = util.FEDERATION_NODES[node_url]
response = util.send_get_request(node_url + "query/", params)
tasks = [
util.send_get_request(node_url + "query/", params)
for node_url in node_urls
]
responses = await asyncio.gather(*tasks, return_exceptions=True)

for result in response:
result["node_name"] = node_name
for node_url, response in zip(node_urls, responses):
node_name = util.FEDERATION_NODES[node_url]
if isinstance(response, HTTPException):
node_errors.append(
{"node_name": node_name, "error": response.detail}
)
warnings.warn(
f"Query to node {node_name} ({node_url}) did not succeed: {response.detail}"
)
else:
for result in response:
result["node_name"] = node_name
cross_node_results.extend(response)

if node_errors:
# TODO: Use logger instead of print, see https://github.com/tiangolo/fastapi/issues/5003
print(
f"Queries to {len(node_errors)}/{total_nodes} nodes failed: {[node_error['node_name'] for node_error in node_errors]}."
)

cross_node_results += response
if len(node_errors) == total_nodes:
# See https://fastapi.tiangolo.com/advanced/additional-responses/ for more info
return JSONResponse(
status_code=status.HTTP_207_MULTI_STATUS,
content={
"errors": node_errors,
"responses": cross_node_results,
"nodes_response_status": "fail",
},
)
return JSONResponse(
status_code=status.HTTP_207_MULTI_STATUS,
content={
"errors": node_errors,
"responses": cross_node_results,
"nodes_response_status": "partial success",
},
)

return cross_node_results
print(f"All nodes queried successfully ({total_nodes/total_nodes}).")
return {
"errors": node_errors,
"responses": cross_node_results,
"nodes_response_status": "success",
}


async def get_terms(data_element_URI: str):
# TODO: Make this path able to handle partial successes as well
"""
Makes a GET request to one or more Neurobagel node APIs using send_get_request utility function where the only parameter is a data element URI.
Expand Down
24 changes: 24 additions & 0 deletions app/api/models.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Data models."""
from enum import Enum
from typing import Optional, Union

from fastapi import Query
Expand Down Expand Up @@ -36,3 +37,26 @@ class CohortQueryResponse(BaseModel):
num_matching_subjects: int
subject_data: Union[list[dict], str]
image_modals: list


class NodesResponseStatus(str, Enum):
"""Possible values for the status of the responses from the queried nodes."""

SUCCESS = "success"
PARTIAL_SUCCESS = "partial success"
FAIL = "fail"


class NodeError(BaseModel):
"""Data model for an error encountered when querying a node."""

node_name: str
error: str


class CombinedQueryResponse(BaseModel):
"""Data model for the combined query results of all matching datasets across all queried nodes."""

errors: list[NodeError]
responses: list[CohortQueryResponse]
nodes_response_status: NodesResponseStatus
6 changes: 2 additions & 4 deletions app/api/routers/query.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
"""Router for query path operations."""

from typing import List

from fastapi import APIRouter, Depends

from .. import crud
from ..models import CohortQueryResponse, QueryModel
from ..models import CombinedQueryResponse, QueryModel

router = APIRouter(prefix="/query", tags=["query"])


@router.get("/", response_model=List[CohortQueryResponse])
@router.get("/", response_model=CombinedQueryResponse)
async def get_query(query: QueryModel = Depends(QueryModel)):
"""When a GET request is sent, return list of dicts corresponding to subject-level metadata aggregated by dataset."""
response = await crud.get(
Expand Down
43 changes: 26 additions & 17 deletions app/api/utility.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@

import httpx
import jsonschema
from fastapi import HTTPException
from fastapi import HTTPException, status
from jsonschema import validate

LOCAL_NODE_INDEX_PATH = Path(__file__).parents[2] / "local_nb_nodes.json"

# Stores the names and URLs of all Neurobagel nodes known to the API instance, in the form of {node_url: node_name, ...}
FEDERATION_NODES = {}

# We use this schema to validate the local_nb_nodes.json file
Expand Down Expand Up @@ -196,7 +198,7 @@ def validate_query_node_url_list(node_urls: list) -> list:
return node_urls


def send_get_request(url: str, params: list):
async def send_get_request(url: str, params: list) -> dict:
"""
Makes a GET request to one or more Neurobagel nodes.
Expand All @@ -218,19 +220,26 @@ def send_get_request(url: str, params: list):
HTTPException
_description_
"""
response = httpx.get(
url=url,
params=params,
# TODO: Revisit timeout value when query performance is improved
timeout=30.0,
# Enable redirect following (off by default) so
# APIs behind a proxy can be reached
follow_redirects=True,
)
async with httpx.AsyncClient() as client:
try:
response = await client.get(
url=url,
params=params,
# TODO: Revisit timeout value when query performance is improved
timeout=30.0,
# Enable redirect following (off by default) so
# APIs behind a proxy can be reached
follow_redirects=True,
)

if not response.is_success:
raise HTTPException(
status_code=response.status_code,
detail=f"{response.reason_phrase}: {response.text}",
)
return response.json()
if not response.is_success:
raise HTTPException(
status_code=response.status_code,
detail=f"{response.reason_phrase}: {response.text}",
)
return response.json()
except httpx.NetworkError as exc:
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail=f"Request failed due to a network error or because the node API cannot be reached: {exc}",
) from exc
Loading

0 comments on commit 3e76fd0

Please sign in to comment.