Skip to content

Commit

Permalink
Merge Staging (#35)
Browse files Browse the repository at this point in the history
* Add pyright + __status__ endpoint (#29)

* wip

* add pyright

* clean up

* simplify pyright

* optimize build

* fixes per code review

---------

Co-authored-by: Aaron Peddle <[email protected]>

* add basic tests (#30)

Co-authored-by: Aaron Peddle <[email protected]>

* Feature/track gpu status (#31)

* track gpu status

* fix potential deadlock with failed background task

* add sequence number

* update _read_event_chan to match previous API

* fixes per code review

---------

Co-authored-by: Aaron Peddle <[email protected]>

* Erikk/ban 375 inference time to potassium (#33)

* add idle time status

* inference timeout

* need to thread

* improve tests

* inference time not including cold boot time

* small change

* changed var name and set to None (instead of zero) if not set

---------

Co-authored-by: Aaron Peddle <[email protected]>

* version bump

---------

Co-authored-by: Aaron Peddle <[email protected]>
Co-authored-by: Aaron Peddle <[email protected]>
Co-authored-by: Erik Kaunismäki <[email protected]>
  • Loading branch information
4 people authored Oct 12, 2023
1 parent 1bc43f4 commit 2959d77
Show file tree
Hide file tree
Showing 11 changed files with 361 additions and 90 deletions.
1 change: 1 addition & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Dockerfile
18 changes: 18 additions & 0 deletions .github/workflows/pyright.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
name: Pyright Type Checking

on: [push, pull_request]

jobs:
type-check:
runs-on: ubuntu-latest

steps:
- name: Checkout code
uses: actions/checkout@v2

- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v1

- name: Build Docker image
run: docker build .

18 changes: 18 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
FROM python:3.8-slim-buster

WORKDIR /potassium

RUN pip install pyright pytest

ADD ./potassium/requirements.txt ./potassium/requirements.txt

RUN pip install -r ./potassium/requirements.txt

ADD . .

RUN pyright
RUN pytest tests

# tests are passing copy potassium to exports dir
RUN mkdir /exports && cp -r ./potassium /exports/potassium

4 changes: 2 additions & 2 deletions potassium/hooks.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@

def send_webhook(url: str, json: dict):
try:
res = requests.post(url, json=json)
requests.post(url, json=json)
except requests.exceptions.ConnectionError:
print(f"Webhook to {url} failed with connection error")
print(f"Webhook to {url} failed with connection error")
171 changes: 109 additions & 62 deletions potassium/potassium.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
import requests
import time
from flask import Flask, request, make_response, abort
from werkzeug.serving import make_server
from threading import Thread, Lock
from threading import Thread, Lock, Condition
import functools
from queue import Queue, Full
import traceback
from termcolor import colored

Expand All @@ -25,22 +24,33 @@ def __init__(self, status: int = 200, json: dict = {}):
self.status = status


class InvalidEndpointTypeException(Exception):
def __init__(self):
super().__init__("Invalid endpoint type. Must be 'handler' or 'background'")


class RouteAlreadyInUseException(Exception):
def __init__(self):
super().__init__("Route already in use")


class Potassium():
"Potassium is a simple, stateful, GPU-enabled, and autoscaleable web framework for deploying machine learning models."

def __init__(self, name):
self.name = name

# default init function, if the user doesn't specify one
def empty_init():
return {}

# semi-private vars, not intended for users to modify
self._init_func = empty_init
self._endpoints = {} # dictionary to store unlimited Endpoints, by unique route
self._init_func = lambda: {}
# dictionary to store unlimited Endpoints, by unique route
self._endpoints = {}
self._context = {}
self._lock = Lock()
self._event_chan = Queue(maxsize=1)
self._gpu_lock = Lock()
self._background_task_cv = Condition()
self._sequence_number = 0
self._idle_start_time = 0
self._last_inference_start_time = None
self._flask_app = self._create_flask_app()

#
def init(self, func):
Expand Down Expand Up @@ -77,6 +87,11 @@ def _standardize_route(route):
# handler is a blocking http POST handler
def handler(self, route: str = "/"):
"handler is a blocking http POST handler"

route = self._standardize_route(route)
if route in self._endpoints:
raise RouteAlreadyInUseException()

def actual_decorator(func):
@functools.wraps(func)
def wrapper(request):
Expand All @@ -93,10 +108,6 @@ def wrapper(request):

return out

nonlocal route # we need to modify the route variable in the outer scope
route = self._standardize_route(route)
if route in self._endpoints:
raise Exception("Route already in use")

self._endpoints[route] = Endpoint(type="handler", func=wrapper)
return wrapper
Expand All @@ -105,87 +116,100 @@ def wrapper(request):
# background is a non-blocking http POST handler
def background(self, route: str = "/"):
"background is a non-blocking http POST handler"
route = self._standardize_route(route)
if route in self._endpoints:
raise RouteAlreadyInUseException()

def actual_decorator(func):
@functools.wraps(func)
def wrapper(request):
# send in app's stateful context if GPU, and the request
return func(self._context, request)

nonlocal route # we need to modify the route variable in the outer scope
route = self._standardize_route(route)
if route in self._endpoints:
raise Exception("Route already in use")

self._endpoints[route] = Endpoint(
type="background", func=wrapper)
return wrapper
return actual_decorator

# _handle_generic takes in a request and the endpoint it was routed to and handles it as expected by that endpoint
def _handle_generic(self, route, endpoint, flask_request):
def test_client(self):
"test_client returns a Flask test client for the app"
return self._flask_app.test_client()

# _handle_generic takes in a request and the endpoint it was routed to and handles it as expected by that endpoint
def _handle_generic(self, endpoint, flask_request):
# potassium rejects if lock already in use
if self._is_working():
try:
self._gpu_lock.acquire(blocking=False)
self._sequence_number += 1
except:
res = make_response()
res.status_code = 423
res.headers['X-Endpoint-Type'] = endpoint.type
return res

res = None
self._last_inference_start_time = time.time()

if endpoint.type == "handler":
req = Request(
json=flask_request.get_json()
)

with self._lock:
try:
out = endpoint.func(req)
res = make_response(out.json)
res.status_code = out.status
res.headers['X-Endpoint-Type'] = endpoint.type
return res
except:
tb_str = traceback.format_exc()
print(colored(tb_str, "red"))
res = make_response(tb_str)
res.status_code = 500
res.headers['X-Endpoint-Type'] = endpoint.type
return res

if endpoint.type == "background":
try:
out = endpoint.func(req)
res = make_response(out.json)
res.status_code = out.status
res.headers['X-Endpoint-Type'] = endpoint.type
except:
tb_str = traceback.format_exc()
print(colored(tb_str, "red"))
res = make_response(tb_str)
res.status_code = 500
res.headers['X-Endpoint-Type'] = endpoint.type
self._idle_start_time = time.time()
self._last_inference_start_time = None
self._gpu_lock.release()
elif endpoint.type == "background":
req = Request(
json=flask_request.get_json()
)

# run as threaded task
def task(endpoint, lock, req):
with lock:
try:
endpoint.func(req)
except Exception as e:
# do any cleanup before re-raising user error
self._write_event_chan(True)
raise e
self._write_event_chan(True)

thread = Thread(target=task, args=(endpoint, self._lock, req))
try:
endpoint.func(req)
except Exception as e:
# do any cleanup before re-raising user error
raise e
finally:
with self._background_task_cv:
self._background_task_cv.notify_all()

self._idle_start_time = time.time()
self._last_inference_start_time = None
lock.release()

thread = Thread(target=task, args=(endpoint, self._gpu_lock, req))
thread.start()

# send task start success message
res = make_response({'started': True})
res.headers['X-Endpoint-Type'] = endpoint.type
return res
else:
raise InvalidEndpointTypeException()

def _write_event_chan(self, item):
try:
self._event_chan.put(item, block=False)
except Full:
pass
return res

# WARNING: cover depends on this being called so it should not be changed
def _read_event_chan(self) -> bool:
return self._event_chan.get()

def _is_working(self):
return self._lock.locked()
"""
_read_event_chan essentially waits for a background task to finish,
and then returns True
"""
with self._background_task_cv:
# wait until the background task is done
self._background_task_cv.wait()
return True

def _create_flask_app(self):
flask_app = Flask(__name__)
Expand All @@ -199,15 +223,38 @@ def handle(path):
abort(404)

endpoint = self._endpoints[route]
return self._handle_generic(route, endpoint, request)
return self._handle_generic(endpoint, request)

@flask_app.route('/__status__', methods=["GET"])
def status():
idle_time = 0
inference_time = 0
gpu_available = not self._gpu_lock.locked()

if self._last_inference_start_time != None:
inference_time = int((time.time() - self._last_inference_start_time)*1000)

if gpu_available:
idle_time = int((time.time() - self._idle_start_time)*1000)

res = make_response({
"gpu_available": gpu_available,
"sequence_number": self._sequence_number,
"idle_time": idle_time,
"inference_time": inference_time,
})

res.status_code = 200
res.headers['X-Endpoint-Type'] = "status"
return res

return flask_app

# serve runs the http server
def serve(self, host="0.0.0.0", port=8000):
print(colored("------\nStarting Potassium Server 🍌", 'yellow'))
self._init_func()
flask_app = self._create_flask_app()
server = make_server(host, port, flask_app)
server = make_server(host, port, self._flask_app, threaded=True)
print(colored(f"Serving at http://{host}:{port}\n------", 'green'))
self._idle_start_time = time.time()
server.serve_forever()
Loading

0 comments on commit 2959d77

Please sign in to comment.