Skip to content

Commit

Permalink
restructuring GQLDataFactory such that it takes in PPSS, rather than …
Browse files Browse the repository at this point in the history
…pieces of it
  • Loading branch information
idiom-bytes committed Dec 14, 2023
1 parent 174814d commit 76aae07
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 21 deletions.
33 changes: 14 additions & 19 deletions pdr_backend/data_eng/gql_data_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,7 @@
from enforce_typing import enforce_types
import polars as pl

from pdr_backend.ppss.data_pp import DataPP
from pdr_backend.ppss.data_ss import DataSS
from pdr_backend.ppss.web3_pp import Web3PP
from pdr_backend.ppss.ppss import PPSS
from pdr_backend.util.timeutil import pretty_timestr, current_ut

from pdr_backend.data_eng.plutil import (
Expand All @@ -23,7 +21,6 @@
get_pdr_predictions_df,
)


@enforce_types
class GQLDataFactory:
"""
Expand All @@ -37,27 +34,25 @@ class GQLDataFactory:
- "datetime" values ares python datetime.datetime, UTC
"""

def __init__(self, pp: DataPP, ss: DataSS, web3: Web3PP):
self.pp = pp
self.ss = ss
self.web3 = web3
def __init__(self, ppss: PPSS):
self.ppss = ppss

# TO DO: Solve duplicates from subgraph.
# Method 1: Cull anything returned outside st_ut, fin_ut
self.debug_duplicate = False

# TO DO: This code has DRY problems. Reduce.
# get network
if "main" in self.web3.network:
if "main" in self.ppss.web3_pp.network:
network = "mainnet"
elif "test" in self.web3.network:
elif "test" in self.ppss.web3_pp.network:
network = "testnet"
else:
raise ValueError(self.web3.network)
raise ValueError(self.ppss.web3_pp.network)

# filter by feed contract address
contract_list = get_all_contract_ids_by_owner(
owner_address=self.web3.owner_addrs,
owner_address=self.ppss.web3_pp.owner_addrs,
network=network,
)
contract_list = [f.lower() for f in contract_list]
Expand Down Expand Up @@ -86,9 +81,9 @@ def get_gql_dfs(self) -> Dict[str, pl.DataFrame]:
# Ss_timestamp is calculated dynamically if ss.fin_timestr = "now".
# But, we don't want fin_timestamp changing as we gather data here.
# To solve, for a given call to this method, we make a constant fin_ut
fin_ut = self.ss.fin_timestamp
fin_ut = self.ppss.data_ss.fin_timestamp

print(f" Data start: {pretty_timestr(self.ss.st_timestamp)}")
print(f" Data start: {pretty_timestr(self.ppss.data_ss.st_timestamp)}")
print(f" Data fin: {pretty_timestr(fin_ut)}")

self._update(fin_ut)
Expand Down Expand Up @@ -134,7 +129,7 @@ def _update(self, fin_ut: int):

# call the function
print(f" Fetching {k}")
df = do_fetch(self.web3.network, st_ut, fin_ut, record["config"])
df = do_fetch(self.ppss.web3_pp.network, st_ut, fin_ut, record["config"])

# postcondition
assert df.schema == record["schema"]
Expand All @@ -156,13 +151,13 @@ def _calc_start_ut(self, filename: str) -> int:
"""
if not os.path.exists(filename):
print(" No file exists yet, so will fetch all data")
return self.ss.st_timestamp
return self.ppss.data_ss.st_timestamp

print(" File already exists")
if not has_data(filename):
print(" File has no data, so delete it")
os.remove(filename)
return self.ss.st_timestamp
return self.ppss.data_ss.st_timestamp

file_utN = newest_ut(filename)
return file_utN + 1000
Expand All @@ -177,7 +172,7 @@ def _load_parquet(self, fin_ut: int) -> Dict[str, pl.DataFrame]:
Where df has columns=GQL_COLS+"datetime", and index=timestamp
"""
print(" Load parquet.")
st_ut = self.ss.st_timestamp
st_ut = self.ppss.data_ss.st_timestamp

dfs: Dict[str, pl.DataFrame] = {} # [parquet_filename] : df

Expand Down Expand Up @@ -209,7 +204,7 @@ def _parquet_filename(self, filename_str: str) -> str:
parquet_filename -- name for parquet file.
"""
basename = f"{filename_str}.parquet"
filename = os.path.join(self.ss.parquet_dir, basename)
filename = os.path.join(self.ppss.data_ss.parquet_dir, basename)
return filename

@enforce_types
Expand Down
4 changes: 2 additions & 2 deletions pdr_backend/data_eng/test/resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ def _data_pp_ss_1feed(tmpdir, feed, st_timestr=None, fin_timestr=None):
def _gql_data_factory(tmpdir, feed, st_timestr=None, fin_timestr=None):
network = "sapphire-mainnet"
ppss = mock_ppss("5m", [feed], network, str(tmpdir), st_timestr, fin_timestr)
web3_pp = mock_web3_pp(network)
gql_data_factory = GQLDataFactory(ppss.data_pp, ppss.data_ss, web3_pp)
ppss.web3_pp = mock_web3_pp(network)
gql_data_factory = GQLDataFactory(ppss)
return ppss, gql_data_factory


Expand Down

0 comments on commit 76aae07

Please sign in to comment.