Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Start tracking records exported #21008

Merged
merged 11 commits into from
Mar 20, 2024

Conversation

tomasfarias
Copy link
Contributor

@tomasfarias tomasfarias commented Mar 19, 2024

Problem

Not sure why we weren't doing this already. Anyways, some test case may fail bug squashed, there should be no failures. Also, BigQuery tests are passing when running manually.

Changes

Pass along BatchExportTemporaryFile.records_total to update_batch_export_run_status and use it to update the Django model's records_completed.

👉 Stay up-to-date with PostHog coding conventions for a smoother review.

Does this work well for both Cloud and self-hosted?

Works anywhere that can run batch exports.

How did you test this code?

Added assertions on records_completed to a handful of tests.

@tiina303 tiina303 requested a review from bretthoerner March 19, 2024 16:24
Comment on lines 221 to +222
rows_exported.add(len(batch))
total_rows_exported += len(batch)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if we could just read the counter instead of having this duplicated.

Comment on lines +441 to +443
def update_batch_export_run_status(
run_id: UUID, status: str, latest_error: str | None, records_completed: int = 0
) -> BatchExportRun:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could just be named update_batch_export_run as it's doing more than setting the status.

Copy link
Contributor Author

@tomasfarias tomasfarias Mar 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On a broader note, I think we should get rid of the ORM (in batch exports) and move to something like aiosql.

Copy link
Contributor

@bretthoerner bretthoerner left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I keep kicking the tests, they are hanging on Temporal starting and other things. May be related to change to use Depot?

Either way, this looks fine. I have some question about style (using vars outside of the context manager that created them). I cut my teeth on Python but I find it to be so, so ugly now. 🙃

@@ -354,6 +354,8 @@ async def flush_to_bigquery(bigquery_table, table_schema):

jsonl_file.reset()

return jsonl_file.records_total
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is de-dented outside of the with BatchExportTemporaryFile() as jsonl_file:, while it may work (which surprises me, but Python loves to leak things from scope) I think we should put it inside?

Copy link
Contributor Author

@tomasfarias tomasfarias Mar 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, with statements do not introduce start a new scope. I am fine with de-denting this though, and I do plan to address it more in future PRs.

posthog/temporal/batch_exports/postgres_batch_export.py Outdated Show resolved Hide resolved
posthog/temporal/batch_exports/bigquery_batch_export.py Outdated Show resolved Hide resolved
@@ -503,6 +503,8 @@ async def flush_to_s3(last_uploaded_part_timestamp: str, last=False):

await s3_upload.complete()

return local_results_file.records_total
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also de-dented, but it competes with the s3_upload which happens at an outer scope...

Maybe I'm being un-Pythonic? The use of important information outside of the with gives me the creeps, but maybe this is fine and good. I guess it's fine as long as __exit__ leaves the state we need?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

await s3_upload.complete() should be in the __exit__ of S3MultiPartUpload. I took it out as I was going crazy trying to debug a batch export completing with an extra number of parts, so I tried being explicit to throw some clarity at the problem.

In the end, I think I randomly fixed the bug in another PR when I wasn't looking for it. Anyways, this remained outside of the __exit__ and should be re-added in as it wasn't the cause of the bug.

Copy link
Contributor Author

@tomasfarias tomasfarias Mar 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When we move it we do have to account for exceptions: We don't want to complete the upload in case of an exception and we have to be very careful if we are aborting, as a retry could continue the upload. Maybe the solution is to move this outside of the activity and in the workflow, as starting, completing, or aborting an upload should never fail except for wrong credentials, which we can very precisely wrap in a try/except.

I can open a follow-up PR to deal with this.

@tomasfarias
Copy link
Contributor Author

I keep kicking the tests, they are hanging on Temporal starting and other things. May be related to change to use Depot?

Not entirely sure, they've been very flaky in this and other PRs. Eventually they pass, but with the snapshot bot adding commits all the progress is reset 🙃

@tomasfarias
Copy link
Contributor Author

Either way, this looks fine. I have some question about style (using vars outside of the context manager that created them). I cut my teeth on Python but I find it to be so, so ugly now. 🙃

Fair point. I think this has to do with our temporary file doing two things: Being a file and a writer to a file. The writer itself outlives the context of the file, and should be able to report how many records it wrote. I am making this distinction explicit to support file formats (parquet for S3, wip in #20979), as in that case I need different writers to deal with the different formats. That PR should already make things better as the writer is defined outside the context manager, I think.

@tomasfarias tomasfarias merged commit 53355af into master Mar 20, 2024
100 of 101 checks passed
@tomasfarias tomasfarias deleted the feat/start-tracking-records-exported branch March 20, 2024 10:23
Copy link

sentry-io bot commented Mar 20, 2024

Suspect Issues

This pull request was deployed and Sentry observed the following issues:

  • ‼️ ActivityError: Activity task failed temporalio.worker._workflow_instance in run_act... View Issue
  • ‼️ ActivityError: Activity task failed temporalio.worker._workflow_instance in run_act... View Issue
  • ‼️ ActivityError: Activity task failed temporalio.worker._workflow_instance in run_act... View Issue
  • ‼️ ActivityError: Activity task failed temporalio.worker._workflow_instance in run_act... View Issue
  • ‼️ InterfaceError: connection already closed django.db.backends.postgresql.base in create_cu... View Issue

Did you find this useful? React with a 👍 or 👎

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants