Skip to content

Commit

Permalink
Add health check api (#24)
Browse files Browse the repository at this point in the history
* add timestamp to prices and keep last updated at in publisher

* Add .idea/ to .gitignore

* add a simple health check api

* add port and health check logic

* run black

* empty

* run pre-commit

* undo remove blank line

* fix: update the last successful update only when it's greater

* refactor: fix typings in pyth replicator

* fix: type error in coin_gecko.py

* refactor: health check code to improve readability and performance

* fix: logic error updating last successful update

* chore: update version to 1.1.0 in pyproject.toml

* refactor: rename port to health_check_port

* refactor: rename health_check_test_period_secs to health_check_threshold_secs

* refactor: use fastapi for health check api

* chore: format imports

* Refactor health check API and add is_healthy method to Publisher class

---------

Co-authored-by: Keyvan <[email protected]>
  • Loading branch information
keyvankhademi and Keyvan authored Mar 14, 2024
1 parent 2fbd4b6 commit 6f1802a
Show file tree
Hide file tree
Showing 11 changed files with 671 additions and 96 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -131,3 +131,6 @@ dmypy.json
# Visual Studio Code
.vscode/
.devcontainer/

# PyCharm
.idea/
4 changes: 4 additions & 0 deletions config/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@
provider_engine = 'pyth_replicator'

product_update_interval_secs = 10
health_check_port = 8000

# The health check will return a failure status if no price data has been published within the specified time frame.
health_check_threshold_secs = 60

[publisher.pythd]
endpoint = 'ws://127.0.0.1:8910'
Expand Down
12 changes: 11 additions & 1 deletion example_publisher/__main__.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
import asyncio
import os
import sys
import threading
import uvicorn
from example_publisher.config import Config
from example_publisher.publisher import Publisher
import typed_settings as ts
import click
import logging
import structlog
from example_publisher.api.health_check import app, API

_DEFAULT_CONFIG_PATH = os.path.join("config", "config.toml")

Expand All @@ -26,13 +29,20 @@
)
def main(config_path):

config = ts.load(
config: Config = ts.load(
cls=Config,
appname="publisher",
config_files=[config_path],
)

publisher = Publisher(config=config)
API.publisher = publisher

def run_server():
uvicorn.run(app, host="0.0.0.0", port=config.health_check_port)

server_thread = threading.Thread(target=run_server)
server_thread.start()

async def run():
try:
Expand Down
31 changes: 31 additions & 0 deletions example_publisher/api/health_check.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
from fastapi import FastAPI, status
from fastapi.responses import JSONResponse
from example_publisher.publisher import Publisher


class API(FastAPI):
publisher: Publisher


app = API()


@app.get("/health")
def health_check():
healthy = API.publisher.is_healthy()
last_successful_update = API.publisher.last_successful_update
if not healthy:
return JSONResponse(
content={
"status": "error",
"last_successful_update": last_successful_update,
},
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
)
return JSONResponse(
content={
"status": "ok",
"last_successful_update": last_successful_update,
},
status_code=status.HTTP_200_OK,
)
2 changes: 2 additions & 0 deletions example_publisher/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ class PythReplicatorConfig:
class Config:
provider_engine: str
pythd: Pythd
health_check_port: int
health_check_threshold_secs: int
product_update_interval_secs: int = ts.option(default=60)
coin_gecko: Optional[CoinGeckoConfig] = ts.option(default=None)
pyth_replicator: Optional[PythReplicatorConfig] = ts.option(default=None)
3 changes: 3 additions & 0 deletions example_publisher/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,16 @@
from dataclasses import dataclass
from typing import List, Optional


Symbol = str
UnixTimestamp = int


@dataclass
class Price:
price: float
conf: float
timestamp: UnixTimestamp


class Provider(ABC):
Expand Down
21 changes: 12 additions & 9 deletions example_publisher/providers/coin_gecko.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import asyncio
from math import floor
import time
from typing import Dict, List, Optional
from pycoingecko import CoinGeckoAPI
from structlog import get_logger
Expand All @@ -16,7 +18,7 @@
class CoinGecko(Provider):
def __init__(self, config: CoinGeckoConfig) -> None:
self._api: CoinGeckoAPI = CoinGeckoAPI()
self._prices: Dict[Id, float] = {}
self._prices: Dict[Id, Price] = {}
self._symbol_to_id: Dict[Symbol, Id] = {
product.symbol: product.coin_gecko_id for product in config.products
}
Expand Down Expand Up @@ -45,18 +47,19 @@ def _update_prices(self) -> None:
ids=list(self._prices.keys()), vs_currencies=USD, precision=18
)
for id_, prices in result.items():
self._prices[id_] = prices[USD]
price = prices[USD]
self._prices[id_] = Price(
price,
price * self._config.confidence_ratio_bps / 10000,
floor(time.time()),
)
log.info("updated prices from CoinGecko", prices=self._prices)

def _get_price(self, id: Id) -> float:
def _get_price(self, id: Id) -> Optional[Price]:
return self._prices.get(id, None)

def latest_price(self, symbol: Symbol) -> Optional[Price]:
id = self._symbol_to_id.get(symbol)
if not id:
if id is None:
return None

price = self._get_price(id)
if not price:
return None
return Price(price, price * self._config.confidence_ratio_bps / 10000)
return self._get_price(id)
37 changes: 20 additions & 17 deletions example_publisher/providers/pyth_replicator.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@

log = get_logger()

UnixTimestamp = int

# Any feed with >= this number of min publishers is considered "coming soon".
COMING_SOON_MIN_PUB_THRESHOLD = 10
Expand All @@ -28,9 +27,7 @@ def __init__(self, config: PythReplicatorConfig) -> None:
first_mapping_account_key=config.first_mapping,
program_key=config.program_key,
)
self._prices: Dict[
str, Tuple[float | None, float | None, UnixTimestamp | None]
] = {}
self._prices: Dict[str, Optional[Price]] = {}
self._update_accounts_task: asyncio.Task | None = None

async def _update_loop(self) -> None:
Expand All @@ -47,20 +44,25 @@ async def _update_loop(self) -> None:
while True:
update = await self._ws.next_update()
log.debug("Received a WS update", account_key=update.key, slot=update.slot)
if isinstance(update, PythPriceAccount):
if isinstance(update, PythPriceAccount) and update.product is not None:
symbol = update.product.symbol

if self._prices.get(symbol) is None:
self._prices[symbol] = [None, None, None]
self._prices[symbol] = None

if update.aggregate_price_status == PythPriceStatus.TRADING:
self._prices[symbol] = [
if (
update.aggregate_price_status == PythPriceStatus.TRADING
and update.aggregate_price is not None
and update.aggregate_price_confidence_interval is not None
):
self._prices[symbol] = Price(
update.aggregate_price,
update.aggregate_price_confidence_interval,
update.timestamp,
]
)
elif (
self._config.manual_agg_enabled
and update.min_publishers is not None
and update.min_publishers >= COMING_SOON_MIN_PUB_THRESHOLD
):
# Do the manual aggregation based on the recent active publishers
Expand All @@ -71,13 +73,14 @@ async def _update_loop(self) -> None:
# Note that we only manually aggregate for feeds that are coming soon. Some feeds should go
# offline outside of market hours (e.g., Equities, Metals). Manually aggregating for these feeds
# can cause them to come online at unexpected times if a single data provider publishes at that time.
prices = []
prices: List[float] = []

current_slot = update.slot
for price_component in update.price_components:
price = price_component.latest_price_info
if (
price.price_status == PythPriceStatus.TRADING
and current_slot is not None
and current_slot - price.pub_slot
<= self._config.manual_agg_max_slot_diff
):
Expand All @@ -93,11 +96,11 @@ async def _update_loop(self) -> None:
if prices:
agg_price, agg_confidence_interval = manual_aggregate(prices)

self._prices[symbol] = [
self._prices[symbol] = Price(
agg_price,
agg_confidence_interval,
update.timestamp,
]
)

log.info(
"Received a price update", symbol=symbol, price=self._prices[symbol]
Expand All @@ -115,7 +118,7 @@ async def _update_accounts_loop(self) -> None:

await asyncio.sleep(self._config.account_update_interval_secs)

def upd_products(self, _: List[Symbol]) -> None:
def upd_products(self, *args) -> None:
# This provider stores all the possible feeds and
# does not care about the desired products as knowing
# them does not improve the performance of the replicator
Expand All @@ -124,15 +127,15 @@ def upd_products(self, _: List[Symbol]) -> None:
pass

def latest_price(self, symbol: Symbol) -> Optional[Price]:
price, conf, timestamp = self._prices.get(symbol, [None, None, None])
price = self._prices.get(symbol, None)

if not price or not conf or not timestamp:
if not price:
return None

if time.time() - timestamp > self._config.staleness_time_in_secs:
if time.time() - price.timestamp > self._config.staleness_time_in_secs:
return None

return Price(price, conf)
return price


def manual_aggregate(prices: List[float]) -> Tuple[float, float]:
Expand Down
15 changes: 14 additions & 1 deletion example_publisher/publisher.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import asyncio
import time
from typing import Dict, List, Optional
from attr import define
from structlog import get_logger
from example_publisher.provider import Provider

from example_publisher.providers.coin_gecko import CoinGecko
from example_publisher.config import Config
from example_publisher.providers.pyth_replicator import PythReplicator
Expand Down Expand Up @@ -45,6 +45,14 @@ def __init__(self, config: Config) -> None:
)
self.subscriptions: Dict[SubscriptionId, Product] = {}
self.products: List[Product] = []
self.last_successful_update: Optional[float] = None

def is_healthy(self) -> bool:
return (
self.last_successful_update is not None
and time.time() - self.last_successful_update
< self.config.health_check_threshold_secs
)

async def start(self):
await self.pythd.connect()
Expand Down Expand Up @@ -141,6 +149,11 @@ async def on_notify_price_sched(self, subscription: int) -> None:
await self.pythd.update_price(
product.price_account, scaled_price, scaled_conf, TRADING
)
self.last_successful_update = (
price.timestamp
if self.last_successful_update is None
else max(self.last_successful_update, price.timestamp)
)

def apply_exponent(self, x: float, exp: int) -> int:
return int(x * (10 ** (-exp)))
Loading

0 comments on commit 6f1802a

Please sign in to comment.