From 56cac0e456e78801d2bc4d318947fc2bb4282893 Mon Sep 17 00:00:00 2001 From: TheByronHimes Date: Wed, 28 Aug 2024 06:31:37 +0000 Subject: [PATCH] Use admin client as context manager --- src/sms/core/events_handler.py | 21 +++++++++++++-------- tests/unit/events/test_events_handler.py | 3 ++- 2 files changed, 15 insertions(+), 9 deletions(-) diff --git a/src/sms/core/events_handler.py b/src/sms/core/events_handler.py index 5fbc2a9..eba423b 100644 --- a/src/sms/core/events_handler.py +++ b/src/sms/core/events_handler.py @@ -14,6 +14,9 @@ # limitations under the License. """Contains implementation of the EventsHandler class.""" +from collections.abc import AsyncGenerator +from contextlib import asynccontextmanager + from aiokafka import TopicPartition from aiokafka.admin import AIOKafkaAdminClient, RecordsToDelete @@ -27,9 +30,15 @@ class EventsHandler(EventsHandlerPort): def __init__(self, *, config: Config): self._config = config - def get_admin_client(self) -> AIOKafkaAdminClient: - """Construct and return an instance of AIOKafkaAdminClient.""" - return AIOKafkaAdminClient(bootstrap_servers=self._config.kafka_servers) + @asynccontextmanager + async def get_admin_client(self) -> AsyncGenerator[AIOKafkaAdminClient, None]: + """Construct and return an instance of AIOKafkaAdminClient that is closed after use.""" + admin_client = AIOKafkaAdminClient(bootstrap_servers=self._config.kafka_servers) + await admin_client.start() + try: + yield admin_client + finally: + await admin_client.close() async def clear_topics(self, *, topics: list[str], exclude_internal: bool = True): """Clear messages from given topic(s). @@ -37,9 +46,7 @@ async def clear_topics(self, *, topics: list[str], exclude_internal: bool = True If no topics are specified, all topics will be cleared, except internal topics unless otherwise specified. """ - admin_client = self.get_admin_client() - await admin_client.start() - try: + async with self.get_admin_client() as admin_client: if not topics: topics = await admin_client.list_topics() if exclude_internal: @@ -53,5 +60,3 @@ async def clear_topics(self, *, topics: list[str], exclude_internal: bool = True for partition_info in topic_info["partitions"] } await admin_client.delete_records(records_to_delete, timeout_ms=10000) - finally: - await admin_client.close() diff --git a/tests/unit/events/test_events_handler.py b/tests/unit/events/test_events_handler.py index 3ffb708..d43c330 100644 --- a/tests/unit/events/test_events_handler.py +++ b/tests/unit/events/test_events_handler.py @@ -20,6 +20,7 @@ import pytest from aiokafka import TopicPartition from aiokafka.admin import AIOKafkaAdminClient, RecordsToDelete +from ghga_service_commons.utils.context import asyncnullcontext from tests.fixtures.config import DEFAULT_TEST_CONFIG from sms.core.events_handler import EventsHandler @@ -72,7 +73,7 @@ async def test_topics_parameter_behavior(topics: list[str], exclude_internal: bo # Create an instance of the EventsHandler and patch with the mock handler = EventsHandler(config=DEFAULT_TEST_CONFIG) - handler.get_admin_client = lambda: mock # type: ignore [method-assign] + handler.get_admin_client = lambda: asyncnullcontext(mock) # type: ignore [method-assign] # Call the clear_topics method await handler.clear_topics(topics=topics, exclude_internal=exclude_internal)