Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

optimized query time #5

Merged
merged 2 commits into from
Dec 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@ httpx
redis
fastapi
bmt
jsonpickle
reasoner_pydantic
uvicorn
80 changes: 63 additions & 17 deletions src/descender.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,49 @@
# Convenience class for loading biolink and coming up with all the descendents of pq and
# Convenience class for loading biolink and coming up with all the descendants of pq and
# types.

from bmt import Toolkit
from src.keymaster import create_pq
import jsonpickle
from collections import defaultdict

from src.keymaster import create_pq

class Descender:
def __init__(self):
self.t = Toolkit()
self.type_to_descendants = self.create_type_to_descendants()
self.pq_to_descendants = self.create_pq_to_descendants()
self.deeptypescache = {}
def __init__(self,rc = None):
"""Descender can be loaded from redis (r) or if no redis is provided, it will load from bmt.
When we load from redis, we also pull in the s and o partial patterns which are used to filter at q time."""
if rc is not None:
db = rc.r[7]
self.pq_to_descendants = jsonpickle.decode(db.get("pq_to_descendants"))
self.type_to_descendants = jsonpickle.decode(db.get("type_to_descendants"))
self.predicate_is_symmetric = jsonpickle.decode(db.get("predicate_symmetries"))
self.s_partial_patterns = jsonpickle.decode(db.get("s_partial_patterns"))
self.o_partial_patterns = jsonpickle.decode(db.get("o_partial_patterns"))
self.pq_to_descendant_int_ids = self.create_pq_to_descendant_int_ids(rc)
else:
self.t = Toolkit()
self.type_to_descendants = self.create_type_to_descendants()
self.pq_to_descendants = self.create_pq_to_descendants()
self.predicate_is_symmetric = self.create_is_symmetric()
self.deeptypescache = {}
def is_symmetric(self, predicate):
return self.predicate_is_symmetric[predicate]
def create_is_symmetric(self):
# Create a dictionary from predicate to whether it is symmetric
# The symmetric nature of an edge is completely determined by the predicate.
# I don't think it's possible for a symmetric predicate to be made asymmetric by the
# addition of qualifiers.
p = self.t.get_element(predicate)
if p.symmetric is None:
return False
return p.symmetric
p_is_symmetric = {}
for p in self.t.get_descendants('biolink:related_to',formatted=True):
try:
p_element = self.t.get_element(p)
if p_element.symmetric is None:
p_is_symmetric[p] = False
else:
p_is_symmetric[p] = p_element.symmetric
except:
print("Error 2 with predicte: " + p)
pass
return p_is_symmetric
def create_type_to_descendants(self):
# Create a dictionary from type to all of its descendants
type_to_descendants = {}
Expand Down Expand Up @@ -63,11 +88,11 @@ def create_pq_to_descendants(self):
e = {"predicate": predicate}
original_pk = create_pq(e)
add_all_decs(e, predicate_directions, predicate_aspects, decs)
# add_all_decs isn't fully redundant, need to make it so by adding grand descendents etc
# add_all_decs isn't fully redundant, need to make it so by adding grand descendants etc
decs = redundantize_decs(decs, original_pk)
for k,v in decs.items():
pq_to_descendants[k] = v
#Connect the new descendents to the ancestors of the original pq
#Connect the new descendants to the ancestors of the original pq
for k,v in pq_to_descendants.items():
if original_pk in v:
pq_to_descendants[k].update(decs[original_pk])
Expand All @@ -79,6 +104,27 @@ def get_pq_descendants(self, pq):
return self.pq_to_descendants[pq]
except:
return [pq]
def create_pq_to_descendant_int_ids(self,rc):
# Create a dictionary from pq to all of its descendant integer ids
# First, pull the integer id for every pq
pql = list(self.pq_to_descendants.keys())
pq_iids = rc.pipeline_gets(3, pql, True).values()
pq_int_ids = {pq:iid for pq,iid in zip(pql, pq_iids)}
# now convert pq_to_descendants into int id values
pq_to_descendant_int_ids = {}
for pq in self.pq_to_descendants:
descendants = self.pq_to_descendants[pq]
pq_to_descendant_int_ids[pq] = set()
for desc in descendants:
# Not every possible descendant is in the database, and the point here is to filter it down to the ones that are.
try:
pq_to_descendant_int_ids[pq].add(pq_int_ids[desc])
except KeyError:
# This is totally expected
pass
return pq_to_descendant_int_ids
def get_pq_descendant_int_ids(self, pq):
return self.pq_to_descendant_int_ids[pq]
def get_deepest_types(self, typelist):
"""Given a list of types, examine self.type_to_descendants and return a list of the types
from typelist that do not have a descendant in the list"""
Expand Down Expand Up @@ -145,14 +191,14 @@ def get_decs(edge, directions, aspects):
return new_edges

def redundantize_decs(decs, root, processed=None):
# given a dictionary from a member to a set of immediate descendents d, and a root node, return a dictionary
# from the member to a set of all descendents. Note that a member is a descendent of itself.
# given a dictionary from a member to a set of immediate descendants d, and a root node, return a dictionary
# from the member to a set of all descendants. Note that a member is a descendent of itself.
# For instance, if decs is {a: {a,b,c}, b: {b,d}, c: {c,e}, d:{d}, e:{e}}, then the return value will be
# {a: {a,b,c,d,e}, b: {b,d}, c: {c,e}, d:{d}, e:{e}}.
# This is a recursive function. It returns a dictionary.
# The base case is when the root is not in the dictionary. In this case, we return the dictionary.
# The recursive case is when the root is in the dictionary. In this case, we add the descendents of the
# root to the root's descendents, and then call the function on the root's descendents.
# The recursive case is when the root is in the dictionary. In this case, we add the descendants of the
# root to the root's descendants, and then call the function on the root's descendants.
# Note that this function is not efficient. It is O(n^2) in the number of edges. However, we don't expect
# the number of edges to be very large, so this should be fine.
if processed is None:
Expand Down
45 changes: 39 additions & 6 deletions src/load_redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

import argparse
import json
import jsonpickle
from collections import defaultdict

from redis_connector import RedisConnection
Expand Down Expand Up @@ -49,7 +50,7 @@ def fixedge(edge):
return new_edge


def load_nodes(nodepath, host, port, password):
def load_nodes(nodepath, descender, host, port, password):
# Load jsonl files into Redis
# The redis database is structured as follows:
# db0 contains a map from a text node id to an integer node_id. The int node_id is defined
Expand All @@ -61,7 +62,6 @@ def load_nodes(nodepath, host, port, password):
# create a dictionary from (original) node_id to category. We also need to keep a dictionary
# in python with the node_id->integer node id mapping. We will use this to create the edges.

descender = Descender()

with RedisConnection(host, port, password) as rc:
pipelines = rc.get_pipelines()
Expand Down Expand Up @@ -97,7 +97,7 @@ def load_nodes(nodepath, host, port, password):
return nodeid_to_categories, nodeid_to_intnodeid


def load_edges(edgepath, nodeid_to_categories, nodeid_to_intnodeid, host, port, password):
def load_edges(edgepath, descender, nodeid_to_categories, nodeid_to_intnodeid, host, port, password):
# Load an edge jsonl into redis. Edges are specified for query by a combination of
# predicate and qualifiers, denoted pq. The databases are structured as:
# db3: pq -> integer_id_for_pq (for saving mem in the other dbs)
Expand All @@ -106,13 +106,17 @@ def load_edges(edgepath, nodeid_to_categories, nodeid_to_intnodeid, host, port,
# (type_int_id, -pq_int_id, object_int_id). The latter is for reverse edges.
# db5: query_pattern -> list of integer_edge_ids
# db6: int_node_id -> list of subclass integer_node_ids
# db7: several pieces of metadata that are used to reconstruct descender at server startup

with RedisConnection(host, port, password) as rc:
pipelines = rc.get_pipelines()

last_edge_id = 0
pq_to_intpq = {}

s_partial_patterns = set()
o_partial_patterns = set()

# read the file
with open(edgepath) as f:
for line in f:
Expand Down Expand Up @@ -141,18 +145,47 @@ def load_edges(edgepath, nodeid_to_categories, nodeid_to_intnodeid, host, port,
for o_cat_int in o_cat_ints:
spattern = create_query_pattern(s_int, pq_intid, o_cat_int)
opattern = create_query_pattern(s_cat_int, -pq_intid, o_int)
s_partial_patterns.add(f"{pq_intid},{o_cat_int}")
o_partial_patterns.add(f"{s_cat_int},-{pq_intid}")
pipelines[5].rpush(spattern, last_edge_id)
pipelines[5].rpush(spattern, o_int)
pipelines[5].rpush(opattern, last_edge_id)
pipelines[5].rpush(opattern, s_int)
if last_edge_id % 10000 == 0:
print("Edge", last_edge_id)
rc.flush_pipelines()

write_metadata(rc, descender, s_partial_patterns, o_partial_patterns)

def write_metadata(rc, descender, s_partial_patterns, o_partial_patterns):
"""
Write metadata to db7 redis to be used at server startup.
The metadata will consist of these elements:
"pq_to_descendants": a json version of descender.pq_to_descendants
"type_to_descendants": a json version of descender.type_to_descendants
"s_partial_patterns": a set of partial patterns for subject queries
"o_partial_patterns": a set of partial patterns for object queries
"predicate_symmetries": a dictionary of {predicate: True/False} denoting whether the predicate is symmetric

This is for 3 reasons:
1. It keeps us from having to recalculate the descendants at server startup (a slow process)
2. It means we don't need to load BMT at server startup, so that we can save time and also avoid changes there for versions
3. The partial patterns optimize query time for very general queries like (related to named thing). if you just
follow the biolink model and look for every predicate, qualifier, type , then there are about 150k. But in the
data, there are more like 1k, and this is the main slow part of the related_to query. By knowing what the
subpatterns are, we can filter and run much faster.
"""

db = rc.r[7]
db.set("pq_to_descendants", jsonpickle.encode(descender.pq_to_descendants))
db.set("type_to_descendants", jsonpickle.encode(descender.type_to_descendants))
db.set("s_partial_patterns", jsonpickle.encode(s_partial_patterns))
db.set("o_partial_patterns", jsonpickle.encode(o_partial_patterns))
db.set("predicate_symmetries", jsonpickle.encode(descender.predicate_is_symmetric))

def load(nodepath, edgepath, host, port, password):
nodeid_to_categories, nodeid_to_intnodeid = load_nodes(nodepath, host, port, password)
load_edges(edgepath, nodeid_to_categories, nodeid_to_intnodeid, host, port, password)
descender = Descender()
nodeid_to_categories, nodeid_to_intnodeid = load_nodes(nodepath, descender, host, port, password)
load_edges(edgepath, descender, nodeid_to_categories, nodeid_to_intnodeid, host, port, password)


if __name__ == "__main__":
Expand Down
65 changes: 43 additions & 22 deletions src/query_redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,26 +49,36 @@ def gquery(input_curies, pq, output_type, input_is_subject, descender, rc, filte
# looked up in redis at start time.

# Get the int_id for the pq:
pqs = descender.get_pq_descendants(pq)
pq_int_ids = rc.pipeline_gets(3, pqs, True).values()
pq_int_ids = descender.get_pq_descendant_int_ids(pq)

# Get the int_id for the output type and its descendants
output_types = descender.get_type_descendants(output_type)
type_int_ids = rc.pipeline_gets(2, output_types, True).values()
type_int_ids = get_type_int_ids(descender, output_type, rc)

# create_query_pattern
iid_list = []
query_patterns = []
if input_is_subject:
query_patterns = [create_query_pattern(iid, pq_int_id, type_int_id) for iid in input_int_ids for type_int_id in type_int_ids for pq_int_id in pq_int_ids]
for type_int_id in type_int_ids:
for pq_int_id in pq_int_ids:
#Filter to the ones that are actually in the db
if f"{pq_int_id},{type_int_id}" in descender.s_partial_patterns:
for iid in input_int_ids:
query_patterns.append(create_query_pattern(iid, pq_int_id, type_int_id))
iid_list.append(iid)
else:
query_patterns = [create_query_pattern(type_int_id, -pq_int_id, iid) for iid in input_int_ids for type_int_id in type_int_ids for pq_int_id in pq_int_ids]
for type_int_id in type_int_ids:
for pq_int_id in pq_int_ids:
#Filter to the ones that are actually in the db
if f"{type_int_id},-{pq_int_id}" in descender.o_partial_patterns:
for iid in input_int_ids:
query_patterns.append(create_query_pattern(type_int_id, -pq_int_id, iid) )
iid_list.append(iid)
# We need to make the iid_list in the same way as query_patterns so that we can
# extract the iids that actually gave results to return them
iid_list = [iid for iid in input_int_ids for type_int_id in type_int_ids for pq_int_id in pq_int_ids]
# iid_list = [iid for iid in input_int_ids for type_int_id in type_int_ids for pq_int_id in pq_int_ids]

# Now, get the list of edge ids that match the query patterns
for qp in query_patterns:
pipelines[5].lrange(qp, 0, -1)
results = pipelines[5].execute()
results = get_results_for_query_patterns(pipelines, query_patterns)
# Keep the input_iids that returned results
# This is kind of messy b/c you have to know if the iid is in the subject or object position of the query pattern
input_int_ids = list(set([iid_list[i] for i in range(len(iid_list)) if len(results[i]) > 0]))
Expand All @@ -89,17 +99,28 @@ def gquery(input_curies, pq, output_type, input_is_subject, descender, rc, filte
edge_ids = filtered_edge_ids
output_node_ids = filtered_output_node_ids

# Collect the node strings:
for iid in set(input_int_ids):
pipelines[1].get(iid)
input_node_strings = pipelines[1].execute()
for oid in set(output_node_ids):
pipelines[1].get(oid)
output_node_strings = pipelines[1].execute()

# Collect the edge strings:
for eid in edge_ids:
pipelines[4].get(eid)
edge_strings = pipelines[4].execute()
return get_strings(input_int_ids, output_node_ids, edge_ids,rc)


def get_results_for_query_patterns(pipelines, query_patterns):
for qp in query_patterns:
pipelines[5].lrange(qp, 0, -1)
results = pipelines[5].execute()
return results


def get_type_int_ids(descender, output_type, rc):
output_types = descender.get_type_descendants(output_type)
type_int_ids = rc.pipeline_gets(2, output_types, True).values()
return type_int_ids




def get_strings(input_int_ids, output_node_ids, edge_ids,rc):
input_node_strings = rc.r[1].mget(set(input_int_ids))
output_node_strings = rc.r[1].mget(set(output_node_ids))

edge_strings = rc.r[4].mget(edge_ids)

return input_node_strings, output_node_strings, edge_strings
4 changes: 3 additions & 1 deletion src/redis_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ def __init__(self,host,port,password):
self.r.append(redis.StrictRedis(host=host, port=port, db=4, password=password))
self.r.append(redis.StrictRedis(host=host, port=port, db=5, password=password))
self.r.append(redis.StrictRedis(host=host, port=port, db=6, password=password))
self.r.append(redis.StrictRedis(host=host, port=port, db=7, password=password))
self.p = [ rc.pipeline() for rc in self.r ]
def __enter__(self):
return self
Expand All @@ -32,7 +33,8 @@ def pipeline_gets(self, pipeline_id, keys, convert_to_int=True):
self.p[pipeline_id].get(key)
values = self.p[pipeline_id].execute()
if convert_to_int:
return {k:int(v) for k,v in zip(keys, values) if v is not None}
s = {k:int(v) for k,v in zip(keys, values) if v is not None}
return s
else:
return {k:v for k,v in zip(keys, values) if v is not None}

Expand Down
9 changes: 8 additions & 1 deletion src/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
allow_headers=["*"],
)

descender = Descender()
REDIS_HOST = os.environ.get("REDIS_HOST", "localhost")
REDIS_PORT = int(os.environ.get("REDIS_PORT", "6379"))
REDIS_PASSWORD = os.environ.get("REDIS_PASSWORD")
Expand All @@ -34,9 +33,14 @@
REDIS_PASSWORD
)

descender = Descender(rc)

@APP.post("/query", tags=["Query"], response_model=PDResponse, response_model_exclude_none=True, status_code=200)
async def query_handler(request: PDResponse):
#import cProfile
#pr = cProfile.Profile()
#pr.enable()

""" Query operations. """
dict_request = request.dict(exclude_unset=True, exclude_none=True)
# Check the query graph for basic validity
Expand Down Expand Up @@ -129,6 +133,9 @@ async def query_handler(request: PDResponse):
object_query_node:[{"id":edge["subject"]}]})
response.message.results.append(result)

# after your program ends
#pr.disable()
#pr.print_stats(sort="cumtime")
return response

import uvicorn
Expand Down
6 changes: 3 additions & 3 deletions tests/test_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@
# TODO : make rc, Desc into fixtures
@pytest.fixture(scope="module")
def rc():
return RedisConnection("localhost", 6379, "nop")
return RedisConnection("localhost", 6379, "")

@pytest.fixture(scope="module")
def descender():
return Descender()
def descender(rc):
return Descender(rc)

def test_simple_queries(rc, descender):
# Given the edge defined in run_basic_tests, query for it by subject and object with exacty the
Expand Down
4 changes: 3 additions & 1 deletion tests/test_trapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,9 @@ def test_profile_asthma():
}
}
}
response = client.post("/query", json={"message": {"query_graph": query_graph}}).json()

response = client.post("/query", json= query_graph).json()
print("How many results?",len(response["message"]["results"]))

def test_500():
# This is giving a 500, seems like it's getting into the double ended query by mistake.
Expand Down
Loading