diff --git a/app.py b/app.py index be46ff2..e4c7f71 100644 --- a/app.py +++ b/app.py @@ -4,9 +4,10 @@ # TODO(nochiel) Give the client structured errors. # TODO(nochiel) - Create a new exception type so that client receives structured errors as a response. # TODO(nochiel) - If there is no data available for a response, provide a good error. +# TODO(nochiel) - Standard error page/response for non-existent routes? +# TODO(nochiel) TEST that all endpoints return consistent types. import asyncio -import logging from datetime import datetime, timedelta import time import sys @@ -45,7 +46,7 @@ class Settings(BaseSettings): @validator('currencies') def uppercase_currency_names(cls, v): - assert v and len(v) > 0, 'no currencies' + assert v and len(v), 'no currencies' return [exchange.upper() for exchange in v] class Config: @@ -91,8 +92,7 @@ class Config: assert _supported_exchanges -# FIXME(nochiel) Do we need this test? -# Check if a timestamp has ms precision by modding by 1e3 +# Exchange data is sometimes returned as epoch milliseconds. def is_ms(timestamp): return timestamp % 1e3 == 0 class Candle(BaseModel): @@ -102,7 +102,13 @@ class Candle(BaseModel): low : float close : float volume : float - + + @validator('timestamp') + def time_in_seconds(cls, v): + result = v + if is_ms(v): result = int(v * 1e-3) + return result + class Config: json_encoders = { datetime: lambda v: v.isoformat() @@ -117,6 +123,7 @@ def request_single(exchange: ccxt.Exchange, currency: str) -> Candle | None: # TODO(nochiel) Add a type for result. result = None + latest_candle = None ticker = f'BTC/{currency}' dt = None @@ -170,17 +177,6 @@ def request_single(exchange: ccxt.Exchange, currency: str) -> Candle | None: latest_candle = candles[-1] - timestamp = latest_candle[OHLCV.timestamp] - timestamp = timestamp * 1e-3 if is_ms(int(timestamp)) else timestamp - - result = Candle( - timestamp = timestamp, - open = latest_candle[OHLCV.open], - high = latest_candle[OHLCV.high], - low = latest_candle[OHLCV.low], - close = latest_candle[OHLCV.close], - volume = latest_candle[OHLCV.volume] - ) except Exception as e: app.logger.error(f'error requesting candle from {exchange.name}: {e}') @@ -193,12 +189,17 @@ def request_single(exchange: ccxt.Exchange, currency: str) -> Candle | None: except Exception as e: app.logger.error(f'error on {exchange} fetch_ticker: {e}') - if candle: - app.logger.debug(f'{exchange} candle: {candle}') - timestamp = candle[OHLCV.timestamp] - timestamp = timestamp * 1e-3 if is_ms(timestamp) else timestamp + latest_candle = candle - result = candle + if latest_candle: + result = Candle( + timestamp = latest_candle[OHLCV.timestamp], + open = latest_candle[OHLCV.open], + high = latest_candle[OHLCV.high], + low = latest_candle[OHLCV.low], + close = latest_candle[OHLCV.close], + volume = latest_candle[OHLCV.volume] + ) return result @@ -242,7 +243,7 @@ def get_configuration(): def calculate_average_price(candles: list[Candle]) -> Candle: - assert candles and len(candles) > 0 + assert candles and len(candles) mean = statistics.mean average_open = mean(candle.open for candle in candles) @@ -295,7 +296,7 @@ class ExchangeResult(BaseModel): assert _supported_exchanges - async def get_exchange_details(exchange: ccxt.Exchange) -> ExchangeDetails: + def get_exchange_details(exchange: ccxt.Exchange) -> ExchangeDetails: result = None @@ -317,7 +318,7 @@ async def get_exchange_details(exchange: ccxt.Exchange) -> ExchangeDetails: result = details return result - tasks = [get_exchange_details(exchange) + tasks = [asyncio.to_thread(get_exchange_details, exchange) for exchange in _supported_exchanges.values()] details = await asyncio.gather(*tasks) result = ExchangeResult(exchanges = list(details)) @@ -338,9 +339,12 @@ class PriceResponse(BaseModel): result = None currency = currency.upper() + if currency not in _settings.currencies: + flask.abort(flask.Response(response = f'Spotbit is not configured to use {currency}.', + status = HTTPStatus.INTERNAL_SERVER_ERROR)) averaging_time = _settings.averaging_time - async def get_candle(exchange: ccxt.Exchange, currency: str) -> tuple[ccxt.Exchange, Candle | None]: + def get_candle(exchange: ccxt.Exchange, currency: str) -> tuple[ccxt.Exchange, Candle | None]: assert exchange assert currency @@ -357,7 +361,7 @@ async def get_candle(exchange: ccxt.Exchange, currency: str) -> tuple[ccxt.Excha return result - tasks = [asyncio.create_task(get_candle(exchange, currency)) + tasks = [asyncio.to_thread(get_candle, exchange, currency) for exchange in _supported_exchanges.values()] task_results = await asyncio.gather(*tasks) app.logger.debug(f'task results: {task_results}') @@ -401,11 +405,14 @@ def now(currency, exchange): status = HTTPStatus.INTERNAL_SERVER_ERROR)) result = None - currency = currency.upper() exchange = _supported_exchanges[exchange] exchange.load_markets() + currency = currency.upper() + if currency not in _settings.currencies: + flask.abort(flask.Response(response = f'Spotbit is not configured to use {currency}.', + status = HTTPStatus.INTERNAL_SERVER_ERROR)) if currency not in exchange.currencies: - flask.abort(flask.Response(response = f'Spotbit does not support the {currency} on {exchange}', + flask.abort(flask.Response(response = f'Spotbit does not support the {currency} on {exchange}.', status = HTTPStatus.INTERNAL_SERVER_ERROR)) assert exchange @@ -509,16 +516,15 @@ async def get_candles_in_range(currency, exchange, date_start, date_end): app.logger.debug(f'requesting periods with {limit} limit: {periods}') - async def get_history(*, + def get_history(*, exchange: ccxt.Exchange = exchange, since: datetime, limit: int = limit, - timeframe: str = timeframe): + timeframe: str = timeframe) -> list: assert exchange app.logger.debug(f'{exchange} {pair} {since}') - # FIXME(nochiel) Handle ccxt.base.errors.RateLimitExceeded rateLimit = exchange.rateLimit _since = int(since.timestamp() * 1e3) @@ -528,12 +534,22 @@ async def get_history(*, candles = None try: - candles = exchange.fetchOHLCV( - symbol = pair, - limit = limit, - timeframe = timeframe, - since = _since, - params = params) + wait = 1 + while wait > 0: + try: + candles = exchange.fetchOHLCV( + symbol = pair, + limit = limit, + timeframe = timeframe, + since = _since, + params = params) + + wait = 0 + + except ccxt.base.errors.RateLimitExceeded as e: + app.logger.debug(f'{e}. Rate limit for {exchange} is {rateLimit}') + time.sleep(wait) + wait *= 2 except Exception as e: app.logger.error(f'{exchange} candle request error: {e}') @@ -542,30 +558,43 @@ async def get_history(*, tasks = [] for p in periods: - task = asyncio.create_task(get_history( - exchange = exchange, - since = p)) + task = asyncio.to_thread(get_history, + exchange = exchange, + since = p) tasks.append(task) if remaining_frames > 0: last_candle_time = periods[-1] + (dt * 100) assert last_candle_time < end app.logger.debug(f'remaining_frames: {remaining_frames}') - task = asyncio.create_task(get_history( - since = last_candle_time, - limit = remaining_frames)) + task = asyncio.to_thread(get_history, + since = last_candle_time, + limit = remaining_frames) tasks.append(task) - + new_last_candle_time = last_candle_time + (dt * remaining_frames ) app.logger.debug(f'new_last_candle_time: {new_last_candle_time}') - candles = await asyncio.gather(*tasks) - _candles = [] - for c in candles: # flatten candles - _candles += c - candles = _candles + task_results = await asyncio.gather(*tasks) + candles = [] + for result in task_results: + if result: candles.extend(result) + + candles = [Candle( + timestamp = candle[OHLCV.timestamp], + open = candle[OHLCV.open], + high = candle[OHLCV.high], + low = candle[OHLCV.low], + close = candle[OHLCV.close], + volume = candle[OHLCV.volume] + ) + + for candle in candles] - assert len(candles) == (n_periods * limit) + remaining_frames + expected_number_of_candles = (n_periods * limit) + remaining_frames + received_number_of_candles = len(candles) + if received_number_of_candles < expected_number_of_candles: + logger.info(f'{exchange} does not have data for the entire period. Expected {expected_number_of_candles} candles. Got {received_number_of_candles} candles') if candles is None or len(candles) == 0: flask.abort(flask.Response(response = f'Spotbit did not receive any candle history for the period {start} - {end} from {exchange}', @@ -574,8 +603,8 @@ async def get_history(*, app.logger.debug(f'got: {len(candles)} candles') app.logger.debug(f'candles: {candles[:10]} ... {candles[-10:]}') - # FIXME(nochiel) Make a type for this. Also is this the best way to return candle data? - return flask.jsonify(candles) + result = flask.jsonify([candle.dict() for candle in candles]) + return result tests_for_get_candles_at_dates = [ '''curl http://localhost:5000/history/usdt/binance --header 'Content-Type:application/json' --data '[\"2022-01-01T00:00 \", \"2022-02-01T00:00\", \"2021-12-01T00:00\"]''', @@ -604,13 +633,15 @@ async def get_candles_at_dates(currency, exchange): status = HTTPStatus.BAD_REQUEST)) try: - app.logger.debug(f'request data: {request.get_data()}') dates = request.get_json() - dates = [datetime.fromisoformat(date) for date in dates] if dates else None - app.logger.debug(f'dates: {dates}') + # TODO(nochiel) Check if isoformat or timestamps. Raise HTTPException if dates don't parse. + dates = [datetime.fromisoformat(date) for date in dates] if dates else [] + if len(dates) == 0: + raise Exception(f'received: ${dates}') except Exception as e: flask.abort(flask.Response( - response = f'error processing dates: {e}', + response = f'error processing dates: {e}.\n' + + 'Dates should be provided in the body of the request as a json array of dates formatted as ISO8601 "YYYY-MM-DDTHH:mm:SS".', status = HTTPStatus.INTERNAL_SERVER_ERROR)) results = None @@ -630,7 +661,7 @@ async def get_candles_at_dates(currency, exchange): candles = {} params = {} - async def get_candle(*, + def get_candle(*, exchange: ccxt.Exchange = exchange, since: datetime, limit: int = limit, @@ -641,7 +672,7 @@ async def get_candle(*, assert exchange result = None - candle, candles = None, None + candles = None try: candles = exchange.fetchOHLCV( symbol = pair, @@ -654,30 +685,26 @@ async def get_candle(*, app.logger.error(e) if candles and len(candles[0]) > 0: - app.logger.debug(f'candle: {candle}') + app.logger.debug(f'candles: {candles}') candle = candles[0] + result = Candle( timestamp = candle[OHLCV.timestamp], - open = candle[OHLCV.open], - high = candle[OHLCV.high], - low = candle[OHLCV.low], - close = candle[OHLCV.close], - volume = candle[OHLCV.volume] + open = candle[OHLCV.open], + high = candle[OHLCV.high], + low = candle[OHLCV.low], + close = candle[OHLCV.close], + volume = candle[OHLCV.volume] ) return result assert dates - tasks = [asyncio.create_task(get_candle(since = date)) + tasks = [asyncio.to_thread(get_candle, since = date) for date in dates] candles = await asyncio.gather(*tasks) - candles = [candle for candle in candles if candle is not None] + candles = [candle.dict() for candle in candles if candle is not None] - class CandleListResponse(BaseModel): - pair: str - candles: list[Candle] + result = flask.jsonify(candles) - result = CandleListResponse(pair = pair, - candles = candles) - - return result.dict() + return result diff --git a/spotbit.config b/spotbit.config index 3adaf8e..594417d 100644 --- a/spotbit.config +++ b/spotbit.config @@ -2,7 +2,7 @@ [server] # exchanges: Exchanges to use for data collection. If this is not set, then Spotbit will use all supported exchanges. There are more than 100 exchanges supported. -exchanges = ["liquid", "phemex", "bitstamp", "ftx"] +exchanges = ["liquid", "phemex", "bitstamp", "ftx", "binance"] # currencies: Currencies to collect data for in the Spotbit database. If a currency is supported by an exchange, then data can be collected for it. Currencies should be lowercase three letter currency codes, eg usd gpy cny jpy eur. If no currencies are listed then Spotbit will collect for usd only. currencies = ["usd", "gbp", "jpy", "usdt", "eur", "0xcafebabe"]