Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: adds Web3SubscriptionsManager.get_subscription_data_nowait() #95

Merged
merged 10 commits into from
Jul 17, 2024
38 changes: 35 additions & 3 deletions silverback/subscriptions.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import asyncio
import json
from enum import Enum
from typing import AsyncGenerator
from typing import AsyncGenerator, Optional

from ape.logging import logger
from websockets import ConnectionClosedError
Expand Down Expand Up @@ -46,14 +46,23 @@ async def __anext__(self) -> str:
if not self.connection:
raise StopAsyncIteration

message = await self.connection.recv()
return await self._receive()

async def _receive(self, timeout: Optional[int] = None) -> str:
"""Receive (and wait if no timeout) for the next message from the
socket.
"""
if not self.connection:
raise ConnectionError("Connection not opened")

message = await asyncio.wait_for(self.connection.recv(), timeout)
Copy link
Member

@fubuloubu fubuloubu Jul 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this the location where node websockets end up raising after getting randomly dropped?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like it, according to the trace on #84

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, we might want to come back later and handle this timeout by reconnecting

# TODO: Handle retries when connection breaks

response = json.loads(message)
if response.get("method") == "eth_subscription":
sub_params: dict = response.get("params", {})
if not (sub_id := sub_params.get("subscription")) or not isinstance(sub_id, str):
logger.debug(f"Corrupted subscription data: {response}")
logger.warning(f"Corrupted subscription data: {response}")
return response

if sub_id not in self._subscriptions:
Expand Down Expand Up @@ -115,6 +124,9 @@ async def subscribe(self, type: SubscriptionType, **filter_params) -> str:
return sub_id

async def get_subscription_data(self, sub_id: str) -> AsyncGenerator[dict, None]:
"""Iterate items from the subscription queue. If nothing is in the
queue, await.
"""
while True:
if not (queue := self._subscriptions.get(sub_id)) or queue.empty():
async with self._ws_lock:
Expand All @@ -124,6 +136,26 @@ async def get_subscription_data(self, sub_id: str) -> AsyncGenerator[dict, None]
else:
yield await queue.get()

async def get_subscription_data_nowait(
fubuloubu marked this conversation as resolved.
Show resolved Hide resolved
self, sub_id: str, timeout: Optional[int] = 15
) -> AsyncGenerator[dict, None]:
"""Iterate items from the subscription queue. If nothing is in the
queue, return.
"""
while True:
if not (queue := self._subscriptions.get(sub_id)) or queue.empty():
async with self._ws_lock:
try:
await self._receive(timeout=timeout)
except TimeoutError:
logger.debug("Receive call timed out.")
return
else:
try:
yield queue.get_nowait()
except asyncio.QueueEmpty:
return

async def unsubscribe(self, sub_id: str) -> bool:
if sub_id not in self._subscriptions:
raise ValueError(f"Unknown sub_id '{sub_id}'")
Expand Down
Loading