-
Notifications
You must be signed in to change notification settings - Fork 1.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
refactor: Structlog batch exports logging #18458
Conversation
1838321
to
288a97c
Compare
@@ -83,7 +83,7 @@ runs: | |||
- uses: syphar/restore-virtualenv@v1 | |||
id: cache-backend-tests | |||
with: | |||
custom_cache_key_element: v1 | |||
custom_cache_key_element: v2 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if this is necessary, but CI was insisting on re-using a virtual environment with an old version of structlog and hogql_parser.
|
||
yield dataset | ||
|
||
bigquery_client.delete_dataset(dataset_id, delete_contents=True, not_found_ok=True) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of hardcoding the test dataset, I rather create a new one and then clean it up.
def producer(self) -> aiokafka.AIOKafkaProducer: | ||
if self._producer is None: | ||
self._producer = aiokafka.AIOKafkaProducer( | ||
bootstrap_servers=settings.KAFKA_HOSTS + ["localhost:9092"], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not including extra hosts here was making CI tests fail due to a connection error with kafka 🤷.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Huh, weird, it should be ["kafka:9092"]
which works in CI for other Kafka clients.
288a97c
to
4189919
Compare
Rebased on master to resolve conflicts, will keep an eye out for tests failing. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks great, I mostly have some async related questions and one worry about the Kafka producer dying.
def flush_to_bigquery(): | ||
logger.info( | ||
"Copying %s records of size %s bytes to BigQuery", | ||
async def flush_to_bigquery(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's interesting to me that this became async
but nothing it await
ed? Is the new logger async, and if so is it OK that we don't await
? (Sorry, I'm still adjusting to Python async.) I see a similar thing happened elsewhere in other destination implementations.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably an artifact of rebasing, this doesn't need to be async.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Once we migrate bigquery to async probably, for now, I'll remove it. Changed my mind, kept it to do async logging in case any handlers happen to block.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The logger is not async (structlog has async methods but they just wrap the log call in a thread, and you pay the overhead of doing that, which doesn't seem worth it especially since the queue's put_nowait
method returns immediately anyways).
That being said, the queue listener that produces logs to kafka, is async, and will release the event loop while it's waiting for work (logs) to arrive on the queue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean, we might as well use the async log methods. There is some overhead from spawning the thread like I said, but it means we can release the event loop while printing logs, and we never know when a handler might block. This just makes it safer and overall we are IO bound anyways to care much about the thread spawn overhead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, I tried this again and remembered why we cannot use the async logging methods (e.g. logger.ainfo
, logger.adebug
): Temporal runs on its own custom event loop that disallows a lot of thread related methods (like run_in_executor
, to_thread
), which the logging library uses.
So, for now, going back to sync logging, and I've removed this await. This shouldn't block more than the current sync handlers anyways...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, okay: I did some more digging. The Workflow is running in a custom event loop. However, activities run in the normal asyncio loop. So, we can do async logging in activities, I was just seeing failures from using async logging everywhere.
This PR is getting quite long, so if it's okay with you I'll revisit async logging as we move each destination to async. We have to work on unblocking everywhere for BigQuery, might as well add logging to the list.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good to me.
posthog/temporal/workflows/logger.py
Outdated
async def bind_batch_exports_logger(team_id: int, destination: str | None = None) -> FilteringBoundLogger: | ||
"""Return a bound logger for BatchExports.""" | ||
if not structlog.is_configured(): | ||
await configure_logger() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Couldn't this be called concurrently while another task is waiting on configure_logger
to complete? I wouldn't worry about the race so much normally, but doing things like creating multiple listen_task
s makes me a bit nervous. In JS I'd stash the configure promise so any future caller would await
the same instance.
Maybe I'm missing something about Python async though?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, I guess leaving these promises dangling is cool in Python? (Related to the other async question I asked about loggers, I guess.)
return (listen_task, worker_shutdown_handler_task)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Aah, good catch, yeah, we should keep a reference to these tasks as they could be gc'd otherwise. The crab compiler would have probably warned us about this one!
I've added a BACKGROUND_LOGGER_TASKS
set to hold references to these tasks. This way, they won't be garbage collected under us.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Couldn't this be called concurrently while another task is waiting on configure_logger to complete?
Yeah, but configure_logger
is actually all sync. I probably got confused because it's spawning an async task, but not async itself. So, we can just make this function sync and get rid of any potential race conditions.
try: | ||
while True: | ||
msg = await self.queue.get() | ||
await self.produce(msg) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So if the Brokers are unhealthy (or something) and this throws, it seems like our Kafka listen task is forever dead and there is recovery or shutdown?
And if so, I guess we'd lose logs and slowly OOM via the asyncio.Queue(maxsize=-1)
?
I don't think it needs to be super robust or perfect but it does make me a little nervous that there's no real recovery (or I'm missing it!).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, fair point, we can do the same as the current approach and catch exceptions so, worst case, at least we will continue to get from the queue and we won't OOM. It means we will miss some logs, but I think we can live with that (at least for now).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps we should also catch any errors when initializing the producer, just in case kafka is unhealthy right as we are coming up.
EDIT: I did just that. Won't put any logs to the queue if the producer fails to start for whatever reason.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for making our code more reliable!
def producer(self) -> aiokafka.AIOKafkaProducer: | ||
if self._producer is None: | ||
self._producer = aiokafka.AIOKafkaProducer( | ||
bootstrap_servers=settings.KAFKA_HOSTS + ["localhost:9092"], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Huh, weird, it should be ["kafka:9092"]
which works in CI for other Kafka clients.
909137e
to
98cf769
Compare
7c2d28c
to
28ca083
Compare
28ca083
to
403c87d
Compare
Problem
The existing logging implementation for batch exports is a bit clunky:
"batch_exports"
, but it would be nice to distinguish from backfills.Changes
structlog
is already a dependency from thedjango-structlog
requirement, so nothing new here, but I did bump the version, which should be supported). This allows us to set context variables (liketeam_id
anddestination
) once and for all.Overall, the logging system now looks easier to understand and, eventually, extend. The line diff may be deceptive as a lot of it is in docstrings to aid with understanding and more than half is in unit testing (which we didn't properly have much of).
👉 Stay up-to-date with PostHog coding conventions for a smoother review.
How did you test this code?
Added unit tests.