-
Notifications
You must be signed in to change notification settings - Fork 1.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
chore(batch-exports): add some Prometheus metrics for batch exports #18467
Conversation
6d10ea5
to
edefc83
Compare
@@ -63,6 +66,9 @@ def handle(self, *args, **options): | |||
options["client_key"] = "--SECRET--" | |||
logging.info(f"Starting Temporal Worker with options: {options}") | |||
|
|||
port = int(os.environ.get("PROMETHEUS_METRICS_EXPORT_PORT", 8001)) | |||
start_http_server(port=port) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tested this out locally and it works as expected (it runs the metrics server in another thread, so it shouldn't mess with async or anything else). I'll need to wire up the scrapers once we deploy this, of course.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Even if running on a separate thread, if the blocking IO doesn't release the GIL we would still be blocking the event loop. But, if it's working in tests then the IO should be playing nice and releasing the GIL 🤝
edefc83
to
4e85e54
Compare
Lots of good cleanups! Thank you! |
@@ -63,6 +66,9 @@ def handle(self, *args, **options): | |||
options["client_key"] = "--SECRET--" | |||
logging.info(f"Starting Temporal Worker with options: {options}") | |||
|
|||
port = int(os.environ.get("PROMETHEUS_METRICS_EXPORT_PORT", 8001)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Couldn't this be a setting? Mostly for consistency with the rest of the arguments which come from settings.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, good point. I cargo culted it from the gunicorn config, but in retrospect it's probably doing it this way because Django stuff isn't available yet.
@@ -707,6 +717,7 @@ async def update_batch_export_backfill_model_status(inputs: UpdateBatchExportBac | |||
|
|||
|
|||
async def execute_batch_export_insert_activity( | |||
destination: str, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: You could probably get this from the Workflow context: temporalio.workflow.info().workflow_type
. But this is fine.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
workflow_type
Ahh, I was trying to find something like this. I'll try.
# It would be nice to record BYTES_EXPORTED for Redshift, but it's not worth estimating | ||
# the byte size of each batch the way things are currently written. We can revisit this | ||
# in the future if we decide it's useful enough. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, Redshift is kind of limited as it doesn't support COPY-ing from local files, only from S3 😭. If/when we add COPY from S3 we can start tracking bytes too.
def flush_to_snowflake(lrf: tempfile._TemporaryFileWrapper, rows_in_file: int): | ||
lrf.flush() | ||
put_file_to_snowflake_table(cursor, lrf.name, inputs.table_name) | ||
ROWS_EXPORTED.labels(destination="snowflake").inc(rows_in_file) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have a PR in draft to add BatchExportTemporaryFile to snowflake batch exports: #18427.
This means that tracking can be the same as the other destinations instead of needing a separate rows_in_file
.
But we can merge this first, and I'll rebase 👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me, a couple nitty comments only.@bretthoerner Have you been able to run manual tests for BigQuery/Redshift/S3? I'd say we do that just in case, but overall good to go from my point of view.
No, but I'll try that this morning before any merge. |
4e85e54
to
48148dd
Compare
Problem
We lack observability of batch export jobs.
Changes
Add some Prometheus metrics (and do some minor tweaks I noticed while I was reading code).
👉 Stay up-to-date with PostHog coding conventions for a smoother review.
How did you test this code?
There should be no functional changes, so existing tests should continue to work.