Skip to content

Commit

Permalink
Use admin client as context manager
Browse files Browse the repository at this point in the history
  • Loading branch information
TheByronHimes committed Aug 28, 2024
1 parent ef99175 commit 56cac0e
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 9 deletions.
21 changes: 13 additions & 8 deletions src/sms/core/events_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
# limitations under the License.
"""Contains implementation of the EventsHandler class."""

from collections.abc import AsyncGenerator
from contextlib import asynccontextmanager

from aiokafka import TopicPartition
from aiokafka.admin import AIOKafkaAdminClient, RecordsToDelete

Expand All @@ -27,19 +30,23 @@ class EventsHandler(EventsHandlerPort):
def __init__(self, *, config: Config):
self._config = config

def get_admin_client(self) -> AIOKafkaAdminClient:
"""Construct and return an instance of AIOKafkaAdminClient."""
return AIOKafkaAdminClient(bootstrap_servers=self._config.kafka_servers)
@asynccontextmanager
async def get_admin_client(self) -> AsyncGenerator[AIOKafkaAdminClient, None]:
"""Construct and return an instance of AIOKafkaAdminClient that is closed after use."""
admin_client = AIOKafkaAdminClient(bootstrap_servers=self._config.kafka_servers)
await admin_client.start()
try:
yield admin_client
finally:
await admin_client.close()

async def clear_topics(self, *, topics: list[str], exclude_internal: bool = True):
"""Clear messages from given topic(s).
If no topics are specified, all topics will be cleared, except internal topics
unless otherwise specified.
"""
admin_client = self.get_admin_client()
await admin_client.start()
try:
async with self.get_admin_client() as admin_client:
if not topics:
topics = await admin_client.list_topics()
if exclude_internal:
Expand All @@ -53,5 +60,3 @@ async def clear_topics(self, *, topics: list[str], exclude_internal: bool = True
for partition_info in topic_info["partitions"]
}
await admin_client.delete_records(records_to_delete, timeout_ms=10000)
finally:
await admin_client.close()
3 changes: 2 additions & 1 deletion tests/unit/events/test_events_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import pytest
from aiokafka import TopicPartition
from aiokafka.admin import AIOKafkaAdminClient, RecordsToDelete
from ghga_service_commons.utils.context import asyncnullcontext
from tests.fixtures.config import DEFAULT_TEST_CONFIG

from sms.core.events_handler import EventsHandler
Expand Down Expand Up @@ -72,7 +73,7 @@ async def test_topics_parameter_behavior(topics: list[str], exclude_internal: bo

# Create an instance of the EventsHandler and patch with the mock
handler = EventsHandler(config=DEFAULT_TEST_CONFIG)
handler.get_admin_client = lambda: mock # type: ignore [method-assign]
handler.get_admin_client = lambda: asyncnullcontext(mock) # type: ignore [method-assign]

# Call the clear_topics method
await handler.clear_topics(topics=topics, exclude_internal=exclude_internal)
Expand Down

0 comments on commit 56cac0e

Please sign in to comment.