Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async & concurrent network startup #116

Merged
merged 27 commits into from
Nov 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
7acb813
refact: rewrite Beacon network calls as async
davidlougheed Sep 10, 2024
8f20a23
Merge branch 'master' into refact/async-network-calls
gsfk Nov 12, 2024
cf9e5ef
patch aiohttp params
gsfk Nov 13, 2024
b540d66
make reference service call async
gsfk Nov 13, 2024
f8ac3f5
async code fixes, mostly lots of awaits
gsfk Nov 13, 2024
d5d2a3b
async dependencies
gsfk Nov 13, 2024
a271fcc
make beacon network async
gsfk Nov 13, 2024
45ae30b
remove network init from app startup
gsfk Nov 18, 2024
2f74b74
move network init to first network request
gsfk Nov 18, 2024
a0d3079
move reverse domain id to a function, we'll need it elsewhere
gsfk Nov 19, 2024
1731085
first pass at async network startup
gsfk Nov 19, 2024
bafdb4c
refresh network init
gsfk Nov 19, 2024
28c91b2
Merge branch 'master' into features/async-calls-with-auth
gsfk Nov 22, 2024
20f64c1
update poetry.lock
gsfk Nov 22, 2024
7006ebe
unused import
gsfk Nov 22, 2024
b5f33c0
add aiocache
gsfk Nov 25, 2024
010aaa2
change access token calls to async
gsfk Nov 25, 2024
d889b95
async fixes
gsfk Nov 25, 2024
9ec200d
async censorship fixes
gsfk Nov 26, 2024
b286367
change all tests to async
gsfk Nov 26, 2024
44a4559
Merge branch 'master' into features/async-calls-with-auth
gsfk Nov 26, 2024
8186610
fix gohan args in test
gsfk Nov 26, 2024
2c3fb34
smoother beacon network startup
gsfk Nov 27, 2024
b1d6104
rm orphaned config values
gsfk Nov 27, 2024
15e7be9
cleanup
gsfk Nov 27, 2024
ff02ba5
lint
gsfk Nov 27, 2024
716f776
add simple network test
gsfk Nov 27, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 5 additions & 10 deletions bento_beacon/app.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import logging
import os
from flask import Flask, current_app, request
Expand All @@ -10,7 +11,6 @@
from .endpoints.cohorts import cohorts
from .endpoints.datasets import datasets
from .network.network import network
from .network.utils import init_network_service_registry
from .utils.exceptions import APIException
from werkzeug.exceptions import HTTPException
from .authz.middleware import authz_middleware
Expand Down Expand Up @@ -65,11 +65,6 @@
# load blueprint for network
if current_app.config["USE_BEACON_NETWORK"]:
app.register_blueprint(network)
try:
init_network_service_registry()
except APIException:
# trouble setting up network, swallow for now
current_app.logger.error("API Error when initializing beacon network")

# get censorship settings from katsu
max_filters = None
Expand All @@ -79,7 +74,7 @@
for tries in range(max_retries + 1):
current_app.logger.info(f"calling katsu for censorship parameters (try={tries})")
try:
max_filters, count_threshold = katsu_censorship_settings()
max_filters, count_threshold = asyncio.run(katsu_censorship_settings())

Check warning on line 77 in bento_beacon/app.py

View check run for this annotation

Codecov / codecov/patch

bento_beacon/app.py#L77

Added line #L77 was not covered by tests
# If we got values successfully, without an API exception being raised, exit early - even if they're None
break
except APIException as e:
Expand All @@ -102,11 +97,11 @@


@app.before_request
def before_request():
async def before_request():
if request.blueprint != "info":
validate_request()
verify_permissions()
save_request_data()
await verify_permissions()
await save_request_data()
reject_query_if_not_permitted()
init_response_data()

Expand Down
52 changes: 29 additions & 23 deletions bento_beacon/authz/access.py
Original file line number Diff line number Diff line change
@@ -1,66 +1,72 @@
import functools
import requests
import aiocache
import aiohttp
from flask import current_app

from .headers import auth_header_from_request
from ..utils.http import tcp_connector

__all__ = [
"get_access_token",
"create_access_header_or_fall_back",
]


@functools.cache
def get_token_endpoint_from_openid_config_url(url: str, validate_ssl: bool = True):
r = requests.get(url, verify=validate_ssl)
@aiocache.cached()
async def get_token_endpoint_from_openid_config_url(url: str):
async with aiohttp.ClientSession(connector=tcp_connector(current_app.config)) as s:
r = await s.get(url)

if not r.ok:
raise Exception(f"Received not-OK response from OIDC config URL: {r.status_code}")
return r.json()["token_endpoint"]

response = await r.json()
return response["token_endpoint"]

def get_access_token() -> str | None:

async def get_access_token() -> str | None:
logger = current_app.logger

oidc_config_url = current_app.config["OPENID_CONFIG_URL"]
client_id = current_app.config["CLIENT_ID"]
client_secret = current_app.config["CLIENT_SECRET"]
validate_ssl = current_app.config["BENTO_VALIDATE_SSL"]

if not all((oidc_config_url, client_id, client_secret)):
logger.error("Could not retrieve access token; one of OPENID_CONFIG_URL | CLIENT_ID | CLIENT_SECRET is not set")
return None

try:
token_endpoint = get_token_endpoint_from_openid_config_url(oidc_config_url, validate_ssl=validate_ssl)
token_endpoint = await get_token_endpoint_from_openid_config_url(oidc_config_url)
current_app.logger.info(f"token_endpoint: {token_endpoint}")
except Exception as e:
logger.error(f"Could not retrieve access token; got exception from OpenID config URL: {e}")
return None

token_res = requests.post(
token_endpoint,
verify=validate_ssl,
data={
"grant_type": "client_credentials",
"client_id": client_id,
"client_secret": client_secret,
},
)
async with aiohttp.ClientSession(connector=tcp_connector(current_app.config)) as s:
davidlougheed marked this conversation as resolved.
Show resolved Hide resolved
token_res = await s.post(
token_endpoint,
data={
"grant_type": "client_credentials",
"client_id": client_id,
"client_secret": client_secret,
},
)

res = await token_res.json()

if not token_res.ok:
logger.error(f"Could not retrieve access token; got error response: {token_res.json()}")
logger.error(f"Could not retrieve access token; got error response: {res}")

Check warning on line 56 in bento_beacon/authz/access.py

View check run for this annotation

Codecov / codecov/patch

bento_beacon/authz/access.py#L56

Added line #L56 was not covered by tests
return None

return token_res.json()["access_token"]
return res["access_token"]


def create_access_header_or_fall_back():
async def create_access_header_or_fall_back():
logger = current_app.logger

if not current_app.config["AUTHZ_BENTO_REQUESTS_ENABLED"]:
logger.warning("AUTHZ_BENTO_REQUESTS_ENABLED is false; falling back to request headers")
return auth_header_from_request()

access_token = get_access_token()
access_token = await get_access_token()
if access_token is None:
logger.error("create_access_header_or_fall_back: falling back to request headers")
return auth_header_from_request()
Expand Down
4 changes: 2 additions & 2 deletions bento_beacon/authz/middleware.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,5 @@
)


def check_permission(permission: Permission) -> bool:
return authz_middleware.evaluate_one(request, RESOURCE_EVERYTHING, permission, mark_authz_done=True)
async def check_permission(permission: Permission) -> bool:
return await authz_middleware.async_evaluate_one(request, RESOURCE_EVERYTHING, permission, mark_authz_done=True)
24 changes: 9 additions & 15 deletions bento_beacon/config_files/config.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import json
import logging
import os
import urllib3
from ..constants import GRANULARITY_COUNT, GRANULARITY_RECORD


Expand All @@ -11,13 +11,15 @@ def str_to_bool(value: str) -> bool:
return value.strip().lower() in ("true", "1", "t", "yes")


def reverse_domain_id(domain):
return ".".join(reversed(domain.split("."))) + ".beacon"


BENTO_DEBUG = str_to_bool(os.environ.get("BENTO_DEBUG", os.environ.get("FLASK_DEBUG", "false")))
BENTO_VALIDATE_SSL = str_to_bool(os.environ.get("BENTO_VALIDATE_SSL", str(not BENTO_DEBUG)))

if not BENTO_VALIDATE_SSL:
# Don't let urllib3 spam us with SSL validation warnings if we're operating with SSL validation off, most likely in
# a development/test context where we're using self-signed certificates.
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
# silence logspam
logging.getLogger("asyncio").setLevel(logging.WARNING)
logging.getLogger("aiocache").setLevel(logging.WARNING)


class Config:
Expand All @@ -40,15 +42,10 @@ class Config:
DEFAULT_PAGINATION_PAGE_SIZE = 10

BENTO_DEBUG = BENTO_DEBUG
BENTO_VALIDATE_SSL = BENTO_VALIDATE_SSL

BENTO_DOMAIN = os.environ.get("BENTOV2_DOMAIN")
BEACON_BASE_URL = os.environ.get("BEACON_BASE_URL")
BENTO_PUBLIC_URL = os.environ.get("BENTOV2_PUBLIC_URL")

# reverse domain id
BEACON_ID = ".".join(reversed(BENTO_DOMAIN.split("."))) + ".beacon"

BEACON_ID = reverse_domain_id(BENTO_DOMAIN)
BEACON_NAME = os.environ.get("BENTO_PUBLIC_CLIENT_NAME", "Bento") + " Beacon"
BEACON_UI_ENABLED = str_to_bool(os.environ.get("BENTO_BEACON_UI_ENABLED", ""))
BEACON_UI_URL = BENTO_PUBLIC_URL + "/#/en/beacon"
Expand Down Expand Up @@ -153,9 +150,6 @@ class Config:
KATSU_DATASETS_ENDPOINT = "/api/datasets"
KATSU_SEARCH_ENDPOINT = "/private/search"
KATSU_RESOURCES_ENDPOINT = "/api/resources"
KATSU_PHENOTYPIC_FEATURE_TERMS_ENDPOINT = "/api/phenotypic_feature_type_autocomplete"
KATSU_DISEASES_TERMS_ENDPOINT = "/api/disease_term_autocomplete"
KATSU_SAMPLED_TISSUES_TERMS_ENDPOINT = "/api/biosample_sampled_tissue_autocomplete"
KATSU_PUBLIC_CONFIG_ENDPOINT = "/api/public_search_fields"
KATSU_INDIVIDUAL_SCHEMA_ENDPOINT = "/api/schemas/phenopacket"
KATSU_EXPERIMENT_SCHEMA_ENDPOINT = "/api/schemas/experiment"
Expand Down
8 changes: 4 additions & 4 deletions bento_beacon/endpoints/datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,16 @@

@datasets.route("/datasets", methods=["GET", "POST"])
@authz_middleware.deco_public_endpoint # TODO: authz - more flexibility in what is visible (?)
def get_datasets():
k_datasets = katsu_datasets()
async def get_datasets():
k_datasets = await katsu_datasets()
datasets_beacon_format = list(map(katsu_to_beacon_dataset_mapping, k_datasets))
return beacon_collections_response({"collections": datasets_beacon_format})


@datasets.route("/datasets/<id>", methods=["GET", "POST"])
@authz_middleware.deco_public_endpoint # TODO: authz - more flexibility in what is visible (?)
def get_datasets_by_id(id):
k_dataset = katsu_datasets(id)
async def get_datasets_by_id(id):
k_dataset = await katsu_datasets(id)

Check warning on line 21 in bento_beacon/endpoints/datasets.py

View check run for this annotation

Codecov / codecov/patch

bento_beacon/endpoints/datasets.py#L21

Added line #L21 was not covered by tests
dataset_beacon_format = katsu_to_beacon_dataset_mapping(k_dataset) if k_dataset else []
return beacon_collections_response({"collections": dataset_beacon_format})

Expand Down
36 changes: 18 additions & 18 deletions bento_beacon/endpoints/individuals.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@


@individuals.route("/individuals", methods=["GET", "POST"])
def get_individuals():
async def get_individuals():
variants_query = g.beacon_query_parameters["variants_query"]
phenopacket_filters = g.beacon_query_parameters["phenopacket_filters"]
experiment_filters = g.beacon_query_parameters["experiment_filters"]
Expand All @@ -42,20 +42,20 @@
# TODO: return default granularity rather than count (default could be bool rather than count)
if no_query:
add_info_to_response("no query found, returning total count")
total_count = katsu_total_individuals_count()
total_count = await katsu_total_individuals_count()
if summary_stats_requested():
add_overview_stats_to_response()
return build_query_response(numTotalResults=total_count)
await add_overview_stats_to_response()

Check warning on line 47 in bento_beacon/endpoints/individuals.py

View check run for this annotation

Codecov / codecov/patch

bento_beacon/endpoints/individuals.py#L47

Added line #L47 was not covered by tests
return await build_query_response(numTotalResults=total_count)

# ----------------------------------------------------------
# collect biosample ids from variant and experiment search
# ----------------------------------------------------------
sample_ids = []

if search_sample_ids:
sample_ids = biosample_id_search(variants_query=variants_query, experiment_filters=experiment_filters)
sample_ids = await biosample_id_search(variants_query=variants_query, experiment_filters=experiment_filters)
if not sample_ids:
return zero_count_response()
return await zero_count_response()

Check warning on line 58 in bento_beacon/endpoints/individuals.py

View check run for this annotation

Codecov / codecov/patch

bento_beacon/endpoints/individuals.py#L58

Added line #L58 was not covered by tests

# -------------------------------
# get individuals
Expand All @@ -65,38 +65,38 @@

# get individuals from katsu config search
if config_filters:
config_ids = search_from_config(config_filters)
config_ids = await search_from_config(config_filters)
if not config_ids:
return zero_count_response()
return await zero_count_response()

Check warning on line 70 in bento_beacon/endpoints/individuals.py

View check run for this annotation

Codecov / codecov/patch

bento_beacon/endpoints/individuals.py#L70

Added line #L70 was not covered by tests
individual_results["config_ids"] = config_ids

if not config_search_only:
# retrieve all matching individuals from sample id search, filtered by any phenopacket filters
# either of phenopacket_filters or sample_ids can be empty
phenopacket_ids = katsu_filters_and_sample_ids_query(phenopacket_filters, "phenopacket", sample_ids)
phenopacket_ids = await katsu_filters_and_sample_ids_query(phenopacket_filters, "phenopacket", sample_ids)
if not phenopacket_ids:
return zero_count_response()
return await zero_count_response()

Check warning on line 78 in bento_beacon/endpoints/individuals.py

View check run for this annotation

Codecov / codecov/patch

bento_beacon/endpoints/individuals.py#L78

Added line #L78 was not covered by tests
individual_results["phenopacket_ids"] = phenopacket_ids

# baroque syntax but covers all cases
individual_ids = list(reduce(set.intersection, (set(ids) for ids in individual_results.values())))

if summary_stats_requested():
add_stats_to_response(individual_ids)
await add_stats_to_response(individual_ids)

return build_query_response(ids=individual_ids, full_record_handler=individuals_full_results)
return await build_query_response(ids=individual_ids, full_record_handler=individuals_full_results)


# TODO: pagination (ideally after katsu search gets paginated)
def individuals_full_results(ids):
async def individuals_full_results(ids):

# temp
# if len(ids) > 100:
# return {"message": "too many ids for full response"}

handover_permission = check_permission(P_DOWNLOAD_DATA)
handover = handover_for_ids(ids) if handover_permission else {}
phenopackets_by_result_set = phenopackets_for_ids(ids).get("results", {})
handover_permission = await check_permission(P_DOWNLOAD_DATA)
handover = (await handover_for_ids(ids)) if handover_permission else {}
phenopackets_by_result_set = (await phenopackets_for_ids(ids)).get("results", {})

Check warning on line 99 in bento_beacon/endpoints/individuals.py

View check run for this annotation

Codecov / codecov/patch

bento_beacon/endpoints/individuals.py#L97-L99

Added lines #L97 - L99 were not covered by tests
result_ids = list(phenopackets_by_result_set.keys())
result_sets = {}
numTotalResults = 0
Expand All @@ -123,8 +123,8 @@
# forbidden / unauthorized if no permissions
@individuals.route("/individuals/<id>", methods=["GET", "POST"])
@authz_middleware.deco_require_permissions_on_resource({P_QUERY_DATA})
def individual_by_id(id):
result_sets, numTotalResults = individuals_full_results([id])
async def individual_by_id(id):
result_sets, numTotalResults = await individuals_full_results([id])

Check warning on line 127 in bento_beacon/endpoints/individuals.py

View check run for this annotation

Codecov / codecov/patch

bento_beacon/endpoints/individuals.py#L127

Added line #L127 was not covered by tests

# return 404 if not found
# only authorized users will get 404 here, so this can't be used to probe ids
Expand Down
Loading