Skip to content

Commit

Permalink
Set up a Kafka consumer for Slack messages
Browse files Browse the repository at this point in the history
For testing, set up a local Kafka broker with Docker compose.
  • Loading branch information
jonathansick committed Feb 15, 2024
1 parent 5c87555 commit f7fb7a1
Show file tree
Hide file tree
Showing 6 changed files with 113 additions and 2 deletions.
6 changes: 6 additions & 0 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,18 @@ jobs:
steps:
- uses: actions/checkout@v4

- name: Start Kafka
run: docker compose -f kafka-compose.yaml up -d

- name: Run tox
uses: lsst-sqre/run-tox@v1
with:
python-version: ${{ matrix.python }}
tox-envs: "py,coverage-report,typing"

- name: Stop Kafka
run: docker compose -f kafka-compose.yaml down

build:
runs-on: ubuntu-latest
needs: [lint, test]
Expand Down
55 changes: 55 additions & 0 deletions kafka-compose.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
# From https://github.com/conduktor/kafka-stack-docker-compose/blob/master/zk-single-kafka-single.yml
#
# docker compose -f kafka-compose.yaml up
# docker compose -f kafka-compose.yaml down
#
# nox -s test is configured to use this Kafka broker

version: '2.1'

services:
zoo1:
image: confluentinc/cp-zookeeper:7.3.2
hostname: zoo1
container_name: zoo1
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_SERVER_ID: 1
ZOOKEEPER_SERVERS: zoo1:2888:3888

kafka1:
image: confluentinc/cp-kafka:7.3.2
hostname: kafka1
container_name: kafka1
ports:
- "9092:9092"
- "29092:29092"
- "9999:9999"
environment:
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka1:19092,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092,DOCKER://host.docker.internal:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,DOCKER:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
KAFKA_BROKER_ID: 1
KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_JMX_PORT: 9999
KAFKA_JMX_HOSTNAME: ${DOCKER_HOST_IP:-127.0.0.1}
KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.authorizer.AclAuthorizer
KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true"
depends_on:
- zoo1

kafdrop:
image: obsidiandynamics/kafdrop
restart: "no"
ports:
- "9000:9000"
environment:
KAFKA_BROKERCONNECT: "kafka1:19092"
depends_on:
- "kafka1"
4 changes: 4 additions & 0 deletions src/unfurlbot/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,10 @@ class Config(BaseSettings):

slack_app_id: str = Field(title="Slack app ID")

consumer_group_id: str = Field(
"unfurlbot", title="Kafka consumer group ID"
)

app_mention_topic: str = Field(
"squarebot.app_mention",
title="app_mention Kafka topic",
Expand Down
40 changes: 40 additions & 0 deletions src/unfurlbot/handlers/kafka.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
"""Kafka router and consumers."""

from __future__ import annotations

from faststream.kafka import KafkaMessage
from faststream.kafka.fastapi import KafkaRouter
from faststream.security import BaseSecurity
from rubin.squarebot.models.kafka import SquarebotSlackMessageValue
from structlog import get_logger

from ..config import config

__all__ = ["kafka_router", "handle_slack_message"]


kafka_security = BaseSecurity(ssl_context=config.kafka.ssl_context)
kafka_router = KafkaRouter(
config.kafka.bootstrap_servers, security=kafka_security
)


@kafka_router.subscriber(
config.message_channels_topic,
config.message_groups_topic,
config.message_im_topic,
config.message_mpim_topic,
group_id=config.consumer_group_id,
)
async def handle_slack_message(
slack_message: SquarebotSlackMessageValue, kafka_message: KafkaMessage
) -> None:
"""Handle a Slack message."""
logger = get_logger(
kafka={
"topic": kafka_message.topic, # type: ignore [attr-defined]
"partition": kafka_message.partition, # type: ignore [attr-defined]
"offset": kafka_message.offset, # type: ignore [attr-defined]
}
)
logger.debug("Received a Slack message", text=slack_message.text)
8 changes: 7 additions & 1 deletion src/unfurlbot/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
from safir.dependencies.http_client import http_client_dependency
from safir.logging import configure_logging, configure_uvicorn_logging
from safir.middleware.x_forwarded import XForwardedMiddleware
from structlog import get_logger

from .config import config
from .handlers.internal import internal_router
from .handlers.kafka import kafka_router

__all__ = ["app", "config"]

Expand All @@ -26,8 +28,11 @@
async def lifespan(app: FastAPI) -> AsyncIterator[None]:
"""Set up and tear down the application."""
# Any code here will be run when the application starts up.
logger = get_logger()

yield
async with kafka_router.lifespan_context(app):
logger.info("Unfurlbot start up complete.")
yield

# Any code here will be run when the application shuts down.
await http_client_dependency.aclose()
Expand All @@ -53,6 +58,7 @@ async def lifespan(app: FastAPI) -> AsyncIterator[None]:

# Attach the routers.
app.include_router(internal_router)
app.include_router(kafka_router)

# Add middleware.
app.add_middleware(XForwardedMiddleware)
2 changes: 1 addition & 1 deletion tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ setenv =
KAFKA_SECURITY_PROTOCOL = PLAINTEXT
UNFURLBOT_SLACK_SIGNING = 1234
UNFURLBOT_SLACK_TOKEN = 1234
UNFURLBOT_SLACK_APP_ID = 1234l
UNFURLBOT_SLACK_APP_ID = 1234
commands =
pytest --cov=unfurlbot --cov-branch --cov-report= {posargs}

Expand Down

0 comments on commit f7fb7a1

Please sign in to comment.