Skip to content

Commit

Permalink
Cleanup.
Browse files Browse the repository at this point in the history
Use better rate limiting because Bitmex throttles aggressively.This
needs to be tested so that when using many threads we can successfully
get all the data the user wants.
Fix type error in get_candles.
Fix checks for pairs supported by exchanges.
  • Loading branch information
nochiel committed Apr 26, 2022
1 parent 62ae76e commit 99780de
Showing 1 changed file with 64 additions and 85 deletions.
149 changes: 64 additions & 85 deletions app.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,13 @@ class Config:
# we can determine which ones shouldn't be supported i.e. populate
# this list with exchanges that fail tests.
_unsupported_exchanges = []
_supported_exchanges: dict[str, ccxt.Exchange] = {}
supported_exchanges: dict[str, ccxt.Exchange] = {}

def get_logger():

import logging
logger = logging.getLogger(__name__)
if _settings.debug: logger.setLevel(logging.DEBUG)
if settings.debug: logger.setLevel(logging.DEBUG)

formatter = logging.Formatter(
'[%(asctime)s] %(levelname)s (thread %(thread)d):\t%(module)s.%(funcName)s: %(message)s')
Expand All @@ -72,36 +72,36 @@ def get_logger():

return logger

_settings = Settings()
settings = Settings()

# TODO(nochiel) Move this to the settings class.
from enum import Enum
CurrencyName = Enum('CurrencyName', [(currency, currency) for currency in _settings.currencies])
CurrencyName = Enum('CurrencyName', [(currency, currency) for currency in settings.currencies])

logger = get_logger()
assert logger

app = FastAPI(debug = _settings.debug)
app = FastAPI(debug = settings.debug)

logger.debug(f'Using currencies: {_settings.currencies}')
if not _settings.exchanges:
logger.debug(f'Using currencies: {settings.currencies}')
if not settings.exchanges:
logger.info('using all exchanges.')
_settings.exchanges = list(ccxt.exchanges)
settings.exchanges = list(ccxt.exchanges)

assert _settings.exchanges
assert settings.exchanges

logger.info('Initialising supported exchanges.')
for e in _settings.exchanges:
for e in settings.exchanges:
if e in ccxt.exchanges and e not in _unsupported_exchanges:
_supported_exchanges[e] = ccxt.__dict__[e]()
supported_exchanges[e] = ccxt.__dict__[e]()
try:
if app.debug is False: _supported_exchanges[e].load_markets()
if app.debug is False: supported_exchanges[e].load_markets()
except Exception as e:
logger.debug(f'Error loading markets for {e}.')

assert _supported_exchanges
assert supported_exchanges

ExchangeName = Enum('ExchangeName', [(id.upper(), id) for id in _supported_exchanges])
ExchangeName = Enum('ExchangeName', [(id.upper(), id) for id in supported_exchanges])

# Exchange data is sometimes returned as epoch milliseconds.
def is_ms(timestamp): return timestamp % 1e3 == 0
Expand Down Expand Up @@ -134,12 +134,21 @@ def get_supported_pair_for(currency: CurrencyName, exchange: ccxt.Exchange) -> s
result = ''

exchange.load_markets()
market_ids = {f'BTC{currency.value}', f'XBT{currency.value}', f'BTC{currency.value}'.lower(), f'XBT{currency.value}'.lower()}
market_ids_found = list((market_ids & exchange.markets_by_id.keys()))
market_ids = {
f'BTC{currency.value}',
f'BTC/{currency.value}',
f'XBT{currency.value}',
f'XBT/{currency.value}',
f'BTC{currency.value}'.lower(),
f'XBT{currency.value}'.lower()}
market_ids_found = list(market_ids & exchange.markets_by_id.keys())
if not market_ids_found:
market_ids_found = list(market_ids & set([market['symbol'] for market in exchange.markets_by_id.values()]))
if market_ids_found:
logger.debug(f'market_ids_found: {market_ids_found}')
market_id = market_ids_found[0]
market = exchange.markets_by_id[market_id]
if market:
market = exchange.markets_by_id[exchange.market_id(market_id)]
if market:
result = market['symbol']
logger.debug(f'Found market {market}, with symbol {result}')

Expand All @@ -164,7 +173,6 @@ def request_single(exchange: ccxt.Exchange, currency: CurrencyName) -> Candle |
latest_candle = None
dt = None


if exchange.has['fetchOHLCV']:
logger.debug('fetchOHLCV')

Expand Down Expand Up @@ -261,8 +269,8 @@ def status(): return "server is running"
@app.get('/api/configure')
def get_configuration():
return {
'currencies': _settings.currencies,
'exchanges': _settings.exchanges,
'currencies': settings.currencies,
'exchanges': settings.exchanges,
}

def calculate_average_price(candles: list[Candle]) -> Candle:
Expand Down Expand Up @@ -302,14 +310,14 @@ async def get_exchanges():

def get_supported_currencies(exchange: ccxt.Exchange) -> list[str] :

required = set(_settings.currencies)
required = set(settings.currencies)
given = set(exchange.currencies.keys())

return list(required & given)

result: list[ExchangeDetails] = []

assert _supported_exchanges
assert supported_exchanges

def get_exchange_details(exchange: ccxt.Exchange) -> ExchangeDetails:

Expand All @@ -320,7 +328,7 @@ def get_exchange_details(exchange: ccxt.Exchange) -> ExchangeDetails:

currencies = []
if exchange.currencies:
currencies = [c for c in _settings.currencies
currencies = [c for c in settings.currencies
if c in exchange.currencies]

details = ExchangeDetails(
Expand All @@ -334,7 +342,7 @@ def get_exchange_details(exchange: ccxt.Exchange) -> ExchangeDetails:
return result

tasks = [asyncio.to_thread(get_exchange_details, exchange)
for exchange in _supported_exchanges.values()]
for exchange in supported_exchanges.values()]
details = await asyncio.gather(*tasks)

result = list(details)
Expand All @@ -356,7 +364,7 @@ async def now_average(currency: CurrencyName):

logger.debug(f'currency: {currency}')

def get_candle(exchange: ccxt.Exchange, currency: CurrencyName) -> tuple[ccxt.Exchange, Candle | None]:
def get_candle(exchange: ccxt.Exchange, currency: CurrencyName = currency) -> tuple[ccxt.Exchange, Candle | None]:
assert exchange
assert currency

Expand All @@ -365,16 +373,16 @@ def get_candle(exchange: ccxt.Exchange, currency: CurrencyName) -> tuple[ccxt.Ex
if currency.value in exchange.currencies:
try:
candle = None
candle = request_single(exchange, currency.value)
candle = request_single(exchange, currency)
if candle:
result = exchange, candle
except Exception as e:
logger.error(f'error requesting data from exchange: {e}')

return result

tasks = [asyncio.to_thread(get_candle, exchange, currency)
for exchange in _supported_exchanges.values()]
tasks = [asyncio.to_thread(get_candle, exchange)
for exchange in supported_exchanges.values()]
task_results = await asyncio.gather(*tasks)
logger.debug(f'task results: {task_results}')

Expand All @@ -395,7 +403,7 @@ def get_candle(exchange: ccxt.Exchange, currency: CurrencyName) -> tuple[ccxt.Ex
status_code = HTTPStatus.INTERNAL_SERVER_ERROR,
detail = 'Spotbit could get any candle data from the configured exchanges.')

exchanges_used = [exchange.name for exchange in _supported_exchanges.values()
exchanges_used = [exchange.name for exchange in supported_exchanges.values()
if exchange.name not in failed_exchanges]

result = PriceResponse(
Expand All @@ -414,14 +422,14 @@ def now(currency: CurrencyName, exchange: ExchangeName):
currency: the symbol for the base currency to use e.g. USD, GBP, UST.
'''

if exchange.value not in _supported_exchanges:
if exchange.value not in supported_exchanges:
raise HTTPException(
status_code = HTTPStatus.INTERNAL_SERVER_ERROR,
detail = f'Spotbit is not configured to use {exchange.value} exchange.')

result = None

ccxt_exchange = _supported_exchanges[exchange.value]
ccxt_exchange = supported_exchanges[exchange.value]
assert ccxt_exchange
ccxt_exchange.load_markets()

Expand Down Expand Up @@ -465,31 +473,31 @@ def get_history(*,

_since = round(since.timestamp() * 1e3)

params = {}
params = dict(end = None)
if exchange == "bitfinex":
params = {'end' : round(end.timestamp() * 1e3)}
# TODO(nochiel) Is this needed?
# params['end'] = round(end.timestamp() * 1e3)
...

candles = None
try:
wait = exchange.rateLimit * 1e-3
while wait:
try:
candles = exchange.fetchOHLCV(
symbol = pair,
limit = limit,
timeframe = timeframe,
since = _since,
params = params)

wait = 0
rate_limit = exchange.rateLimit * 1e-3
wait = rate_limit
while wait:
try:
candles = exchange.fetchOHLCV(
symbol = pair,
limit = limit,
timeframe = timeframe,
since = _since,
params = params)

except ccxt.errors.RateLimitExceeded as e:
logger.debug(f'{e}. Rate limit for {exchange} is {exchange.rateLimit}')
time.sleep(wait)
wait *= 2
wait = 0

except Exception as e:
logger.error(f'{exchange} candle request error: {e}')
except (ccxt.errors.RateLimitExceeded, ccxt.errors.DDoSProtection) as e:
logger.error(f'{exchange} candle request error: {e}')
time.sleep(wait)
wait *= rateLimit
if wait > 60: wait = 0

if candles:
result = [Candle(
Expand Down Expand Up @@ -518,7 +526,7 @@ async def get_candles_in_range(
start, end(required): datetime formatted as ISO8601 "YYYY-MM-DDTHH:mm:SS" or unix timestamp.
'''

ccxt_exchange = _supported_exchanges[exchange.value]
ccxt_exchange = supported_exchanges[exchange.value]
ccxt_exchange.load_markets()
assert ccxt_exchange.currencies
assert ccxt_exchange.markets
Expand Down Expand Up @@ -625,12 +633,13 @@ async def get_candles_at_dates(
Dates should be provided in the body of the request as a json array of dates formatted as ISO8601 "YYYY-MM-DDTHH:mm:SS".
'''

if exchange.value not in _supported_exchanges:
result: list[Candle] = []
if exchange.value not in supported_exchanges:
raise HTTPException(
detail = ServerErrors.EXCHANGE_NOT_SUPPORTED,
status_code = HTTPStatus.INTERNAL_SERVER_ERROR)

ccxt_exchange = _supported_exchanges[exchange.value]
ccxt_exchange = supported_exchanges[exchange.value]
ccxt_exchange.load_markets()

pair = get_supported_pair_for(currency, ccxt_exchange)
Expand Down Expand Up @@ -671,36 +680,6 @@ async def get_candles_at_dates(

return result

def tests():
# Placeholder
# Expected: validation errors or server errors or valid responses.

import requests
response = requests.get('http://[::1]:5000/api/now/FOOBAR')
response = requests.get('http://[::1]:5000/api/now/usd')
response = requests.get('http://[::1]:5000/api/now/USD')
response = requests.get('http://[::1]:5000/api/now/JPY')

response = requests.get('http://[::1]:5000/api/now/USD/Bitstamp')
response = requests.get('http://[::1]:5000/api/now/USD/bitstamp')
response = requests.get('http://[::1]:5000/api/now/usdt/bitstamp')

response = requests.get(
'http://[::1]:5000/api/history/USD/bitstamp?start=2019-01-01T0000&end=1522641600'
)

response = requests.get(
"http://[::1]:5000/api/history/USD/liquid?start=2022-01-01T00:00&end=2022-02-01T00:00"
)

response = requests.post('http://[::1]:5000/history/USDT/binance',
json = ['2022-01-01T00:00', '2022-02-01T00:00', '2021-12-01T00:00']
)

response = requests.post(
"http://[::1]:5000/api/history/JPY/liquid",
json=["2022-01-01T00:00", "2022-02-01T00:00", "2021-12-01T00:00"],
)

if __name__ == '__main__':
import uvicorn
Expand Down

0 comments on commit 99780de

Please sign in to comment.