diff --git a/requirements.txt b/requirements.txt index 482c5fd..fa86ec7 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,5 +2,6 @@ httpx redis fastapi bmt +jsonpickle reasoner_pydantic uvicorn \ No newline at end of file diff --git a/src/descender.py b/src/descender.py index 6d8e36c..d2d8b9a 100644 --- a/src/descender.py +++ b/src/descender.py @@ -1,24 +1,49 @@ -# Convenience class for loading biolink and coming up with all the descendents of pq and +# Convenience class for loading biolink and coming up with all the descendants of pq and # types. from bmt import Toolkit -from src.keymaster import create_pq +import jsonpickle from collections import defaultdict +from src.keymaster import create_pq + class Descender: - def __init__(self): - self.t = Toolkit() - self.type_to_descendants = self.create_type_to_descendants() - self.pq_to_descendants = self.create_pq_to_descendants() - self.deeptypescache = {} + def __init__(self,rc = None): + """Descender can be loaded from redis (r) or if no redis is provided, it will load from bmt. + When we load from redis, we also pull in the s and o partial patterns which are used to filter at q time.""" + if rc is not None: + db = rc.r[7] + self.pq_to_descendants = jsonpickle.decode(db.get("pq_to_descendants")) + self.type_to_descendants = jsonpickle.decode(db.get("type_to_descendants")) + self.predicate_is_symmetric = jsonpickle.decode(db.get("predicate_symmetries")) + self.s_partial_patterns = jsonpickle.decode(db.get("s_partial_patterns")) + self.o_partial_patterns = jsonpickle.decode(db.get("o_partial_patterns")) + self.pq_to_descendant_int_ids = self.create_pq_to_descendant_int_ids(rc) + else: + self.t = Toolkit() + self.type_to_descendants = self.create_type_to_descendants() + self.pq_to_descendants = self.create_pq_to_descendants() + self.predicate_is_symmetric = self.create_is_symmetric() + self.deeptypescache = {} def is_symmetric(self, predicate): + return self.predicate_is_symmetric[predicate] + def create_is_symmetric(self): + # Create a dictionary from predicate to whether it is symmetric # The symmetric nature of an edge is completely determined by the predicate. # I don't think it's possible for a symmetric predicate to be made asymmetric by the # addition of qualifiers. - p = self.t.get_element(predicate) - if p.symmetric is None: - return False - return p.symmetric + p_is_symmetric = {} + for p in self.t.get_descendants('biolink:related_to',formatted=True): + try: + p_element = self.t.get_element(p) + if p_element.symmetric is None: + p_is_symmetric[p] = False + else: + p_is_symmetric[p] = p_element.symmetric + except: + print("Error 2 with predicte: " + p) + pass + return p_is_symmetric def create_type_to_descendants(self): # Create a dictionary from type to all of its descendants type_to_descendants = {} @@ -63,11 +88,11 @@ def create_pq_to_descendants(self): e = {"predicate": predicate} original_pk = create_pq(e) add_all_decs(e, predicate_directions, predicate_aspects, decs) - # add_all_decs isn't fully redundant, need to make it so by adding grand descendents etc + # add_all_decs isn't fully redundant, need to make it so by adding grand descendants etc decs = redundantize_decs(decs, original_pk) for k,v in decs.items(): pq_to_descendants[k] = v - #Connect the new descendents to the ancestors of the original pq + #Connect the new descendants to the ancestors of the original pq for k,v in pq_to_descendants.items(): if original_pk in v: pq_to_descendants[k].update(decs[original_pk]) @@ -79,6 +104,27 @@ def get_pq_descendants(self, pq): return self.pq_to_descendants[pq] except: return [pq] + def create_pq_to_descendant_int_ids(self,rc): + # Create a dictionary from pq to all of its descendant integer ids + # First, pull the integer id for every pq + pql = list(self.pq_to_descendants.keys()) + pq_iids = rc.pipeline_gets(3, pql, True).values() + pq_int_ids = {pq:iid for pq,iid in zip(pql, pq_iids)} + # now convert pq_to_descendants into int id values + pq_to_descendant_int_ids = {} + for pq in self.pq_to_descendants: + descendants = self.pq_to_descendants[pq] + pq_to_descendant_int_ids[pq] = set() + for desc in descendants: + # Not every possible descendant is in the database, and the point here is to filter it down to the ones that are. + try: + pq_to_descendant_int_ids[pq].add(pq_int_ids[desc]) + except KeyError: + # This is totally expected + pass + return pq_to_descendant_int_ids + def get_pq_descendant_int_ids(self, pq): + return self.pq_to_descendant_int_ids[pq] def get_deepest_types(self, typelist): """Given a list of types, examine self.type_to_descendants and return a list of the types from typelist that do not have a descendant in the list""" @@ -145,14 +191,14 @@ def get_decs(edge, directions, aspects): return new_edges def redundantize_decs(decs, root, processed=None): - # given a dictionary from a member to a set of immediate descendents d, and a root node, return a dictionary - # from the member to a set of all descendents. Note that a member is a descendent of itself. + # given a dictionary from a member to a set of immediate descendants d, and a root node, return a dictionary + # from the member to a set of all descendants. Note that a member is a descendent of itself. # For instance, if decs is {a: {a,b,c}, b: {b,d}, c: {c,e}, d:{d}, e:{e}}, then the return value will be # {a: {a,b,c,d,e}, b: {b,d}, c: {c,e}, d:{d}, e:{e}}. # This is a recursive function. It returns a dictionary. # The base case is when the root is not in the dictionary. In this case, we return the dictionary. - # The recursive case is when the root is in the dictionary. In this case, we add the descendents of the - # root to the root's descendents, and then call the function on the root's descendents. + # The recursive case is when the root is in the dictionary. In this case, we add the descendants of the + # root to the root's descendants, and then call the function on the root's descendants. # Note that this function is not efficient. It is O(n^2) in the number of edges. However, we don't expect # the number of edges to be very large, so this should be fine. if processed is None: diff --git a/src/load_redis.py b/src/load_redis.py index ab7b615..84c42f4 100644 --- a/src/load_redis.py +++ b/src/load_redis.py @@ -3,6 +3,7 @@ import argparse import json +import jsonpickle from collections import defaultdict from redis_connector import RedisConnection @@ -49,7 +50,7 @@ def fixedge(edge): return new_edge -def load_nodes(nodepath, host, port, password): +def load_nodes(nodepath, descender, host, port, password): # Load jsonl files into Redis # The redis database is structured as follows: # db0 contains a map from a text node id to an integer node_id. The int node_id is defined @@ -61,7 +62,6 @@ def load_nodes(nodepath, host, port, password): # create a dictionary from (original) node_id to category. We also need to keep a dictionary # in python with the node_id->integer node id mapping. We will use this to create the edges. - descender = Descender() with RedisConnection(host, port, password) as rc: pipelines = rc.get_pipelines() @@ -97,7 +97,7 @@ def load_nodes(nodepath, host, port, password): return nodeid_to_categories, nodeid_to_intnodeid -def load_edges(edgepath, nodeid_to_categories, nodeid_to_intnodeid, host, port, password): +def load_edges(edgepath, descender, nodeid_to_categories, nodeid_to_intnodeid, host, port, password): # Load an edge jsonl into redis. Edges are specified for query by a combination of # predicate and qualifiers, denoted pq. The databases are structured as: # db3: pq -> integer_id_for_pq (for saving mem in the other dbs) @@ -106,6 +106,7 @@ def load_edges(edgepath, nodeid_to_categories, nodeid_to_intnodeid, host, port, # (type_int_id, -pq_int_id, object_int_id). The latter is for reverse edges. # db5: query_pattern -> list of integer_edge_ids # db6: int_node_id -> list of subclass integer_node_ids + # db7: several pieces of metadata that are used to reconstruct descender at server startup with RedisConnection(host, port, password) as rc: pipelines = rc.get_pipelines() @@ -113,6 +114,9 @@ def load_edges(edgepath, nodeid_to_categories, nodeid_to_intnodeid, host, port, last_edge_id = 0 pq_to_intpq = {} + s_partial_patterns = set() + o_partial_patterns = set() + # read the file with open(edgepath) as f: for line in f: @@ -141,6 +145,8 @@ def load_edges(edgepath, nodeid_to_categories, nodeid_to_intnodeid, host, port, for o_cat_int in o_cat_ints: spattern = create_query_pattern(s_int, pq_intid, o_cat_int) opattern = create_query_pattern(s_cat_int, -pq_intid, o_int) + s_partial_patterns.add(f"{pq_intid},{o_cat_int}") + o_partial_patterns.add(f"{s_cat_int},-{pq_intid}") pipelines[5].rpush(spattern, last_edge_id) pipelines[5].rpush(spattern, o_int) pipelines[5].rpush(opattern, last_edge_id) @@ -148,11 +154,38 @@ def load_edges(edgepath, nodeid_to_categories, nodeid_to_intnodeid, host, port, if last_edge_id % 10000 == 0: print("Edge", last_edge_id) rc.flush_pipelines() - + write_metadata(rc, descender, s_partial_patterns, o_partial_patterns) + +def write_metadata(rc, descender, s_partial_patterns, o_partial_patterns): + """ + Write metadata to db7 redis to be used at server startup. + The metadata will consist of these elements: + "pq_to_descendants": a json version of descender.pq_to_descendants + "type_to_descendants": a json version of descender.type_to_descendants + "s_partial_patterns": a set of partial patterns for subject queries + "o_partial_patterns": a set of partial patterns for object queries + "predicate_symmetries": a dictionary of {predicate: True/False} denoting whether the predicate is symmetric + + This is for 3 reasons: + 1. It keeps us from having to recalculate the descendants at server startup (a slow process) + 2. It means we don't need to load BMT at server startup, so that we can save time and also avoid changes there for versions + 3. The partial patterns optimize query time for very general queries like (related to named thing). if you just + follow the biolink model and look for every predicate, qualifier, type , then there are about 150k. But in the + data, there are more like 1k, and this is the main slow part of the related_to query. By knowing what the + subpatterns are, we can filter and run much faster. + """ + + db = rc.r[7] + db.set("pq_to_descendants", jsonpickle.encode(descender.pq_to_descendants)) + db.set("type_to_descendants", jsonpickle.encode(descender.type_to_descendants)) + db.set("s_partial_patterns", jsonpickle.encode(s_partial_patterns)) + db.set("o_partial_patterns", jsonpickle.encode(o_partial_patterns)) + db.set("predicate_symmetries", jsonpickle.encode(descender.predicate_is_symmetric)) def load(nodepath, edgepath, host, port, password): - nodeid_to_categories, nodeid_to_intnodeid = load_nodes(nodepath, host, port, password) - load_edges(edgepath, nodeid_to_categories, nodeid_to_intnodeid, host, port, password) + descender = Descender() + nodeid_to_categories, nodeid_to_intnodeid = load_nodes(nodepath, descender, host, port, password) + load_edges(edgepath, descender, nodeid_to_categories, nodeid_to_intnodeid, host, port, password) if __name__ == "__main__": diff --git a/src/query_redis.py b/src/query_redis.py index 6fdc055..e9d4f13 100644 --- a/src/query_redis.py +++ b/src/query_redis.py @@ -49,26 +49,36 @@ def gquery(input_curies, pq, output_type, input_is_subject, descender, rc, filte # looked up in redis at start time. # Get the int_id for the pq: - pqs = descender.get_pq_descendants(pq) - pq_int_ids = rc.pipeline_gets(3, pqs, True).values() + pq_int_ids = descender.get_pq_descendant_int_ids(pq) # Get the int_id for the output type and its descendants - output_types = descender.get_type_descendants(output_type) - type_int_ids = rc.pipeline_gets(2, output_types, True).values() + type_int_ids = get_type_int_ids(descender, output_type, rc) # create_query_pattern + iid_list = [] + query_patterns = [] if input_is_subject: - query_patterns = [create_query_pattern(iid, pq_int_id, type_int_id) for iid in input_int_ids for type_int_id in type_int_ids for pq_int_id in pq_int_ids] + for type_int_id in type_int_ids: + for pq_int_id in pq_int_ids: + #Filter to the ones that are actually in the db + if f"{pq_int_id},{type_int_id}" in descender.s_partial_patterns: + for iid in input_int_ids: + query_patterns.append(create_query_pattern(iid, pq_int_id, type_int_id)) + iid_list.append(iid) else: - query_patterns = [create_query_pattern(type_int_id, -pq_int_id, iid) for iid in input_int_ids for type_int_id in type_int_ids for pq_int_id in pq_int_ids] + for type_int_id in type_int_ids: + for pq_int_id in pq_int_ids: + #Filter to the ones that are actually in the db + if f"{type_int_id},-{pq_int_id}" in descender.o_partial_patterns: + for iid in input_int_ids: + query_patterns.append(create_query_pattern(type_int_id, -pq_int_id, iid) ) + iid_list.append(iid) # We need to make the iid_list in the same way as query_patterns so that we can # extract the iids that actually gave results to return them - iid_list = [iid for iid in input_int_ids for type_int_id in type_int_ids for pq_int_id in pq_int_ids] + # iid_list = [iid for iid in input_int_ids for type_int_id in type_int_ids for pq_int_id in pq_int_ids] # Now, get the list of edge ids that match the query patterns - for qp in query_patterns: - pipelines[5].lrange(qp, 0, -1) - results = pipelines[5].execute() + results = get_results_for_query_patterns(pipelines, query_patterns) # Keep the input_iids that returned results # This is kind of messy b/c you have to know if the iid is in the subject or object position of the query pattern input_int_ids = list(set([iid_list[i] for i in range(len(iid_list)) if len(results[i]) > 0])) @@ -89,17 +99,28 @@ def gquery(input_curies, pq, output_type, input_is_subject, descender, rc, filte edge_ids = filtered_edge_ids output_node_ids = filtered_output_node_ids - # Collect the node strings: - for iid in set(input_int_ids): - pipelines[1].get(iid) - input_node_strings = pipelines[1].execute() - for oid in set(output_node_ids): - pipelines[1].get(oid) - output_node_strings = pipelines[1].execute() - - # Collect the edge strings: - for eid in edge_ids: - pipelines[4].get(eid) - edge_strings = pipelines[4].execute() + return get_strings(input_int_ids, output_node_ids, edge_ids,rc) + + +def get_results_for_query_patterns(pipelines, query_patterns): + for qp in query_patterns: + pipelines[5].lrange(qp, 0, -1) + results = pipelines[5].execute() + return results + + +def get_type_int_ids(descender, output_type, rc): + output_types = descender.get_type_descendants(output_type) + type_int_ids = rc.pipeline_gets(2, output_types, True).values() + return type_int_ids + + + + +def get_strings(input_int_ids, output_node_ids, edge_ids,rc): + input_node_strings = rc.r[1].mget(set(input_int_ids)) + output_node_strings = rc.r[1].mget(set(output_node_ids)) + + edge_strings = rc.r[4].mget(edge_ids) return input_node_strings, output_node_strings, edge_strings diff --git a/src/redis_connector.py b/src/redis_connector.py index c303aac..07f4a57 100644 --- a/src/redis_connector.py +++ b/src/redis_connector.py @@ -11,6 +11,7 @@ def __init__(self,host,port,password): self.r.append(redis.StrictRedis(host=host, port=port, db=4, password=password)) self.r.append(redis.StrictRedis(host=host, port=port, db=5, password=password)) self.r.append(redis.StrictRedis(host=host, port=port, db=6, password=password)) + self.r.append(redis.StrictRedis(host=host, port=port, db=7, password=password)) self.p = [ rc.pipeline() for rc in self.r ] def __enter__(self): return self @@ -32,7 +33,8 @@ def pipeline_gets(self, pipeline_id, keys, convert_to_int=True): self.p[pipeline_id].get(key) values = self.p[pipeline_id].execute() if convert_to_int: - return {k:int(v) for k,v in zip(keys, values) if v is not None} + s = {k:int(v) for k,v in zip(keys, values) if v is not None} + return s else: return {k:v for k,v in zip(keys, values) if v is not None} diff --git a/src/server.py b/src/server.py index 285ab5f..8a197b2 100644 --- a/src/server.py +++ b/src/server.py @@ -24,7 +24,6 @@ allow_headers=["*"], ) -descender = Descender() REDIS_HOST = os.environ.get("REDIS_HOST", "localhost") REDIS_PORT = int(os.environ.get("REDIS_PORT", "6379")) REDIS_PASSWORD = os.environ.get("REDIS_PASSWORD") @@ -34,9 +33,14 @@ REDIS_PASSWORD ) +descender = Descender(rc) @APP.post("/query", tags=["Query"], response_model=PDResponse, response_model_exclude_none=True, status_code=200) async def query_handler(request: PDResponse): + #import cProfile + #pr = cProfile.Profile() + #pr.enable() + """ Query operations. """ dict_request = request.dict(exclude_unset=True, exclude_none=True) # Check the query graph for basic validity @@ -129,6 +133,9 @@ async def query_handler(request: PDResponse): object_query_node:[{"id":edge["subject"]}]}) response.message.results.append(result) + # after your program ends + #pr.disable() + #pr.print_stats(sort="cumtime") return response import uvicorn diff --git a/tests/test_query.py b/tests/test_query.py index a7ecf01..6afae67 100644 --- a/tests/test_query.py +++ b/tests/test_query.py @@ -14,11 +14,11 @@ # TODO : make rc, Desc into fixtures @pytest.fixture(scope="module") def rc(): - return RedisConnection("localhost", 6379, "nop") + return RedisConnection("localhost", 6379, "") @pytest.fixture(scope="module") -def descender(): - return Descender() +def descender(rc): + return Descender(rc) def test_simple_queries(rc, descender): # Given the edge defined in run_basic_tests, query for it by subject and object with exacty the diff --git a/tests/test_trapi.py b/tests/test_trapi.py index eadc2f0..bf1f3b2 100644 --- a/tests/test_trapi.py +++ b/tests/test_trapi.py @@ -47,7 +47,9 @@ def test_profile_asthma(): } } } - response = client.post("/query", json={"message": {"query_graph": query_graph}}).json() + + response = client.post("/query", json= query_graph).json() + print("How many results?",len(response["message"]["results"])) def test_500(): # This is giving a 500, seems like it's getting into the double ended query by mistake.