Skip to content

Commit

Permalink
Fix #514: Improve subgraph error handling (#552)
Browse files Browse the repository at this point in the history
* Improving subgraph error handling, rather than throwing exception on dupes, just handle them

* Fixing tests such that they are simulating 1000s records, and working correctly with subgraph chunk_size

* fixing tests

* fixing pylint errors
  • Loading branch information
idiom-bytes authored Jan 23, 2024
1 parent 1257603 commit 4af7175
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 60 deletions.
9 changes: 2 additions & 7 deletions pdr_backend/lake/gql_data_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,13 +226,8 @@ def _save_parquet(self, filename: str, df: pl.DataFrame):
cur_df = pl.read_parquet(filename)
df = pl.concat([cur_df, df])

# check for duplicates and throw error if any found
duplicate_rows = df.filter(pl.struct("ID").is_duplicated())
if len(duplicate_rows) > 0:
raise Exception(
f"Not saved. Duplicate rows found. {len(duplicate_rows)} rows: {duplicate_rows}"
)

# drop duplicates
df = df.filter(pl.struct("ID").is_unique())
df.write_parquet(filename)
n_new = df.shape[0] - cur_df.shape[0]
print(f" Just appended {n_new} df rows to file {filename}")
Expand Down
20 changes: 14 additions & 6 deletions pdr_backend/subgraph/subgraph_predictions.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ class FilterMode(Enum):
CONTRACT_TS = 3


# pylint: disable=too-many-statements
@enforce_types
def fetch_filtered_predictions(
start_ts: int,
Expand Down Expand Up @@ -112,12 +113,19 @@ def fetch_filtered_predictions(
}}
}}"""

print("Querying subgraph...", query)
result = query_subgraph(
get_subgraph_url(network),
query,
timeout=20.0,
)
try:
print("Querying subgraph...", query)
result = query_subgraph(
get_subgraph_url(network),
query,
timeout=20.0,
)
except Exception as e:
print(
f"Error querying subgraph, return #{len(predictions)} records... exception: ",
e,
)
break

offset += chunk_size

Expand Down
136 changes: 89 additions & 47 deletions pdr_backend/subgraph/test/test_subgraph_predictions.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,56 +31,56 @@
)

# pylint: disable=line-too-long
MOCK_PREDICTIONS_RESPONSE_FIRST_CALL = {
"data": {
"predictPredictions": [
{
"id": "0x18f54cc21b7a2fdd011bea06bba7801b280e3151-1698527100-0xd2a24cb4ff2584bad80ff5f109034a891c3d88dd",
"user": {"id": "0xd2a24cb4ff2584bad80ff5f109034a891c3d88dd"},
"stake": "0.050051425480971974",
"timestamp": 1698527000,
"payout": {"payout": "0", "trueValue": False, "predictedValue": True},
"slot": {
"slot": 1698527100,
"predictContract": {
"id": "0x18f54cc21b7a2fdd011bea06bba7801b280e3151",
"token": {
"id": "0x18f54cc21b7a2fdd011bea06bba7801b280e3151",
"name": "ADA/USDT",
"nft": {
"nftData": [
{
"key": "0x238ad53218834f943da60c8bafd36c36692dcb35e6d76bdd93202f5c04c0baff",
"value": "0x55534454",
},
{
"key": "0x2cef5778d97683b4f64607f72e862fc0c92376e44cc61195ef72a634c0b1793e",
"value": "0x4144412f55534454",
},
{
"key": "0x49435d2ff85f9f3594e40e887943d562765d026d50b7383e76891f8190bff4c9",
"value": "0x356d",
},
{
"key": "0xf1f3eb40f5bc1ad1344716ced8b8a0431d840b5783aea1fd01786bc26f35ac0f",
"value": "0x414441",
},
{
"key": "0xf7e3126f87228afb82c9b18537eed25aaeb8171a78814781c26ed2cfeff27e69",
"value": "0x62696e616e6365",
},
]
},
_PREDICTION = {
"id": "0x18f54cc21b7a2fdd011bea06bba7801b280e3151-1698527100-0xd2a24cb4ff2584bad80ff5f109034a891c3d88dd",
"user": {"id": "0xd2a24cb4ff2584bad80ff5f109034a891c3d88dd"},
"stake": "0.050051425480971974",
"timestamp": 1698527000,
"payout": {"payout": "0", "trueValue": False, "predictedValue": True},
"slot": {
"slot": 1698527100,
"predictContract": {
"id": "0x18f54cc21b7a2fdd011bea06bba7801b280e3151",
"token": {
"id": "0x18f54cc21b7a2fdd011bea06bba7801b280e3151",
"name": "ADA/USDT",
"nft": {
"nftData": [
{
"key": "0x238ad53218834f943da60c8bafd36c36692dcb35e6d76bdd93202f5c04c0baff",
"value": "0x55534454",
},
{
"key": "0x2cef5778d97683b4f64607f72e862fc0c92376e44cc61195ef72a634c0b1793e",
"value": "0x4144412f55534454",
},
{
"key": "0x49435d2ff85f9f3594e40e887943d562765d026d50b7383e76891f8190bff4c9",
"value": "0x356d",
},
{
"key": "0xf1f3eb40f5bc1ad1344716ced8b8a0431d840b5783aea1fd01786bc26f35ac0f",
"value": "0x414441",
},
},
{
"key": "0xf7e3126f87228afb82c9b18537eed25aaeb8171a78814781c26ed2cfeff27e69",
"value": "0x62696e616e6365",
},
]
},
}
]
}
},
},
},
}

MOCK_PREDICTIONS_RESPONSE_FIRST_CALL = {"data": {"predictPredictions": [_PREDICTION]}}

MOCK_PREDICTIONS_RESPONSE_SECOND_CALL: Dict[str, dict] = {}

MOCK_PREDICTIONS_RESPONSE_1000 = {
"data": {"predictPredictions": [_PREDICTION for i in range(0, 1000)]}
}

MOCK_CONTRACTS_RESPONSE = {
"data": {
"tokens": [
Expand All @@ -103,8 +103,15 @@
@enforce_types
@patch("pdr_backend.subgraph.subgraph_predictions.query_subgraph")
def test_fetch_filtered_predictions(mock_query_subgraph):
"""
@description
Test that fetch_filtered_predictions() can fetch predictions from subgraph
and return them as a list of Prediction objects.
"""
# show the system can fetch multiple times, and handle empty responses
mock_query_subgraph.side_effect = [
MOCK_PREDICTIONS_RESPONSE_FIRST_CALL,
MOCK_PREDICTIONS_RESPONSE_1000,
MOCK_PREDICTIONS_RESPONSE_1000,
MOCK_PREDICTIONS_RESPONSE_SECOND_CALL,
]
predictions = fetch_filtered_predictions(
Expand All @@ -115,14 +122,49 @@ def test_fetch_filtered_predictions(mock_query_subgraph):
filter_mode=FilterMode.PREDICTOOR,
)

assert len(predictions) == 1
assert len(predictions) == 2000
assert isinstance(predictions[0], Prediction)
assert predictions[0].user == "0xd2a24cb4ff2584bad80ff5f109034a891c3d88dd"
assert predictions[0].pair == "ADA/USDT"
assert predictions[0].address[0] == "0x18f54cc21b7a2fdd011bea06bba7801b280e3151"
assert predictions[0].trueval is False
assert predictions[0].prediction is True
assert mock_query_subgraph.call_count == 1
assert mock_query_subgraph.call_count == 3


@enforce_types
@patch("pdr_backend.subgraph.subgraph_predictions.query_subgraph")
def test_fetch_filtered_predictions_exception(mock_query_subgraph):
"""
@description
Verifies that fetch_filtered_predictions() can handle exceptions from subgraph
and return the predictions that were fetched before the exception.
"""
num_successful_fetches = 3

# we're going to simulate an exception from subgraph on the second call
# pylint: disable=unused-argument
def simulate_exception(*args, **kwargs):
if simulate_exception.call_count < num_successful_fetches:
simulate_exception.call_count += 1
return MOCK_PREDICTIONS_RESPONSE_1000
raise Exception(f"Simulated exception on call #{num_successful_fetches+1}")

simulate_exception.call_count = 0

# Patch query_subgraph to use our simulate_exception function
mock_query_subgraph.side_effect = simulate_exception

predictions = fetch_filtered_predictions(
start_ts=1622547000,
end_ts=1622548800,
filters=["0x18f54cc21b7a2fdd011bea06bba7801b280e3151"],
network="mainnet",
filter_mode=FilterMode.PREDICTOOR,
)

assert len(predictions) == num_successful_fetches * 1000
assert mock_query_subgraph.call_count == num_successful_fetches + 1


@enforce_types
Expand Down

0 comments on commit 4af7175

Please sign in to comment.