From 079fc6729c685c6af10e540b29d087e46c1fea4a Mon Sep 17 00:00:00 2001 From: hawang-wish <130547790+hawang-wish@users.noreply.github.com> Date: Thu, 29 Aug 2024 14:27:22 +0800 Subject: [PATCH] Fix InMemoryBroker to use startup and shutdown middleware hooks --- taskiq/brokers/inmemory_broker.py | 22 ++++++++++++++++------ 1 file changed, 16 insertions(+), 6 deletions(-) diff --git a/taskiq/brokers/inmemory_broker.py b/taskiq/brokers/inmemory_broker.py index 544289f..5e96a6f 100644 --- a/taskiq/brokers/inmemory_broker.py +++ b/taskiq/brokers/inmemory_broker.py @@ -173,13 +173,23 @@ def listen(self) -> AsyncGenerator[bytes, None]: async def startup(self) -> None: """Runs startup events for client and worker side.""" - for event in (TaskiqEvents.CLIENT_STARTUP, TaskiqEvents.WORKER_STARTUP): - for handler in self.event_handlers.get(event, []): - await maybe_awaitable(handler(self.state)) + for handler in self.event_handlers.get( + TaskiqEvents.CLIENT_STARTUP + if self.is_worker_process + else TaskiqEvents.WORKER_STARTUP, + [], + ): + await maybe_awaitable(handler(self.state)) + await super().startup() async def shutdown(self) -> None: """Runs shutdown events for client and worker side.""" - for event in (TaskiqEvents.CLIENT_SHUTDOWN, TaskiqEvents.WORKER_SHUTDOWN): - for handler in self.event_handlers.get(event, []): - await maybe_awaitable(handler(self.state)) + for handler in self.event_handlers.get( + TaskiqEvents.CLIENT_SHUTDOWN + if self.is_worker_process + else TaskiqEvents.WORKER_SHUTDOWN, + [], + ): + await maybe_awaitable(handler(self.state)) self.executor.shutdown() + await super().shutdown()