Skip to content

Commit

Permalink
Extracts Oak from API server into a standalone RPC server (#357)
Browse files Browse the repository at this point in the history
### Related issues

- Relates to #347

### Summary

- adds RPC server to expose Oak's compare() method
- adds [wait-for-it.sh](https://github.com/vishnubob/wait-for-it), a
simple and often-used script for polling a TCP service until it's ready
(used to check when the RPC server is up)
- changes API from running via uvicorn to running via gunicorn, but with
the uvicorn worker class
- fixes high memory usage and some transient concurrency issues when the
number of API workers is increased

### Notes

- I was briefly trying to make better use of Docker layer caching and,
in the process, switched the references to poetry in `start_api.sh` from
full paths to just `poetry`. It appeared to work fine, so I didn't
switch them back, but if there's a reason to use those full paths by all
means feel free to reintroduce them.

### Checks

- [ ] All tests have passed (or issues created for failing tests)
  • Loading branch information
falquaddoomi authored Oct 2, 2023
1 parent 9298243 commit 179490c
Show file tree
Hide file tree
Showing 6 changed files with 443 additions and 6 deletions.
171 changes: 170 additions & 1 deletion backend/poetry.lock

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions backend/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ pystow = ">=0.5.0"
loguru = "*"
fastapi = "^0.103.1"
oaklib = "^0.5.19"
gunicorn = "^21.2.0"
tinyrpc = {extras = ["zmq"], version = "^1.1.7"}


[tool.poetry.group.dev.dependencies]
Expand Down
31 changes: 29 additions & 2 deletions backend/src/monarch_py/api/config.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,17 @@
import os
import zmq
from functools import lru_cache

from pydantic import BaseSettings

from tinyrpc import RPCClient
from tinyrpc.protocols.jsonrpc import JSONRPCProtocol
from tinyrpc.transports.zmq import ZmqClientTransport

from monarch_py.implementations.oak.oak_implementation import OakImplementation
from monarch_py.implementations.solr.solr_implementation import SolrImplementation
from monarch_py.datamodels.model import TermSetPairwiseSimilarity



class Settings(BaseSettings):
Expand All @@ -13,6 +20,8 @@ class Settings(BaseSettings):
solr_url = os.getenv("SOLR_URL") if os.getenv("SOLR_URL") else f"http://{solr_host}:{solr_port}/solr"
phenio_db_path = os.getenv("PHENIO_DB_PATH") if os.getenv("PHENIO_DB_PATH") else "/data/phenio.db"

oak_server_host = os.getenv("OAK_SERVER_HOST", '127.0.0.1')
oak_server_port = os.getenv("OAK_SERVER_PORT", 18811)

settings = Settings()

Expand All @@ -21,8 +30,26 @@ class Settings(BaseSettings):
def solr():
return SolrImplementation(settings.solr_url)

class OakRPCMarshaller:
def __init__(self) -> None:
ctx = zmq.Context()
rpc_client = RPCClient(
JSONRPCProtocol(),
ZmqClientTransport.create(
ctx, f'tcp://{settings.oak_server_host}:{settings.oak_server_port}'
)
)
self.oak_server = rpc_client.get_proxy()

def compare(self, *args, **kwargs):
# for some reason TermSetPairwiseSimilarity is not JSON-serializable so
# we can't call compare() directly. instead, we call compare_as_dict(),
# which sends back a dict, and then we convert it into the expected
# TermSetPairwiseSimilarity type
result = self.oak_server.compare_as_dict(*args, **kwargs)
return TermSetPairwiseSimilarity(**result)


@lru_cache(maxsize=1)
def oak():
phenio_db_path = settings.phenio_db_path if os.path.exists(settings.phenio_db_path) else None
return OakImplementation().init_semsim(phenio_path=phenio_db_path)
return OakRPCMarshaller()
46 changes: 46 additions & 0 deletions backend/src/monarch_py/api/oak_server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import os
from typing import List

import zmq

from tinyrpc.server import RPCServer
from tinyrpc.dispatch import RPCDispatcher
from tinyrpc.protocols.jsonrpc import JSONRPCProtocol
from tinyrpc.transports.zmq import ZmqServerTransport

from monarch_py.implementations.oak.oak_implementation import OakImplementation
# from monarch_py.datamodels.model import TermSetPairwiseSimilarity
from monarch_py.api.config import settings

def run_server():
# first load oak, before starting the server
phenio_db_path = settings.phenio_db_path if os.path.exists(settings.phenio_db_path) else None
oak = OakImplementation().init_semsim(phenio_path=phenio_db_path)

# set up all the bits that a tinyrpc server is made of
ctx = zmq.Context()
dispatcher = RPCDispatcher()
transport = ZmqServerTransport.create(
ctx, f'tcp://{settings.oak_server_host}:{settings.oak_server_port}'
)

rpc_server = RPCServer(
transport,
JSONRPCProtocol(),
dispatcher
)

# register a serializable proxy for the compare method
@dispatcher.public
def compare_as_dict(
subjects: List[str], objects: List[str], predicates: List[str] = None, labels=False
) -> dict:
return oak.compare(
subjects=subjects, objects=objects,
predicates=predicates, labels=labels
).dict()

rpc_server.serve_forever()

if __name__ == '__main__':
run_server()
16 changes: 13 additions & 3 deletions backend/start_api.sh
Original file line number Diff line number Diff line change
@@ -1,7 +1,17 @@
#!/usr/bin/bash

# check for uvicorn workers env var, default to 1
UVICORN_WORKERS=${UVICORN_WORKERS:-1}
# before we start, run the oak server in the background...
poetry run python -m src.monarch_py.api.oak_server &

# ...but block while we wait for it to start serving request
./wait-for-it.sh -t 0 ${OAK_SERVER_HOST:-localhost}:${OAK_SERVER_PORT:-18811}

# check for uvicorn workers env var, default to 8
UVICORN_WORKERS=${UVICORN_WORKERS:-8}

# Start the API
/opt/poetry/bin/poetry run uvicorn src.monarch_py.api.main:app --host 0.0.0.0 --port 8000 --workers ${UVICORN_WORKERS}
poetry run gunicorn src.monarch_py.api.main:app \
--bind 0.0.0.0:8000 \
--preload --timeout ${WORKER_TIMEOUT:-120} \
--worker-class uvicorn.workers.UvicornWorker \
--workers ${UVICORN_WORKERS}
183 changes: 183 additions & 0 deletions backend/wait-for-it.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
#!/usr/bin/env bash
# from https://github.com/vishnubob/wait-for-it/blob/81b1373f17855a4dc21156cfe1694c31d7d1792e/wait-for-it.sh
# Use this script to test if a given TCP host/port are available

WAITFORIT_cmdname=${0##*/}

echoerr() { if [[ $WAITFORIT_QUIET -ne 1 ]]; then echo "$@" 1>&2; fi }

usage()
{
cat << USAGE >&2
Usage:
$WAITFORIT_cmdname host:port [-s] [-t timeout] [-- command args]
-h HOST | --host=HOST Host or IP under test
-p PORT | --port=PORT TCP port under test
Alternatively, you specify the host and port as host:port
-s | --strict Only execute subcommand if the test succeeds
-q | --quiet Don't output any status messages
-t TIMEOUT | --timeout=TIMEOUT
Timeout in seconds, zero for no timeout
-- COMMAND ARGS Execute command with args after the test finishes
USAGE
exit 1
}

wait_for()
{
if [[ $WAITFORIT_TIMEOUT -gt 0 ]]; then
echoerr "$WAITFORIT_cmdname: waiting $WAITFORIT_TIMEOUT seconds for $WAITFORIT_HOST:$WAITFORIT_PORT"
else
echoerr "$WAITFORIT_cmdname: waiting for $WAITFORIT_HOST:$WAITFORIT_PORT without a timeout"
fi
WAITFORIT_start_ts=$(date +%s)
while :
do
if [[ $WAITFORIT_ISBUSY -eq 1 ]]; then
nc -z $WAITFORIT_HOST $WAITFORIT_PORT
WAITFORIT_result=$?
else
(echo -n > /dev/tcp/$WAITFORIT_HOST/$WAITFORIT_PORT) >/dev/null 2>&1
WAITFORIT_result=$?
fi
if [[ $WAITFORIT_result -eq 0 ]]; then
WAITFORIT_end_ts=$(date +%s)
echoerr "$WAITFORIT_cmdname: $WAITFORIT_HOST:$WAITFORIT_PORT is available after $((WAITFORIT_end_ts - WAITFORIT_start_ts)) seconds"
break
fi
sleep 1
done
return $WAITFORIT_result
}

wait_for_wrapper()
{
# In order to support SIGINT during timeout: http://unix.stackexchange.com/a/57692
if [[ $WAITFORIT_QUIET -eq 1 ]]; then
timeout $WAITFORIT_BUSYTIMEFLAG $WAITFORIT_TIMEOUT $0 --quiet --child --host=$WAITFORIT_HOST --port=$WAITFORIT_PORT --timeout=$WAITFORIT_TIMEOUT &
else
timeout $WAITFORIT_BUSYTIMEFLAG $WAITFORIT_TIMEOUT $0 --child --host=$WAITFORIT_HOST --port=$WAITFORIT_PORT --timeout=$WAITFORIT_TIMEOUT &
fi
WAITFORIT_PID=$!
trap "kill -INT -$WAITFORIT_PID" INT
wait $WAITFORIT_PID
WAITFORIT_RESULT=$?
if [[ $WAITFORIT_RESULT -ne 0 ]]; then
echoerr "$WAITFORIT_cmdname: timeout occurred after waiting $WAITFORIT_TIMEOUT seconds for $WAITFORIT_HOST:$WAITFORIT_PORT"
fi
return $WAITFORIT_RESULT
}

# process arguments
while [[ $# -gt 0 ]]
do
case "$1" in
*:* )
WAITFORIT_hostport=(${1//:/ })
WAITFORIT_HOST=${WAITFORIT_hostport[0]}
WAITFORIT_PORT=${WAITFORIT_hostport[1]}
shift 1
;;
--child)
WAITFORIT_CHILD=1
shift 1
;;
-q | --quiet)
WAITFORIT_QUIET=1
shift 1
;;
-s | --strict)
WAITFORIT_STRICT=1
shift 1
;;
-h)
WAITFORIT_HOST="$2"
if [[ $WAITFORIT_HOST == "" ]]; then break; fi
shift 2
;;
--host=*)
WAITFORIT_HOST="${1#*=}"
shift 1
;;
-p)
WAITFORIT_PORT="$2"
if [[ $WAITFORIT_PORT == "" ]]; then break; fi
shift 2
;;
--port=*)
WAITFORIT_PORT="${1#*=}"
shift 1
;;
-t)
WAITFORIT_TIMEOUT="$2"
if [[ $WAITFORIT_TIMEOUT == "" ]]; then break; fi
shift 2
;;
--timeout=*)
WAITFORIT_TIMEOUT="${1#*=}"
shift 1
;;
--)
shift
WAITFORIT_CLI=("$@")
break
;;
--help)
usage
;;
*)
echoerr "Unknown argument: $1"
usage
;;
esac
done

if [[ "$WAITFORIT_HOST" == "" || "$WAITFORIT_PORT" == "" ]]; then
echoerr "Error: you need to provide a host and port to test."
usage
fi

WAITFORIT_TIMEOUT=${WAITFORIT_TIMEOUT:-15}
WAITFORIT_STRICT=${WAITFORIT_STRICT:-0}
WAITFORIT_CHILD=${WAITFORIT_CHILD:-0}
WAITFORIT_QUIET=${WAITFORIT_QUIET:-0}

# Check to see if timeout is from busybox?
WAITFORIT_TIMEOUT_PATH=$(type -p timeout)
WAITFORIT_TIMEOUT_PATH=$(realpath $WAITFORIT_TIMEOUT_PATH 2>/dev/null || readlink -f $WAITFORIT_TIMEOUT_PATH)

WAITFORIT_BUSYTIMEFLAG=""
if [[ $WAITFORIT_TIMEOUT_PATH =~ "busybox" ]]; then
WAITFORIT_ISBUSY=1
# Check if busybox timeout uses -t flag
# (recent Alpine versions don't support -t anymore)
if timeout &>/dev/stdout | grep -q -e '-t '; then
WAITFORIT_BUSYTIMEFLAG="-t"
fi
else
WAITFORIT_ISBUSY=0
fi

if [[ $WAITFORIT_CHILD -gt 0 ]]; then
wait_for
WAITFORIT_RESULT=$?
exit $WAITFORIT_RESULT
else
if [[ $WAITFORIT_TIMEOUT -gt 0 ]]; then
wait_for_wrapper
WAITFORIT_RESULT=$?
else
wait_for
WAITFORIT_RESULT=$?
fi
fi

if [[ $WAITFORIT_CLI != "" ]]; then
if [[ $WAITFORIT_RESULT -ne 0 && $WAITFORIT_STRICT -eq 1 ]]; then
echoerr "$WAITFORIT_cmdname: strict mode, refusing to execute subprocess"
exit $WAITFORIT_RESULT
fi
exec "${WAITFORIT_CLI[@]}"
else
exit $WAITFORIT_RESULT
fi

0 comments on commit 179490c

Please sign in to comment.