Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize getting Price data #16

Open
wants to merge 60 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 59 commits
Commits
Show all changes
60 commits
Select commit Hold shift + click to select a range
cd385df
added working price preloading using ccxt
scientes Mar 5, 2021
34f0c66
reformatting and documentation
scientes Mar 5, 2021
777c441
added type hints
scientes Mar 5, 2021
a1b9f8e
added walrus operator
scientes Mar 31, 2021
5089a9e
fixes smaller issues
scientes Mar 31, 2021
1ac3e77
change warning to error
scientes Mar 31, 2021
ed50892
PoC for a grpah based solution
scientes Apr 4, 2021
657666b
working graph implementation
scientes Apr 8, 2021
74935e1
better batching implementation
scientes Apr 8, 2021
48c24c8
revert accidental change
scientes Apr 8, 2021
50378a6
formatting
scientes Apr 8, 2021
e1f4581
formatting
scientes Apr 8, 2021
95deecc
fixed some mypy bugs
scientes Apr 8, 2021
7976ee6
black formatting
provinzio Apr 8, 2021
a0c8df5
fix mypy
scientes Apr 8, 2021
08f3402
flake compatible formatting (except E501)
scientes Apr 8, 2021
23f38b6
documentation
scientes Apr 8, 2021
3939e15
small formatting
scientes Apr 8, 2021
65cbfbf
RM `# type: ignore`
provinzio Apr 8, 2021
888dba8
Use logging instead of print
provinzio Apr 8, 2021
5d4f394
Order requirements-dev alphabetically
provinzio Apr 8, 2021
be5ab79
ADD all required modules explicitly
provinzio Apr 8, 2021
8928d6d
Use explicit import
provinzio Apr 8, 2021
b2f2074
FIX remove false *1000
provinzio Apr 8, 2021
700930a
REFACTOR PriceData.get_candles
provinzio Apr 9, 2021
3c036bb
ADD flake8-bugbear to show additional warnings
provinzio Apr 9, 2021
644c2a9
Adjust bug message, ccxt raises error when ohlc limit is exceeded
provinzio Apr 10, 2021
d28babd
REFACTOR PriceData.`preload_price_data_path`
provinzio Apr 10, 2021
2d31fc1
FIX `force_decimal` should raise ValueError
provinzio Apr 10, 2021
6acd035
ADD `get_avg_candle_prices`
provinzio Apr 10, 2021
6dcb6ce
UPDATE `get_candles` docstring
provinzio Apr 10, 2021
591c749
FIX ignore missing import of `ccxt` module
provinzio Apr 10, 2021
5226447
FIX mypy/flake8 errors and some refactoring graph
provinzio Apr 10, 2021
35536ae
ADD make venv and some comments in makefile
provinzio Apr 11, 2021
2e9b866
REFACTOR Getting time batches from operations...
provinzio Apr 11, 2021
842f731
ADD TODO: preferredexchange default only for debug purposes
provinzio Apr 11, 2021
cac907c
refractored path sorting
scientes Apr 27, 2021
dd8fb3e
better ratelimiting and exchanges are set via config
scientes Apr 27, 2021
98c049f
change from list to set
scientes May 2, 2021
0808511
fixed a bug which caused misses when looking up price_data
scientes May 26, 2021
6bda214
fix ratelimit for kraken
scientes Jun 15, 2021
c41b18e
Merge branch 'main' into ohlcv-batch
scientes Aug 15, 2021
908043e
Update requirements.txt
scientes Aug 15, 2021
25dd649
Merge branch 'main' into ohlcv-batch
scientes Sep 5, 2021
f0a52ca
fix formatting
scientes Sep 8, 2021
c56ba99
fix formatting
scientes Sep 8, 2021
08b7f41
Merge branch 'main' into ohlcv-batch
provinzio Nov 28, 2021
ada4859
UPDATE Use types for type hinting
provinzio Nov 28, 2021
ee6fdcf
warning if exchange for CSV export is not in found in CCTX list
Griffsano Dec 26, 2021
1e48809
Merge pull request #2 from Griffsano/ohlcv-batch
scientes Dec 26, 2021
f5ef07c
Merge branch 'main' into ohlcv-batch
scientes Dec 26, 2021
bbd8bb8
formatting
scientes Dec 26, 2021
8f24604
added progress counter and sorted operations before fetching prices
scientes Dec 31, 2021
84f7e86
kraken ignore and warning and formatting
scientes Dec 31, 2021
6fc0948
formatting
scientes Dec 31, 2021
11d5849
Ohlcv update (#4)
Griffsano Jan 10, 2022
080f475
unpin ccxt
scientes Jan 28, 2022
1f8f77e
Merge remote-tracking branch 'origin/main' into ohlcv-batch
provinzio Feb 6, 2022
1cb8a45
Merge remote-tracking branch 'origin/main' into ohlcv-batch
provinzio Feb 6, 2022
4f2dfe7
UPDATE warning when ccxt mapping is missing for exchange
provinzio Feb 6, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion requirements-dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ flake8==3.8.4
flake8-bugbear==21.4.3
isort==5.7.0
mccabe==0.6.1
mypy-extensions==0.4.3
mypy==0.812
mypy-extensions==0.4.3
pathspec==0.8.1
pycodestyle==2.6.0
pyflakes==2.2.0
Expand Down
11 changes: 11 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,18 @@
aiodns==2.0.0
aiohttp==3.7.4.post0
async-timeout==3.0.1
attrs==20.3.0
ccxt>=1.42.7
certifi==2020.12.5
cffi==1.14.5
chardet==4.0.0
cryptography==3.4.7
idna==2.10
multidict==5.1.0
pycares==3.1.1
pycparser==2.20
python-dateutil==2.8.1
requests==2.25.1
six==1.15.0
yarl==1.1.0
urllib3==1.26.5
3 changes: 3 additions & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ warn_return_any = True
show_error_codes = True
warn_unused_configs = True

[mypy-ccxt.*]
ignore_missing_imports = True

[flake8]
exclude = *py*env*/
max_line_length = 88
Expand Down
18 changes: 18 additions & 0 deletions src/book.py
Original file line number Diff line number Diff line change
Expand Up @@ -1131,6 +1131,24 @@ def read_file(self, file_path: Path) -> None:

log.info("Reading file from exchange %s at %s", exchange, file_path)
read_file(file_path)

ccxt_mapping = {
"binance": "binance",
"binance_v2": "binance",
"coinbase": "coinbasepro",
"coinbase_pro": "coinbasepro",
"kraken_ledgers_old": "kraken",
"kraken_ledgers": "kraken",
"kraken_trades": "kraken",
"bitpanda_pro_trades": "bitpanda",
}
api = ccxt_mapping.get(exchange)

if api not in config.EXCHANGES:
log.warning(
f"Exchange `{api}` not found in EXCHANGES API list in config.ini. "
"Consider adding it to obtain more accurate price data."
)
else:
log.warning(
f"Unable to detect the exchange of file `{file_path}`. "
Expand Down
1 change: 1 addition & 0 deletions src/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,4 @@ def IS_LONG_TERM(buy: datetime, sell: datetime) -> bool:
DATA_PATH = Path(BASE_PATH, "data")
EXPORT_PATH = Path(BASE_PATH, "export")
FIAT = FIAT_CLASS.name # Convert to string.
EXCHANGES = ["binance", "coinbasepro"]
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a comment for the EXCHANGES variable. Whats the benefit of the variable? Why do we need it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The OHLCV feature is not supported for all exchanges (e.g. Kraken, it works without error, but the returned data is inaccurate and not useable). So I see two options: Define all supported (and tested) exchanges in a list, or define exchanges that should not be used for this feature. I would go with the first option (which is how it is implemented now).

318 changes: 318 additions & 0 deletions src/graph.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,318 @@
import collections
import time
from typing import Optional

import ccxt

import config
import log_config

log = log_config.getLogger(__name__)


class RateLimit:
exchangedict: dict[str, int] = {}

def limit(self, exchange):
if lastcall := self.exchangedict.get(exchange.id):
now = time.time()
delay = exchange.rateLimit / 1000
if exchange.name == "Kraken":
delay += 2 # the reported ratelimit gets exceeded sometimes
timepassed = now - lastcall
if (waitfor := delay - timepassed) > 0:
time.sleep(waitfor + 0.5)
self.exchangedict[exchange.id] = time.time()
else:
self.exchangedict[exchange.id] = time.time()


class PricePath:
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. What do you think about factoring the graph algorithm out? I think that it would be nice to seperate them since the graph algorithm alone does a huge job and could be encapsulded as an own class.
  2. We should use namedtuples and dataclasses or some other kind of types to make the code better readable and easier to work with. We could do something like I did with the Operations and add a class for each major type of variable like Edges, Vertices, etc. Especially to avoid something like this c[1][1]["avg_vol"] or that path[i][1]["stoptime"] which I find incredibly hard to read and get through

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah for readability this would be a major improvement, but i don't know how named tuples and such impact performance, because the graph algo is what currently needs the majority of computing time.

What we should try is to save the timerange and volume data directly in the graph itself and not how i do it currently (using self.cache), using this the algorithm could be far more efficient, due to the ability of being able to filter the pairs while walking the paths, reducing the iterations significantly when timerangedata is present.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be fine for us when we switch to dataclasses. My intuitive opinion on this is that we should prefer readability over performance.
https://stackoverflow.com/a/55256047

We might want to consider the usage of __slots__. This could be interesting for that https://stackoverflow.com/questions/50180735/how-can-dataclasses-be-made-to-work-better-with-slots.

def __init__(
self,
exchanges: Optional[list[str]] = None,
gdict: Optional[dict] = None,
cache: Optional[dict] = None,
):
if exchanges is None:
exchanges = list(config.EXCHANGES)
if gdict is None:
gdict = {}
if cache is None:
cache = {}

self.gdict = gdict
self.cache = cache
self.RateLimit = RateLimit()

# Saves the priority for a certain path so that bad paths can be skipped.
self.priority: collections.defaultdict[str, int] = collections.defaultdict(int)
allpairs: set[tuple[str, str, str, str]] = set()

for exchange_id in exchanges:
exchange_class = getattr(ccxt, exchange_id)
exchange = exchange_class()
markets = exchange.fetch_markets()
assert isinstance(markets, list)
if exchange_id == "kraken":
log.warning(
"""Kraken is currently not supported due to only supporting
the last 720 candles of historic data"""
)
continue
if exchange.has["fetchOHLCV"]:
toadd = [
(i["base"], i["quote"], exchange_id, i["symbol"]) for i in markets
]
for pair in toadd:
allpairs.add(pair)
else:
log.warning(
f"{exchange.name} does not support fetch ohlcv. "
f"Ignoring exchange and {len(markets)} pairs."
)

# Remove duplicate pairs.
# TODO It might be faster to create it directly as set.
# Is it even necessary to convert it to a list?
# allpairs = list(set(allpairs))
allpairslist: list[tuple[str, str, str, str]] = list(allpairs)
del allpairs
# print("Total Pairs to check:", len(allpairs))

# Sorting by `symbol` to have the same result on every run due to the set.
allpairslist.sort(key=lambda x: x[3])

for base, quote, exchange, symbol in allpairslist:
self.add_Vertex(base)
self.add_Vertex(quote)
self.add_Edge(
base, quote, {"exchange": exchange, "symbol": symbol, "inverted": False}
)
self.add_Edge(
quote, base, {"exchange": exchange, "symbol": symbol, "inverted": True}
)

def edges(self):
return self.find_edges()

# Find the distinct list of edges

def find_edges(self):
edgename = []
for vrtx in self.gdict:
for nxtvrtx in self.gdict[vrtx]:
if {nxtvrtx, vrtx} not in edgename:
edgename.append({vrtx, nxtvrtx})
return edgename

def get_Vertices(self):
return list(self.gdict.keys())

# Add the vertex as a key
def add_Vertex(self, vrtx):
if vrtx not in self.gdict:
self.gdict[vrtx] = []

def add_Edge(self, vrtx1, vrtx2, data):
if vrtx1 in self.gdict:
self.gdict[vrtx1].append((vrtx2, data))
else:
self.gdict[vrtx1] = [
(vrtx2, data),
]

def _get_path(self, start, stop, maxdepth, depth=0):
"""
a recursive function for finding all possible paths between to vertices
"""
paths = []
if (edges := self.gdict.get(start)) and maxdepth > depth:
for edge in edges: # list of edges starting from the start vertice
if depth == 0 and edge[0] == stop:
paths.append([edge])
elif edge[0] == stop:
paths.append(edge)
else:
path = self._get_path(edge[0], stop, maxdepth, depth=depth + 1)
if len(path) and path is not None:
for p in path:
if p[0] == stop:
newpath = [edge]
newpath.append(p)
paths.append(newpath)
return paths

def change_prio(self, key, value):
ke = "-".join(key)
self.priority[ke] += value

def get_path(
self, start, stop, starttime=0, stoptime=0, preferredexchange=None, maxdepth=3
):
def comb_sort_key(path):
"""
Sorting function which is used to prioritize paths by:
(in order of magnitude)
- smallest length -> +1 per element
- preferred exchange -> +1 per exchange which is not preferred
- priority -> +0.5 per unfinished execution of path
- volume (if known) -> 1/sum(avg_vol per pair)
- volume (if not known) -> 1 -> always smaller if volume is known
"""
# prioritze pairs with the preferred exchange
volume = 1
volumenew = 1
priority = self.priority.get("-".join([a[1]["symbol"] for a in path]), 0)
pathlis = (a if (a := check_cache(pair)) else None for pair in path)
for possiblepath in pathlis:
if possiblepath and possiblepath[0]:
if possiblepath[1][1]["stoptime"] == 0:
break
elif possiblepath[1][1]["avg_vol"] != 0:
# is very much off because volume is not in the same
# currency something for later
# volumenew*= volume of next thing in path
# (needs to be fixed for inverted paths)
volumenew *= possiblepath[1][1]["avg_vol"]

else:
break
else:
volume = 1 / volumenew
temppriority = volume + priority

if preferredexchange:

return (
len(path)
+ sum(
[
0 if pair[1]["exchange"] == preferredexchange else 1
for pair in path
]
)
+ temppriority
)
else:
return len(path) + temppriority

def check_cache(pair):
"""
checking if the start and stoptime of a pair is already known
or if it needs to be downloaded
"""
if pair[1].get("starttime") or pair[1].get("stoptime"):
return True, pair
if cacheres := self.cache.get(pair[1]["exchange"] + pair[1]["symbol"]):
pair[1]["starttime"] = cacheres[0]
pair[1]["stoptime"] = cacheres[1]
pair[1]["avg_vol"] = cacheres[2]
return True, pair
return False, pair

def get_active_timeframe(path, starttimestamp=0, stoptimestamp=-1):
rangeinms = 0
timeframe = int(6.048e8) # week in ms
if starttimestamp == 0:
starttimestamp = 1325372400 * 1000
if stoptimestamp == -1:
stoptimestamp = time.time_ns() // 1_000_000 # get cur time in ms
starttimestamp -= timeframe # to handle edge cases
if stoptimestamp > starttimestamp:
rangeinms = stoptimestamp - starttimestamp
else:
rangeinms = 0 # maybe throw error

# add one candle to the end to ensure the needed
# timeslot is in the requested candles
rangeincandles = int(rangeinms / timeframe) + 1

# todo: cache already used pairs
globalstarttime = 0
globalstoptime = 0
for i in range(len(path)):
cached, path[i] = check_cache(path[i])
if not cached:
exchange_class = getattr(ccxt, path[i][1]["exchange"])
exchange = exchange_class()

self.RateLimit.limit(exchange)
timeframeexchange = exchange.timeframes.get("1w")
if (
timeframeexchange
): # this must be handled better maybe choose timeframe dynamically
# maybe cache this per pair
ohlcv = exchange.fetch_ohlcv(
path[i][1]["symbol"], "1w", starttimestamp, rangeincandles
)
else:
ohlcv = [] # do not check fail later
if len(ohlcv) > 1:
# (candle ends after the date + timeframe)
path[i][1]["stoptime"] = ohlcv[-1][0] + timeframe
path[i][1]["avg_vol"] = sum([vol[-1] for vol in ohlcv]) / len(
ohlcv
) # avg vol in curr
path[i][1]["starttime"] = ohlcv[0][0]
if (
path[i][1]["stoptime"] < globalstoptime
or globalstoptime == 0
):
globalstoptime = path[i][1]["stoptime"]
if path[i][1]["starttime"] > globalstarttime:
globalstarttime = path[i][1]["starttime"]
else:
path[i][1]["stoptime"] = 0
path[i][1]["starttime"] = 0
path[i][1]["avg_vol"] = 0
self.cache[path[i][1]["exchange"] + path[i][1]["symbol"]] = (
path[i][1]["starttime"],
path[i][1]["stoptime"],
path[i][1]["avg_vol"],
)
else:

if (
path[i][1]["stoptime"] < globalstoptime or globalstoptime == 0
) and path[i][1]["stoptime"] != 0:
globalstoptime = path[i][1]["stoptime"]
if path[i][1]["starttime"] > globalstarttime:
globalstarttime = path[i][1]["starttime"]
ohlcv = []
return (globalstarttime, globalstoptime), path

# get all possible paths which are no longer than 4 pairs long
paths = self._get_path(start, stop, maxdepth)
# sort by path length to get minimal conversion chain to reduce error
paths = sorted(paths, key=comb_sort_key)
# get timeframe in which a path is viable
for path in paths:
timest, newpath = get_active_timeframe(path)
# this is implemented as a generator (hence the yield) to reduce
# the amount of computing needed. if the first path fails the next is used
if starttime == 0 and stoptime == 0:
yield timest, newpath
elif starttime == 0:
if stoptime < timest[1]:
yield timest, newpath
elif stoptime == 0:
if starttime > timest[0]:
yield timest, newpath
# The most ideal situation is if the timerange of the path is known
# and larger than the needed timerange
else:
if stoptime < timest[1] and starttime > timest[0]:
yield timest, newpath


if __name__ == "__main__":
g = PricePath(exchanges=["binance", "coinbasepro"])
start = "IOTA"
to = "EUR"
preferredexchange = "binance"
path = g.get_path(start, to, maxdepth=2, preferredexchange=preferredexchange)
# debug only in actual use we would iterate over
# the path object fetching new paths as needed
path = list(path)
print(len(path))
1 change: 1 addition & 0 deletions src/log_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,4 @@

# Disable urllib debug messages
getLogger("urllib3").propagate = False
getLogger("ccxt").propagate = False
Loading