Skip to content

Commit

Permalink
Add support for clearing Kafka topics
Browse files Browse the repository at this point in the history
  • Loading branch information
TheByronHimes committed Aug 26, 2024
1 parent c0837b2 commit b1d8784
Show file tree
Hide file tree
Showing 7 changed files with 239 additions and 0 deletions.
41 changes: 41 additions & 0 deletions openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,47 @@ paths:
tags:
- StateManagementService
- sms-mongodb
/events/:
delete:
description: If no topics are specified, all topics will be cleared, except
internal topics unless otherwise specified.
operationId: clear_topics
parameters:
- description: The topic(s) to clear.
in: query
name: topics
required: false
schema:
default: []
description: The topic(s) to clear.
items:
type: string
title: Topics
type: array
- description: Whether to exclude internal topics.
in: query
name: exclude_internal
required: false
schema:
default: true
description: Whether to exclude internal topics.
title: Exclude Internal
type: boolean
responses:
'204':
description: Successful Response
'422':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
description: Validation Error
security:
- HTTPBearer: []
summary: Clear events from the given topic(s).
tags:
- StateManagementService
- sms-kafka
/health:
get:
description: Used to test if this service is alive
Expand Down
3 changes: 3 additions & 0 deletions src/sms/adapters/inbound/fastapi_/dummies.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,15 @@

from sms.config import Config
from sms.ports.inbound.docs_handler import DocsHandlerPort
from sms.ports.inbound.events_handler import EventsHandlerPort
from sms.ports.inbound.objects_handler import ObjectsHandlerPort

config_dummy = DependencyDummy("config_dummy")
docs_handler_port = DependencyDummy("docs_handler_port")
objects_handler_port = DependencyDummy("objects_handler_port")
events_handler_port = DependencyDummy("events_handler_port")

ConfigDummy = Annotated[Config, Depends(config_dummy)]
DocsHandlerPortDummy = Annotated[DocsHandlerPort, Depends(docs_handler_port)]
ObjectsHandlerPortDummy = Annotated[ObjectsHandlerPort, Depends(objects_handler_port)]
EventsHandlerPortDummy = Annotated[EventsHandlerPort, Depends(events_handler_port)]
89 changes: 89 additions & 0 deletions src/sms/adapters/inbound/fastapi_/routers/events.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
# Copyright 2021 - 2024 Universität Tübingen, DKFZ, EMBL, and Universität zu Köln
# for the German Human Genome-Phenome Archive (GHGA)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""FastAPI routes for Kafka state management."""

import re
from typing import Annotated

from fastapi import APIRouter, HTTPException, Query, status

from sms.adapters.inbound.fastapi_ import dummies
from sms.adapters.inbound.fastapi_.http_authorization import (
TokenAuthContext,
require_token,
)

KAFKA_TOPIC_PATTERN = r"^[a-zA-Z0-9._-]+$"

kafka_router = APIRouter()


def validate_kafka_topic_name(topic: str) -> None:
"""Validate a Kafka topic name, raising a ValueError if invalid."""
if not re.match(KAFKA_TOPIC_PATTERN, topic):
raise ValueError(f"Invalid topic name: {topic}")


@kafka_router.delete(
"/",
operation_id="clear_topics",
summary="Clear events from the given topic(s).",
description=(
"If no topics are specified, all topics will be cleared, except internal"
+ " topics unless otherwise specified."
),
status_code=status.HTTP_204_NO_CONTENT,
)
async def clear_topics(
events_handler: dummies.EventsHandlerPortDummy,
_token: Annotated[TokenAuthContext, require_token],
topics: list[str] = Query(
default=[],
description="The topic(s) to clear.",
),
exclude_internal: bool = Query(
True, description="Whether to exclude internal topics."
),
) -> None:
"""Clear messages from given topic(s).
If no topics are specified, all topics will be cleared, except internal topics
unless otherwise specified.
Args:
- `topics`: The topic(s) to clear.
- `exclude_internal`: Whether to exclude internal topics.
- `events_handler`: The events handler to use.
Raises:
- `HTTPException`: If an error occurs.
"""
try:
for topic in topics:
validate_kafka_topic_name(topic)
except ValueError as err:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail=str(err)
) from err

try:
await events_handler.clear_topics(
topics=topics, exclude_internal=exclude_internal
)
except Exception as err:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(err)
) from err
2 changes: 2 additions & 0 deletions src/sms/adapters/inbound/fastapi_/routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@
from fastapi import APIRouter

from sms.adapters.inbound.fastapi_.routers.documents import mongodb_router
from sms.adapters.inbound.fastapi_.routers.events import kafka_router
from sms.adapters.inbound.fastapi_.routers.objects import s3_router

router = APIRouter(tags=["StateManagementService"])

router.include_router(mongodb_router, prefix="/documents", tags=["sms-mongodb"])
router.include_router(s3_router, prefix="/objects", tags=["sms-s3"])
router.include_router(kafka_router, prefix="/events", tags=["sms-kafka"])


@router.get(
Expand Down
61 changes: 61 additions & 0 deletions src/sms/core/events_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
# Copyright 2021 - 2024 Universität Tübingen, DKFZ, EMBL, and Universität zu Köln
# for the German Human Genome-Phenome Archive (GHGA)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Contains implementation of the EventsHandler class."""

from aiokafka import TopicPartition
from aiokafka.admin import AIOKafkaAdminClient, RecordsToDelete

from sms.config import Config
from sms.ports.inbound.events_handler import EventsHandlerPort


class EventsHandler(EventsHandlerPort):
"""A class to manage the state of kafka events."""

def __init__(self, *, config: Config):
self._config = config

def get_admin_client(self) -> AIOKafkaAdminClient:
"""Construct and return an instance of AIOKafkaAdminClient."""
return AIOKafkaAdminClient(bootstrap_servers=self._config.kafka_servers)

async def clear_topics(
self, *, topics: str | list[str], exclude_internal: bool = True
):
"""Clear messages from given topic(s).
If no topics are specified, all topics will be cleared, except internal topics
unless otherwise specified.
"""
admin_client = self.get_admin_client()
await admin_client.start()
try:
if not topics:
topics = await admin_client.list_topics()
elif isinstance(topics, str):
topics = [topics]
if exclude_internal:
topics = [topic for topic in topics if not topic.startswith("__")]
topics_info = await admin_client.describe_topics(topics)
records_to_delete = {
TopicPartition(
topic=topic_info["topic"], partition=partition_info["partition"]
): RecordsToDelete(before_offset=-1)
for topic_info in topics_info
for partition_info in topic_info["partitions"]
}
await admin_client.delete_records(records_to_delete, timeout_ms=10000)
finally:
await admin_client.close()
10 changes: 10 additions & 0 deletions src/sms/inject.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@
from sms.adapters.outbound.docs_dao import DocsDao
from sms.config import Config
from sms.core.docs_handler import DocsHandler
from sms.core.events_handler import EventsHandler
from sms.core.objects_handler import ObjectsHandler, S3ObjectStorages
from sms.ports.inbound.docs_handler import DocsHandlerPort
from sms.ports.inbound.events_handler import EventsHandlerPort
from sms.ports.inbound.objects_handler import ObjectsHandlerPort


Expand Down Expand Up @@ -74,6 +76,7 @@ async def prepare_rest_app(
config: Config,
docs_handler_override: DocsHandlerPort | None = None,
objects_handler_override: ObjectsHandlerPort | None = None,
events_handler_override: EventsHandlerPort | None = None,
) -> AsyncGenerator[FastAPI, None]:
"""Construct and initialize a REST API app along with all its dependencies.
By default, the core dependencies are automatically prepared but you can also
Expand All @@ -86,6 +89,13 @@ async def prepare_rest_app(
)
app.dependency_overrides[dummies.objects_handler_port] = lambda: objects_handler

events_handler = (
events_handler_override
if events_handler_override
else EventsHandler(config=config)
)
app.dependency_overrides[dummies.events_handler_port] = lambda: events_handler

async with prepare_docs_handler_with_override(
config=config, docs_handler_override=docs_handler_override
) as docs_handler:
Expand Down
33 changes: 33 additions & 0 deletions src/sms/ports/inbound/events_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# Copyright 2021 - 2024 Universität Tübingen, DKFZ, EMBL, and Universität zu Köln
# for the German Human Genome-Phenome Archive (GHGA)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Defines the API of a class that interfaces between inbound requests and kafka."""

from abc import ABC, abstractmethod


class EventsHandlerPort(ABC):
"""A class to manage the state of kafka events."""

@abstractmethod
async def clear_topics(
self, *, topics: str | list[str], exclude_internal: bool = True
):
"""Clear messages from given topic(s).
If no topics are specified, all topics will be cleared, except internal topics
unless otherwise specified.
"""
...

0 comments on commit b1d8784

Please sign in to comment.