From b4b2714027040c312ef88fcd34535cef2eb9907a Mon Sep 17 00:00:00 2001 From: Tyler Hoyt Date: Fri, 11 Aug 2023 23:20:45 -0400 Subject: [PATCH 1/3] Fix bug with handling on_close with a RobustConnection When handing on_close, futures were not properly popped from the futures list, causing an asyncio.exceptions.InvalidStateError when the connection is re-esablished and response recieved. --- aio_pika/patterns/rpc.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/aio_pika/patterns/rpc.py b/aio_pika/patterns/rpc.py index b47dc768..d4c62ee0 100644 --- a/aio_pika/patterns/rpc.py +++ b/aio_pika/patterns/rpc.py @@ -99,16 +99,18 @@ def __init__( self.consumer_tags: Dict[Callable[..., Any], ConsumerTag] = {} self.host_exceptions = host_exceptions - def __remove_future(self, future: asyncio.Future) -> None: - log.debug("Remove done future %r", future) - self.futures.pop(str(id(future)), None) + def __remove_future(self, correlation_id: str) -> Callable[[asyncio.Future], None]: + def remove_future(future: asyncio.Future) -> None: + log.debug("Remove done future %r", future) + self.futures.pop(correlation_id, None) + return remove_future def create_future(self) -> Tuple[asyncio.Future, str]: future = self.loop.create_future() log.debug("Create future for RPC call") correlation_id = str(uuid.uuid4()) self.futures[correlation_id] = future - future.add_done_callback(self.__remove_future) + future.add_done_callback(self.__remove_future(correlation_id)) return future, correlation_id async def close(self) -> None: From bfce5e58376a7ce3d89ccfcc0a7a7f5b5bd10fac Mon Sep 17 00:00:00 2001 From: Tyler Hoyt Date: Sat, 12 Aug 2023 15:24:38 -0400 Subject: [PATCH 2/3] Fixed line length --- aio_pika/patterns/rpc.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/aio_pika/patterns/rpc.py b/aio_pika/patterns/rpc.py index d4c62ee0..c803692f 100644 --- a/aio_pika/patterns/rpc.py +++ b/aio_pika/patterns/rpc.py @@ -99,7 +99,9 @@ def __init__( self.consumer_tags: Dict[Callable[..., Any], ConsumerTag] = {} self.host_exceptions = host_exceptions - def __remove_future(self, correlation_id: str) -> Callable[[asyncio.Future], None]: + def __remove_future( + self, correlation_id: str + ) -> Callable[[asyncio.Future], None]: def remove_future(future: asyncio.Future) -> None: log.debug("Remove done future %r", future) self.futures.pop(correlation_id, None) From e951c72b7192c8bb4a1f14c914c6a72361fe8d7d Mon Sep 17 00:00:00 2001 From: Tyler Hoyt Date: Sat, 12 Aug 2023 18:13:47 -0400 Subject: [PATCH 3/3] Rename to do_remove --- aio_pika/patterns/rpc.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/aio_pika/patterns/rpc.py b/aio_pika/patterns/rpc.py index c803692f..000aaf42 100644 --- a/aio_pika/patterns/rpc.py +++ b/aio_pika/patterns/rpc.py @@ -102,10 +102,10 @@ def __init__( def __remove_future( self, correlation_id: str ) -> Callable[[asyncio.Future], None]: - def remove_future(future: asyncio.Future) -> None: + def do_remove(future: asyncio.Future) -> None: log.debug("Remove done future %r", future) self.futures.pop(correlation_id, None) - return remove_future + return do_remove def create_future(self) -> Tuple[asyncio.Future, str]: future = self.loop.create_future()