diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 660357f..e05a884 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -45,12 +45,18 @@ jobs: steps: - uses: actions/checkout@v4 + - name: Start Kafka + run: docker compose -f kafka-compose.yaml up -d + - name: Run tox uses: lsst-sqre/run-tox@v1 with: python-version: ${{ matrix.python }} tox-envs: "py,coverage-report,typing" + - name: Stop Kafka + run: docker compose -f kafka-compose.yaml down + build: runs-on: ubuntu-latest needs: [lint, test] diff --git a/kafka-compose.yaml b/kafka-compose.yaml new file mode 100644 index 0000000..f335e56 --- /dev/null +++ b/kafka-compose.yaml @@ -0,0 +1,55 @@ +# From https://github.com/conduktor/kafka-stack-docker-compose/blob/master/zk-single-kafka-single.yml +# +# docker compose -f kafka-compose.yaml up +# docker compose -f kafka-compose.yaml down +# +# nox -s test is configured to use this Kafka broker + +version: '2.1' + +services: + zoo1: + image: confluentinc/cp-zookeeper:7.3.2 + hostname: zoo1 + container_name: zoo1 + ports: + - "2181:2181" + environment: + ZOOKEEPER_CLIENT_PORT: 2181 + ZOOKEEPER_SERVER_ID: 1 + ZOOKEEPER_SERVERS: zoo1:2888:3888 + + kafka1: + image: confluentinc/cp-kafka:7.3.2 + hostname: kafka1 + container_name: kafka1 + ports: + - "9092:9092" + - "29092:29092" + - "9999:9999" + environment: + KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka1:19092,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092,DOCKER://host.docker.internal:29092 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,DOCKER:PLAINTEXT + KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL + KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181" + KAFKA_BROKER_ID: 1 + KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO" + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 + KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 + KAFKA_JMX_PORT: 9999 + KAFKA_JMX_HOSTNAME: ${DOCKER_HOST_IP:-127.0.0.1} + KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.authorizer.AclAuthorizer + KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true" + depends_on: + - zoo1 + + kafdrop: + image: obsidiandynamics/kafdrop + restart: "no" + ports: + - "9000:9000" + environment: + KAFKA_BROKERCONNECT: "kafka1:19092" + depends_on: + - "kafka1" diff --git a/src/unfurlbot/config.py b/src/unfurlbot/config.py index c0616cf..80c2eb1 100644 --- a/src/unfurlbot/config.py +++ b/src/unfurlbot/config.py @@ -215,6 +215,10 @@ class Config(BaseSettings): slack_app_id: str = Field(title="Slack app ID") + consumer_group_id: str = Field( + "unfurlbot", title="Kafka consumer group ID" + ) + app_mention_topic: str = Field( "squarebot.app_mention", title="app_mention Kafka topic", diff --git a/src/unfurlbot/handlers/kafka.py b/src/unfurlbot/handlers/kafka.py new file mode 100644 index 0000000..eeabdc9 --- /dev/null +++ b/src/unfurlbot/handlers/kafka.py @@ -0,0 +1,40 @@ +"""Kafka router and consumers.""" + +from __future__ import annotations + +from faststream.kafka import KafkaMessage +from faststream.kafka.fastapi import KafkaRouter +from faststream.security import BaseSecurity +from rubin.squarebot.models.kafka import SquarebotSlackMessageValue +from structlog import get_logger + +from ..config import config + +__all__ = ["kafka_router", "handle_slack_message"] + + +kafka_security = BaseSecurity(ssl_context=config.kafka.ssl_context) +kafka_router = KafkaRouter( + config.kafka.bootstrap_servers, security=kafka_security +) + + +@kafka_router.subscriber( + config.message_channels_topic, + config.message_groups_topic, + config.message_im_topic, + config.message_mpim_topic, + group_id=config.consumer_group_id, +) +async def handle_slack_message( + slack_message: SquarebotSlackMessageValue, kafka_message: KafkaMessage +) -> None: + """Handle a Slack message.""" + logger = get_logger( + kafka={ + "topic": kafka_message.topic, # type: ignore [attr-defined] + "partition": kafka_message.partition, # type: ignore [attr-defined] + "offset": kafka_message.offset, # type: ignore [attr-defined] + } + ) + logger.debug("Received a Slack message", text=slack_message.text) diff --git a/src/unfurlbot/main.py b/src/unfurlbot/main.py index fa1170c..69792ca 100644 --- a/src/unfurlbot/main.py +++ b/src/unfurlbot/main.py @@ -15,9 +15,11 @@ from safir.dependencies.http_client import http_client_dependency from safir.logging import configure_logging, configure_uvicorn_logging from safir.middleware.x_forwarded import XForwardedMiddleware +from structlog import get_logger from .config import config from .handlers.internal import internal_router +from .handlers.kafka import kafka_router __all__ = ["app", "config"] @@ -26,8 +28,11 @@ async def lifespan(app: FastAPI) -> AsyncIterator[None]: """Set up and tear down the application.""" # Any code here will be run when the application starts up. + logger = get_logger() - yield + async with kafka_router.lifespan_context(app): + logger.info("Unfurlbot start up complete.") + yield # Any code here will be run when the application shuts down. await http_client_dependency.aclose() @@ -53,6 +58,7 @@ async def lifespan(app: FastAPI) -> AsyncIterator[None]: # Attach the routers. app.include_router(internal_router) +app.include_router(kafka_router) # Add middleware. app.add_middleware(XForwardedMiddleware) diff --git a/tox.ini b/tox.ini index 28794ce..5ecb461 100644 --- a/tox.ini +++ b/tox.ini @@ -13,7 +13,7 @@ setenv = KAFKA_SECURITY_PROTOCOL = PLAINTEXT UNFURLBOT_SLACK_SIGNING = 1234 UNFURLBOT_SLACK_TOKEN = 1234 - UNFURLBOT_SLACK_APP_ID = 1234l + UNFURLBOT_SLACK_APP_ID = 1234 commands = pytest --cov=unfurlbot --cov-branch --cov-report= {posargs}