From de6fceee06f79d04f6bcaab35f65ad46e9a0a44d Mon Sep 17 00:00:00 2001 From: Mike Shultz Date: Fri, 12 Jul 2024 13:40:51 -0600 Subject: [PATCH 01/10] feat: adds Web3SubscriptionsManager.pop_subscription_data() --- silverback/subscriptions.py | 22 +++++++++++++++++++++- 1 file changed, 21 insertions(+), 1 deletion(-) diff --git a/silverback/subscriptions.py b/silverback/subscriptions.py index d99a6488..f6d0f8ff 100644 --- a/silverback/subscriptions.py +++ b/silverback/subscriptions.py @@ -1,7 +1,7 @@ import asyncio import json from enum import Enum -from typing import AsyncGenerator +from typing import AsyncGenerator, Union from ape.logging import logger from websockets import ConnectionClosedError @@ -115,6 +115,9 @@ async def subscribe(self, type: SubscriptionType, **filter_params) -> str: return sub_id async def get_subscription_data(self, sub_id: str) -> AsyncGenerator[dict, None]: + """Iterate items from the subscription queue. If nothing is in the + queue, await. + """ while True: if not (queue := self._subscriptions.get(sub_id)) or queue.empty(): async with self._ws_lock: @@ -124,6 +127,23 @@ async def get_subscription_data(self, sub_id: str) -> AsyncGenerator[dict, None] else: yield await queue.get() + async def pop_subscription_data(self, sub_id: str) -> Union[dict, None]: + """Remove and return a single item from the subscription queue.""" + + async with self._ws_lock: + # NOTE: Python <3.10 does not support `anext` function + await self.__anext__() + + queue = self._subscriptions.get(sub_id) + + if queue: + try: + return await queue.get_nowait() + except asyncio.QueueEmpty: + pass + + return None + async def unsubscribe(self, sub_id: str) -> bool: if sub_id not in self._subscriptions: raise ValueError(f"Unknown sub_id '{sub_id}'") From 5c0a12626ef4ae2621e16608d97666bb60ddcad7 Mon Sep 17 00:00:00 2001 From: Mike Shultz Date: Fri, 12 Jul 2024 13:49:16 -0600 Subject: [PATCH 02/10] feat: adds get_subscription_data_nowait() to Web3SubscriptionsManager --- silverback/subscriptions.py | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/silverback/subscriptions.py b/silverback/subscriptions.py index f6d0f8ff..4d528a26 100644 --- a/silverback/subscriptions.py +++ b/silverback/subscriptions.py @@ -127,6 +127,22 @@ async def get_subscription_data(self, sub_id: str) -> AsyncGenerator[dict, None] else: yield await queue.get() + async def get_subscription_data_nowait(self, sub_id: str) -> AsyncGenerator[dict, None]: + """Iterate items from the subscription queue. If nothing is in the + queue, return. + """ + while True: + if not (queue := self._subscriptions.get(sub_id)) or queue.empty(): + async with self._ws_lock: + # Keep pulling until a message comes to process + # NOTE: Python <3.10 does not support `anext` function + await self.__anext__() + else: + try: + yield await queue.get_nowait() + except asyncio.QueueEmpty: + pass + async def pop_subscription_data(self, sub_id: str) -> Union[dict, None]: """Remove and return a single item from the subscription queue.""" From 8845be4e0355ed76abb5709fe671c1eb9531e999 Mon Sep 17 00:00:00 2001 From: Mike Shultz Date: Fri, 12 Jul 2024 14:38:43 -0600 Subject: [PATCH 03/10] fix: awaiting sync func --- silverback/subscriptions.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/silverback/subscriptions.py b/silverback/subscriptions.py index 4d528a26..b75f92eb 100644 --- a/silverback/subscriptions.py +++ b/silverback/subscriptions.py @@ -139,7 +139,7 @@ async def get_subscription_data_nowait(self, sub_id: str) -> AsyncGenerator[dict await self.__anext__() else: try: - yield await queue.get_nowait() + yield queue.get_nowait() except asyncio.QueueEmpty: pass @@ -154,7 +154,7 @@ async def pop_subscription_data(self, sub_id: str) -> Union[dict, None]: if queue: try: - return await queue.get_nowait() + return queue.get_nowait() except asyncio.QueueEmpty: pass From c587f8b764e8f5e04d67d8e04af3d81aaf9c330c Mon Sep 17 00:00:00 2001 From: Mike Shultz Date: Fri, 12 Jul 2024 16:00:38 -0600 Subject: [PATCH 04/10] refactor: leverage asyncio.timeout when calling websocket client.recv --- silverback/subscriptions.py | 26 +++++++++++++++++++------- 1 file changed, 19 insertions(+), 7 deletions(-) diff --git a/silverback/subscriptions.py b/silverback/subscriptions.py index b75f92eb..7839ad9b 100644 --- a/silverback/subscriptions.py +++ b/silverback/subscriptions.py @@ -46,8 +46,18 @@ async def __anext__(self) -> str: if not self.connection: raise StopAsyncIteration - message = await self.connection.recv() - # TODO: Handle retries when connection breaks + return self._receive() + + async def _receive(self, timeout: Optional[int] = None) -> str: + """Receive (and wait if no timeout) for the next message from the + socket. + """ + if not self.connection: + raise ConnectionClosedError() + + async with asyncio.timeout(timeout): + message = await self.connection.recv() + # TODO: Handle retries when connection breaks response = json.loads(message) if response.get("method") == "eth_subscription": @@ -127,16 +137,18 @@ async def get_subscription_data(self, sub_id: str) -> AsyncGenerator[dict, None] else: yield await queue.get() - async def get_subscription_data_nowait(self, sub_id: str) -> AsyncGenerator[dict, None]: + async def get_subscription_data_nowait( + self, sub_id: str, timeout: Optional[int] = 15 + ) -> AsyncGenerator[dict, None]: """Iterate items from the subscription queue. If nothing is in the queue, return. """ while True: if not (queue := self._subscriptions.get(sub_id)) or queue.empty(): - async with self._ws_lock: - # Keep pulling until a message comes to process - # NOTE: Python <3.10 does not support `anext` function - await self.__anext__() + try: + await self._receive(timeout=timeout) + except TimeoutError: + pass else: try: yield queue.get_nowait() From 762aa27dfde20f97031f147ef6731d3fdd4950bb Mon Sep 17 00:00:00 2001 From: Mike Shultz Date: Fri, 12 Jul 2024 16:17:08 -0600 Subject: [PATCH 05/10] fix: one day I'll learn to lint before committing. --- silverback/subscriptions.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/silverback/subscriptions.py b/silverback/subscriptions.py index 7839ad9b..f2a7f0de 100644 --- a/silverback/subscriptions.py +++ b/silverback/subscriptions.py @@ -1,7 +1,7 @@ import asyncio import json from enum import Enum -from typing import AsyncGenerator, Union +from typing import AsyncGenerator, Optional, Union from ape.logging import logger from websockets import ConnectionClosedError @@ -46,14 +46,14 @@ async def __anext__(self) -> str: if not self.connection: raise StopAsyncIteration - return self._receive() + return await self._receive() async def _receive(self, timeout: Optional[int] = None) -> str: """Receive (and wait if no timeout) for the next message from the socket. """ if not self.connection: - raise ConnectionClosedError() + raise ConnectionError("Connection not opened") async with asyncio.timeout(timeout): message = await self.connection.recv() From b810a95abb24a2cb21e492b57eebdf46d159f519 Mon Sep 17 00:00:00 2001 From: Mike Shultz Date: Fri, 12 Jul 2024 16:50:36 -0600 Subject: [PATCH 06/10] fix: Python 3.10 does not support asyncio.timeout --- silverback/subscriptions.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/silverback/subscriptions.py b/silverback/subscriptions.py index f2a7f0de..ef3894d8 100644 --- a/silverback/subscriptions.py +++ b/silverback/subscriptions.py @@ -55,9 +55,8 @@ async def _receive(self, timeout: Optional[int] = None) -> str: if not self.connection: raise ConnectionError("Connection not opened") - async with asyncio.timeout(timeout): - message = await self.connection.recv() - # TODO: Handle retries when connection breaks + message = await asyncio.wait_for(self.connection.recv(), timeout) + # TODO: Handle retries when connection breaks response = json.loads(message) if response.get("method") == "eth_subscription": From 3edd89dd0088b0140d597fdfe781ede952117c91 Mon Sep 17 00:00:00 2001 From: Mike Shultz Date: Fri, 12 Jul 2024 17:24:15 -0600 Subject: [PATCH 07/10] fix: get asyncio lock when calling _receive() --- silverback/subscriptions.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/silverback/subscriptions.py b/silverback/subscriptions.py index ef3894d8..7ceedaa7 100644 --- a/silverback/subscriptions.py +++ b/silverback/subscriptions.py @@ -144,10 +144,11 @@ async def get_subscription_data_nowait( """ while True: if not (queue := self._subscriptions.get(sub_id)) or queue.empty(): - try: - await self._receive(timeout=timeout) - except TimeoutError: - pass + async with self._ws_lock: + try: + await self._receive(timeout=timeout) + except TimeoutError: + pass else: try: yield queue.get_nowait() From bd4f3579945eae75bd1521ee6686ee03ea38a970 Mon Sep 17 00:00:00 2001 From: Mike Shultz Date: Fri, 12 Jul 2024 17:54:13 -0600 Subject: [PATCH 08/10] chore: cleanup, remove pop_subscription_data() --- silverback/subscriptions.py | 17 ----------------- 1 file changed, 17 deletions(-) diff --git a/silverback/subscriptions.py b/silverback/subscriptions.py index 7ceedaa7..ac3d7120 100644 --- a/silverback/subscriptions.py +++ b/silverback/subscriptions.py @@ -155,23 +155,6 @@ async def get_subscription_data_nowait( except asyncio.QueueEmpty: pass - async def pop_subscription_data(self, sub_id: str) -> Union[dict, None]: - """Remove and return a single item from the subscription queue.""" - - async with self._ws_lock: - # NOTE: Python <3.10 does not support `anext` function - await self.__anext__() - - queue = self._subscriptions.get(sub_id) - - if queue: - try: - return queue.get_nowait() - except asyncio.QueueEmpty: - pass - - return None - async def unsubscribe(self, sub_id: str) -> bool: if sub_id not in self._subscriptions: raise ValueError(f"Unknown sub_id '{sub_id}'") From 0fb254ab11583989b137a737508240e2fb2914f5 Mon Sep 17 00:00:00 2001 From: Mike Shultz Date: Fri, 12 Jul 2024 18:49:15 -0600 Subject: [PATCH 09/10] chore: logging and return from generator on end --- silverback/subscriptions.py | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/silverback/subscriptions.py b/silverback/subscriptions.py index ac3d7120..e6c91550 100644 --- a/silverback/subscriptions.py +++ b/silverback/subscriptions.py @@ -1,7 +1,7 @@ import asyncio import json from enum import Enum -from typing import AsyncGenerator, Optional, Union +from typing import AsyncGenerator, Optional from ape.logging import logger from websockets import ConnectionClosedError @@ -62,7 +62,7 @@ async def _receive(self, timeout: Optional[int] = None) -> str: if response.get("method") == "eth_subscription": sub_params: dict = response.get("params", {}) if not (sub_id := sub_params.get("subscription")) or not isinstance(sub_id, str): - logger.debug(f"Corrupted subscription data: {response}") + logger.warning(f"Corrupted subscription data: {response}") return response if sub_id not in self._subscriptions: @@ -144,16 +144,24 @@ async def get_subscription_data_nowait( """ while True: if not (queue := self._subscriptions.get(sub_id)) or queue.empty(): + logger.debug( + f"Acquiring lock for recv (no wait). (is locked? {self._ws_lock.locked()})" + ) async with self._ws_lock: + logger.debug("Acquired lock for recv (no wait).") try: await self._receive(timeout=timeout) except TimeoutError: - pass + logger.debug("Receive call timed out.") + return + else: + logger.debug("Receive call completed.") else: try: yield queue.get_nowait() except asyncio.QueueEmpty: - pass + logger.debug("Subscription queue empty.") + return async def unsubscribe(self, sub_id: str) -> bool: if sub_id not in self._subscriptions: From 59f925030d5aaf9d737f7b31298e975375344e1c Mon Sep 17 00:00:00 2001 From: Mike Shultz Date: Fri, 12 Jul 2024 19:35:16 -0600 Subject: [PATCH 10/10] chore: cleanup debug statements --- silverback/subscriptions.py | 7 ------- 1 file changed, 7 deletions(-) diff --git a/silverback/subscriptions.py b/silverback/subscriptions.py index e6c91550..087eaf87 100644 --- a/silverback/subscriptions.py +++ b/silverback/subscriptions.py @@ -144,23 +144,16 @@ async def get_subscription_data_nowait( """ while True: if not (queue := self._subscriptions.get(sub_id)) or queue.empty(): - logger.debug( - f"Acquiring lock for recv (no wait). (is locked? {self._ws_lock.locked()})" - ) async with self._ws_lock: - logger.debug("Acquired lock for recv (no wait).") try: await self._receive(timeout=timeout) except TimeoutError: logger.debug("Receive call timed out.") return - else: - logger.debug("Receive call completed.") else: try: yield queue.get_nowait() except asyncio.QueueEmpty: - logger.debug("Subscription queue empty.") return async def unsubscribe(self, sub_id: str) -> bool: