diff --git a/README.md b/README.md index c8a6956..d3f9502 100644 --- a/README.md +++ b/README.md @@ -13,10 +13,9 @@ Users may choose to run their own local **Spotbit** server, or simply to connect **Why Use Spotbit?** -1. **Privacy.** Spotbit is the only way to anoymize your Bitcoin pricing via a Tor connection. -1. **Speed.** Spotbit provides information more quickly and at a higher rate due to the use of a local database. -1. **Reliability.** Spotbit aggregrates information, making your pricing data more trustworthy. -1. **Self-sovereignty.** Spotbit runs from your server (or that of your choice), saves to your database, and allows you to choose the pricing services relevant to your trading pair. +1. **Privacy.** Spotbit can work as a Tor hidden service. +1. **Reliability.** Spotbit aggregrates information using the exchanges/sources you configure, making your pricing data more trustworthy. +1. **Self-sovereignty.** Spotbit can run from your server. ## Additional Information @@ -96,7 +95,7 @@ The Flask server runs over port 5000. The following API routes can be used via t * `/now//` - Returns the latest candle for BTC/currency (if supported by the exchange API), or the latest spot price. - currency is a three letter fiat currency (e.g. USD, JPY, etc) - - exchange is the name of an exchange supported by CCXT. If the exchange is already in the config file, then the newest row from your local database is returned. If the exchange is not supported, then Spotbit will directly request this exchange and return data, but it will not be stored locally. + - exchange is the name of an exchange supported by CCXT. It is used if Spotbit is configured to use the exchange. If the exchange is not supported, then Spotbit will return an error. - Example response: ``` {"close":10314.06,"currency_pair":"BTC-USD","datetime":"2020-09-13 14:31:00","high":10315.65,"id":122983,"low":10314.06,"open":10315.65,"timestamp":1600007460000,"vol":3.53308926} @@ -136,8 +135,6 @@ You can check on a spotbit's status at any time by running `sudo systemctl statu Spotbbit uses a config file located at `/home/spotbit/.spotbit/spotbit.config` to store settings. The allowed fields are: -* `keepWeeks` - - The number of weeks worth of data to keep in the database for exchanges that you are not retrieving history for. This setting does not apply to exchanges that have a long-term history. * `exchanges` - The exchanges you want to get current data for. They should be supplied as a list of lowercase names separated by spaces. By default, spotbit.config will include the exchanges needed to create averages for you in USD, GBP, EUR, JPY and USDT. * `currencies` @@ -211,7 +208,7 @@ To use Spotbit you'll need to use the following tools: All of these Python libraries can be installed via pip and Python3.8 can be installed for you in the install script. ### Motivation -Spotbit aims to provide an easy option for aggregating exchange data that does not require the use of a third party data website like Coinmarketcap. These data can be used inside of other apps or for personal use / analysis. Acquiring data across many exchanges can be a pain because normally one would need write slightly different code in order to interact with each API. Additionally, the use of local storage means that data can always be served quickly even while new data are being downloaded. Spotbit runs two separate threads - one with the Flask webserver, and another that makes API requests to exchanges to update the local database. +Spotbit aims to provide an easy option for aggregating exchange data that does not require the use of a third party data website like Coinmarketcap. These data can be used inside of other apps or for personal use / analysis. Acquiring data across many exchanges can be a pain because normally one would need write slightly different code in order to interact with each API. ### Derived from… This Spotbit project is either derived from or was inspired by the need of Fully Noded 2 to display realtime price info in-app: diff --git a/app.py b/app.py new file mode 100644 index 0000000..ab7fd81 --- /dev/null +++ b/app.py @@ -0,0 +1,705 @@ +# TODO(nochiel) Add tests +# TODO(nochiel) - Standard error page/response for non-existent routes? + +import asyncio +from datetime import datetime, timedelta +from http import HTTPStatus +import logging +import os +import pathlib +import sys +import time + +import ccxt + +from fastapi import FastAPI, HTTPException +from pydantic import BaseModel, BaseSettings, validator + +class ServerErrors: # TODO(nochiel) Replace these with HTTPException + NO_DATA = 'Spotbit did not find any data.' + EXCHANGE_NOT_SUPPORTED = 'Spotbit is not configured to support the exchange.' + BAD_DATE_FORMAT = 'Please use dates in YYYY-MM-DDTHH:mm:ss ISO8601 format or unix timestamps.' + +class Error(BaseModel): + code : int + reason : str + exchange : str + currency : str + +class Settings(BaseSettings): + + exchanges: list[str] = [] + currencies: list[str] + + onion: str | None = None + debug: bool = False + + @validator('currencies') + def uppercase_currency_names(cls, v): + assert v and len(v), 'no currencies' + return [exchange.upper() for exchange in v] + + class Config: + env_file = 'spotbit.config' + +# TODO(nochiel) Write comprehensive tests for each exchange so that +# we can determine which ones shouldn't be supported i.e. populate +# this list with exchanges that fail tests. +_unsupported_exchanges = [] +supported_exchanges: dict[str, ccxt.Exchange] = {} + +def get_logger(): + + import logging + logger = logging.getLogger(__name__) + if settings.debug: logger.setLevel(logging.DEBUG) + + formatter = logging.Formatter( + '[%(asctime)s] %(levelname)s (thread %(thread)d):\t%(module)s.%(funcName)s: %(message)s') + + handler = logging.StreamHandler() + handler.setFormatter(formatter) + logger.addHandler(handler) + + import logging.handlers + handler = logging.handlers.RotatingFileHandler( + filename = 'spotbit.log', + maxBytes = 1 << 20, + backupCount = 2, + ) + handler.setFormatter(formatter) + logger.addHandler(handler) + + return logger + +settings = Settings() + +# TODO(nochiel) Move this to the settings class. +from enum import Enum +CurrencyName = Enum('CurrencyName', [(currency, currency) for currency in settings.currencies]) + +logger = get_logger() +assert logger + +app = FastAPI(debug = settings.debug) + +logger.debug(f'Using currencies: {settings.currencies}') +if not settings.exchanges: + logger.info('using all exchanges.') + settings.exchanges = list(ccxt.exchanges) + +assert settings.exchanges + +logger.info('Initialising supported exchanges.') +for e in settings.exchanges: + if e in ccxt.exchanges and e not in _unsupported_exchanges: + supported_exchanges[e] = ccxt.__dict__[e]() + try: + if app.debug is False: supported_exchanges[e].load_markets() + except Exception as e: + logger.debug(f'Error loading markets for {e}.') + +assert supported_exchanges + +ExchangeName = Enum('ExchangeName', [(id.upper(), id) for id in supported_exchanges]) + +# Exchange data is sometimes returned as epoch milliseconds. +def is_ms(timestamp): return timestamp % 1e3 == 0 + +class Candle(BaseModel): + timestamp : datetime + open : float + high : float + low : float + close : float + volume : float + + @validator('timestamp') + def time_in_seconds(cls, v): + result = v + + if type(v) is int and is_ms(v): + result = int(v * 1e-3) + + return result + + class Config: + json_encoders = { + datetime: lambda v: v.isoformat() + } + +def get_supported_pair_for(currency: CurrencyName, exchange: ccxt.Exchange) -> str: + assert exchange + + result = '' + + exchange.load_markets() + market_ids = { + f'BTC{currency.value}', + f'BTC/{currency.value}', + f'XBT{currency.value}', + f'XBT/{currency.value}', + f'BTC{currency.value}'.lower(), + f'XBT{currency.value}'.lower()} + market_ids_found = list(market_ids & exchange.markets_by_id.keys()) + if not market_ids_found: + market_ids_found = list(market_ids & set([market['symbol'] for market in exchange.markets_by_id.values()])) + if market_ids_found: + logger.debug(f'market_ids_found: {market_ids_found}') + market_id = market_ids_found[0] + market = exchange.markets_by_id[exchange.market_id(market_id)] + if market: + result = market['symbol'] + logger.debug(f'Found market {market}, with symbol {result}') + + return result + + +# FIXME(nochiel) Redundancy: Merge this with get_history. +# TODO(nochiel) TEST Do we really need to check if fetchOHLCV exists in the exchange api? +# TEST ccxt abstracts internally using fetch_trades so we don't have to use fetch_ticker ourselves. +def request_single(exchange: ccxt.Exchange, currency: CurrencyName) -> Candle | None: + ''' + Make a single request, without having to loop through all exchanges and currency pairs. + ''' + assert exchange and isinstance(exchange, ccxt.Exchange) + assert currency + + exchange.load_markets() + pair = get_supported_pair_for(currency, exchange) + if not pair: return None + + result = None + latest_candle = None + dt = None + + if exchange.has['fetchOHLCV']: + logger.debug('fetchOHLCV') + + timeframe = '1m' + match exchange.id: + case 'btcalpha' | 'hollaex': + timeframe = '1h' + case 'poloniex': + timeframe = '5m' + + # Some exchanges have explicit limits on how many candles you can get at once + # TODO(nochiel) Simplify this by checking for 2 canonical limits to use. + limit = 1000 + match exchange.id: + case 'bitstamp': + limit = 1000 + case 'bybit': + limit = 200 + case 'eterbase': + limit = 1000000 + case 'exmo': + limit = 3000 + case 'btcalpha': + limit = 720 + + since = round((datetime.now() - timedelta(hours=1)).timestamp() * 1e3) + + # TODO(nochiel) TEST other exchanges requiring special conditions: bitstamp, bitmart? + params = [] + if exchange.id == 'bitfinex': + params = { + 'limit':100, + 'start': since, + 'end': round(datetime.now().timestamp() * 1e3) + } + + try: + candles = exchange.fetchOHLCV( + symbol = pair, + timeframe = timeframe, + limit = limit, + since = since, + params = params) + + latest_candle = candles[-1] + + except Exception as e: + logger.error(f'error requesting candle from {exchange.name}: {e}') + + else: # TODO(nochiel) TEST + logger.debug(f'fetch_ticker: {pair}') + + candle = None + try: + candle = exchange.fetch_ticker(pair) + except Exception as e: + logger.error(f'error on {exchange} fetch_ticker: {e}') + + latest_candle = candle + + if latest_candle: + result = Candle( + timestamp = latest_candle[OHLCV.timestamp], + open = latest_candle[OHLCV.open], + high = latest_candle[OHLCV.high], + low = latest_candle[OHLCV.low], + close = latest_candle[OHLCV.close], + volume = latest_candle[OHLCV.volume] + ) + + return result + + +# Routes +# TODO(nochiel) Add tests for routes. + +# TODO(nochiel) Put the api behind an /api/v1 path. + +# TODO(nochiel) Make this the Spotbit frontend. +@app.get('/') +def index(): + + raise HTTPException( + detail = 'Not implemented.', + status_code = 404 + ) + + +@app.get('/api/status') +def status(): return "server is running" + +# TODO(nochiel) FINDOUT Do we need to enable clients to change configuration? +# If clients should be able to change configuration, use sessions. +@app.get('/api/configure') +def get_configuration(): + return { + 'currencies': settings.currencies, + 'exchanges': settings.exchanges, + } + +def calculate_average_price(candles: list[Candle]) -> Candle: + + assert candles + + from statistics import mean + average_open = mean(candle.open for candle in candles) + average_high = mean(candle.high for candle in candles) + average_low = mean(candle.low for candle in candles) + average_close = mean(candle.close for candle in candles) + average_volume = mean(candle.volume for candle in candles) + + candle = Candle( + timestamp = min(candle.timestamp for candle in candles), + open = average_open, + high = average_high, + low = average_low, + close = average_close, + volume = average_volume, + ) + return candle + +class ExchangeDetails(BaseModel): + id: str + name: str + url : str + countries: list[str] + currencies: list[str] + +@app.get('/api/exchanges', response_model = list[ExchangeDetails]) +async def get_exchanges(): + # Ref. https://github.com/BlockchainCommons/spotbit/issues/54 + ''' + Get a list of exchanges that this instance of Spotbit has been configured to use. + ''' + + def get_supported_currencies(exchange: ccxt.Exchange) -> list[str] : + + required = set(settings.currencies) + given = set(exchange.currencies.keys()) + + return list(required & given) + + result: list[ExchangeDetails] = [] + + assert supported_exchanges + + def get_exchange_details(exchange: ccxt.Exchange) -> ExchangeDetails: + + result = None + + assert exchange + exchange.load_markets() + + currencies = [] + if exchange.currencies: + currencies = [c for c in settings.currencies + if c in exchange.currencies] + + details = ExchangeDetails( + id = exchange.id, + name = exchange.name, + url = exchange.urls['www'], + countries = exchange.countries, + currencies = get_supported_currencies(exchange)) # TODO(nochiel) TEST + + result = details + return result + + tasks = [asyncio.to_thread(get_exchange_details, exchange) + for exchange in supported_exchanges.values()] + details = await asyncio.gather(*tasks) + + result = list(details) + + return result + +class PriceResponse(BaseModel): + candle : Candle + exchanges_used : list[str] + failed_exchanges : list[str] + +@app.get('/api/now/{currency}', response_model = PriceResponse) +async def now_average(currency: CurrencyName): + ''' + Return an average price from the exchanges configured for the given currency. + ''' + + result = None + + logger.debug(f'currency: {currency}') + + def get_candle(exchange: ccxt.Exchange, currency: CurrencyName = currency) -> tuple[ccxt.Exchange, Candle | None]: + assert exchange + assert currency + + result = (exchange, None) + exchange.load_markets() + if currency.value in exchange.currencies: + try: + candle = None + candle = request_single(exchange, currency) + if candle: + result = exchange, candle + except Exception as e: + logger.error(f'error requesting data from exchange: {e}') + + return result + + tasks = [asyncio.to_thread(get_candle, exchange) + for exchange in supported_exchanges.values()] + task_results = await asyncio.gather(*tasks) + logger.debug(f'task results: {task_results}') + + candles = [] + failed_exchanges = [] + for exchange, candle in task_results: + if candle: + candles.append(candle) + else: + failed_exchanges.append(exchange.name) + + logger.debug(f'candles: {candles}') + average_price_candle = None + if len(candles): + average_price_candle = calculate_average_price(candles) + else: + raise HTTPException( + status_code = HTTPStatus.INTERNAL_SERVER_ERROR, + detail = 'Spotbit could not get any candle data from the configured exchanges.') + + exchanges_used = [exchange.name for exchange in supported_exchanges.values() + if exchange.name not in failed_exchanges] + + result = PriceResponse( + candle = average_price_candle, + exchanges_used = exchanges_used, + failed_exchanges = failed_exchanges, + ) + + return result + +@app.get('/api/now/{currency}/{exchange}', response_model = Candle) +def now(currency: CurrencyName, exchange: ExchangeName): + ''' + parameters: + exchange: an exchange to use. + currency: the symbol for the base currency to use e.g. USD, GBP, UST. + ''' + + if exchange.value not in supported_exchanges: + raise HTTPException( + status_code = HTTPStatus.INTERNAL_SERVER_ERROR, + detail = f'Spotbit is not configured to use {exchange.value} exchange.') + + result = None + + ccxt_exchange = supported_exchanges[exchange.value] + assert ccxt_exchange + ccxt_exchange.load_markets() + + if currency.value not in ccxt_exchange.currencies: + raise HTTPException( + status_code = HTTPStatus.INTERNAL_SERVER_ERROR, + detail = f'Spotbit does not support {currency.value} on {ccxt_exchange}.' ) + + result = request_single(ccxt_exchange, currency) + if not result: + raise HTTPException( + status_code = HTTPStatus.INTERNAL_SERVER_ERROR, + detail = ServerErrors.NO_DATA + ) + + return result + +from enum import IntEnum +class OHLCV(IntEnum): + ''' + Indices for components ina candle list. + ''' + timestamp = 0 + open = 1 + high = 2 + low = 3 + close = 4 + volume = 5 + +def get_history(*, + exchange: ccxt.Exchange, + since: datetime, + limit: int, + timeframe: str, + pair: str) -> list[Candle] | None: + + assert exchange + logger.debug(f'{exchange} {pair} {since}') + + result = None + + _since = round(since.timestamp() * 1e3) + params = dict() + if exchange == "bitfinex": + # TODO(nochiel) Is this needed? + # params['end'] = round(end.timestamp() * 1e3) + ... + + candles = None + + # @nochiel: Re. Bitmex. Rate-limiting is very aggressive on authenticated API calls. + # For a large number of requests this throttling doesn't help and Bitmex will increase + # its rate limit to 3600 seconds! + # Serializing requests won't help either + wait = exchange.rateLimit * 1e-3 + while wait: + try: + candles = exchange.fetchOHLCV( + symbol = pair, + limit = limit, + timeframe = timeframe, + since = _since, + params = params) + + wait = 0 + + except (ccxt.errors.RateLimitExceeded, ccxt.errors.DDoSProtection) as e: + logger.error(f'rate-limited on {exchange}: {e}') + logger.error(f'waiting {wait} seconds on {exchange} before making another request') + time.sleep(wait) + wait *= 2 + if wait > 120: + raise Exception(f'{exchange} has rate limited spotbit') from e + + except Exception as e: + logger.error(f'{exchange} candle request error: {e}') + + if candles: + result = [Candle( + timestamp = candle[OHLCV.timestamp], + open = candle[OHLCV.open], + high = candle[OHLCV.high], + low = candle[OHLCV.low], + close = candle[OHLCV.close], + volume = candle[OHLCV.volume] + ) + + for candle in candles] + + return result + +@app.get('/api/history/{currency}/{exchange}', response_model = list[Candle]) +async def get_candles_in_range( + currency: CurrencyName, + exchange: ExchangeName, + start: datetime, + end: datetime = datetime.now()): + ''' + parameters: + exchange(required): an exchange to use. + currency(required): the symbol for the base currency to use e.g. USD, GBP, UST. + start, end(required): datetime formatted as ISO8601 "YYYY-MM-DDTHH:mm:SS" or unix timestamp. + ''' + + ccxt_exchange = supported_exchanges[exchange.value] + ccxt_exchange.load_markets() + assert ccxt_exchange.currencies + assert ccxt_exchange.markets + + pair = get_supported_pair_for(currency, ccxt_exchange) + if not pair: + raise HTTPException( + detail = f'Spotbit does not support the {pair} pair on {ccxt_exchange}', + status_code = HTTPStatus.INTERNAL_SERVER_ERROR) + + result = None + + start = start.astimezone(start.tzinfo) + end = end.astimezone(end.tzinfo) + + (start, end) = (end, start) if end < start else (start, end) + logger.debug(f'start: {start}, end: {end}') + + limit = 100 + candles = None + periods = [] + + dt = timedelta(hours = 1) + params = None + timeframe = '1h' + + if ccxt_exchange.timeframes: + if '1h' in ccxt_exchange.timeframes: + timeframe = '1h' + dt = timedelta(hours = 1) + + elif '30m' in ccxt_exchange.timeframes: + timeframe = '30m' + dt = timedelta(minutes = 30) + + n_periods, remaining_frames_duration = divmod(end - start, dt * 100) + remaining_frames = remaining_frames_duration // dt + logger.debug(f'requesting #{n_periods + remaining_frames} periods') + + if n_periods == 0: + n_periods = 1 + limit, remaining_frames = remaining_frames, 0 + for i in range(n_periods): + periods.append(start + i * (dt * 100)) + + logger.debug(f'requesting periods with {limit} limit: {periods}') + + tasks = [] + args = dict( exchange = ccxt_exchange, + limit = limit, + timeframe = timeframe, + pair = pair) + + for period in periods: + args['since'] = period + task = asyncio.to_thread(get_history, + **args) + tasks.append(task) + + if remaining_frames > 0: + last_candle_time = periods[-1] + (dt * 100) + assert last_candle_time < end + logger.debug(f'remaining_frames: {remaining_frames}') + + args['since'] = last_candle_time + args['limit'] = remaining_frames + task = asyncio.to_thread(get_history, + **args) + tasks.append(task) + + new_last_candle_time = last_candle_time + (dt * remaining_frames ) + logger.debug(f'new_last_candle_time: {new_last_candle_time}') + + task_results = await asyncio.gather(*tasks) + candles = [] + for result in task_results: + if result: candles.extend(result) + + expected_number_of_candles = (n_periods * limit) + remaining_frames + received_number_of_candles = len(candles) + if received_number_of_candles < expected_number_of_candles: + logger.info(f'{ccxt_exchange} does not have data for the entire period. Expected {expected_number_of_candles} candles. Got {received_number_of_candles} candles') + + if not candles: + raise HTTPException( + detail = f'Spotbit did not receive any candle history for the period {start} - {end} from {ccxt_exchange}', + status_code = HTTPStatus.INTERNAL_SERVER_ERROR) + + logger.debug(f'got: {len(candles)} candles') + logger.debug(f'candles: {candles[:10]} ... {candles[-10:]}') + result = candles + + return result + + +# TODO(nochiel) If no exchange is given, test all supported exchanges until we get candles for all dates. +# Return all database rows within `tolerance` for each of the supplied dates +@app.post('/api/history/{currency}/{exchange}') +async def get_candles_at_dates( + currency: CurrencyName, + exchange: ExchangeName, + dates: list[datetime]) -> list[Candle]: + ''' + Dates should be provided in the body of the request as a json array of dates formatted as ISO8601 "YYYY-MM-DDTHH:mm:SS". + ''' + + result: list[Candle] = [] + if exchange.value not in supported_exchanges: + raise HTTPException( + detail = ServerErrors.EXCHANGE_NOT_SUPPORTED, + status_code = HTTPStatus.INTERNAL_SERVER_ERROR) + + ccxt_exchange = supported_exchanges[exchange.value] + ccxt_exchange.load_markets() + + pair = get_supported_pair_for(currency, ccxt_exchange) + + # Different exchanges have different ticker formates + if not pair: + raise HTTPException( + detail = f'Spotbit does not support the BTC/{currency.value} pair on {exchange.value}', + status_code = HTTPStatus.INTERNAL_SERVER_ERROR) + + # FIXME(nochiel) Different exchanges return candle data at different resolutions. + # I need to get candle data in the lowest possible resolution then filter out the dates needed. + limit = 100 + timeframe = '1h' + + if ccxt_exchange.timeframes: + if '1h' in ccxt_exchange.timeframes: + timeframe = '1h' + + elif '30m' in ccxt_exchange.timeframes: + timeframe = '30m' + + candles_found: tuple[list[Candle] | None] + args = [dict(exchange = ccxt_exchange, + limit = limit, + timeframe = timeframe, + pair = pair, + + since = date) + for date in dates] + tasks = [asyncio.to_thread(get_history, **arg) + for arg in args] + candles_found = await asyncio.gather(*tasks) + + result = [candles_at[0] for candles_at in candles_found if candles_at] + if not result: + raise HTTPException( + detail = f'Spotbit did not receive any candle history for the requested dates.', + status_code = HTTPStatus.INTERNAL_SERVER_ERROR) + + + return result + + +if __name__ == '__main__': + import uvicorn + + assert logger + logger.info(f'app.debug: {app.debug}') + uvicorn.run('app:app', + host ='::', + port = 5000, + debug = app.debug, + log_level = 'debug' if app.debug else 'info', + reload = app.debug) diff --git a/configure.py b/configure.py deleted file mode 100644 index 89640ea..0000000 --- a/configure.py +++ /dev/null @@ -1,7 +0,0 @@ -import server - -# This script is run by the install script when spotbit is being set up. -# It allows for spotbit to function as a systemd service on a user's system. -# Once the database has been opened, spotbit can easily be run in the background. -server.configure_db() -print("database configured in /home/spotbit/.spotbit/sb.db") \ No newline at end of file diff --git a/installSpotbit.config b/installSpotbit.config index 3eb6ed7..cf3beec 100644 --- a/installSpotbit.config +++ b/installSpotbit.config @@ -16,8 +16,8 @@ NOPROMPT=false # character long password. # Don't ignore this. USERPASSWORD= -# If you already have python3.8, then set the following to "false" -INSTALL_PYTHON38=true +# If you already have python3.10, then set the following to "false" +INSTALL_PYTHON310=true ########################### diff --git a/installSpotbit.sh b/installSpotbit.sh index 87025bf..308a154 100644 --- a/installSpotbit.sh +++ b/installSpotbit.sh @@ -37,7 +37,7 @@ config_get() { # system NOPROMPT="$(config_get NOPROMPT)" USERPASSWORD="$(config_get USERPASSWORD)" -INSTALL_PYTHON38="$(config_get INSTALL_PYTHON38)" +INSTALL_PYTHON310="$(config_get INSTALL_PYTHON310)" # vps VPS="$(config_get VPS)" @@ -168,7 +168,7 @@ System ------ NOPROMPT..........: $NOPROMPT USERPASSWORD......: $USERPASSWORD -INSTALL_PYTHON38..: $INSTALL_PYTHON38 +INSTALL_PYTHON310..: $INSTALL_PYTHON310 VPS --- @@ -392,25 +392,25 @@ fi #### # 5. Install python & python dependencies #### -if "$INSTALL_PYTHON38"; then +if "$INSTALL_PYTHON310"; then echo " ---------------- - $MESSAGE_PREFIX Installing python 3.8 + $MESSAGE_PREFIX Installing python 3.10 ---------------- " apt-get install build-essential checkinstall -y apt-get install libreadline-gplv2-dev libncursesw5-dev libssl-dev libsqlite3-dev tk-dev libgdbm-dev libc6-dev libbz2-dev libffi-dev zlib1g-dev -y mkdir ~spotbit/downloads chown -R spotbit ~spotbit/downloads - sudo -u spotbit wget --progress=bar:force https://www.python.org/ftp/python/3.8.5/Python-3.8.5.tgz -O ~spotbit/downloads/Python-3.8.5.tgz cd ~spotbit/downloads - tar xzf Python-3.8.5.tgz - cd Python-3.8.5 + sudo -u spotbit wget --progress=bar:force https://www.python.org/ftp/python/3.10.4/Python-3.10.4.tgz + tar xzf Python-3.10.4.tgz + cd Python-3.10.4 ./configure --enable-optimizations make altinstall # altinstall to prevent replacing default python binary at /usr/bin/python - if ! [[ -z "$(python3.8 -V)" ]]; then + if ! [[ -z "$(python3.10 -V)" ]]; then echo " - $MESSAGE_PREFIX installed $(python3.8 -V)" + $MESSAGE_PREFIX installed $(python3.10 -V)" fi fi @@ -421,7 +421,7 @@ echo " $MESSAGE_PREFIX Installing python dependencies ---------------- " -python3.8 -m pip install -r requirements.txt +python3.10 -m pip install -r requirements.txt cd "$SCRIPTS_DIR" @@ -435,7 +435,7 @@ else mkdir /home/spotbit/.spotbit chown -R spotbit /home/spotbit/.spotbit touch /home/spotbit/.spotbit/spotbit.config - cat spotbit_example.config >> /home/spotbit/.spotbit/spotbit.config + cat spotbit.config >> /home/spotbit/.spotbit/spotbit.config fi @@ -456,7 +456,7 @@ Requires=tor.service After=tor.service [Service] -ExecStart=/usr/local/bin/python3.8 /home/spotbit/spotbit/server.py +ExecStart=/usr/local/bin/python3.10 /home/spotbit/spotbit/app.py # Process management #################### diff --git a/requirements.txt b/requirements.txt index 0c7e84f..2f41d7c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,75 @@ -## Requirements for running Spotbit ## -flask -ccxt \ No newline at end of file +aiodns==3.0.0 +aiohttp==3.8.1 +aiosignal==1.2.0 +anyio==3.5.0 +asgiref==3.5.0 +astroid==2.9.3 +async-timeout==4.0.2 +atomicwrites==1.4.0 +attrs==21.4.0 +bdkpython==0.0.5 +beancount==2.3.5 +beautifulsoup4==4.10.0 +bottle==0.12.19 +buidl==0.2.36 +cachetools==5.0.0 +ccxt==1.74.49 +certifi==2021.10.8 +cffi==1.15.0 +chardet==4.0.0 +charset-normalizer==2.0.12 +click==8.0.4 +colorama==0.4.4 +cryptography==36.0.1 +fastapi==0.75.0 +frozenlist==1.3.0 +google-api-core==2.7.1 +google-api-python-client==2.43.0 +google-auth==2.6.2 +google-auth-httplib2==0.1.0 +googleapis-common-protos==1.56.0 +h11==0.13.0 +httplib2==0.20.4 +idna==3.3 +iniconfig==1.1.1 +isort==5.10.1 +itsdangerous==2.1.0 +Jinja2==3.0.3 +lazy-object-proxy==1.7.1 +lxml==4.8.0 +MarkupSafe==2.1.0 +mccabe==0.6.1 +multidict==6.0.2 +nodeenv==1.6.0 +packaging==21.3 +pip==22.0.3 +platformdirs==2.5.1 +pluggy==1.0.0 +ply==3.11 +protobuf==3.20.0 +py==1.11.0 +pyasn1==0.4.8 +pyasn1-modules==0.2.8 +pycares==4.1.2 +pycparser==2.21 +pydantic==1.9.0 +pyparsing==3.0.7 +pytest==7.1.1 +python-dateutil==2.8.2 +python-dotenv==0.19.2 +requests==2.27.1 +rsa==4.8 +setuptools==60.9.3 +six==1.16.0 +sniffio==1.2.0 +soupsieve==2.3.2 +starlette==0.17.1 +toml==0.10.2 +tomli==2.0.1 +typing_extensions==4.1.1 +uritemplate==4.1.1 +urllib3==1.26.8 +uvicorn==0.17.6 +Werkzeug==2.0.3 +wrapt==1.13.3 +yarl==1.7.2 diff --git a/server.py b/server.py deleted file mode 100644 index 68d503a..0000000 --- a/server.py +++ /dev/null @@ -1,818 +0,0 @@ -from flask import Flask, request as flaskRequest, jsonify, render_template -import requests -import json -import time -from datetime import datetime, timedelta -import sqlite3 -import sys -import ccxt -import os -from threading import Thread -from pathlib import Path -import logging - -#setup the logging module for file output -log = logging.getLogger('spotbit') -log.setLevel(logging.DEBUG) -logFileHandler = logging.FileHandler('/home/spotbit/.spotbit/spotbit.log') -logFileHandler.setLevel(logging.DEBUG) -log.addHandler(logFileHandler) -#Config Settings -allowedFields = ["keepWeeks", "exchanges", "currencies", "interval", "exchange_limit", "averaging_time", "historicalExchanges", "historyEnd"] -configPath = Path("/home/spotbit/.spotbit/spotbit.config") -#Default values; these will be overwritten when the config file is read -exchanges = [] -historicalExchanges = [] # exchanges that we want the history of -currencies = [] -interval = 10 #time to wait between GET requests to servers, to avoid ratelimits -keepWeeks = 3 # add this to the config file -exchange_limit = 200 #when there are more exchanges than this multithreading is ideal -performance_mode = False -averaging_time = 1 # the number of hours that we should average information over -historyEnd = 0 -on_demand = False # whether or not we are caching data -score = 0 #the current percent of empty tables -#the information regarding the current thread -threadResults = None -# curated exchange lists for creating averages -curated_exchanges = {'USD': ['coinbasepro', 'okcoin', 'bitfinex', 'kraken', 'bitstamp'], 'GBP': ['coinbasepro', 'coinsbank', 'bitstamp', 'kraken', 'cexio'], 'EUR': ['kraken', 'coinbasepro', 'bitstamp', 'bitfinex', 'indoex'], 'JPY': ['bitflyer', 'liquid', 'coincheck', 'bitbank', 'zaif'], 'USDT': ['binance', 'okex', 'huobipro', 'bitmax', 'gateio']} -curated_exchanges_list = ['gemini', 'bitstamp', 'okcoin', 'coinsbit', 'coinbasepro', 'coinsbank', 'kraken', 'cexio', 'bitfinex', 'indoex', 'bitflyer', 'liquid', 'coincheck', 'bitbank', 'zaif', 'hitbtc', 'binance', 'okex', 'gateio', 'bitmax'] -curated_currencies = ['USD', 'GBP', 'EUR', 'JPY', 'AUD', 'USDT'] - - -p = Path("/home/spotbit/.spotbit/sb.db") -db = sqlite3.connect(p) -print(f"db opened in {p}") -log.debug(f"db opened in {p}") -ONION = "" -try: - ONION = os.environ["ONION"] #get this value from the path - print(f"spotbit is running at {ONION}") -except Exception as e: - print(f"cant find ONION in PATH {e}") - -# Database configuration -# We need to have the database opened manually once so that systemd can access it -def configure_db(): - p = Path("/home/spotbit/.spotbit/sb.db") - db = sqlite3.connect(p) - print(f"db opened in {p}") - log.debug(f"db opened in {p}") -app = Flask(__name__) - -# split up the number of exchanges per chunk based on how many cpu cores are available -# cpuOffset: the number of cores you want to try and utilize. -def optimize_chunks(cpuOffset): - return int(len(exchanges) / (os.cpu_count()-cpuOffset)) - -# Create a dict that contains ccxt objects for every supported exchange. -# The API will query a subset of these exchanges based on what the user has specified -# Unsupported exchanges: bitvaro phemex vaultoro -# Future Plans: -# Hard coding supported exchanges is a bad practice. CCXT autogenerates code for each exchange and therefore at least in theory may frequently support new exchanges. -# Need to find a way to automatically create a list of exchange objects. -# btctradeim doesn't want to work on raspberry pi -def init_supported_exchanges(): - objects = {"acx":ccxt.acx(), "aofex":ccxt.aofex(), "bequant":ccxt.bequant(), "bibox":ccxt.bibox(), "bigone":ccxt.bigone(), "binance":ccxt.binance(), "bitbank":ccxt.bitbank(), "bitbay":ccxt.bitbay(), "bitfinex":ccxt.bitfinex(), "bitflyer":ccxt.bitflyer(), "bitforex":ccxt.bitforex(), "bithumb":ccxt.bithumb(), "bitkk":ccxt.bitkk(), "bitmax":ccxt.bitmax(), "bitstamp":ccxt.bitstamp(), "bittrex":ccxt.bittrex(), "bitz":ccxt.bitz(), "bl3p":ccxt.bl3p(), "bleutrade":ccxt.bleutrade(), "braziliex":ccxt.braziliex(), "btcalpha":ccxt.btcalpha(), "btcbox":ccxt.btcbox(), "btcmarkets":ccxt.btcmarkets(), "btctradeua":ccxt.btctradeua(), "bw":ccxt.bw(), "bybit":ccxt.bybit(), "bytetrade":ccxt.bytetrade(), "cex":ccxt.cex(), "chilebit":ccxt.chilebit(), "coinbase":ccxt.coinbase(), "coinbasepro":ccxt.coinbasepro(), "coincheck":ccxt.coincheck(), "coinegg":ccxt.coinegg(), "coinex":ccxt.coinex(), "coinfalcon":ccxt.coinfalcon(), "coinfloor":ccxt.coinfloor(), "coinmate":ccxt.coinmate(), "coinone":ccxt.coinone(), "crex24":ccxt.crex24(), "currencycom":ccxt.currencycom(), "digifinex":ccxt.digifinex(), "dsx":ccxt.dsx(), "eterbase":ccxt.eterbase(), "exmo":ccxt.exmo(), "exx":ccxt.exx(), "foxbit":ccxt.foxbit(), "ftx":ccxt.ftx(), "gateio":ccxt.gateio(), "gemini":ccxt.gemini(), "hbtc":ccxt.hbtc(), "hitbtc":ccxt.hitbtc(), "hollaex":ccxt.hollaex(), "huobipro":ccxt.huobipro(), "ice3x":ccxt.ice3x(), "independentreserve":ccxt.independentreserve(), "indodax":ccxt.indodax(), "itbit":ccxt.itbit(), "kraken":ccxt.kraken(), "kucoin":ccxt.kucoin(), "lakebtc":ccxt.lakebtc(), "latoken":ccxt.latoken(), "lbank":ccxt.lbank(), "liquid":ccxt.liquid(), "livecoin":ccxt.livecoin(), "luno":ccxt.luno(), "lykke":ccxt.lykke(), "mercado":ccxt.mercado(), "oceanex":ccxt.oceanex(), "okcoin":ccxt.okcoin(), "okex":ccxt.okex(), "paymium":ccxt.paymium(), "poloniex":ccxt.poloniex(), "probit":ccxt.probit(), "southxchange":ccxt.southxchange(), "stex":ccxt.stex(), "surbitcoin":ccxt.surbitcoin(), "therock":ccxt.therock(), "tidebit":ccxt.tidebit(), "tidex":ccxt.tidex(), "upbit":ccxt.upbit(), "vbtc":ccxt.vbtc(), "wavesexchange":ccxt.wavesexchange(), "whitebit":ccxt.whitebit(), "yobit":ccxt.yobit(), "zaif":ccxt.zaif(), "zb":ccxt.zb()} - return objects - -# Check if a given exchange is in the list of supported exchanges. -# Currently, the list of supported exchanges is all those supported by ccxt aside from a small handful that did not seem to work properly. May be bug in ccxt or just a typo in their code / docs -def is_supported(exchange): - try: - obj = ex_objs[exchange] - if obj != None: - return True - else: - return False - except Exception as e: - print(f"caught an error: {e}") - log.error(f"caught an error {e}") - return False - -# Check if a timestamp has ms precision by modding by 1000 -def is_ms(timestamp): - if timestamp % 1000 == 0: - return True - return False - -# We create a list of all exchanges to do error checking on user input -ex_objs = init_supported_exchanges() -num_exchanges = len(ex_objs) -print(f"created list of {num_exchanges}") -log.info(f"created list of {num_exchanges}") - -@app.route('/') -def index(): - date_start = (datetime.now() - timedelta(days=5)).timestamp()*1e3 - date_end = (datetime.now()).timestamp()*1e3 - f0 = f"{ONION}/now/USD/coinbasepro" - f1 = f"{ONION}/now/USD" - f2 = f"{ONION}/hist/USD/coinbasepro/{date_start}/{date_end}" - f3 = f"{ONION}/configure" - return render_template('index.html', fetch_0=f0,fetch_1=f1,fetch_2=f2,fetch_3=f3,date_start=date_start,date_end=date_end) - -# TODO: create an html page to render here -@app.route('/status') -def status(): - global score - global threadResults - if performance_mode: - l = len(threadResults) - content = f"Threads: {l}" - for chunk, thread in threadResults: - html += f"{chunk} at memory address: {thread}" - return f"

{content}

" - else: - return "server is running" - -# configure the settings of Spotbit while the server is still running -# send a GET request to this route to view current settings -# send a POST request to this route with settings fields stored in JSON to update settings -# TODO: make the updates persistant by also writing them to file. -@app.route('/configure', methods=['GET', 'POST']) -def configure(): - # seems like this needs to be done in order to reference global vars inside of the flask server thread - global keepWeeks - global currencies - global exchanges - global interval - global on_demand - if flaskRequest.method == 'POST': - #return the config settings TODO: error check so that the user doesn't have to submit everything at once. Also implement a form here. - keepWeeks = flaskRequest.json("keepWeeks") - exchanges = flaskRequest.json("exchanges") - currencies = flaskRequest.json("currencies") - interval = flaskRequest.json("interval") - return {'updated settings?':'yes', 'keepWeeks':keepWeeks, 'currencies':currencies, 'exchanges':exchanges, 'interval':interval} - else: - return {'updated settings?':'no', 'keepWeeks':keepWeeks, 'currencies':currencies, 'on demand exchanges':list(ex_objs.keys()), 'cached exchanges': exchanges, 'interval':interval} - -# return averages in a list of tuples -# find the oldest timestamp in the list of tuples -def average_price_value(tuple_list, tuple_length, ticker): - running_sums = [0] * tuple_length - oldest_timestamp = 1e13 - for tup in tuple_list: - if tup != None and tup[1] < oldest_timestamp: - oldest_timestamp = tup[1] - for i in range(0,tuple_length): - if i > 3: - running_sums[i] += tup[i] - list_len = len(tuple_list) - return {'id': 'average_value', 'timestamp': (datetime.now()).timestamp()*1e3, 'datetime': datetime.now(), 'oldest_timestamp': oldest_timestamp, 'currency_pair': ticker, 'open': running_sums[4]/list_len, 'high': running_sums[5]/list_len, 'low': running_sums[6]/list_len, 'close': running_sums[7]/list_len, 'volume': running_sums[8]/list_len} - -# route for when a call is made without specifying an exchange. -# return an average of the 5 curated exchanges for that currency -@app.route('/now/') -def now_noex(currency): - global averaging_time - db_n = sqlite3.connect(p, timeout=30) - currency = currency.upper() - ticker = f"BTC-{currency}" - # only calculate averages if a list has been curated already - if currency in curated_currencies: - components = curated_exchanges[currency] - failed_exchanges = [] - components_list = [] - for exchange in components: - ms_check = f"SELECT timestamp FROM {exchange} LIMIT 1;" - cursor = db_n.execute(ms_check) - res = cursor.fetchone() - # only take values from within 15 min of present - ts_delta = (datetime.now() - timedelta(hours=averaging_time)).timestamp() - if res!= None and is_ms(int(res[0])): - ts_delta *= 1e3 - statement = f"SELECT * FROM {exchange} WHERE pair = '{ticker}' AND timestamp > {ts_delta} ORDER BY timestamp DESC LIMIT 1;" - cursor = db_n.execute(statement) - res = cursor.fetchone() - if res != None: - components_list.append(res) - else: - # if there is no data in the table yet, then try a direct request. - res = fallback_to_direct(exchange, currency, db_n) - if len(res) < 2: - log.error(f"could not get data from {exchange}") - failed_exchanges.append(exchange) - else: - components_list.append(res) - result = average_price_value(components_list, 9, ticker) - result['exchanges'] = components - result['failed_exchanges'] = failed_exchanges - return result - -# Get the latest price entry in the database. -# Currency: the three letter base currency desired. Must be a currency you are already collecting data for -# Exchange: the exchange to query data for from the local database. -@app.route('/now//') -def now(currency, exchange): - db_n = sqlite3.connect(p, timeout=30) - ticker = "BTC-{}".format(currency.upper()) - if exchange in exchanges: - #if the exchange is already in the config file - #statement = "SELECT * FROM {} WHERE pair = '{}' AND timestamp = (SELECT MAX(timestamp) FROM {});".format(exchange, ticker, exchange) - statement = f"SELECT * FROM {exchange} WHERE pair = '{ticker}' ORDER BY timestamp DESC LIMIT 1;" - try: - cursor = db_n.execute(statement) - res = cursor.fetchone() - except sqlite3.OperationalError: - print("database is locked. Cannot access it") - log.error("database is locked. Cannot access it") - return {'err': 'database locked'} - if res != None: - db_n.close() - return {'id':res[0], 'timestamp':res[1], 'datetime':res[2], 'currency_pair':res[3], 'open':res[4], 'high':res[5], 'low':res[6], 'close':res[7], 'vol':res[8]} - else: - db_n.close() - return fallback_to_direct(exchange, currency, db_n) - elif exchange == "all": #if all is selected then we select from all exchanges and average the latest close - result_set = [] - for e in exchanges: - ts_cutoff = (datetime.now() - timedelta(hours=averaging_time)).timestamp() - check_ms = f"SELECT timestamp FROM {e} LIMIT 1;" - cursor = db_n.execute(check_ms) - db_n.commit() - ts = cursor.fetchone() - if ts != None and is_ms(int(ts[0])): - print(f"using millisecond precision for {e}") - logging.info(f"using millisecond precision for {e}") - ts_cutoff *= 1e3 - statement = f"SELECT timestamp, close FROM {e} WHERE timestamp > {ts_cutoff} AND pair = '{ticker}' ORDER BY timestamp LIMIT 1;" - cursor = db_n.execute(statement) - db_n.commit() - result = cursor.fetchone() - if result != None: - result_set.append(result[1]) - return {'ticker': list_mean(result_set)} - else: - return fallback_to_direct(exchange, currency, db_n) - -# This method will directly request an exchange that is supported but who's table is also empty -def fallback_to_direct(exchange, currency, db_n): - #make a direct request - ticker = "BTC-{}".format(currency.upper()) - res = request_single(exchange, currency) - #db_n.close() - if res != None: - return res - else: - return {'id': res} - -# Find the mean of a list of two-value tuples -def list_mean(input_list): - avg = 0.0 - for l in input_list: - avg += l - return avg/len(input_list) - -# Get data from local storage inside of a certain range. -# Parameters: -# Currency: the fiat base currency to fetch data for. Should be a three letter currency code in lowercase. -# Exchange: the exchange to get data from. -# date_start and date_end: date_start is the oldest time value in the range desired. It can be provided as a millisecond timestamp or as a datetime formatted as "YYYY-MM-DDTHH:mm:SS". -@app.route('/hist////', methods=['GET']) -def hist(currency, exchange, date_start, date_end): - db_n = sqlite3.connect(p, timeout=10) - ticker = "BTC-{}".format(currency.upper()) - #check what format of dates we have - if (str(date_start)).isdigit(): - date_s = int(date_start) - date_e = int(date_end) - else: - #error checking for malformed dates - try: - date_s = (datetime.fromisoformat(date_start.replace("T", " "))).timestamp()*1000 - date_e = (datetime.fromisoformat(date_end.replace("T", " "))).timestamp()*1000 - except Exception: - return "malformed dates. Provide both dates in the same format: use YYYY-MM-DDTHH:mm:SS or millisecond timestamps" - # check the table we want to select from to see the precision of it - check = f"SELECT timestamp FROM {exchange} ORDER BY timestamp DESC LIMIT 1;" - cursor = db_n.execute(check) - statement = "" - ts = cursor.fetchone() - if ts != None and is_ms(int(ts[0])): - statement = f"SELECT * FROM {exchange} WHERE timestamp > {date_s} AND timestamp < {date_e} AND pair = '{ticker}';" - else: - # for some exchanges we cannot use ms precision timestamps (such as coinbase) - date_s /= 1e3 - date_e /= 1e3 - statement = f"SELECT * FROM {exchange} WHERE timestamp > {date_s} AND timestamp < {date_e} AND pair = '{ticker}';" - # keep trying in case of database locked error - while True: - try: - cursor = db_n.execute(statement) - break - except sqlite3.OperationalError as oe: - time.sleep(5) - - res = cursor.fetchall() - db_n.close() - return {'columns': ['id', 'timestamp', 'datetime', 'currency_pair', 'open', 'high', 'low', 'close', 'vol'], 'data':res} - -# Return all database rows within `tolerance` for each of the supplied dates -# Dates should be provided as millisecond timestamps separated by hyphens -@app.route('/hist///') -def hist_single_dates(currency, exchange, dates): - db_n = sqlite3.connect(p, timeout=10) - ticker = "BTC-{}".format(currency.upper()) - dates_list = dates.split("-") - # the number of minutes away from a given date that is considered acceptable - tolerance = 30 - results = {} - check_ms = f"SELECT timestamp FROM {exchange} LIMIT 1;" - cursor = db_n.execute(check_ms) - ts = cursor.fetchone() - ms_precision = True - if ts != None and is_ms(int(ts[0])) != True: - ms_precision = False - for d in dates_list: - try: - ts = int(d) - except Exception: - return f"malformed date {d}" - dt = datetime.fromtimestamp(ts/1e3) - lower_bound = (dt - timedelta(minutes=tolerance)).timestamp()*1e3 - upper_bound = (dt + timedelta(minutes=tolerance)).timestamp()*1e3 - if ms_precision == False: - ts /= 1e3 - statement = f"SELECT * FROM {exchange} WHERE pair = '{ticker}' AND timestamp > {lower_bound} AND timestamp > {upper_bound} ORDER BY timestamp ASC;" - # right now we return everything - while True: - try: - cursor = db_n.execute(statement) - res = cursor.fetchall()[0] - break - except sqlite3.OperationalError: - time.sleep(2) - if res != None: - results[f"{d}"] = {'id':res[0], 'timestamp':res[1], 'datetime':res[2], 'pair':res[3], 'open':res[4], 'high':res[5], 'low':res[6], 'close':res[7], 'vol':res[8]} - else: - results[f"{d}"] = None - return results -# Make a single request, without having to loop through all exchanges and currency pairs. -# This is intended for when the user requests an exchange in /now that is not present in the database. -# It will probably not be used for /hist because of the length of time getting arbitrary amounts of historical data can be -def request_single(exchange, currency): - if not is_supported(exchange): - return f"{exchange} is not supported by CCXT" - obj = ex_objs[exchange] - ticker = "BTC/{}".format(currency.upper()) - dt = None - if obj.has['fetchOHLCV']: - tframe = '1m' - # drop all this in a separate method - lim = 1000 - if exchange == "bleutrade" or exchange == "btcalpha" or exchange == "rightbtc" or exchange == "hollaex": - tframe = '1h' - if exchange == "poloniex": - tframe = '5m' - # some exchanges have explicit limits on how many candles you can get at once - if exchange == "bitstamp": - lim = 1000 - if exchange == "bybit": - lim = 200 - if exchange == "eterbase": - lim = 1000000 - if exchange == "exmo": - lim = 3000 - if exchange == "btcalpha": - lim = 720 - result = None - if exchange == "bitfinex": #other exchanges requiring special conditions: bitstamp, bitmart - params = {'limit':100, 'start':(round((datetime.now()-timedelta(hours=1)).timestamp()*1000)), 'end':round(datetime.now().timestamp()*1000)} - try: - result = ex_objs[exchange].fetch_ohlcv(symbol=ticker, timeframe=tframe, since=None, params=params) - except Exception as e: - print(f"got an error requesting info from {exchange}: {e}") - logging.error(f"got an error requesting info frm {exchange}: {e}") - else: - try: - result = obj.fetch_ohlcv(symbol=ticker, timeframe=tframe, since=None, limit=lim) - except Exception as e: - print(f"got an error requesting info from {exchange}: {e}") - logging.error(f"got an error requesting info from {exchange}: {e}") - else: - try: - result = obj.fetch_ticker(ticker) - if result != None and is_ms(result['timestamp']) == False: - dt = datetime.fromtimestamp(result['timestamp']) - else: - dt = datetime.fromtimestamp(result['timestamp'] / 1e3) - if result != None: - return {'close': result['close'], 'symbol': ticker, 'timestamp': result['timestamp'], 'datetime': dt, 'volume': result['bidVolume'], 'id': 'on_demand'} - except Exception as e: - print(f"got ratelimited on {e}") - logging.error(f"got ratelimited on {e}") - if result != None: - res = result[-1] - if is_ms(res[0]): - dt = datetime.fromtimestamp(res[0]/1e3) - else: - dt = datetime.fromtimestamp(res[0]) - return {'id': 'on_demand', 'timestamp': res[0], 'datetime': dt, 'currency_pair': ticker, 'open': res[1], 'high': res[2], 'low': res[3], 'close': res[4], 'vol': res[5]} - else: - return "no data" - - -# Make an HTTP GET request to exchanges via the ccxt API -# TODO: add error checking for if an exchange supports ohlc data. If not, default to regular price data. (done) -# Loop through all chosen exchanges, check if they are supported, loop through all chosen currencies, for each make request to ohlc endpoint if supported, else price ticker. Write data to local storage. -# Bitfinex special rule: bitfinex returns candles from the beginning of time, not the most recent. This is a behavior of the API itself and has nothing to do with this code or ccxt. Therefore we must specify the timeframe desired in the optional params field of the function call with a dictionary of available options. -def request(exchanges,interval,db_n): - global currencies - for e in exchanges: - for curr in currencies: - ticker = "BTC/{}".format(curr) - success = True - if ex_objs[e].has['fetchOHLCV']: - candle = None - tframe = '1m' - lim = 1000 - if e == "bleutrade" or e == "btcalpha" or e == "rightbtc" or e == "hollaex": - tframe = '1h' - if e == "poloniex": - tframe = '5m' - # some exchanges have explicit limits on how many candles you can get at once - if e == "bitstamp": - lim = 1000 - if e == "bybit": - lim = 200 - if e == "eterbase": - lim = 1000000 - if e == "exmo": - lim = 3000 - if e == "btcalpha": - lim = 720 - if e == "bitfinex": - params = {'limit':100, 'start':(round((datetime.now()-timedelta(hours=1)).timestamp()*1000)), 'end':round(datetime.now().timestamp()*1000)} - try: - candle = ex_objs[e].fetch_ohlcv(symbol=ticker, timeframe=tframe, since=None, params=params) - if candle == None: - raise Exception(f"candle from {e} is null") - except Exception as err: #figure out this error type - #the point so far is to gracefully handle the error, but waiting for the next cycle should be good enough - if "does not have" not in str(err): - print(f"error fetching candle: {e} {curr} {err}") - log.error(f"error fetching candle: {e} {curr} {err}") - success = False - else: - try: - candle = ex_objs[e].fetch_ohlcv(symbol=ticker, timeframe=tframe, since=None, limit=lim) #'ticker' was listed as 'symbol' before | interval should be determined in the config file - if candle == None: - raise Exception(f"candle from {e} is nulll") - except Exception as err: - if "does not have" not in str(err): - print(f"error fetching candle: {e} {curr} {err}") - log.error(f"error fetching candle: {e} {curr} {err}") - success = False - if success: - times_inserted = 0 - for line in candle: - ts = datetime.fromtimestamp(line[0]/1e3) #check here if we have a ms timestamp or not - for l in line: - if l == None: - l = 0 - #this is another error check condition for when null values slip into the data. - statement = "INSERT INTO {} (timestamp, datetime, pair, open, high, low, close, volume) VALUES ({}, '{}', '{}', {}, {}, {}, {}, {});".format(e, line[0], ts, ticker.replace("/", "-"), line[1], line[2], line[3], line[4], line[5]) - try: - db_n.execute(statement) - db_n.commit() - times_inserted += len(candle) - except sqlite3.OperationalError as op: - nulls = [] - c = 0 - # identify where the null value is - for l in line: - if l == None: - nulls.append(c) - c += 1 - print(f"exchange: {e} currency: {curr}\nsql statement: {statement}\nerror: {op}(moving on)") - log.error(f"exchange: {e} currency: {curr} sql statement: {statement} error: {op}") - now = datetime.now() - print(f"[{now}] | inserted into {e} {curr} {times_inserted} times") - log.info(f"[{now}] | inserted into {e} {curr} {times_inserted} times") - else: - try: - price = ex_objs[e].fetch_ticker(ticker) - except Exception as err: - print(f"error fetching ticker: {err}") - log.error(f"error fetching ticker: {err}") - success = False - if success: - ts = None - try: - if is_ms(int(price['timestamp'])): - ts = datetime.fromtimestamp(int(price['timestamp'])/1e3) - else: - ts = datetime.fromtimestamp(int(price['timestamp'])) - except OverflowError as oe: - print(f"{oe} caused by {ts}") - ticker = ticker.replace("/", "-") - statement = f"INSERT INTO {e} (timestamp, datetime, pair, open, high, low, close, volume) VALUES ({price['timestamp']}, '{ts}', '{ticker}', 0.0, 0.0, 0.0, {price['last']}, 0.0);" - db_n.execute(statement) - db_n.commit() - now = datetime.now() - print(f"[{now}] | inserted into {e} {curr} VALUE: {price['last']}") - log.info(f"[{now}] | inserted into {e} {curr} VALUE: {price['last']}") - time.sleep(interval) - -# Thread method. Makes requests every interval seconds. -# Adding this method here to make request more versatile while maintaining the same behavior -def request_periodically(exchanges, interval): - db_n = sqlite3.connect(p, timeout=30) - while True: - request(exchanges,interval,db_n) - -# Split the list of exchanges into chunks up to size chunk_size. -# Create a thread for each chunk and start it, then add the thread to a list. -# Return a list of tuples that contain the list of whats in each chunk and a list of the actual thread objects. -def request_fast(exchanges,interval, chunk_size): - count = 0 - chunks = [] - threads = [] - current_chunk = [] - # split up the list of exchanges - for e in exchanges: - if count < chunk_size: - current_chunk.append(e) - count += 1 - else: - count = 0 - chunks.append(current_chunk) - current_chunk = [] - # Start a thread for each chunk - for chunk in chunks: - print(f"creating thread for chunk {chunk}") - log.info(f"creating thread for chunk {chunk}") - cThread = Thread(target=request_periodically, args=(chunk,interval)) - cThread.start() - threads.append(cThread) - return (chunks, threads) - -# Fetch the complete historical data for an exchange for a given time interval in milliseconds -# start_date is the oldest date -# end_date is the newest date -def request_history(exchange, currency, start_date, end_date): - global interval - db_n = sqlite3.connect(p, timeout=10) - ticker = f"BTC/{currency}" - while start_date < end_date: - #params = {'limit': 10000, 'start': start_date, 'end': int((datetime.fromtimestamp(start_date/1e3) + timedelta(hours=2)).timestamp()*1e3)} - params = {'start': start_date, 'end': end_date} - tick = ex_objs[exchange].fetch_ohlcv(symbol=ticker, timeframe='1m', params=params) - for line in tick: - dt = None - symbol = ticker.replace("/", "-") - try: - if is_ms(int(line['timestamp'])): - dt = datetime.fromtimestamp(line['timestamp'] / 1e3) - else: - dt = datetime.fromtimestamp(line['timestamp']) - statement = f"INSERT INTO {exchange} (timestamp, datetime, pair, open, high, low, close, volume) VALUES ({line['timestamp']}, '{dt}', '{symbol}', 0.0, 0.0, 0.0, {line['last']}, 0.0);" - except TypeError: - if line[0] % 1000 == 0: - dt = datetime.fromtimestamp(line[0] / 1e3) - else: - dt = datetime.fromtimestamp(line[0]) - statement = f"INSERT INTO {exchange} (timestamp, datetime, pair, open, high, low, close, volume) VALUES ({line[0]}, '{dt}', '{symbol}', {line[1]}, {line[2]}, {line[3]}, {line[4]}, {line[5]});" - db_n.execute(statement) - db_n.commit() - l = len(tick) - print(f"table: {exchange} period: {start_date} to {end_date} rows inserted: {l}") - log.info(f"table: {exchange} period: {start_date} to {end_date} rows inserted: {l}") - start_date += 1e4 #leaving this hardcoded for now - start_date = int(start_date) - time.sleep(interval) - -# Create a thread for each exchange that needs history. -def request_history_periodically(histExchanges, currencies, start_date): - history_threads = [] - for h in histExchanges: - hThread = Thread(target=request_history, args=(h, "USD", historyEnd, datetime.now().timestamp()*1e3)) - hThread.start() - history_threads.append(hThread) - print(f"started thread for {h}") - log.info(f"started thread for {h}") - return history_threads - -# Read the values stored in the config file and store them in memory. -# Run during install and at every run of the server. -# Returns void -def read_config(): - global exchanges - global interval - global performance_mode - global averaging_time - global exchange_limit - global historicalExchanges - global historyEnd - global keepWeeks - global on_demand - with open(configPath, "r") as f: - lines = f.readlines() - #read each line in the file - for line in lines: - #split the current line - setting_line = line.split("=") - #if there are invalid lines in the file ignore them - if "#" in setting_line[0]: - pass #ignore comments - elif setting_line[0] not in allowedFields and "#" not in setting_line[0]: - print(f"invalid config setting {setting_line[0]}") - log.error(f"invalid config setting {setting_line[0]}") - elif setting_line[0] == "keepWeeks": - try: - keepWeeks = int(setting_line[1]) - except Exception as e: - print(f"could not read keepWeeks field. Using default setting of {keepWeeks} weeks. Error: {e}") - log.error(f"could not read keepWeeks field. Using default setting of {keepWeeks} weeks. Error: {e}") - elif setting_line[0] == "exchanges": - exs = setting_line[1].split(" ") - for e in exs: - e = e.replace("\n", "") - if e == "all": - exchanges = list(ex_objs.keys()) - on_demand = True - break - if e not in exchanges and is_supported(e) == True: - exchanges.append(e) - else: - print(f"{e} is not supported by CCXT!") - log.error(f"{e} is not supported by CCXT!") - elif setting_line[0] == "currencies": - currs = setting_line[1].split(" ") - for c in currs: - #need to make sure currency codes are all caps and have newlines dropped off - c_formatted = (c.replace("\n", "")).upper() - if c_formatted not in currencies: - if "\n" in c: - currencies.append(c_formatted) - else: - currencies.append(c_formatted) - elif setting_line[0] == "interval": - interval = int(setting_line[1]) - elif setting_line[0] == "exchange_limit": - try: - exchange_limit = int((setting_line[1].replace("\n", ""))) - except TypeError: - print("invalid value in exchange_limit field. Must be an integer") - log.error("invalid value in exchange_limit field. Must be an integer") - elif setting_line[0] == "averaging_time": - try: - averaging_time = int((setting_line[1]).replace("\n", "")) - except TypeError: - print("invalid value in averaging_time field. Must be an integer (number of hours)") - log.error("invalid value in averaging_time field. Must be an integer (number of hours)") - elif setting_line[0] == "historicalExchanges": - hists = setting_line[1].split(" ") - for h in hists: - h = (h.replace("\n", "")) - historicalExchanges.append(h) - print(f"collecting history for {historicalExchanges}") - log.error(f"collecting history for {historicalExchanges}") - elif setting_line[0] == "historyEnd": - try: - historyEnd = int((setting_line[1]).replace("\n", "")) - except TypeError: - print("invalid value in historyEnd. Must be millisecond timestamp (integer)") - log.error("invalid value in historyEnd. Must be millisecond timestamp (integer)") - else: - return - #print statement for debugging - len_exchanges = len(exchanges) - if len_exchanges > exchange_limit: - print(f"{len_exchanges} exchanges detected. Using performance mode (multithreading)") - log.info(f"{len_exchanges} exchanges detected. Using performance mode (multithreading)") - performance_mode = True - - print(f" Settings read:\n keepWeeks: {keepWeeks}\n exchanges: {exchanges}\n currencies: {currencies}\n interval: {interval}\n exchange_limit: {exchange_limit}\n averaging_time: {averaging_time}\n historicalExchanges: {historicalExchanges}\n historyEnd: {historyEnd}") - log.info(f" Settings read:\n keepWeeks: {keepWeeks}\n exchanges: {exchanges}\n currencies: {currencies}\n interval: {interval}\n exchange_limit: {exchange_limit}\n averaging_time: {averaging_time}\n historicalExchanges: {historicalExchanges}\n historyEnd: {historyEnd}") - -# Check for empty tables in the database -def poke_db(exchanges): - global score - db_n = sqlite3.connect(p) - empties = 0 - for e in exchanges: - statement = f"SELECT * FROM {e} ORDER BY timestamp DESC LIMIT 1;" - c = db_n.execute(statement) - db_n.commit() - res = c.fetchone() - if res == None: - print(f"{e} table is empty!") - log.info(f"{e} table is empty!") - score = (empties / len(exchanges))*100 - print(f"{score}% of tables are empty") - return score - -# Find gaps in an exchanges database back to historyEnd and create a list of those gaps as tuples -def find_gaps(exchange, currency): - global historyEnd - db_n = sqlite3.connect(p) - currency = currency.upper() - ticker = f"BTC-{currency}" - statement = f"SELECT timestamp FROM {exchange} LIMIT 1;" - c = db_n.execute(statement) - res = c.fetchone() - if res != None and is_ms(int(res[0])): - statement = f"SELECT timestamp,datetime FROM {exchange} WHERE pair = '{ticker}' AND timestamp > {historyEnd} ORDER BY timestamp;" - else: - statement = f"SELECT timestamp, datetime FROM {exchange} WHERE pair = '{ticker}' AND timestamp > {historyEnd / 1e3} ORDER BY timestamp;" - c = db_n.execute(statement) - res = c.fetchall() - report = {} - # later in time is higer ids - i = 0 - key = 0 - stop = len(res) - #make the time gap a configurable param - while i < stop-1: - if res[i+1][0] > res[i][0]+1000000: - #report.append((res[i], res[i+1])) - report[key] = f"{res[i][0]}-{res[i+1][0]}" - key +=1 - i += 1 - return report - -# Fill gaps in a table via request_history -def backfill(report, exchange, currency): - for key in report: - print(f"filling gap {key}") - rang = report[key].split("-") - start = int(rang[0]) - end = int(rang[1]) - request_history(exchange, currency, start, end) - -# This method is called at the first run. -# It sets up the required tables inside of a local sqlite3 database. There is one table for each exchange. -# Tables are only created if they do not already exist. Install will attempt to create tables for every listed exchange at once when called. -def install(): - read_config() - #create the sqlite db - len_exchanges = len(exchanges) - print(f"creating tables for {len_exchanges} exchanges if they do not exist already.") - log.info(f"creating tables for {len_exchanges} exchanges if they do not exist already.") - for exchange in exchanges: - sql = f"CREATE TABLE IF NOT EXISTS {exchange} (id INTEGER PRIMARY KEY AUTOINCREMENT, timestamp INTEGER, datetime TEXT, pair TEXT, open REAL, high REAL, low REAL, close REAL, volume REAL)" - print(f"created table for {exchange}") - log.info(f"created table for {exchange}") - db.execute(sql) - db.commit() - db.close() - -# Remove every entry older than now-keepWeeks from all tables in the database -# if there is nothing to prune then nothing will be pruned. -def prune(keepWeeks): - # prune checks will run continuously and check every 60k seconds right now. - db_n = sqlite3.connect(p, timeout=10) - while True: - for exchange in exchanges: - #count = ((db.execute("SELECT Count(*) FROM {}".format(exchange))).fetchone())[0] - if exchange not in historicalExchanges: - check = f"SELECT MAX(timestamp) FROM {exchange};" - cursor = db_n.execute(check) - check_ts = cursor.fetchone() - statement = "" - if check_ts[0] is not None: - try: - if is_ms(int(check_ts[0])): - cutoff = (datetime.now()-timedelta(weeks=keepWeeks)).timestamp()*1000 - statement = f"DELETE FROM {exchange} WHERE timestamp < {cutoff};" - else: - cutoff = (datetime.now()-timedelta(weeks=keepWeeks)).timestamp() - statement = f"DELETE FROM {exchange} WHERE timestamp < {cutoff};" - while True: - try: - db_n.execute(statement) - break - except sqlite3.OperationalError as op: - log.error(f"{op}. Trying again in one hour...") - print(f"{op}. Trying again in one hour...") - time.sleep(3600) - db_n.commit() - except TypeError as te: - log.error(f"too early to prune {te}") - time.sleep(60000) - - -if __name__ == "__main__": - install() #install will call read_config - chunk_size = optimize_chunks(cpuOffset=0) - threadResults = None - # spin up many threads if there is a lot of exchanges present in the config file - if performance_mode: - # request_fast will create and start the threads automatically - print("performance mode is ON") - log.info("performance mode is ON") - threadResults = request_fast(exchanges, interval, chunk_size) - else: - print("performance mode is OFF") - log.info("performance mode is OFF") - prices_thread = Thread(target=request_periodically, args=(exchanges,interval)) - prices_thread.start() - request_history_periodically(historicalExchanges, currencies, historyEnd) - pruning_thread = Thread(target=prune, args=[keepWeeks]) - pruning_thread.start() - app.run() - db.close() - diff --git a/spotbit.config b/spotbit.config new file mode 100644 index 0000000..6e27afd --- /dev/null +++ b/spotbit.config @@ -0,0 +1,12 @@ +#### Spotbit Configuration File #### +[server] + +# debug = True + +# exchanges: Exchanges to use for data collection. If this is not set, then Spotbit will use all supported exchanges. There are more than 100 exchanges supported. +exchanges = ["liquid", "gemini", "phemex", "bitstamp", "ftx", "binance"] + +# currencies: Currencies to collect data for in the Spotbit database. If a currency is supported by an exchange, then data can be collected for it. Currencies should be lowercase three letter currency codes, eg usd gpy cny jpy eur. If no currencies are listed then Spotbit will collect for usd only. +currencies = ["usd", "gbp", "jpy", "usdt", "eur"] + + diff --git a/spotbit_example.config b/spotbit_example.config deleted file mode 100644 index 1c971c2..0000000 --- a/spotbit_example.config +++ /dev/null @@ -1,19 +0,0 @@ -#### Spotbit Configuration File #### -# There are currently 8 allowed fields in this file: -# - keepWeeks: The number of allowed rows in the database (default 100,000 rows) -# - exchanges: Exchanges to collect data for. If there is no option here, then Spotbit will use a demo subset of exchanges in memory. There are more than 100 exchanges supported, but Spotbit will not gather information for all these by default. Exchanges should be separated by spaces. -# - currencies: Currencies to collect data for. If a currency is supported by an exchange, then data can be collected for it. Currencies should be lowercase three letter currency codes, eg usd gpy cny jpy eur. If no currencies are listed then Spotbit will collect for usd only. -# - interval: the amount of time in seconds to wait between http GET requests. Should be at least 2 or 3 seconds to avoid ratelimits, especially when not many exchanges are being queried. -# - exchange_limit: the cutoff point when performance mode is turned on. Under this number of exchange in the exchanges field, they will all run queries inside of one thread. Above the limit, spotbit will split them into evenly sized groups based on the number of cpu cores available to decrease lag time. -# - averaging_time: the amount of hours to consider "current" when calculating an average of all exchange prices in the /now API route -# - historicalExchanges: exchanges that you want historical, and not just current, data for. -# - historyEnd: the earliest point to start collecting historical data from. Must be a millisecond timestamp -# Lines with '#' are ignored by Spotbit. -# Lines that do not represent a valid configuration option will be ignored. - -keepWeeks=50 -exchanges=gemini bitstamp okcoin coinsbit coinbasepro coinsbank kraken cexio bitfinex indoex bitflyer liquid coincheck bitbank zaif hitbtc binance okex gateio bitmax -currencies=usd gbp jpy usdt eur -interval=5 -exchange_limit=24 -averaging_time=1