Skip to content

Commit

Permalink
Towards #462: Separate lake and aimodel SS, lake command (#473)
Browse files Browse the repository at this point in the history
* Split aimodel and lake ss.
* Split data ss tests.
* Add aimodel ss into predictoor ss.
* Remove stray data_ss.
* Moves test_n to sim ss.
* Trader ss to use own feed instead of data pp.
* Remove data pp entirely.
* Correct ohlcv data factory.
* Add timeframe into arg feeds.
* Refine and add tests for timeframe in arg feed.
* Remove timeframe dependency in trader and predictoor.
* Remove timeframe from lake ss keys.
* Singleify trader agents.
* Adds lake command, assert timeframe in lake (needed for columns).
* Process all signals in lake.
  • Loading branch information
calina-c authored Dec 29, 2023
1 parent 1dec231 commit bd01d71
Show file tree
Hide file tree
Showing 48 changed files with 822 additions and 811 deletions.
12 changes: 5 additions & 7 deletions pdr_backend/aimodel/aimodel_data_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@
import polars as pl
from enforce_typing import enforce_types

from pdr_backend.ppss.data_pp import DataPP
from pdr_backend.ppss.data_ss import DataSS
from pdr_backend.ppss.predictoor_ss import PredictoorSS
from pdr_backend.util.mathutil import fill_nans, has_nan


Expand Down Expand Up @@ -38,8 +37,7 @@ class AimodelDataFactory:
- "timestamp" values are ut: int is unix time, UTC, in ms (not s)
"""

def __init__(self, pp: DataPP, ss: DataSS):
self.pp = pp
def __init__(self, ss: PredictoorSS):
self.ss = ss

def create_xy(
Expand Down Expand Up @@ -73,7 +71,7 @@ def create_xy(
# condition inputs
if do_fill_nans and has_nan(mergedohlcv_df):
mergedohlcv_df = fill_nans(mergedohlcv_df)
ss = self.ss
ss = self.ss.aimodel_ss

# main work
x_df = pd.DataFrame() # build this up
Expand Down Expand Up @@ -107,8 +105,8 @@ def create_xy(

# y is set from yval_{exch_str, signal_str, pair_str}
# eg y = [BinEthC_-1, BinEthC_-2, ..., BinEthC_-450, BinEthC_-451]
pp = self.pp
hist_col = f"{pp.exchange_str}:{pp.pair_str}:{pp.signal_str}"
ref_ss = self.ss
hist_col = f"{ref_ss.exchange_str}:{ref_ss.pair_str}:{ref_ss.signal_str}"
z = mergedohlcv_df[hist_col].to_list()
y = np.array(_slice(z, -testshift - N_train - 1, -testshift))

Expand Down
9 changes: 8 additions & 1 deletion pdr_backend/aimodel/test/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,12 @@
@enforce_types
@pytest.fixture()
def aimodel_factory():
aimodel_ss = AimodelSS({"approach": "LIN"})
aimodel_ss = AimodelSS(
{
"approach": "LIN",
"max_n_train": 7,
"autoregressive_n": 3,
"input_feeds": ["binance BTC/USDT c"],
}
)
return AimodelFactory(aimodel_ss)
79 changes: 38 additions & 41 deletions pdr_backend/aimodel/test/test_aimodel_data_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,34 +12,30 @@
ETHUSDT_RAWOHLCV_DFS,
KRAKEN_BTC_DATA,
KRAKEN_ETH_DATA,
_data_pp,
_data_pp_ss_1feed,
_data_ss,
_predictoor_ss,
_predictoor_ss_1feed,
_df_from_raw_data,
_mergedohlcv_df_ETHUSDT,
)
from pdr_backend.ppss.data_pp import DataPP
from pdr_backend.ppss.data_ss import DataSS
from pdr_backend.ppss.aimodel_ss import AimodelSS
from pdr_backend.ppss.predictoor_ss import PredictoorSS
from pdr_backend.util.mathutil import fill_nans, has_nan


@enforce_types
def test_create_xy__0(tmpdir):
data_pp = DataPP(
{
"timeframe": "5m",
"predict_feeds": ["binanceus ETH/USDT c"],
"sim_only": {"test_n": 2},
}
)
data_ss = DataSS(
def test_create_xy__0():
predictoor_ss = PredictoorSS(
{
"input_feeds": ["binanceus ETH/USDT oc"],
"parquet_dir": str(tmpdir),
"st_timestr": "2023-06-18", # not used by AimodelDataFactory
"fin_timestr": "2023-06-21", # ""
"max_n_train": 4,
"autoregressive_n": 2,
"predict_feed": "binanceus ETH/USDT c 5m",
"bot_only": {
"s_until_epoch_end": 60,
"stake_amount": 1,
},
"aimodel_ss": {
"input_feeds": ["binanceus ETH/USDT oc"],
"approach": "LIN",
"max_n_train": 4,
"autoregressive_n": 2,
},
}
)
mergedohlcv_df = pl.DataFrame(
Expand Down Expand Up @@ -71,18 +67,20 @@ def test_create_xy__0(tmpdir):
}
)

factory = AimodelDataFactory(data_pp, data_ss)
factory = AimodelDataFactory(predictoor_ss)
X, y, x_df = factory.create_xy(mergedohlcv_df, testshift=0)

_assert_pd_df_shape(data_ss, X, y, x_df)
_assert_pd_df_shape(predictoor_ss.aimodel_ss, X, y, x_df)
assert np.array_equal(X, target_X)
assert np.array_equal(y, target_y)
assert x_df.equals(target_x_df)


@enforce_types
def test_create_xy__1exchange_1coin_1signal(tmpdir):
_, ss, _, aimodel_data_factory = _data_pp_ss_1feed(tmpdir, "binanceus ETH/USDT h")
ss, _, aimodel_data_factory = _predictoor_ss_1feed(
tmpdir, "binanceus ETH/USDT h 5m"
)
mergedohlcv_df = merge_rawohlcv_dfs(ETHUSDT_RAWOHLCV_DFS)

# =========== have testshift = 0
Expand Down Expand Up @@ -121,7 +119,7 @@ def test_create_xy__1exchange_1coin_1signal(tmpdir):

X, y, x_df = aimodel_data_factory.create_xy(mergedohlcv_df, testshift=0)

_assert_pd_df_shape(ss, X, y, x_df)
_assert_pd_df_shape(ss.aimodel_ss, X, y, x_df)
assert np.array_equal(X, target_X)
assert np.array_equal(y, target_y)
assert x_df.equals(target_x_df)
Expand Down Expand Up @@ -161,7 +159,7 @@ def test_create_xy__1exchange_1coin_1signal(tmpdir):

X, y, x_df = aimodel_data_factory.create_xy(mergedohlcv_df, testshift=1)

_assert_pd_df_shape(ss, X, y, x_df)
_assert_pd_df_shape(ss.aimodel_ss, X, y, x_df)
assert np.array_equal(X, target_X)
assert np.array_equal(y, target_y)
assert x_df.equals(target_x_df)
Expand All @@ -186,21 +184,19 @@ def test_create_xy__1exchange_1coin_1signal(tmpdir):
}
)

assert "max_n_train" in ss.d
ss.d["max_n_train"] = 5
assert "max_n_train" in ss.aimodel_ss.d
ss.aimodel_ss.d["max_n_train"] = 5

X, y, x_df = aimodel_data_factory.create_xy(mergedohlcv_df, testshift=0)

_assert_pd_df_shape(ss, X, y, x_df)
_assert_pd_df_shape(ss.aimodel_ss, X, y, x_df)
assert np.array_equal(X, target_X)
assert np.array_equal(y, target_y)
assert x_df.equals(target_x_df)


@enforce_types
def test_create_xy__2exchanges_2coins_2signals(tmpdir):
parquet_dir = str(tmpdir)

def test_create_xy__2exchanges_2coins_2signals():
rawohlcv_dfs = {
"binanceus": {
"BTC/USDT": _df_from_raw_data(BINANCE_BTC_DATA),
Expand All @@ -212,20 +208,19 @@ def test_create_xy__2exchanges_2coins_2signals(tmpdir):
},
}

pp = _data_pp(["binanceus ETH/USDT h"])
ss = _data_ss(
parquet_dir,
ss = _predictoor_ss(
"binanceus ETH/USDT h 5m",
["binanceus BTC/USDT,ETH/USDT hl", "kraken BTC/USDT,ETH/USDT hl"],
)
assert ss.autoregressive_n == 3
assert ss.n == (4 + 4) * 3
assert ss.aimodel_ss.autoregressive_n == 3
assert ss.aimodel_ss.n == (4 + 4) * 3

mergedohlcv_df = merge_rawohlcv_dfs(rawohlcv_dfs)

aimodel_data_factory = AimodelDataFactory(pp, ss)
aimodel_data_factory = AimodelDataFactory(ss)
X, y, x_df = aimodel_data_factory.create_xy(mergedohlcv_df, testshift=0)

_assert_pd_df_shape(ss, X, y, x_df)
_assert_pd_df_shape(ss.aimodel_ss, X, y, x_df)
found_cols = x_df.columns.tolist()
target_cols = [
"binanceus:BTC/USDT:high:t-4",
Expand Down Expand Up @@ -319,7 +314,7 @@ def test_create_xy__input_type(tmpdir):
@enforce_types
def test_create_xy__handle_nan(tmpdir):
# create mergedohlcv_df
_, _, _, aimodel_data_factory = _data_pp_ss_1feed(tmpdir, "binanceus ETH/USDT h")
_, _, aimodel_data_factory = _predictoor_ss_1feed(tmpdir, "binanceus ETH/USDT h 5m")
mergedohlcv_df = merge_rawohlcv_dfs(ETHUSDT_RAWOHLCV_DFS)

# initial mergedohlcv_df should be ok
Expand Down Expand Up @@ -365,7 +360,9 @@ def test_create_xy__handle_nan(tmpdir):


@enforce_types
def _assert_pd_df_shape(ss: DataSS, X: np.ndarray, y: np.ndarray, x_df: pd.DataFrame):
def _assert_pd_df_shape(
ss: AimodelSS, X: np.ndarray, y: np.ndarray, x_df: pd.DataFrame
):
assert X.shape[0] == y.shape[0]
assert X.shape[0] == (ss.max_n_train + 1) # 1 for test, rest for train
assert X.shape[1] == ss.n
Expand Down
9 changes: 8 additions & 1 deletion pdr_backend/aimodel/test/test_aimodel_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,14 @@
@enforce_types
def test_aimodel_factory_basic():
for approach in APPROACHES:
aimodel_ss = AimodelSS({"approach": approach})
aimodel_ss = AimodelSS(
{
"approach": approach,
"max_n_train": 7,
"autoregressive_n": 3,
"input_feeds": ["binance BTC/USDT c"],
}
)
factory = AimodelFactory(aimodel_ss)
assert isinstance(factory.aimodel_ss, AimodelSS)

Expand Down
6 changes: 3 additions & 3 deletions pdr_backend/analytics/get_traction_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@
def get_traction_info_main(
ppss: PPSS, start_timestr: str, end_timestr: str, pq_dir: str
):
data_ss = ppss.data_ss
data_ss.d["st_timestr"] = start_timestr
data_ss.d["fin_timestr"] = end_timestr
lake_ss = ppss.lake_ss
lake_ss.d["st_timestr"] = start_timestr
lake_ss.d["fin_timestr"] = end_timestr

gql_data_factory = GQLDataFactory(ppss)
gql_dfs = gql_data_factory.get_gql_dfs()
Expand Down
2 changes: 1 addition & 1 deletion pdr_backend/analytics/test/test_check_network.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ def test_check_network_main( # pylint: disable=unused-argument
monkeypatch,
):
del_network_override(monkeypatch)
ppss = mock_ppss("5m", ["binance BTC/USDT c"], "sapphire-mainnet", str(tmpdir))
ppss = mock_ppss(["binance BTC/USDT c 5m"], "sapphire-mainnet", str(tmpdir))

mock_get_opf_addresses.return_value = {
"dfbuyer": "0xdfBuyerAddress",
Expand Down
2 changes: 1 addition & 1 deletion pdr_backend/analytics/test/test_get_predictions_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ def test_get_predictions_info_main_mainnet(
monkeypatch,
):
del_network_override(monkeypatch)
ppss = mock_ppss("5m", ["binance BTC/USDT c"], "sapphire-mainnet", str(tmpdir))
ppss = mock_ppss(["binance BTC/USDT c 5m"], "sapphire-mainnet", str(tmpdir))

mock_getids = Mock(return_value=["0x123", "0x234"])
mock_fetch = Mock(return_value=_sample_first_predictions)
Expand Down
2 changes: 1 addition & 1 deletion pdr_backend/analytics/test/test_get_predictoors_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
@enforce_types
def test_get_predictoors_info_main_mainnet(tmpdir, monkeypatch):
del_network_override(monkeypatch)
ppss = mock_ppss("5m", ["binance BTC/USDT c"], "sapphire-mainnet", str(tmpdir))
ppss = mock_ppss(["binance BTC/USDT c 5m"], "sapphire-mainnet", str(tmpdir))

mock_fetch = Mock(return_value=[])
mock_save = Mock()
Expand Down
2 changes: 1 addition & 1 deletion pdr_backend/analytics/test/test_get_traction_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ def test_get_traction_info_main_mainnet(
monkeypatch,
):
del_network_override(monkeypatch)
ppss = mock_ppss("5m", ["binance BTC/USDT c"], "sapphire-mainnet", str(tmpdir))
ppss = mock_ppss(["binance BTC/USDT c 5m"], "sapphire-mainnet", str(tmpdir))

mock_traction_stat = Mock()
mock_plot_cumsum = Mock()
Expand Down
Loading

0 comments on commit bd01d71

Please sign in to comment.