diff --git a/scripts/influx_metrics.py b/scripts/influx_metrics.py index 9cbce24..f061ac6 100644 --- a/scripts/influx_metrics.py +++ b/scripts/influx_metrics.py @@ -80,8 +80,9 @@ def get_params() -> tp.Dict: ''' return { "points": 30, - "window": 6, + "window": 3600, "period": 600, + "tolerance": 300, "alpha": [0.05, 0.01, 0.001, 0.0001], "n": [144, 1008, 2016, 4320], } @@ -192,6 +193,7 @@ def get_price_cumulatives( print(f'Fetching prices for {qid} ...') query = f''' from(bucket:"{bucket}") |> range(start: -{points}d) + |> filter(fn: (r) => r["_measurement"] == "mem") |> filter(fn: (r) => r["id"] == "{qid}") ''' df = query_api.query_data_frame(query=query, org=org) @@ -224,86 +226,102 @@ def compute_amount_out(twap_112: np.ndarray, amount_in: int) -> np.ndarray: amount_in [int]: Unit value for the quote currency in pair e.g. WETH in SushiSwap YFI/WETH uses `amount_in = 1e18` (18 decimals) - Outputs: [np.ndarray]: - ''' - rshift = np.vectorize(lambda x: int(x * amount_in) >> PC_RESOLUTION) - return rshift(twap_112) - -def get_twap(pc: pd.DataFrame, q: tp.Dict, p: tp.Dict) -> pd.DataFrame: + Note: + Using `np.vectorize` results in an OverFlow error. Using + `np.vectorize` will be reconsidered when/if data size is too large. + ''' + # rshift = np.vectorize(lambda x: int(x * amount_in) >> PC_RESOLUTION) + rshift = np.array(np.zeros(len(twap_112))) + for i in range(0, len(rshift)): + rshift[i] = int(twap_112[i] * amount_in) >> PC_RESOLUTION + return rshift + + +def dynamic_window( + df: pd.DataFrame, + max_rows: int, + window: int + ) -> pd.DataFrame: + ''' + Computes the window size in terms of rows such that there is as much data + as there are seconds specified in the `window` variable. ''' - Calculates the rolling Time Weighted Average Price (TWAP) values for each - (`_time`, `_value`) row in the `priceCumulatives` dataframe. Rolling TWAP - values are calculated with a window size of `params['window']`. - Inputs: - pc [pf.DataFrame]: `priceCumulatives` - _time [int]: Unix timestamp - _field [str]: Price cumulative field, `price0Cumulative` or - `price1Cumulatives` - _value [int]: Price cumulative field at unix timestamp `_time` + for i in range(1, int(max_rows+1)): + df['lag_time'] = df[['_time']].shift(i) + df[i] =\ + (pd.to_datetime(df['_time']) - pd.to_datetime(df['lag_time']))\ + .dt.total_seconds() + df[i] = abs(df[i] - window) + df.drop(['lag_time'], axis=1, inplace=True) - q [tp.Dict]: Quote pair entry fetched from SushiSwap - id [str]: Name of swap pair - pair [str]: Contract address of swap pair - token0 [str]: Contract address of token 0 in swap pair - token1 [str]: Contract address of token 1 in swap pair - is_price0 [bool]: If true, use the TWAP value calculated from the - `priceCumulative0` storage variable: - `price0 = num_token_1 / num_token_0` + min_df = df[[i for i in range(1, int(max_rows+1))]]\ + .idxmin(axis="columns") - If false, use the TWAP value calculated from the - `priceCumulative1` storage variable - p [tp.Dict]: Parameters to use in statistical estimates - points [int]: 1 mo of data behind to estimate mles - window [int]: 1h TWAPs (assuming ovl_sushi ingested every - 10m) - period [int]: 10m periods [s] - alpha List[float]: alpha uncertainty in VaR calc - n: List[int]: number of periods into the future over which - VaR is calculated + df.dropna(inplace=True) + df = df.join(pd.DataFrame(min_df, columns=['dynamic_window'])) + df['dynamic_window'] = df['dynamic_window'].astype(int) + return df - Outputs: - [pd.DataFrame]: + +def delta_window( + row: pd.Series, + values: pd.Series, + lookback: pd.Series + ) -> pd.Series: + ''' + Computes difference based on window sizes specified in `lookback` ''' - window = p['window'] - dp = pc.filter(items=['_value'])\ - .rolling(window=window)\ - .apply(lambda w: w[-1] - w[0], raw=True) + loc = values.index.get_loc(row.name) + lb = lookback.loc[row.name] + return values.iloc[loc] - values.iloc[loc-lb] - # for time, need to map to timestamp first then apply delta - dt = pc.filter(items=['_time'])\ - .applymap(datetime.timestamp)\ - .rolling(window=window)\ - .apply(lambda w: w[-1] - w[0], raw=True) - # Filter out NaNs - twap_112 = (dp['_value'] / dt['_time']).to_numpy() +def get_twap(pc: pd.DataFrame, q: tp.Dict, p: tp.Dict) -> pd.DataFrame: + window = p['window'] + period = p['period'] + + max_rows = ((window/period)+1) * 2 + + pc = dynamic_window(pc, int(max_rows), int(window)) + pc['dp'] = pc.apply( + delta_window, + values=pc['_value'], + lookback=pc['dynamic_window'], + axis=1) + pc['dt'] = pc.apply( + delta_window, + values=pc['_time'], + lookback=pc['dynamic_window'], + axis=1).dt.total_seconds() + + # with NaNs filtered out + twap_112 = (pc['dp'] / pc['dt']).to_numpy() twap_112 = twap_112[np.logical_not(np.isnan(twap_112))] twaps = compute_amount_out(twap_112, q['amount_in']) - window_times = dt['_time'].to_numpy() + # window times + window_times = pc['dt'].to_numpy() window_times = window_times[np.logical_not(np.isnan(window_times))] - # Window close timestamps + # window close timestamps t = pc.filter(items=['_time'])\ .applymap(datetime.timestamp)\ .rolling(window=window)\ .apply(lambda w: w[-1], raw=True) - ts = t['_time'].to_numpy() ts = ts[np.logical_not(np.isnan(ts))] df = pd.DataFrame(data=[ts, window_times, twaps]).T df.columns = ['timestamp', 'window', 'twap'] - # Filter out any TWAPs that are less than or equal to 0; - # TODO: Why? Ingestion from sushi? + # filter out any twaps that are less than or equal to 0; + # TODO: why? injestion from sushi? df = df[df['twap'] > 0] - return df