diff --git a/openapi.yaml b/openapi.yaml index c11c223..90e435b 100644 --- a/openapi.yaml +++ b/openapi.yaml @@ -214,6 +214,47 @@ paths: tags: - StateManagementService - sms-mongodb + /events/: + delete: + description: If no topics are specified, all topics will be cleared, except + internal topics unless otherwise specified. + operationId: clear_topics + parameters: + - description: The topic(s) to clear. + in: query + name: topics + required: false + schema: + default: [] + description: The topic(s) to clear. + items: + type: string + title: Topics + type: array + - description: Whether to exclude internal topics. + in: query + name: exclude_internal + required: false + schema: + default: true + description: Whether to exclude internal topics. + title: Exclude Internal + type: boolean + responses: + '204': + description: Successful Response + '422': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPValidationError' + description: Validation Error + security: + - HTTPBearer: [] + summary: Clear events from the given topic(s). + tags: + - StateManagementService + - sms-kafka /health: get: description: Used to test if this service is alive diff --git a/src/sms/adapters/inbound/fastapi_/dummies.py b/src/sms/adapters/inbound/fastapi_/dummies.py index 43c1f77..f5bfb65 100644 --- a/src/sms/adapters/inbound/fastapi_/dummies.py +++ b/src/sms/adapters/inbound/fastapi_/dummies.py @@ -23,12 +23,15 @@ from sms.config import Config from sms.ports.inbound.docs_handler import DocsHandlerPort +from sms.ports.inbound.events_handler import EventsHandlerPort from sms.ports.inbound.objects_handler import ObjectsHandlerPort config_dummy = DependencyDummy("config_dummy") docs_handler_port = DependencyDummy("docs_handler_port") objects_handler_port = DependencyDummy("objects_handler_port") +events_handler_port = DependencyDummy("events_handler_port") ConfigDummy = Annotated[Config, Depends(config_dummy)] DocsHandlerPortDummy = Annotated[DocsHandlerPort, Depends(docs_handler_port)] ObjectsHandlerPortDummy = Annotated[ObjectsHandlerPort, Depends(objects_handler_port)] +EventsHandlerPortDummy = Annotated[EventsHandlerPort, Depends(events_handler_port)] diff --git a/src/sms/adapters/inbound/fastapi_/routers/events.py b/src/sms/adapters/inbound/fastapi_/routers/events.py new file mode 100644 index 0000000..a0bf556 --- /dev/null +++ b/src/sms/adapters/inbound/fastapi_/routers/events.py @@ -0,0 +1,89 @@ +# Copyright 2021 - 2024 Universität Tübingen, DKFZ, EMBL, and Universität zu Köln +# for the German Human Genome-Phenome Archive (GHGA) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""FastAPI routes for Kafka state management.""" + +import re +from typing import Annotated + +from fastapi import APIRouter, HTTPException, Query, status + +from sms.adapters.inbound.fastapi_ import dummies +from sms.adapters.inbound.fastapi_.http_authorization import ( + TokenAuthContext, + require_token, +) + +KAFKA_TOPIC_PATTERN = r"^[a-zA-Z0-9._-]+$" + +kafka_router = APIRouter() + + +def validate_kafka_topic_name(topic: str) -> None: + """Validate a Kafka topic name, raising a ValueError if invalid.""" + if not re.match(KAFKA_TOPIC_PATTERN, topic): + raise ValueError(f"Invalid topic name: {topic}") + + +@kafka_router.delete( + "/", + operation_id="clear_topics", + summary="Clear events from the given topic(s).", + description=( + "If no topics are specified, all topics will be cleared, except internal" + + " topics unless otherwise specified." + ), + status_code=status.HTTP_204_NO_CONTENT, +) +async def clear_topics( + events_handler: dummies.EventsHandlerPortDummy, + _token: Annotated[TokenAuthContext, require_token], + topics: list[str] = Query( + default=[], + description="The topic(s) to clear.", + ), + exclude_internal: bool = Query( + True, description="Whether to exclude internal topics." + ), +) -> None: + """Clear messages from given topic(s). + + If no topics are specified, all topics will be cleared, except internal topics + unless otherwise specified. + + Args: + - `topics`: The topic(s) to clear. + - `exclude_internal`: Whether to exclude internal topics. + - `events_handler`: The events handler to use. + + Raises: + - `HTTPException`: If an error occurs. + """ + try: + for topic in topics: + validate_kafka_topic_name(topic) + except ValueError as err: + raise HTTPException( + status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail=str(err) + ) from err + + try: + await events_handler.clear_topics( + topics=topics, exclude_internal=exclude_internal + ) + except Exception as err: + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(err) + ) from err diff --git a/src/sms/adapters/inbound/fastapi_/routes.py b/src/sms/adapters/inbound/fastapi_/routes.py index ba17195..b540dc3 100644 --- a/src/sms/adapters/inbound/fastapi_/routes.py +++ b/src/sms/adapters/inbound/fastapi_/routes.py @@ -17,12 +17,14 @@ from fastapi import APIRouter from sms.adapters.inbound.fastapi_.routers.documents import mongodb_router +from sms.adapters.inbound.fastapi_.routers.events import kafka_router from sms.adapters.inbound.fastapi_.routers.objects import s3_router router = APIRouter(tags=["StateManagementService"]) router.include_router(mongodb_router, prefix="/documents", tags=["sms-mongodb"]) router.include_router(s3_router, prefix="/objects", tags=["sms-s3"]) +router.include_router(kafka_router, prefix="/events", tags=["sms-kafka"]) @router.get( diff --git a/src/sms/core/events_handler.py b/src/sms/core/events_handler.py new file mode 100644 index 0000000..97198f5 --- /dev/null +++ b/src/sms/core/events_handler.py @@ -0,0 +1,61 @@ +# Copyright 2021 - 2024 Universität Tübingen, DKFZ, EMBL, and Universität zu Köln +# for the German Human Genome-Phenome Archive (GHGA) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Contains implementation of the EventsHandler class.""" + +from aiokafka import TopicPartition +from aiokafka.admin import AIOKafkaAdminClient, RecordsToDelete + +from sms.config import Config +from sms.ports.inbound.events_handler import EventsHandlerPort + + +class EventsHandler(EventsHandlerPort): + """A class to manage the state of kafka events.""" + + def __init__(self, *, config: Config): + self._config = config + + def get_admin_client(self) -> AIOKafkaAdminClient: + """Construct and return an instance of AIOKafkaAdminClient.""" + return AIOKafkaAdminClient(bootstrap_servers=self._config.kafka_servers) + + async def clear_topics( + self, *, topics: str | list[str], exclude_internal: bool = True + ): + """Clear messages from given topic(s). + + If no topics are specified, all topics will be cleared, except internal topics + unless otherwise specified. + """ + admin_client = self.get_admin_client() + await admin_client.start() + try: + if not topics: + topics = await admin_client.list_topics() + elif isinstance(topics, str): + topics = [topics] + if exclude_internal: + topics = [topic for topic in topics if not topic.startswith("__")] + topics_info = await admin_client.describe_topics(topics) + records_to_delete = { + TopicPartition( + topic=topic_info["topic"], partition=partition_info["partition"] + ): RecordsToDelete(before_offset=-1) + for topic_info in topics_info + for partition_info in topic_info["partitions"] + } + await admin_client.delete_records(records_to_delete, timeout_ms=10000) + finally: + await admin_client.close() diff --git a/src/sms/inject.py b/src/sms/inject.py index 0300e1f..fa285e4 100644 --- a/src/sms/inject.py +++ b/src/sms/inject.py @@ -25,8 +25,10 @@ from sms.adapters.outbound.docs_dao import DocsDao from sms.config import Config from sms.core.docs_handler import DocsHandler +from sms.core.events_handler import EventsHandler from sms.core.objects_handler import ObjectsHandler, S3ObjectStorages from sms.ports.inbound.docs_handler import DocsHandlerPort +from sms.ports.inbound.events_handler import EventsHandlerPort from sms.ports.inbound.objects_handler import ObjectsHandlerPort @@ -74,6 +76,7 @@ async def prepare_rest_app( config: Config, docs_handler_override: DocsHandlerPort | None = None, objects_handler_override: ObjectsHandlerPort | None = None, + events_handler_override: EventsHandlerPort | None = None, ) -> AsyncGenerator[FastAPI, None]: """Construct and initialize a REST API app along with all its dependencies. By default, the core dependencies are automatically prepared but you can also @@ -86,6 +89,13 @@ async def prepare_rest_app( ) app.dependency_overrides[dummies.objects_handler_port] = lambda: objects_handler + events_handler = ( + events_handler_override + if events_handler_override + else EventsHandler(config=config) + ) + app.dependency_overrides[dummies.events_handler_port] = lambda: events_handler + async with prepare_docs_handler_with_override( config=config, docs_handler_override=docs_handler_override ) as docs_handler: diff --git a/src/sms/ports/inbound/events_handler.py b/src/sms/ports/inbound/events_handler.py new file mode 100644 index 0000000..736d7e8 --- /dev/null +++ b/src/sms/ports/inbound/events_handler.py @@ -0,0 +1,33 @@ +# Copyright 2021 - 2024 Universität Tübingen, DKFZ, EMBL, and Universität zu Köln +# for the German Human Genome-Phenome Archive (GHGA) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Defines the API of a class that interfaces between inbound requests and kafka.""" + +from abc import ABC, abstractmethod + + +class EventsHandlerPort(ABC): + """A class to manage the state of kafka events.""" + + @abstractmethod + async def clear_topics( + self, *, topics: str | list[str], exclude_internal: bool = True + ): + """Clear messages from given topic(s). + + If no topics are specified, all topics will be cleared, except internal topics + unless otherwise specified. + """ + ...