Skip to content

Commit

Permalink
fix: Keep strong reference to background tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
tomasfarias committed Nov 10, 2023
1 parent 4e4c222 commit 98cf769
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 11 deletions.
13 changes: 6 additions & 7 deletions posthog/temporal/tests/batch_exports/test_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
TRUNCATE_LOG_ENTRIES_TABLE_SQL,
)
from posthog.kafka_client.topics import KAFKA_LOG_ENTRIES
from posthog.temporal.workflows.logger import bind_batch_exports_logger, configure_logger
from posthog.temporal.workflows.logger import BACKGROUND_LOGGER_TASKS, bind_batch_exports_logger, configure_logger

pytestmark = pytest.mark.asyncio

Expand Down Expand Up @@ -138,16 +138,15 @@ async def configure(log_capture, queue, producer):
* Set the queue and producer to capture messages sent.
* Do not cache logger to ensure each test starts clean.
"""
tasks = configure_logger(
extra_processors=[log_capture], queue=queue, producer=producer, cache_logger_on_first_use=False
)
yield tasks
configure_logger(extra_processors=[log_capture], queue=queue, producer=producer, cache_logger_on_first_use=False)

yield

for task in tasks:
for task in BACKGROUND_LOGGER_TASKS:
# Clean up logger tasks to avoid leaking/warnings.
task.cancel()

await asyncio.wait(tasks)
await asyncio.wait(BACKGROUND_LOGGER_TASKS)


async def test_batch_exports_logger_binds_context(log_capture):
Expand Down
22 changes: 18 additions & 4 deletions posthog/temporal/workflows/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@

from posthog.kafka_client.topics import KAFKA_LOG_ENTRIES

BACKGROUND_LOGGER_TASKS = set()


async def bind_batch_exports_logger(team_id: int, destination: str | None = None) -> FilteringBoundLogger:
"""Return a bound logger for BatchExports."""
Expand All @@ -30,7 +32,7 @@ def configure_logger(
queue: asyncio.Queue | None = None,
producer: aiokafka.AIOKafkaProducer | None = None,
cache_logger_on_first_use: bool = True,
) -> tuple:
) -> None:
"""Configure a StructLog logger for batch exports.
Configuring the logger involves:
Expand Down Expand Up @@ -65,7 +67,8 @@ def configure_logger(
logger_factory=logger_factory(),
cache_logger_on_first_use=cache_logger_on_first_use,
)
listen_task = asyncio.create_task(

listen_task = create_logger_background_task(
KafkaLogProducerFromQueue(queue=log_queue, topic=KAFKA_LOG_ENTRIES, producer=producer).listen()
)

Expand All @@ -82,9 +85,20 @@ async def worker_shutdown_handler():

await asyncio.wait([listen_task])

worker_shutdown_handler_task = asyncio.create_task(worker_shutdown_handler())
create_logger_background_task(worker_shutdown_handler())


def create_logger_background_task(task) -> asyncio.Task:
"""Create an asyncio.Task and add them to BACKGROUND_LOGGER_TASKS.
Adding them to BACKGROUND_LOGGER_TASKS keeps a strong reference to the task, so they won't
be garbage collected and disappear mid execution.
"""
new_task = asyncio.create_task(task)
BACKGROUND_LOGGER_TASKS.add(new_task)
new_task.add_done_callback(BACKGROUND_LOGGER_TASKS.discard)

return (listen_task, worker_shutdown_handler_task)
return new_task


class PutInBatchExportsLogQueueProcessor:
Expand Down

0 comments on commit 98cf769

Please sign in to comment.