Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add health check api #24

Merged
merged 19 commits into from
Mar 14, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -131,3 +131,6 @@ dmypy.json
# Visual Studio Code
.vscode/
.devcontainer/

# PyCharm
.idea/
2 changes: 2 additions & 0 deletions config/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
provider_engine = 'pyth_replicator'

product_update_interval_secs = 10
port = 8000
keyvankhademi marked this conversation as resolved.
Show resolved Hide resolved
health_check_test_period_secs = 60
keyvankhademi marked this conversation as resolved.
Show resolved Hide resolved

[publisher.pythd]
endpoint = 'ws://127.0.0.1:8910'
Expand Down
13 changes: 12 additions & 1 deletion example_publisher/__main__.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
import asyncio
import os
import sys
import threading
from example_publisher.api.health_check import HTTPRequestHandler
from example_publisher.config import Config
from example_publisher.publisher import Publisher
import typed_settings as ts
import click
import logging
import structlog
from http.server import HTTPServer


_DEFAULT_CONFIG_PATH = os.path.join("config", "config.toml")

Expand All @@ -26,14 +30,21 @@
)
def main(config_path):

config = ts.load(
config: Config = ts.load(
cls=Config,
appname="publisher",
config_files=[config_path],
)

publisher = Publisher(config=config)

HTTPRequestHandler.publisher = publisher
HTTPRequestHandler.config = config
server = HTTPServer(("", config.port), HTTPRequestHandler)

server_thread = threading.Thread(target=server.serve_forever)
server_thread.start()

async def run():
try:
await publisher.start()
Expand Down
39 changes: 39 additions & 0 deletions example_publisher/api/health_check.py
keyvankhademi marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
from http.server import BaseHTTPRequestHandler
import json
import time
from example_publisher.config import Config


from example_publisher.publisher import Publisher


class HTTPRequestHandler(BaseHTTPRequestHandler):
publisher: Publisher = None
config: Config = None

def __init__(self, *args, **kwargs):
self.test_priod_secs: int = (
HTTPRequestHandler.config.health_check_test_period_secs
)
self.last_successful_update: float = (
keyvankhademi marked this conversation as resolved.
Show resolved Hide resolved
keyvankhademi marked this conversation as resolved.
Show resolved Hide resolved
HTTPRequestHandler.publisher.last_successful_update
)
super().__init__(*args, **kwargs)

def is_healthy(self):
return (
self.last_successful_update is not None
and time.time() - self.last_successful_update < self.test_priod_secs
)

def do_GET(self):
healthy = self.is_healthy()
data = {
"status": "ok" if healthy else "error",
"last_successful_update": self.last_successful_update,
}

self.send_response(200 if healthy else 503)
self.send_header("Content-type", "application/json")
self.end_headers()
self.wfile.write(json.dumps(data).encode("utf-8"))
2 changes: 2 additions & 0 deletions example_publisher/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ class PythReplicatorConfig:
class Config:
provider_engine: str
pythd: Pythd
port: int
health_check_test_period_secs: int
product_update_interval_secs: int = ts.option(default=60)
coin_gecko: Optional[CoinGeckoConfig] = ts.option(default=None)
pyth_replicator: Optional[PythReplicatorConfig] = ts.option(default=None)
3 changes: 3 additions & 0 deletions example_publisher/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,16 @@
from dataclasses import dataclass
from typing import List, Optional


Symbol = str
UnixTimestamp = int


@dataclass
class Price:
price: float
conf: float
timestamp: UnixTimestamp


class Provider(ABC):
Expand Down
19 changes: 10 additions & 9 deletions example_publisher/providers/coin_gecko.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import asyncio
from math import floor
import time
from typing import Dict, List, Optional
from pycoingecko import CoinGeckoAPI
from structlog import get_logger
Expand All @@ -16,7 +18,7 @@
class CoinGecko(Provider):
def __init__(self, config: CoinGeckoConfig) -> None:
self._api: CoinGeckoAPI = CoinGeckoAPI()
self._prices: Dict[Id, float] = {}
self._prices: Dict[Id, Price] = {}
self._symbol_to_id: Dict[Symbol, Id] = {
product.symbol: product.coin_gecko_id for product in config.products
}
Expand Down Expand Up @@ -45,18 +47,17 @@ def _update_prices(self) -> None:
ids=list(self._prices.keys()), vs_currencies=USD, precision=18
)
for id_, prices in result.items():
self._prices[id_] = prices[USD]
price = prices[USD]
self._prices[id_] = Price(
price,
price * self._config.confidence_ratio_bps / 10000,
floor(time.time()),
)
log.info("updated prices from CoinGecko", prices=self._prices)

def _get_price(self, id: Id) -> float:
keyvankhademi marked this conversation as resolved.
Show resolved Hide resolved
return self._prices.get(id, None)

def latest_price(self, symbol: Symbol) -> Optional[Price]:
id = self._symbol_to_id.get(symbol)
if not id:
return None

price = self._get_price(id)
if not price:
return None
return Price(price, price * self._config.confidence_ratio_bps / 10000)
return self._get_price(id)
15 changes: 7 additions & 8 deletions example_publisher/providers/pyth_replicator.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,12 @@

from structlog import get_logger

from example_publisher.provider import Price, Provider, Symbol
from example_publisher.provider import Price, Provider, Symbol, UnixTimestamp

from ..config import PythReplicatorConfig

log = get_logger()

UnixTimestamp = int

# Any feed with >= this number of min publishers is considered "coming soon".
COMING_SOON_MIN_PUB_THRESHOLD = 10
Expand Down Expand Up @@ -51,14 +50,14 @@ async def _update_loop(self) -> None:
symbol = update.product.symbol

if self._prices.get(symbol) is None:
self._prices[symbol] = [None, None, None]
self._prices[symbol] = (None, None, None)

if update.aggregate_price_status == PythPriceStatus.TRADING:
self._prices[symbol] = [
keyvankhademi marked this conversation as resolved.
Show resolved Hide resolved
self._prices[symbol] = (
update.aggregate_price,
update.aggregate_price_confidence_interval,
update.timestamp,
]
)
elif (
self._config.manual_agg_enabled
and update.min_publishers >= COMING_SOON_MIN_PUB_THRESHOLD
Expand Down Expand Up @@ -93,11 +92,11 @@ async def _update_loop(self) -> None:
if prices:
agg_price, agg_confidence_interval = manual_aggregate(prices)

self._prices[symbol] = [
self._prices[symbol] = (
agg_price,
agg_confidence_interval,
update.timestamp,
]
)

log.info(
"Received a price update", symbol=symbol, price=self._prices[symbol]
Expand Down Expand Up @@ -132,7 +131,7 @@ def latest_price(self, symbol: Symbol) -> Optional[Price]:
if time.time() - timestamp > self._config.staleness_time_in_secs:
return None

return Price(price, conf)
return Price(price, conf, timestamp)


def manual_aggregate(prices: List[float]) -> Tuple[float, float]:
Expand Down
2 changes: 2 additions & 0 deletions example_publisher/publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ def __init__(self, config: Config) -> None:
)
self.subscriptions: Dict[SubscriptionId, Product] = {}
self.products: List[Product] = []
self.last_successful_update: Optional[float] = None

async def start(self):
await self.pythd.connect()
Expand Down Expand Up @@ -141,6 +142,7 @@ async def on_notify_price_sched(self, subscription: int) -> None:
await self.pythd.update_price(
product.price_account, scaled_price, scaled_conf, TRADING
)
self.last_successful_update = price.timestamp
keyvankhademi marked this conversation as resolved.
Show resolved Hide resolved

def apply_exponent(self, x: float, exp: int) -> int:
return int(x * (10 ** (-exp)))
Loading