From 99780ded005e43002fe563896fd3cd4dfdced8d3 Mon Sep 17 00:00:00 2001 From: Nick Ochiel Date: Tue, 26 Apr 2022 23:38:16 +0300 Subject: [PATCH] Cleanup. Use better rate limiting because Bitmex throttles aggressively.This needs to be tested so that when using many threads we can successfully get all the data the user wants. Fix type error in get_candles. Fix checks for pairs supported by exchanges. --- app.py | 149 +++++++++++++++++++++++++-------------------------------- 1 file changed, 64 insertions(+), 85 deletions(-) diff --git a/app.py b/app.py index 8a85508..4087745 100644 --- a/app.py +++ b/app.py @@ -46,13 +46,13 @@ class Config: # we can determine which ones shouldn't be supported i.e. populate # this list with exchanges that fail tests. _unsupported_exchanges = [] -_supported_exchanges: dict[str, ccxt.Exchange] = {} +supported_exchanges: dict[str, ccxt.Exchange] = {} def get_logger(): import logging logger = logging.getLogger(__name__) - if _settings.debug: logger.setLevel(logging.DEBUG) + if settings.debug: logger.setLevel(logging.DEBUG) formatter = logging.Formatter( '[%(asctime)s] %(levelname)s (thread %(thread)d):\t%(module)s.%(funcName)s: %(message)s') @@ -72,36 +72,36 @@ def get_logger(): return logger -_settings = Settings() +settings = Settings() # TODO(nochiel) Move this to the settings class. from enum import Enum -CurrencyName = Enum('CurrencyName', [(currency, currency) for currency in _settings.currencies]) +CurrencyName = Enum('CurrencyName', [(currency, currency) for currency in settings.currencies]) logger = get_logger() assert logger -app = FastAPI(debug = _settings.debug) +app = FastAPI(debug = settings.debug) -logger.debug(f'Using currencies: {_settings.currencies}') -if not _settings.exchanges: +logger.debug(f'Using currencies: {settings.currencies}') +if not settings.exchanges: logger.info('using all exchanges.') - _settings.exchanges = list(ccxt.exchanges) + settings.exchanges = list(ccxt.exchanges) -assert _settings.exchanges +assert settings.exchanges logger.info('Initialising supported exchanges.') -for e in _settings.exchanges: +for e in settings.exchanges: if e in ccxt.exchanges and e not in _unsupported_exchanges: - _supported_exchanges[e] = ccxt.__dict__[e]() + supported_exchanges[e] = ccxt.__dict__[e]() try: - if app.debug is False: _supported_exchanges[e].load_markets() + if app.debug is False: supported_exchanges[e].load_markets() except Exception as e: logger.debug(f'Error loading markets for {e}.') -assert _supported_exchanges +assert supported_exchanges -ExchangeName = Enum('ExchangeName', [(id.upper(), id) for id in _supported_exchanges]) +ExchangeName = Enum('ExchangeName', [(id.upper(), id) for id in supported_exchanges]) # Exchange data is sometimes returned as epoch milliseconds. def is_ms(timestamp): return timestamp % 1e3 == 0 @@ -134,12 +134,21 @@ def get_supported_pair_for(currency: CurrencyName, exchange: ccxt.Exchange) -> s result = '' exchange.load_markets() - market_ids = {f'BTC{currency.value}', f'XBT{currency.value}', f'BTC{currency.value}'.lower(), f'XBT{currency.value}'.lower()} - market_ids_found = list((market_ids & exchange.markets_by_id.keys())) + market_ids = { + f'BTC{currency.value}', + f'BTC/{currency.value}', + f'XBT{currency.value}', + f'XBT/{currency.value}', + f'BTC{currency.value}'.lower(), + f'XBT{currency.value}'.lower()} + market_ids_found = list(market_ids & exchange.markets_by_id.keys()) + if not market_ids_found: + market_ids_found = list(market_ids & set([market['symbol'] for market in exchange.markets_by_id.values()])) if market_ids_found: + logger.debug(f'market_ids_found: {market_ids_found}') market_id = market_ids_found[0] - market = exchange.markets_by_id[market_id] - if market: + market = exchange.markets_by_id[exchange.market_id(market_id)] + if market: result = market['symbol'] logger.debug(f'Found market {market}, with symbol {result}') @@ -164,7 +173,6 @@ def request_single(exchange: ccxt.Exchange, currency: CurrencyName) -> Candle | latest_candle = None dt = None - if exchange.has['fetchOHLCV']: logger.debug('fetchOHLCV') @@ -261,8 +269,8 @@ def status(): return "server is running" @app.get('/api/configure') def get_configuration(): return { - 'currencies': _settings.currencies, - 'exchanges': _settings.exchanges, + 'currencies': settings.currencies, + 'exchanges': settings.exchanges, } def calculate_average_price(candles: list[Candle]) -> Candle: @@ -302,14 +310,14 @@ async def get_exchanges(): def get_supported_currencies(exchange: ccxt.Exchange) -> list[str] : - required = set(_settings.currencies) + required = set(settings.currencies) given = set(exchange.currencies.keys()) return list(required & given) result: list[ExchangeDetails] = [] - assert _supported_exchanges + assert supported_exchanges def get_exchange_details(exchange: ccxt.Exchange) -> ExchangeDetails: @@ -320,7 +328,7 @@ def get_exchange_details(exchange: ccxt.Exchange) -> ExchangeDetails: currencies = [] if exchange.currencies: - currencies = [c for c in _settings.currencies + currencies = [c for c in settings.currencies if c in exchange.currencies] details = ExchangeDetails( @@ -334,7 +342,7 @@ def get_exchange_details(exchange: ccxt.Exchange) -> ExchangeDetails: return result tasks = [asyncio.to_thread(get_exchange_details, exchange) - for exchange in _supported_exchanges.values()] + for exchange in supported_exchanges.values()] details = await asyncio.gather(*tasks) result = list(details) @@ -356,7 +364,7 @@ async def now_average(currency: CurrencyName): logger.debug(f'currency: {currency}') - def get_candle(exchange: ccxt.Exchange, currency: CurrencyName) -> tuple[ccxt.Exchange, Candle | None]: + def get_candle(exchange: ccxt.Exchange, currency: CurrencyName = currency) -> tuple[ccxt.Exchange, Candle | None]: assert exchange assert currency @@ -365,7 +373,7 @@ def get_candle(exchange: ccxt.Exchange, currency: CurrencyName) -> tuple[ccxt.Ex if currency.value in exchange.currencies: try: candle = None - candle = request_single(exchange, currency.value) + candle = request_single(exchange, currency) if candle: result = exchange, candle except Exception as e: @@ -373,8 +381,8 @@ def get_candle(exchange: ccxt.Exchange, currency: CurrencyName) -> tuple[ccxt.Ex return result - tasks = [asyncio.to_thread(get_candle, exchange, currency) - for exchange in _supported_exchanges.values()] + tasks = [asyncio.to_thread(get_candle, exchange) + for exchange in supported_exchanges.values()] task_results = await asyncio.gather(*tasks) logger.debug(f'task results: {task_results}') @@ -395,7 +403,7 @@ def get_candle(exchange: ccxt.Exchange, currency: CurrencyName) -> tuple[ccxt.Ex status_code = HTTPStatus.INTERNAL_SERVER_ERROR, detail = 'Spotbit could get any candle data from the configured exchanges.') - exchanges_used = [exchange.name for exchange in _supported_exchanges.values() + exchanges_used = [exchange.name for exchange in supported_exchanges.values() if exchange.name not in failed_exchanges] result = PriceResponse( @@ -414,14 +422,14 @@ def now(currency: CurrencyName, exchange: ExchangeName): currency: the symbol for the base currency to use e.g. USD, GBP, UST. ''' - if exchange.value not in _supported_exchanges: + if exchange.value not in supported_exchanges: raise HTTPException( status_code = HTTPStatus.INTERNAL_SERVER_ERROR, detail = f'Spotbit is not configured to use {exchange.value} exchange.') result = None - ccxt_exchange = _supported_exchanges[exchange.value] + ccxt_exchange = supported_exchanges[exchange.value] assert ccxt_exchange ccxt_exchange.load_markets() @@ -465,31 +473,31 @@ def get_history(*, _since = round(since.timestamp() * 1e3) - params = {} + params = dict(end = None) if exchange == "bitfinex": - params = {'end' : round(end.timestamp() * 1e3)} + # TODO(nochiel) Is this needed? + # params['end'] = round(end.timestamp() * 1e3) + ... candles = None - try: - wait = exchange.rateLimit * 1e-3 - while wait: - try: - candles = exchange.fetchOHLCV( - symbol = pair, - limit = limit, - timeframe = timeframe, - since = _since, - params = params) - - wait = 0 + rate_limit = exchange.rateLimit * 1e-3 + wait = rate_limit + while wait: + try: + candles = exchange.fetchOHLCV( + symbol = pair, + limit = limit, + timeframe = timeframe, + since = _since, + params = params) - except ccxt.errors.RateLimitExceeded as e: - logger.debug(f'{e}. Rate limit for {exchange} is {exchange.rateLimit}') - time.sleep(wait) - wait *= 2 + wait = 0 - except Exception as e: - logger.error(f'{exchange} candle request error: {e}') + except (ccxt.errors.RateLimitExceeded, ccxt.errors.DDoSProtection) as e: + logger.error(f'{exchange} candle request error: {e}') + time.sleep(wait) + wait *= rateLimit + if wait > 60: wait = 0 if candles: result = [Candle( @@ -518,7 +526,7 @@ async def get_candles_in_range( start, end(required): datetime formatted as ISO8601 "YYYY-MM-DDTHH:mm:SS" or unix timestamp. ''' - ccxt_exchange = _supported_exchanges[exchange.value] + ccxt_exchange = supported_exchanges[exchange.value] ccxt_exchange.load_markets() assert ccxt_exchange.currencies assert ccxt_exchange.markets @@ -625,12 +633,13 @@ async def get_candles_at_dates( Dates should be provided in the body of the request as a json array of dates formatted as ISO8601 "YYYY-MM-DDTHH:mm:SS". ''' - if exchange.value not in _supported_exchanges: + result: list[Candle] = [] + if exchange.value not in supported_exchanges: raise HTTPException( detail = ServerErrors.EXCHANGE_NOT_SUPPORTED, status_code = HTTPStatus.INTERNAL_SERVER_ERROR) - ccxt_exchange = _supported_exchanges[exchange.value] + ccxt_exchange = supported_exchanges[exchange.value] ccxt_exchange.load_markets() pair = get_supported_pair_for(currency, ccxt_exchange) @@ -671,36 +680,6 @@ async def get_candles_at_dates( return result -def tests(): - # Placeholder - # Expected: validation errors or server errors or valid responses. - - import requests - response = requests.get('http://[::1]:5000/api/now/FOOBAR') - response = requests.get('http://[::1]:5000/api/now/usd') - response = requests.get('http://[::1]:5000/api/now/USD') - response = requests.get('http://[::1]:5000/api/now/JPY') - - response = requests.get('http://[::1]:5000/api/now/USD/Bitstamp') - response = requests.get('http://[::1]:5000/api/now/USD/bitstamp') - response = requests.get('http://[::1]:5000/api/now/usdt/bitstamp') - - response = requests.get( - 'http://[::1]:5000/api/history/USD/bitstamp?start=2019-01-01T0000&end=1522641600' - ) - - response = requests.get( - "http://[::1]:5000/api/history/USD/liquid?start=2022-01-01T00:00&end=2022-02-01T00:00" - ) - - response = requests.post('http://[::1]:5000/history/USDT/binance', - json = ['2022-01-01T00:00', '2022-02-01T00:00', '2021-12-01T00:00'] - ) - - response = requests.post( - "http://[::1]:5000/api/history/JPY/liquid", - json=["2022-01-01T00:00", "2022-02-01T00:00", "2021-12-01T00:00"], - ) if __name__ == '__main__': import uvicorn