Skip to content

Commit

Permalink
Set the logger context from FastStream context
Browse files Browse the repository at this point in the history
In the ConsumerContextDependency, use the FastStream context to get the
original message, including the aiokafka ConsumerRecord.
  • Loading branch information
jonathansick committed Feb 26, 2024
1 parent 4e16f35 commit 6d0ce19
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 14 deletions.
22 changes: 19 additions & 3 deletions src/unfurlbot/dependencies/consumercontext.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
from dataclasses import dataclass
from typing import Any

from aiokafka import ConsumerRecord
from faststream import context
from faststream.kafka.fastapi import KafkaMessage
from structlog import get_logger
from structlog.stdlib import BoundLogger

Expand All @@ -21,6 +24,9 @@ class ConsumerContext:
factory: Factory
"""Factory for creating services."""

record: ConsumerRecord | None = None
"""The Kafka record being processed."""

def rebind_logger(self, **values: Any) -> None:
"""Add the given values to the logging context.
Expand All @@ -45,11 +51,21 @@ class ConsumerContextDependency:
def __init__(self) -> None:
self._process_context: ProcessContext | None = None

async def __call__(
self,
) -> ConsumerContext:
async def __call__(self) -> ConsumerContext:
"""Create a per-request context and return it."""
# Get the message from the FastStream context
message: KafkaMessage = context.get_local("message")
record = message.raw_message

# Add the Kafka context to the logger
logger = get_logger(__name__) # eventually use a logger dependency
kafka_context = {
"topic": record.topic,
"offset": record.offset,
"partition": record.partition,
}
logger = logger.bind(kafka=kafka_context)

return ConsumerContext(
logger=logger,
factory=Factory(
Expand Down
14 changes: 3 additions & 11 deletions src/unfurlbot/handlers/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from typing import Annotated

from fastapi import Depends
from faststream.kafka.fastapi import KafkaMessage, KafkaRouter
from faststream.kafka.fastapi import KafkaRouter
from faststream.security import BaseSecurity
from rubin.squarebot.models.kafka import SquarebotSlackMessageValue
from structlog import get_logger
Expand Down Expand Up @@ -36,18 +36,10 @@
)
async def handle_slack_message(
message: SquarebotSlackMessageValue,
msg: KafkaMessage,
ctx: Annotated[ConsumerContext, Depends(consumer_context_dependency)],
context: Annotated[ConsumerContext, Depends(consumer_context_dependency)],
) -> None:
"""Handle a Slack message."""
record = msg.raw_message
kafka_context = {
"topic": record.topic,
"offset": record.offset,
"partition": record.partition,
}
ctx.rebind_logger(kafka=kafka_context)
logger = ctx.logger
logger = context.logger

logger.info(
"Slack message text",
Expand Down

0 comments on commit 6d0ce19

Please sign in to comment.