From 1cb23269430a04a6c6ab95e727586ca2733b2b74 Mon Sep 17 00:00:00 2001 From: brouillette Date: Wed, 1 Dec 2021 16:34:30 -0500 Subject: [PATCH 1/5] peer fetch chord-debug-dependent cert validation --- bento_federation_service/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bento_federation_service/utils.py b/bento_federation_service/utils.py index e18d28f..8608257 100644 --- a/bento_federation_service/utils.py +++ b/bento_federation_service/utils.py @@ -38,7 +38,7 @@ async def peer_fetch(client: AsyncHTTPClient, peer: str, path_fragment: str, req urljoin(peer, path_fragment) + arg_str, request_timeout=TIMEOUT, method=method, - body=request_body, + body=request_body,validate_cert=(not CHORD_DEBUG), headers={ **({} if request_body is None else {"Content-Type": "application/json; charset=UTF-8"}), **({"Authorization": auth_header} if auth_header else {}), From 5bc9045443d937a81f254623db2229564b2c9095 Mon Sep 17 00:00:00 2001 From: brouillette Date: Wed, 1 Dec 2021 16:35:09 -0500 Subject: [PATCH 2/5] begin trying to integrate gohan --- .../search/dataset_search/dataset_search.py | 35 +++++++++++++------ 1 file changed, 25 insertions(+), 10 deletions(-) diff --git a/bento_federation_service/search/dataset_search/dataset_search.py b/bento_federation_service/search/dataset_search/dataset_search.py index f41a785..b2635d6 100644 --- a/bento_federation_service/search/dataset_search/dataset_search.py +++ b/bento_federation_service/search/dataset_search/dataset_search.py @@ -153,16 +153,31 @@ async def _fetch_table_definition_worker(table_queue: Queue, auth_header: Option return try: - # TODO: Don't fetch schema except for first time? - table_ownerships_and_records.append((t, await peer_fetch( - client, - CHORD_URL, - f"api/{t['service_artifact']}/tables/{t['table_id']}", - method="GET", - auth_header=auth_header, # Required, otherwise may hit a 403 error - extra_headers=DATASET_SEARCH_HEADERS - ))) - # TODO: Handle HTTP errors + # TEMP: Gohan compatibility + if t['service_artifact'] == "variant": + # construct gohan query parameters + url_args = () # TODO: populate + # call gohan + table_ownerships_and_records.append((t, await peer_fetch( + client, + CHORD_URL, + f"api/gohan/variants/get/by/variantId?assemblyId=GRCh37", + method="GET", + auth_header=auth_header, # Required, otherwise may hit a 403 error + extra_headers=DATASET_SEARCH_HEADERS, + url_args=url_args + ))) + else: + # TODO: Don't fetch schema except for first time? + table_ownerships_and_records.append((t, await peer_fetch( + client, + CHORD_URL, + f"api/{t['service_artifact']}/tables/{t['table_id']}", + method="GET", + auth_header=auth_header, # Required, otherwise may hit a 403 error + extra_headers=DATASET_SEARCH_HEADERS + ))) + # TODO: Handle HTTP errors finally: table_queue.task_done() From aa91f1935855a6956478ab99fd35c30e05452c4b Mon Sep 17 00:00:00 2001 From: brouillette Date: Wed, 8 Dec 2021 13:54:06 -0500 Subject: [PATCH 3/5] - patched gohan schema/table request - gohan data call + query params construction --- .../search/dataset_search/dataset_search.py | 83 ++++++++++++------- .../search/dataset_search/query_utils.py | 64 +++++++++++++- 2 files changed, 117 insertions(+), 30 deletions(-) diff --git a/bento_federation_service/search/dataset_search/dataset_search.py b/bento_federation_service/search/dataset_search/dataset_search.py index b2635d6..28f55c7 100644 --- a/bento_federation_service/search/dataset_search/dataset_search.py +++ b/bento_federation_service/search/dataset_search/dataset_search.py @@ -10,6 +10,7 @@ from typing import Dict, List, Optional, Set, Tuple from bento_federation_service.constants import CHORD_URL, SERVICE_NAME, WORKERS +from bento_federation_service.search.dataset_search import query_utils from bento_federation_service.utils import peer_fetch, iterable_to_queue from .constants import DATASET_SEARCH_HEADERS @@ -153,31 +154,24 @@ async def _fetch_table_definition_worker(table_queue: Queue, auth_header: Option return try: - # TEMP: Gohan compatibility - if t['service_artifact'] == "variant": - # construct gohan query parameters - url_args = () # TODO: populate - # call gohan - table_ownerships_and_records.append((t, await peer_fetch( - client, - CHORD_URL, - f"api/gohan/variants/get/by/variantId?assemblyId=GRCh37", - method="GET", - auth_header=auth_header, # Required, otherwise may hit a 403 error - extra_headers=DATASET_SEARCH_HEADERS, - url_args=url_args - ))) + # TEMP: Gohan compatibility testing + if t['service_artifact'] != "variant": + url = f"api/{t['service_artifact']}/tables/{t['table_id']}" else: - # TODO: Don't fetch schema except for first time? - table_ownerships_and_records.append((t, await peer_fetch( - client, - CHORD_URL, - f"api/{t['service_artifact']}/tables/{t['table_id']}", - method="GET", - auth_header=auth_header, # Required, otherwise may hit a 403 error - extra_headers=DATASET_SEARCH_HEADERS - ))) - # TODO: Handle HTTP errors + url = f"api/gohan/tables/fake" + + print("url: " + url) + + #TODO: Don't fetch schema except for first time? + table_ownerships_and_records.append((t, await peer_fetch( + client, + CHORD_URL, + url, + method="GET", + auth_header=auth_header, # Required, otherwise may hit a 403 error + extra_headers=DATASET_SEARCH_HEADERS + ))) + # TODO: Handle HTTP errors finally: table_queue.task_done() @@ -222,19 +216,45 @@ async def _table_search_worker( if not is_querying_data_type: continue - r = await peer_fetch( - client, - CHORD_URL, + # -- TEMP: Gohan compatibility testing + if table_ownership['service_artifact'] != "variant": path_fragment=( f"api/{table_ownership['service_artifact']}{'/private' if private else ''}/tables" f"/{table_record['id']}/search" - ), - url_args=(("query", json.dumps(data_type_queries[table_data_type])),), + ) + url_args = (("query", json.dumps(data_type_queries[table_data_type])),) + else: + path_fragment = (f"api/gohan/variants/get/by/variantId") + # --- TEMP: + # construct based on search query + supplemental_url_args = [["getSampleIdsOnly", "true"]] + + # - transform custom Query to list of lists to simplify + # the gohan query parameter construction + tmpjson=json.dumps({"tmpkey":data_type_queries[table_data_type]}) + reloaded_converted=json.loads(tmpjson)["tmpkey"] + + # - generate query parameters from list of query tree objects + gohan_query_params = query_utils.construct_gohan_query_params(reloaded_converted, supplemental_url_args) + + print(f"gohan_query_params: {gohan_query_params}") + url_args=gohan_query_params + # + # + + + r = await peer_fetch( + client, + CHORD_URL, + path_fragment=path_fragment, + url_args=url_args, method="GET", auth_header=auth_header, # Required in some cases to not get a 403 extra_headers=DATASET_SEARCH_HEADERS, ) + #print(f"Response: {r}") + if private: # We have a results array to account for results = r["results"] @@ -262,6 +282,9 @@ async def run_search_on_dataset( ) -> Tuple[Dict[str, list], Query, List[str]]: linked_field_sets: LinkedFieldSetList = _get_dataset_linked_field_sets(dataset) + # print(f"Linked Field Sets: {linked_field_sets}") + # print(f"Dataset: {dataset}") + # Pairs of table ownership records, from the metadata service, and table records, # from each data service to which the table belongs) table_ownerships_and_records: List[Tuple[Dict, Dict]] = [] @@ -275,6 +298,8 @@ async def run_search_on_dataset( await table_ownership_queue.join() try: + # print(f"table_ownerships_and_records: {table_ownerships_and_records}") + table_data_types: Set[str] = {t[1]["data_type"] for t in table_ownerships_and_records} # Set of data types excluded from building the join query diff --git a/bento_federation_service/search/dataset_search/query_utils.py b/bento_federation_service/search/dataset_search/query_utils.py index 23c1e61..d3f7865 100644 --- a/bento_federation_service/search/dataset_search/query_utils.py +++ b/bento_federation_service/search/dataset_search/query_utils.py @@ -1,5 +1,5 @@ from bento_lib.search.queries import convert_query_to_ast_and_preprocess, Query -from typing import Dict, Iterable, Optional, Tuple +from typing import Dict, Iterable, List, Optional, Tuple from bento_federation_service.utils import get_request_json @@ -36,3 +36,65 @@ def test_queries(queries: Iterable[Query]) -> None: for q in queries: # Try compiling each query to make sure it works. convert_query_to_ast_and_preprocess(q) + +def print_tree(tabs, t): + for i in t: + if(isinstance(i, list)): + print_tree(tabs+1, i) + else: + print(f"{tabs*' '}{i}") + + +def simple_resolve_tree(t, finalists: list): + counter=0 + for i in t: + if isinstance(i, list): + simple_resolve_tree(i, finalists) + if counter == len(t)-1 and (isinstance(i, str) or isinstance(i, int)): + finalists.append(str(i)) + counter+=1 + return finalists + +def pair_up_simple_list(t: List[List[str]]): + counter=0 + pairs=[] + for i in t: + if counter % 2 == 0: + pairs.append([t[counter], t[counter+1]]) + counter += 1 + return pairs + +def rename_gohan_compatible(list_pairs): + for p in list_pairs: + if p[0] == "assembly_id": + p[0] = "assemblyId" + elif p[0] == "start": + p[0] = "lowerBound" + elif p[0] == "end": + p[0] = "upperBound" + elif p[0] == "genotype_type": + p[0] = "genotype" + +def prune_non_gohan_paramters(list_pairs): + for p in list_pairs: + if p[0] == "sample_id": + list_pairs.remove(p) + + +def construct_gohan_query_params(ast: list, supplemental_args: List[List[str]]): + # somehow convert AST to a simple list of lists/strings/ints + #converted_ast = [] # temp + + # resolve simple key/value pairs + simple_list = [] + simple_resolve_tree(ast, simple_list) + + # pair up simple list and concat with extra arg pairs + pairs = pair_up_simple_list(simple_list) + supplemental_args + # prune unnecessary paramers + prune_non_gohan_paramters(pairs) + # ensure gohan query param nameing convention matches up + rename_gohan_compatible(pairs) + + return tuple(tuple(x) for x in pairs) + From 3e9df784c5d38b90a173ec719ea660ca4e7b559c Mon Sep 17 00:00:00 2001 From: brouillette Date: Thu, 9 Dec 2021 14:50:53 -0500 Subject: [PATCH 4/5] gohan compat rc1 --- bento_federation_service/constants.py | 2 + .../search/dataset_search/dataset_search.py | 51 ++++++++++--------- 2 files changed, 30 insertions(+), 23 deletions(-) diff --git a/bento_federation_service/constants.py b/bento_federation_service/constants.py index f7bd347..fe93a78 100644 --- a/bento_federation_service/constants.py +++ b/bento_federation_service/constants.py @@ -80,6 +80,8 @@ def _env_url_trailing_slash(var: str) -> str: CHORD_URLS_SET = CHORD_URL != "" and CHORD_REGISTRY_URL != "" +USE_GOHAN = _env_to_bool("USE_GOHAN") + TIMEOUT = 180 # seconds LAST_ERRORED_CACHE_TIME = 30 MAX_BUFFER_SIZE = 1024 ** 3 # 1 gigabyte; maximum size a response can be diff --git a/bento_federation_service/search/dataset_search/dataset_search.py b/bento_federation_service/search/dataset_search/dataset_search.py index 28f55c7..f55ec76 100644 --- a/bento_federation_service/search/dataset_search/dataset_search.py +++ b/bento_federation_service/search/dataset_search/dataset_search.py @@ -9,7 +9,7 @@ from typing import Dict, List, Optional, Set, Tuple -from bento_federation_service.constants import CHORD_URL, SERVICE_NAME, WORKERS +from bento_federation_service.constants import CHORD_URL, SERVICE_NAME, WORKERS, USE_GOHAN from bento_federation_service.search.dataset_search import query_utils from bento_federation_service.utils import peer_fetch, iterable_to_queue from .constants import DATASET_SEARCH_HEADERS @@ -154,10 +154,13 @@ async def _fetch_table_definition_worker(table_queue: Queue, auth_header: Option return try: - # TEMP: Gohan compatibility testing - if t['service_artifact'] != "variant": - url = f"api/{t['service_artifact']}/tables/{t['table_id']}" - else: + # Setup up pre-requisites + # - default: + url = f"api/{t['service_artifact']}/tables/{t['table_id']}" + + # - Gohan compatibility + # TODO: formalize/clean this up + if USE_GOHAN and t['service_artifact'] == "variant": url = f"api/gohan/tables/fake" print("url: " + url) @@ -216,33 +219,34 @@ async def _table_search_worker( if not is_querying_data_type: continue - # -- TEMP: Gohan compatibility testing - if table_ownership['service_artifact'] != "variant": - path_fragment=( - f"api/{table_ownership['service_artifact']}{'/private' if private else ''}/tables" - f"/{table_record['id']}/search" - ) - url_args = (("query", json.dumps(data_type_queries[table_data_type])),) - else: + + # Setup up search pre-requisites + # - defaults: + path_fragment=( + f"api/{table_ownership['service_artifact']}{'/private' if private else ''}/tables" + f"/{table_record['id']}/search" + ) + url_args = (("query", json.dumps(data_type_queries[table_data_type])),) + + # - Gohan compatibility + # TODO: formalize/clean this up + if USE_GOHAN and table_ownership['service_artifact'] == "variant": + # reset path_fragment: path_fragment = (f"api/gohan/variants/get/by/variantId") - # --- TEMP: - # construct based on search query - supplemental_url_args = [["getSampleIdsOnly", "true"]] + # reset url_args: + # - construct based on search query + supplemental_url_args = [["getSampleIdsOnly", "true"]] # - transform custom Query to list of lists to simplify # the gohan query parameter construction tmpjson=json.dumps({"tmpkey":data_type_queries[table_data_type]}) reloaded_converted=json.loads(tmpjson)["tmpkey"] - # - generate query parameters from list of query tree objects gohan_query_params = query_utils.construct_gohan_query_params(reloaded_converted, supplemental_url_args) - - print(f"gohan_query_params: {gohan_query_params}") - url_args=gohan_query_params - # - # + url_args = gohan_query_params + # Run the search r = await peer_fetch( client, CHORD_URL, @@ -253,7 +257,8 @@ async def _table_search_worker( extra_headers=DATASET_SEARCH_HEADERS, ) - #print(f"Response: {r}") + # if "gohan" in path_fragment: + # print(f"Response: {r}") if private: # We have a results array to account for From f9ee6c96b37c3ad842d4f17a09946167e6ab89e3 Mon Sep 17 00:00:00 2001 From: brouillette Date: Thu, 9 Dec 2021 15:12:56 -0500 Subject: [PATCH 5/5] rc1.1 --- .../search/dataset_search/dataset_search.py | 2 -- bento_federation_service/utils.py | 2 +- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/bento_federation_service/search/dataset_search/dataset_search.py b/bento_federation_service/search/dataset_search/dataset_search.py index f55ec76..d7f0512 100644 --- a/bento_federation_service/search/dataset_search/dataset_search.py +++ b/bento_federation_service/search/dataset_search/dataset_search.py @@ -257,8 +257,6 @@ async def _table_search_worker( extra_headers=DATASET_SEARCH_HEADERS, ) - # if "gohan" in path_fragment: - # print(f"Response: {r}") if private: # We have a results array to account for diff --git a/bento_federation_service/utils.py b/bento_federation_service/utils.py index 8608257..b9d7374 100644 --- a/bento_federation_service/utils.py +++ b/bento_federation_service/utils.py @@ -38,7 +38,7 @@ async def peer_fetch(client: AsyncHTTPClient, peer: str, path_fragment: str, req urljoin(peer, path_fragment) + arg_str, request_timeout=TIMEOUT, method=method, - body=request_body,validate_cert=(not CHORD_DEBUG), + body=request_body, validate_cert=(not CHORD_DEBUG), headers={ **({} if request_body is None else {"Content-Type": "application/json; charset=UTF-8"}), **({"Authorization": auth_header} if auth_header else {}),