Skip to content

Commit

Permalink
Cleanup API.
Browse files Browse the repository at this point in the history
Use threads for HTTP requests.
  • Loading branch information
nochiel committed Apr 16, 2022
1 parent 4ae81c8 commit d142ed8
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 74 deletions.
173 changes: 100 additions & 73 deletions app.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@
# TODO(nochiel) Give the client structured errors.
# TODO(nochiel) - Create a new exception type so that client receives structured errors as a response.
# TODO(nochiel) - If there is no data available for a response, provide a good error.
# TODO(nochiel) - Standard error page/response for non-existent routes?
# TODO(nochiel) TEST that all endpoints return consistent types.

import asyncio
import logging
from datetime import datetime, timedelta
import time
import sys
Expand Down Expand Up @@ -45,7 +46,7 @@ class Settings(BaseSettings):

@validator('currencies')
def uppercase_currency_names(cls, v):
assert v and len(v) > 0, 'no currencies'
assert v and len(v), 'no currencies'
return [exchange.upper() for exchange in v]

class Config:
Expand Down Expand Up @@ -91,8 +92,7 @@ class Config:

assert _supported_exchanges

# FIXME(nochiel) Do we need this test?
# Check if a timestamp has ms precision by modding by 1e3
# Exchange data is sometimes returned as epoch milliseconds.
def is_ms(timestamp): return timestamp % 1e3 == 0

class Candle(BaseModel):
Expand All @@ -102,7 +102,13 @@ class Candle(BaseModel):
low : float
close : float
volume : float


@validator('timestamp')
def time_in_seconds(cls, v):
result = v
if is_ms(v): result = int(v * 1e-3)
return result

class Config:
json_encoders = {
datetime: lambda v: v.isoformat()
Expand All @@ -117,6 +123,7 @@ def request_single(exchange: ccxt.Exchange, currency: str) -> Candle | None:

# TODO(nochiel) Add a type for result.
result = None
latest_candle = None
ticker = f'BTC/{currency}'
dt = None

Expand Down Expand Up @@ -170,17 +177,6 @@ def request_single(exchange: ccxt.Exchange, currency: str) -> Candle | None:

latest_candle = candles[-1]

timestamp = latest_candle[OHLCV.timestamp]
timestamp = timestamp * 1e-3 if is_ms(int(timestamp)) else timestamp

result = Candle(
timestamp = timestamp,
open = latest_candle[OHLCV.open],
high = latest_candle[OHLCV.high],
low = latest_candle[OHLCV.low],
close = latest_candle[OHLCV.close],
volume = latest_candle[OHLCV.volume]
)
except Exception as e:
app.logger.error(f'error requesting candle from {exchange.name}: {e}')

Expand All @@ -193,12 +189,17 @@ def request_single(exchange: ccxt.Exchange, currency: str) -> Candle | None:
except Exception as e:
app.logger.error(f'error on {exchange} fetch_ticker: {e}')

if candle:
app.logger.debug(f'{exchange} candle: {candle}')
timestamp = candle[OHLCV.timestamp]
timestamp = timestamp * 1e-3 if is_ms(timestamp) else timestamp
latest_candle = candle

result = candle
if latest_candle:
result = Candle(
timestamp = latest_candle[OHLCV.timestamp],
open = latest_candle[OHLCV.open],
high = latest_candle[OHLCV.high],
low = latest_candle[OHLCV.low],
close = latest_candle[OHLCV.close],
volume = latest_candle[OHLCV.volume]
)

return result

Expand Down Expand Up @@ -242,7 +243,7 @@ def get_configuration():

def calculate_average_price(candles: list[Candle]) -> Candle:

assert candles and len(candles) > 0
assert candles and len(candles)

mean = statistics.mean
average_open = mean(candle.open for candle in candles)
Expand Down Expand Up @@ -295,7 +296,7 @@ class ExchangeResult(BaseModel):

assert _supported_exchanges

async def get_exchange_details(exchange: ccxt.Exchange) -> ExchangeDetails:
def get_exchange_details(exchange: ccxt.Exchange) -> ExchangeDetails:

result = None

Expand All @@ -317,7 +318,7 @@ async def get_exchange_details(exchange: ccxt.Exchange) -> ExchangeDetails:
result = details
return result

tasks = [get_exchange_details(exchange)
tasks = [asyncio.to_thread(get_exchange_details, exchange)
for exchange in _supported_exchanges.values()]
details = await asyncio.gather(*tasks)
result = ExchangeResult(exchanges = list(details))
Expand All @@ -338,9 +339,12 @@ class PriceResponse(BaseModel):
result = None

currency = currency.upper()
if currency not in _settings.currencies:
flask.abort(flask.Response(response = f'Spotbit is not configured to use {currency}.',
status = HTTPStatus.INTERNAL_SERVER_ERROR))
averaging_time = _settings.averaging_time

async def get_candle(exchange: ccxt.Exchange, currency: str) -> tuple[ccxt.Exchange, Candle | None]:
def get_candle(exchange: ccxt.Exchange, currency: str) -> tuple[ccxt.Exchange, Candle | None]:
assert exchange
assert currency

Expand All @@ -357,7 +361,7 @@ async def get_candle(exchange: ccxt.Exchange, currency: str) -> tuple[ccxt.Excha

return result

tasks = [asyncio.create_task(get_candle(exchange, currency))
tasks = [asyncio.to_thread(get_candle, exchange, currency)
for exchange in _supported_exchanges.values()]
task_results = await asyncio.gather(*tasks)
app.logger.debug(f'task results: {task_results}')
Expand Down Expand Up @@ -401,11 +405,14 @@ def now(currency, exchange):
status = HTTPStatus.INTERNAL_SERVER_ERROR))

result = None
currency = currency.upper()
exchange = _supported_exchanges[exchange]
exchange.load_markets()
currency = currency.upper()
if currency not in _settings.currencies:
flask.abort(flask.Response(response = f'Spotbit is not configured to use {currency}.',
status = HTTPStatus.INTERNAL_SERVER_ERROR))
if currency not in exchange.currencies:
flask.abort(flask.Response(response = f'Spotbit does not support the {currency} on {exchange}',
flask.abort(flask.Response(response = f'Spotbit does not support the {currency} on {exchange}.',
status = HTTPStatus.INTERNAL_SERVER_ERROR))

assert exchange
Expand Down Expand Up @@ -509,16 +516,15 @@ async def get_candles_in_range(currency, exchange, date_start, date_end):

app.logger.debug(f'requesting periods with {limit} limit: {periods}')

async def get_history(*,
def get_history(*,
exchange: ccxt.Exchange = exchange,
since: datetime,
limit: int = limit,
timeframe: str = timeframe):
timeframe: str = timeframe) -> list:
assert exchange

app.logger.debug(f'{exchange} {pair} {since}')

# FIXME(nochiel) Handle ccxt.base.errors.RateLimitExceeded
rateLimit = exchange.rateLimit
_since = int(since.timestamp() * 1e3)

Expand All @@ -528,12 +534,22 @@ async def get_history(*,

candles = None
try:
candles = exchange.fetchOHLCV(
symbol = pair,
limit = limit,
timeframe = timeframe,
since = _since,
params = params)
wait = 1
while wait > 0:
try:
candles = exchange.fetchOHLCV(
symbol = pair,
limit = limit,
timeframe = timeframe,
since = _since,
params = params)

wait = 0

except ccxt.base.errors.RateLimitExceeded as e:
app.logger.debug(f'{e}. Rate limit for {exchange} is {rateLimit}')
time.sleep(wait)
wait *= 2

except Exception as e:
app.logger.error(f'{exchange} candle request error: {e}')
Expand All @@ -542,30 +558,43 @@ async def get_history(*,

tasks = []
for p in periods:
task = asyncio.create_task(get_history(
exchange = exchange,
since = p))
task = asyncio.to_thread(get_history,
exchange = exchange,
since = p)
tasks.append(task)

if remaining_frames > 0:
last_candle_time = periods[-1] + (dt * 100)
assert last_candle_time < end
app.logger.debug(f'remaining_frames: {remaining_frames}')
task = asyncio.create_task(get_history(
since = last_candle_time,
limit = remaining_frames))
task = asyncio.to_thread(get_history,
since = last_candle_time,
limit = remaining_frames)
tasks.append(task)

new_last_candle_time = last_candle_time + (dt * remaining_frames )
app.logger.debug(f'new_last_candle_time: {new_last_candle_time}')

candles = await asyncio.gather(*tasks)
_candles = []
for c in candles: # flatten candles
_candles += c
candles = _candles
task_results = await asyncio.gather(*tasks)
candles = []
for result in task_results:
if result: candles.extend(result)

candles = [Candle(
timestamp = candle[OHLCV.timestamp],
open = candle[OHLCV.open],
high = candle[OHLCV.high],
low = candle[OHLCV.low],
close = candle[OHLCV.close],
volume = candle[OHLCV.volume]
)

for candle in candles]

assert len(candles) == (n_periods * limit) + remaining_frames
expected_number_of_candles = (n_periods * limit) + remaining_frames
received_number_of_candles = len(candles)
if received_number_of_candles < expected_number_of_candles:
logger.info(f'{exchange} does not have data for the entire period. Expected {expected_number_of_candles} candles. Got {received_number_of_candles} candles')

if candles is None or len(candles) == 0:
flask.abort(flask.Response(response = f'Spotbit did not receive any candle history for the period {start} - {end} from {exchange}',
Expand All @@ -574,8 +603,8 @@ async def get_history(*,
app.logger.debug(f'got: {len(candles)} candles')
app.logger.debug(f'candles: {candles[:10]} ... {candles[-10:]}')

# FIXME(nochiel) Make a type for this. Also is this the best way to return candle data?
return flask.jsonify(candles)
result = flask.jsonify([candle.dict() for candle in candles])
return result

tests_for_get_candles_at_dates = [
'''curl http://localhost:5000/history/usdt/binance --header 'Content-Type:application/json' --data '[\"2022-01-01T00:00 \", \"2022-02-01T00:00\", \"2021-12-01T00:00\"]''',
Expand Down Expand Up @@ -604,13 +633,15 @@ async def get_candles_at_dates(currency, exchange):
status = HTTPStatus.BAD_REQUEST))

try:
app.logger.debug(f'request data: {request.get_data()}')
dates = request.get_json()
dates = [datetime.fromisoformat(date) for date in dates] if dates else None
app.logger.debug(f'dates: {dates}')
# TODO(nochiel) Check if isoformat or timestamps. Raise HTTPException if dates don't parse.
dates = [datetime.fromisoformat(date) for date in dates] if dates else []
if len(dates) == 0:
raise Exception(f'received: ${dates}')
except Exception as e:
flask.abort(flask.Response(
response = f'error processing dates: {e}',
response = f'error processing dates: {e}.\n' +
'Dates should be provided in the body of the request as a json array of dates formatted as ISO8601 "YYYY-MM-DDTHH:mm:SS".',
status = HTTPStatus.INTERNAL_SERVER_ERROR))

results = None
Expand All @@ -630,7 +661,7 @@ async def get_candles_at_dates(currency, exchange):

candles = {}
params = {}
async def get_candle(*,
def get_candle(*,
exchange: ccxt.Exchange = exchange,
since: datetime,
limit: int = limit,
Expand All @@ -641,7 +672,7 @@ async def get_candle(*,
assert exchange

result = None
candle, candles = None, None
candles = None
try:
candles = exchange.fetchOHLCV(
symbol = pair,
Expand All @@ -654,30 +685,26 @@ async def get_candle(*,
app.logger.error(e)

if candles and len(candles[0]) > 0:
app.logger.debug(f'candle: {candle}')
app.logger.debug(f'candles: {candles}')
candle = candles[0]

result = Candle(
timestamp = candle[OHLCV.timestamp],
open = candle[OHLCV.open],
high = candle[OHLCV.high],
low = candle[OHLCV.low],
close = candle[OHLCV.close],
volume = candle[OHLCV.volume]
open = candle[OHLCV.open],
high = candle[OHLCV.high],
low = candle[OHLCV.low],
close = candle[OHLCV.close],
volume = candle[OHLCV.volume]
)

return result

assert dates
tasks = [asyncio.create_task(get_candle(since = date))
tasks = [asyncio.to_thread(get_candle, since = date)
for date in dates]
candles = await asyncio.gather(*tasks)
candles = [candle for candle in candles if candle is not None]
candles = [candle.dict() for candle in candles if candle is not None]

class CandleListResponse(BaseModel):
pair: str
candles: list[Candle]
result = flask.jsonify(candles)

result = CandleListResponse(pair = pair,
candles = candles)

return result.dict()
return result
2 changes: 1 addition & 1 deletion spotbit.config
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
[server]

# exchanges: Exchanges to use for data collection. If this is not set, then Spotbit will use all supported exchanges. There are more than 100 exchanges supported.
exchanges = ["liquid", "phemex", "bitstamp", "ftx"]
exchanges = ["liquid", "phemex", "bitstamp", "ftx", "binance"]

# currencies: Currencies to collect data for in the Spotbit database. If a currency is supported by an exchange, then data can be collected for it. Currencies should be lowercase three letter currency codes, eg usd gpy cny jpy eur. If no currencies are listed then Spotbit will collect for usd only.
currencies = ["usd", "gbp", "jpy", "usdt", "eur", "0xcafebabe"]
Expand Down

0 comments on commit d142ed8

Please sign in to comment.