Skip to content

Commit

Permalink
chore: Extend docstrings
Browse files Browse the repository at this point in the history
  • Loading branch information
tomasfarias committed Nov 2, 2023
1 parent 52adedd commit 6c60f36
Showing 1 changed file with 21 additions and 3 deletions.
24 changes: 21 additions & 3 deletions posthog/temporal/workflows/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
async def bind_batch_exports_logger(
team_id: int, destination: str | None = None
) -> structlog.types.FilteringBoundLogger:
"""Return a logger for BatchExports."""
"""Return a bound logger for BatchExports."""
if not structlog.is_configured():
await configure_logger()

Expand All @@ -32,9 +32,18 @@ async def configure_logger(
) -> tuple:
"""Configure a StructLog logger for batch exports.
Configuring the logger involves:
* Setting up processors.
* Spawning a task to listen for Kafka logs.
* Spawning a task to shutdown gracefully on worker shutdown.
Args:
logger_factory: Optionally, override the logger_factory.
extra_processors: Optionally, add any processors at the end of the chain.
queue: Optionally, bring your own log queue.
producer: Optionally, bring your own Kafka producer.
cache_logger_on_first_use: Set whether to cache logger for performance.
Should always be True except in tests.
"""
log_queue = queue if queue is not None else asyncio.Queue(maxsize=-1)
put_in_queue = PutInBatchExportsLogQueueProcessor(log_queue)
Expand Down Expand Up @@ -82,7 +91,7 @@ async def worker_shutdown_handler():
class PutInBatchExportsLogQueueProcessor:
"""A StructLog processor that puts event_dict into a queue.
The idea is that any event_dicts can be processed later by any queue listeners.
We format event_dict as a message to be sent to Kafka by a queue listener.
"""

def __init__(self, queue: asyncio.Queue):
Expand All @@ -91,6 +100,11 @@ def __init__(self, queue: asyncio.Queue):
def __call__(
self, logger: logging.Logger, method_name: str, event_dict: structlog.types.EventDict
) -> structlog.types.EventDict:
"""Put a message into the queue, if we have all the necessary details.
Always return event_dict so that processors that come later in the chain can do
their own thing.
"""
try:
message_dict = {
"instance_id": event_dict["workflow_run_id"],
Expand Down Expand Up @@ -234,7 +248,11 @@ def __init__(
)

async def listen_and_produce(self):
"""Listen to messages in queue and produce them to Kafka as they come."""
"""Listen to messages in queue and produce them to Kafka as they come.
This is designed to be ran as an asyncio.Task, as it will wait forever for the queue
to have messages.
"""
await self.producer.start()

try:
Expand Down

0 comments on commit 6c60f36

Please sign in to comment.