Skip to content

Commit

Permalink
Merge pull request #23 from bento-platform/features/gohan-compatibility
Browse files Browse the repository at this point in the history
Features/gohan compatibility
  • Loading branch information
brouillette authored Dec 9, 2021
2 parents a55a21b + f9ee6c9 commit 57170ab
Show file tree
Hide file tree
Showing 4 changed files with 117 additions and 10 deletions.
2 changes: 2 additions & 0 deletions bento_federation_service/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ def _env_url_trailing_slash(var: str) -> str:

CHORD_URLS_SET = CHORD_URL != "" and CHORD_REGISTRY_URL != ""

USE_GOHAN = _env_to_bool("USE_GOHAN")

TIMEOUT = 180 # seconds
LAST_ERRORED_CACHE_TIME = 30
MAX_BUFFER_SIZE = 1024 ** 3 # 1 gigabyte; maximum size a response can be
Expand Down
59 changes: 51 additions & 8 deletions bento_federation_service/search/dataset_search/dataset_search.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@

from typing import Dict, List, Optional, Set, Tuple

from bento_federation_service.constants import CHORD_URL, SERVICE_NAME, WORKERS
from bento_federation_service.constants import CHORD_URL, SERVICE_NAME, WORKERS, USE_GOHAN
from bento_federation_service.search.dataset_search import query_utils
from bento_federation_service.utils import peer_fetch, iterable_to_queue
from .constants import DATASET_SEARCH_HEADERS

Expand Down Expand Up @@ -153,11 +154,22 @@ async def _fetch_table_definition_worker(table_queue: Queue, auth_header: Option
return

try:
# TODO: Don't fetch schema except for first time?
# Setup up pre-requisites
# - default:
url = f"api/{t['service_artifact']}/tables/{t['table_id']}"

# - Gohan compatibility
# TODO: formalize/clean this up
if USE_GOHAN and t['service_artifact'] == "variant":
url = f"api/gohan/tables/fake"

print("url: " + url)

#TODO: Don't fetch schema except for first time?
table_ownerships_and_records.append((t, await peer_fetch(
client,
CHORD_URL,
f"api/{t['service_artifact']}/tables/{t['table_id']}",
url,
method="GET",
auth_header=auth_header, # Required, otherwise may hit a 403 error
extra_headers=DATASET_SEARCH_HEADERS
Expand Down Expand Up @@ -207,19 +219,45 @@ async def _table_search_worker(
if not is_querying_data_type:
continue


# Setup up search pre-requisites
# - defaults:
path_fragment=(
f"api/{table_ownership['service_artifact']}{'/private' if private else ''}/tables"
f"/{table_record['id']}/search"
)
url_args = (("query", json.dumps(data_type_queries[table_data_type])),)

# - Gohan compatibility
# TODO: formalize/clean this up
if USE_GOHAN and table_ownership['service_artifact'] == "variant":
# reset path_fragment:
path_fragment = (f"api/gohan/variants/get/by/variantId")

# reset url_args:
# - construct based on search query
supplemental_url_args = [["getSampleIdsOnly", "true"]]
# - transform custom Query to list of lists to simplify
# the gohan query parameter construction
tmpjson=json.dumps({"tmpkey":data_type_queries[table_data_type]})
reloaded_converted=json.loads(tmpjson)["tmpkey"]
# - generate query parameters from list of query tree objects
gohan_query_params = query_utils.construct_gohan_query_params(reloaded_converted, supplemental_url_args)
url_args = gohan_query_params


# Run the search
r = await peer_fetch(
client,
CHORD_URL,
path_fragment=(
f"api/{table_ownership['service_artifact']}{'/private' if private else ''}/tables"
f"/{table_record['id']}/search"
),
url_args=(("query", json.dumps(data_type_queries[table_data_type])),),
path_fragment=path_fragment,
url_args=url_args,
method="GET",
auth_header=auth_header, # Required in some cases to not get a 403
extra_headers=DATASET_SEARCH_HEADERS,
)


if private:
# We have a results array to account for
results = r["results"]
Expand Down Expand Up @@ -247,6 +285,9 @@ async def run_search_on_dataset(
) -> Tuple[Dict[str, list], Query, List[str]]:
linked_field_sets: LinkedFieldSetList = _get_dataset_linked_field_sets(dataset)

# print(f"Linked Field Sets: {linked_field_sets}")
# print(f"Dataset: {dataset}")

# Pairs of table ownership records, from the metadata service, and table records,
# from each data service to which the table belongs)
table_ownerships_and_records: List[Tuple[Dict, Dict]] = []
Expand All @@ -260,6 +301,8 @@ async def run_search_on_dataset(
await table_ownership_queue.join()

try:
# print(f"table_ownerships_and_records: {table_ownerships_and_records}")

table_data_types: Set[str] = {t[1]["data_type"] for t in table_ownerships_and_records}

# Set of data types excluded from building the join query
Expand Down
64 changes: 63 additions & 1 deletion bento_federation_service/search/dataset_search/query_utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from bento_lib.search.queries import convert_query_to_ast_and_preprocess, Query
from typing import Dict, Iterable, Optional, Tuple
from typing import Dict, Iterable, List, Optional, Tuple

from bento_federation_service.utils import get_request_json

Expand Down Expand Up @@ -36,3 +36,65 @@ def test_queries(queries: Iterable[Query]) -> None:
for q in queries:
# Try compiling each query to make sure it works.
convert_query_to_ast_and_preprocess(q)

def print_tree(tabs, t):
for i in t:
if(isinstance(i, list)):
print_tree(tabs+1, i)
else:
print(f"{tabs*' '}{i}")


def simple_resolve_tree(t, finalists: list):
counter=0
for i in t:
if isinstance(i, list):
simple_resolve_tree(i, finalists)
if counter == len(t)-1 and (isinstance(i, str) or isinstance(i, int)):
finalists.append(str(i))
counter+=1
return finalists

def pair_up_simple_list(t: List[List[str]]):
counter=0
pairs=[]
for i in t:
if counter % 2 == 0:
pairs.append([t[counter], t[counter+1]])
counter += 1
return pairs

def rename_gohan_compatible(list_pairs):
for p in list_pairs:
if p[0] == "assembly_id":
p[0] = "assemblyId"
elif p[0] == "start":
p[0] = "lowerBound"
elif p[0] == "end":
p[0] = "upperBound"
elif p[0] == "genotype_type":
p[0] = "genotype"

def prune_non_gohan_paramters(list_pairs):
for p in list_pairs:
if p[0] == "sample_id":
list_pairs.remove(p)


def construct_gohan_query_params(ast: list, supplemental_args: List[List[str]]):
# somehow convert AST to a simple list of lists/strings/ints
#converted_ast = [] # temp

# resolve simple key/value pairs
simple_list = []
simple_resolve_tree(ast, simple_list)

# pair up simple list and concat with extra arg pairs
pairs = pair_up_simple_list(simple_list) + supplemental_args
# prune unnecessary paramers
prune_non_gohan_paramters(pairs)
# ensure gohan query param nameing convention matches up
rename_gohan_compatible(pairs)

return tuple(tuple(x) for x in pairs)

2 changes: 1 addition & 1 deletion bento_federation_service/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ async def peer_fetch(client: AsyncHTTPClient, peer: str, path_fragment: str, req
urljoin(peer, path_fragment) + arg_str,
request_timeout=TIMEOUT,
method=method,
body=request_body,
body=request_body, validate_cert=(not CHORD_DEBUG),
headers={
**({} if request_body is None else {"Content-Type": "application/json; charset=UTF-8"}),
**({"Authorization": auth_header} if auth_header else {}),
Expand Down

0 comments on commit 57170ab

Please sign in to comment.