Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ENH] Handle partial nodes success #55

Merged
merged 21 commits into from
Jan 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
8de7904
add comment explaining FEDERATION_NODES format
alyssadai Jan 9, 2024
780b615
implement custom HTTP response for partial success federated query
alyssadai Jan 9, 2024
3643130
test API path response for partial success federated query
alyssadai Jan 9, 2024
566e7e1
mock individual node request to assert over combined query response
alyssadai Jan 9, 2024
5502a49
add new response model returning errors and node query results
alyssadai Jan 10, 2024
16114da
do not use exception object for partial success responses
alyssadai Jan 10, 2024
fe9703f
rename status key in combined query response
alyssadai Jan 10, 2024
9fa13ea
update response returned when all nodes succeed or fail
alyssadai Jan 10, 2024
05be17f
handle network errors in federated query
alyssadai Jan 10, 2024
1ea377a
test federated response given unreachable nodes, create fixture for s…
alyssadai Jan 10, 2024
e338599
refactor tests of federated query response into separate module
alyssadai Jan 10, 2024
caa3933
test response when queries to all nodes fail
alyssadai Jan 10, 2024
d0f858b
test response when queries to all nodes succeed
alyssadai Jan 10, 2024
4b9e926
update comments and type hints
alyssadai Jan 10, 2024
e1eafdc
turn status of node responses into enum
alyssadai Jan 10, 2024
b7fbb60
use model for node error in federated query response
alyssadai Jan 10, 2024
cf8d392
switch to sending requests to nodes asynchronously
alyssadai Jan 11, 2024
83a50cd
update mocked get function in tests to be async
alyssadai Jan 11, 2024
ab50b37
update return type hint
alyssadai Jan 11, 2024
f65aff8
make code a bit cleaner
alyssadai Jan 14, 2024
f3640e6
rename fixture for mocked data
alyssadai Jan 14, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 58 additions & 8 deletions app/api/crud.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
"""CRUD functions called by path operations."""

import asyncio
import warnings

from fastapi import HTTPException, status
from fastapi.responses import JSONResponse

from . import utility as util


Expand All @@ -13,7 +19,7 @@ async def get(
assessment: str,
image_modal: str,
node_urls: list[str],
):
) -> dict:
"""
Makes GET requests to one or more Neurobagel node APIs using send_get_request utility function where the parameters are Neurobagel query parameters.

Expand Down Expand Up @@ -45,8 +51,10 @@ async def get(

"""
cross_node_results = []
node_errors = []

node_urls = util.validate_query_node_url_list(node_urls)
total_nodes = len(node_urls)

# Node API query parameters
params = {}
Expand All @@ -67,19 +75,61 @@ async def get(
if image_modal:
params["image_modal"] = image_modal

for node_url in node_urls:
node_name = util.FEDERATION_NODES[node_url]
response = util.send_get_request(node_url + "query/", params)
tasks = [
util.send_get_request(node_url + "query/", params)
for node_url in node_urls
]
responses = await asyncio.gather(*tasks, return_exceptions=True)
alyssadai marked this conversation as resolved.
Show resolved Hide resolved

for result in response:
result["node_name"] = node_name
for node_url, response in zip(node_urls, responses):
alyssadai marked this conversation as resolved.
Show resolved Hide resolved
node_name = util.FEDERATION_NODES[node_url]
if isinstance(response, HTTPException):
node_errors.append(
{"node_name": node_name, "error": response.detail}
)
warnings.warn(
f"Query to node {node_name} ({node_url}) did not succeed: {response.detail}"
)
else:
for result in response:
result["node_name"] = node_name
cross_node_results.extend(response)

if node_errors:
# TODO: Use logger instead of print, see https://github.com/tiangolo/fastapi/issues/5003
print(
f"Queries to {len(node_errors)}/{total_nodes} nodes failed: {[node_error['node_name'] for node_error in node_errors]}."
)

cross_node_results += response
if len(node_errors) == total_nodes:
# See https://fastapi.tiangolo.com/advanced/additional-responses/ for more info
return JSONResponse(
status_code=status.HTTP_207_MULTI_STATUS,
content={
"errors": node_errors,
"responses": cross_node_results,
"nodes_response_status": "fail",
},
)
return JSONResponse(
status_code=status.HTTP_207_MULTI_STATUS,
content={
"errors": node_errors,
"responses": cross_node_results,
"nodes_response_status": "partial success",
alyssadai marked this conversation as resolved.
Show resolved Hide resolved
},
)

return cross_node_results
print(f"All nodes queried successfully ({total_nodes/total_nodes}).")
return {
alyssadai marked this conversation as resolved.
Show resolved Hide resolved
"errors": node_errors,
"responses": cross_node_results,
"nodes_response_status": "success",
}


async def get_terms(data_element_URI: str):
# TODO: Make this path able to handle partial successes as well
"""
Makes a GET request to one or more Neurobagel node APIs using send_get_request utility function where the only parameter is a data element URI.

Expand Down
24 changes: 24 additions & 0 deletions app/api/models.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Data models."""
from enum import Enum
from typing import Optional, Union

from fastapi import Query
Expand Down Expand Up @@ -36,3 +37,26 @@ class CohortQueryResponse(BaseModel):
num_matching_subjects: int
subject_data: Union[list[dict], str]
image_modals: list


class NodesResponseStatus(str, Enum):
"""Possible values for the status of the responses from the queried nodes."""

SUCCESS = "success"
PARTIAL_SUCCESS = "partial success"
FAIL = "fail"


class NodeError(BaseModel):
"""Data model for an error encountered when querying a node."""

node_name: str
error: str


class CombinedQueryResponse(BaseModel):
alyssadai marked this conversation as resolved.
Show resolved Hide resolved
"""Data model for the combined query results of all matching datasets across all queried nodes."""

errors: list[NodeError]
responses: list[CohortQueryResponse]
nodes_response_status: NodesResponseStatus
6 changes: 2 additions & 4 deletions app/api/routers/query.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
"""Router for query path operations."""

from typing import List

from fastapi import APIRouter, Depends

from .. import crud
from ..models import CohortQueryResponse, QueryModel
from ..models import CombinedQueryResponse, QueryModel

router = APIRouter(prefix="/query", tags=["query"])


@router.get("/", response_model=List[CohortQueryResponse])
@router.get("/", response_model=CombinedQueryResponse)
async def get_query(query: QueryModel = Depends(QueryModel)):
"""When a GET request is sent, return list of dicts corresponding to subject-level metadata aggregated by dataset."""
response = await crud.get(
Expand Down
43 changes: 26 additions & 17 deletions app/api/utility.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@

import httpx
import jsonschema
from fastapi import HTTPException
from fastapi import HTTPException, status
from jsonschema import validate

LOCAL_NODE_INDEX_PATH = Path(__file__).parents[2] / "local_nb_nodes.json"

# Stores the names and URLs of all Neurobagel nodes known to the API instance, in the form of {node_url: node_name, ...}
FEDERATION_NODES = {}

# We use this schema to validate the local_nb_nodes.json file
Expand Down Expand Up @@ -196,7 +198,7 @@ def validate_query_node_url_list(node_urls: list) -> list:
return node_urls


def send_get_request(url: str, params: list):
async def send_get_request(url: str, params: list) -> dict:
"""
Makes a GET request to one or more Neurobagel nodes.

Expand All @@ -218,19 +220,26 @@ def send_get_request(url: str, params: list):
HTTPException
_description_
"""
response = httpx.get(
url=url,
params=params,
# TODO: Revisit timeout value when query performance is improved
timeout=30.0,
# Enable redirect following (off by default) so
# APIs behind a proxy can be reached
follow_redirects=True,
)
async with httpx.AsyncClient() as client:
try:
response = await client.get(
url=url,
params=params,
# TODO: Revisit timeout value when query performance is improved
timeout=30.0,
# Enable redirect following (off by default) so
# APIs behind a proxy can be reached
follow_redirects=True,
)

if not response.is_success:
raise HTTPException(
status_code=response.status_code,
detail=f"{response.reason_phrase}: {response.text}",
)
return response.json()
if not response.is_success:
raise HTTPException(
status_code=response.status_code,
detail=f"{response.reason_phrase}: {response.text}",
)
return response.json()
except httpx.NetworkError as exc:
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail=f"Request failed due to a network error or because the node API cannot be reached: {exc}",
) from exc
Loading