Skip to content

Commit

Permalink
Fix #436 - Implement GQL data factory (PR #438)
Browse files Browse the repository at this point in the history
* First pass on gql data factory

Co-authored-by: trentmc <[email protected]>
  • Loading branch information
idiom-bytes and trentmc authored Dec 15, 2023
1 parent 8fe215d commit d3029a7
Show file tree
Hide file tree
Showing 34 changed files with 1,258 additions and 420 deletions.
19 changes: 13 additions & 6 deletions pdr_backend/accuracy/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,32 +77,39 @@ def save_statistics_to_file():
"0x4ac2e51f9b1b0ca9e000dfe6032b24639b172703", network_param
)

contract_information = fetch_contract_id_and_spe(contract_addresses, network_param)
contracts_list_unfiltered = fetch_contract_id_and_spe(
contract_addresses,
network_param,
)

while True:
try:
output = []

for statistic_type in statistic_types:
seconds_per_epoch = statistic_type["seconds_per_epoch"]
contracts = list(
contracts_list = list(
filter(
lambda item, spe=seconds_per_epoch: int(
item["seconds_per_epoch"]
)
== spe,
contract_information,
contracts_list_unfiltered,
)
)

start_ts_param, end_ts_param = calculate_timeframe_timestamps(
statistic_type["alias"]
)

contract_ids = [contract["id"] for contract in contracts]
# Get statistics for all contracts
contract_ids = [contract_item["ID"] for contract_item in contracts_list]

statistics = calculate_statistics_for_all_assets(
contract_ids, contracts, start_ts_param, end_ts_param, network_param
contract_ids,
contracts_list,
start_ts_param,
end_ts_param,
network_param,
)

output.append(
Expand Down
227 changes: 227 additions & 0 deletions pdr_backend/data_eng/gql_data_factory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,227 @@
import os
from typing import Dict, Callable

from enforce_typing import enforce_types
import polars as pl

from pdr_backend.data_eng.plutil import (
has_data,
newest_ut,
)
from pdr_backend.data_eng.table_pdr_predictions import (
predictions_schema,
get_pdr_predictions_df,
)
from pdr_backend.ppss.ppss import PPSS
from pdr_backend.util.networkutil import get_sapphire_postfix
from pdr_backend.util.subgraph_predictions import get_all_contract_ids_by_owner
from pdr_backend.util.timeutil import pretty_timestr, current_ut


@enforce_types
class GQLDataFactory:
"""
Roles:
- From each GQL API, fill >=1 gql_dfs -> parquet files data lake
- From gql_dfs, calculate other dfs and stats
- All timestamps, after fetching, are transformed into milliseconds wherever appropriate
Finally:
- "timestamp" values are ut: int is unix time, UTC, in ms (not s)
- "datetime" values ares python datetime.datetime, UTC
"""

def __init__(self, ppss: PPSS):
self.ppss = ppss

# filter by feed contract address
network = get_sapphire_postfix(ppss.web3_pp.network)
contract_list = get_all_contract_ids_by_owner(
owner_address=self.ppss.web3_pp.owner_addrs,
network=network,
)
contract_list = [f.lower() for f in contract_list]

# configure all tables that will be recorded onto lake
self.record_config = {
"pdr_predictions": {
"fetch_fn": get_pdr_predictions_df,
"schema": predictions_schema,
"config": {
"contract_list": contract_list,
},
},
}

def get_gql_dfs(self) -> Dict[str, pl.DataFrame]:
"""
@description
Get historical dataframes across many feeds and timeframes.
@return
predictions_df -- *polars* Dataframe. See class docstring
"""
print("Get predictions data across many feeds and timeframes.")

# Ss_timestamp is calculated dynamically if ss.fin_timestr = "now".
# But, we don't want fin_timestamp changing as we gather data here.
# To solve, for a given call to this method, we make a constant fin_ut
fin_ut = self.ppss.data_ss.fin_timestamp

print(f" Data start: {pretty_timestr(self.ppss.data_ss.st_timestamp)}")
print(f" Data fin: {pretty_timestr(fin_ut)}")

self._update(fin_ut)
gql_dfs = self._load_parquet(fin_ut)

print("Get historical data across many subgraphs. Done.")

# postconditions
assert len(gql_dfs.values()) > 0
for df in gql_dfs.values():
assert isinstance(df, pl.DataFrame)

return gql_dfs

def _update(self, fin_ut: int):
"""
@description
Iterate across all gql queries and update their parquet files:
- Predictoors
- Slots
- Claims
Improve this by:
1. Break out raw data from any transformed/cleaned data
2. Integrate other queries and summaries
3. Integrate config/pp if needed
@arguments
fin_ut -- a timestamp, in ms, in UTC
"""

for k, record in self.record_config.items():
filename = self._parquet_filename(k)
print(f" filename={filename}")

st_ut = self._calc_start_ut(filename)
print(f" Aim to fetch data from start time: {pretty_timestr(st_ut)}")
if st_ut > min(current_ut(), fin_ut):
print(" Given start time, no data to gather. Exit.")
continue

# to satisfy mypy, get an explicit function pointer
do_fetch: Callable[[str, int, int, Dict], pl.DataFrame] = record["fetch_fn"]

# call the function
print(f" Fetching {k}")
df = do_fetch(self.ppss.web3_pp.network, st_ut, fin_ut, record["config"])

# postcondition
if len(df) > 0:
assert df.schema == record["schema"]

# save to parquet
self._save_parquet(filename, df)

def _calc_start_ut(self, filename: str) -> int:
"""
@description
Calculate start timestamp, reconciling whether file exists and where
its data starts. If file exists, you can only append to end.
@arguments
filename - parquet file with data. May or may not exist.
@return
start_ut - timestamp (ut) to start grabbing data for (in ms)
"""
if not os.path.exists(filename):
print(" No file exists yet, so will fetch all data")
return self.ppss.data_ss.st_timestamp

print(" File already exists")
if not has_data(filename):
print(" File has no data, so delete it")
os.remove(filename)
return self.ppss.data_ss.st_timestamp

file_utN = newest_ut(filename)
return file_utN + 1000

def _load_parquet(self, fin_ut: int) -> Dict[str, pl.DataFrame]:
"""
@arguments
fin_ut -- finish timestamp
@return
gql_dfs -- dict of [gql_filename] : df
Where df has columns=GQL_COLS+"datetime", and index=timestamp
"""
print(" Load parquet.")
st_ut = self.ppss.data_ss.st_timestamp

dfs: Dict[str, pl.DataFrame] = {} # [parquet_filename] : df

for k, record in self.record_config.items():
filename = self._parquet_filename(k)
print(f" filename={filename}")

# load all data from file
df = pl.read_parquet(filename)
df = df.filter(
(pl.col("timestamp") >= st_ut) & (pl.col("timestamp") <= fin_ut)
)

# postcondition
assert df.schema == record["schema"]
dfs[k] = df

return dfs

def _parquet_filename(self, filename_str: str) -> str:
"""
@description
Computes the lake-path for the parquet file.
@arguments
filename_str -- eg "subgraph_predictions"
@return
parquet_filename -- name for parquet file.
"""
basename = f"{filename_str}.parquet"
filename = os.path.join(self.ppss.data_ss.parquet_dir, basename)
return filename

@enforce_types
def _save_parquet(self, filename: str, df: pl.DataFrame):
"""write to parquet file
parquet only supports appending via the pyarrow engine
"""

# precondition
assert "timestamp" in df.columns and df["timestamp"].dtype == pl.Int64
assert len(df) > 0
if len(df) > 1:
assert (
df.head(1)["timestamp"].to_list()[0]
< df.tail(1)["timestamp"].to_list()[0]
)

if os.path.exists(filename): # "append" existing file
cur_df = pl.read_parquet(filename)
df = pl.concat([cur_df, df])

# check for duplicates and throw error if any found
duplicate_rows = df.filter(pl.struct("ID").is_duplicated())
if len(duplicate_rows) > 0:
raise Exception(
f"Not saved. Duplicate rows found. {len(duplicate_rows)} rows: {duplicate_rows}"
)

df.write_parquet(filename)
n_new = df.shape[0] - cur_df.shape[0]
print(f" Just appended {n_new} df rows to file {filename}")
else: # write new file
df.write_parquet(filename)
print(f" Just saved df with {df.shape[0]} rows to new file {filename}")
88 changes: 88 additions & 0 deletions pdr_backend/data_eng/table_pdr_predictions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
from typing import List, Dict
from enforce_typing import enforce_types

import polars as pl
from polars import Utf8, Int64, Float64, Boolean

from pdr_backend.util.networkutil import get_sapphire_postfix
from pdr_backend.util.subgraph_predictions import (
fetch_filtered_predictions,
FilterMode,
)
from pdr_backend.util.timeutil import ms_to_seconds

# RAW_PREDICTIONS_SCHEMA
predictions_schema = {
"ID": Utf8,
"pair": Utf8,
"timeframe": Utf8,
"prediction": Boolean,
"stake": Float64,
"trueval": Boolean,
"timestamp": Int64,
"source": Utf8,
"payout": Float64,
"slot": Int64,
"user": Utf8,
}


def _object_list_to_df(objects: List[object], schema: Dict) -> pl.DataFrame:
"""
@description
Convert list objects to a dataframe using their __dict__ structure.
"""
# Get all predictions into a dataframe
obj_dicts = [object.__dict__ for object in objects]
obj_df = pl.DataFrame(obj_dicts, schema=schema)
assert obj_df.schema == schema

return obj_df


def _transform_timestamp_to_ms(df: pl.DataFrame) -> pl.DataFrame:
df = df.with_columns(
[
pl.col("timestamp").mul(1000).alias("timestamp"),
]
)
return df


@enforce_types
def get_pdr_predictions_df(
network: str, st_ut: int, fin_ut: int, config: Dict
) -> pl.DataFrame:
"""
@description
Fetch raw predictions from predictoor subgraph
Update function for graphql query, returns raw data
+ Transforms ts into ms as required for data factory
"""
network = get_sapphire_postfix(network)

# fetch predictions
predictions = fetch_filtered_predictions(
ms_to_seconds(st_ut),
ms_to_seconds(fin_ut),
config["contract_list"],
network,
FilterMode.CONTRACT_TS,
payout_only=False,
trueval_only=False,
)

if len(predictions) == 0:
print(" No predictions to fetch. Exit.")
return pl.DataFrame()

# convert predictions to df and transform timestamp into ms
predictions_df = _object_list_to_df(predictions, predictions_schema)
predictions_df = _transform_timestamp_to_ms(predictions_df)

# cull any records outside of our time range and sort them by timestamp
predictions_df = predictions_df.filter(
pl.col("timestamp").is_between(st_ut, fin_ut)
).sort("timestamp")

return predictions_df
Loading

0 comments on commit d3029a7

Please sign in to comment.