Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: S3 batch export with SPMC abstractions #26577

Merged
merged 7 commits into from
Dec 4, 2024

Conversation

tomasfarias
Copy link
Contributor

@tomasfarias tomasfarias commented Dec 2, 2024

Problem

Builds on top of #26575 to refactor S3 batch export to use fully-async SPMC abstractions. Big part of the work was done in the linked PR, so there isn't much to do here.

Changes

  1. Implement new S3Consumer class
  2. Move flush function to flush method of S3Consumer.
  3. Call S3Consumer with run_consumer_loop in S3 batch export.
  4. Implement the new events_recent view in spmc, as that was missing from my other PR.

The implementation does have a potential concurrency conflict when computing the next_part_number that I think to have resolved by using a new pending_parts.

👉 Stay up-to-date with PostHog coding conventions for a smoother review.

Does this work well for both Cloud and self-hosted?

How did you test this code?

@tomasfarias tomasfarias changed the title feat: Add single-producer multiple-consumer module for batch exprots refactor: S3 batch export with SPMC abstractions Dec 2, 2024
@tomasfarias tomasfarias marked this pull request as ready for review December 3, 2024 10:30
@@ -199,6 +211,7 @@ def __init__(
self.kms_key_id = kms_key_id
self.upload_id: str | None = None
self.parts: list[Part] = []
self.pending_parts: list[Part] = []
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without this, we could run into concurrency problems: As we only append a part to self.parts after it has been sent. Imagine the following (very realistic) sequence of events with just 2 consumers:

  1. Consumer 1 starts, gets assigned part 1 by self.part_number+1 as self.parts is empty.
  2. Consumer 1 relinquishes control back to main loop on awaiting self.upload_part_retryable.
  3. Consumer 2 starts, gets assigned part 1 by self.part_number+1 as self.parts is still empty.
  4. Consumer 2 relinquishes control back to main loop on awaiting self.upload_part_retryable.
  5. Consumer 1 resumes, appends part 1 to self.parts, and finishes.
  6. Consumer 2 resumes, appends part 1 to self.parts, and finishes.

We have ended with two parts with part number 1, which means whichever consumer uploaded last would have overridden the other in the upload.

With the addition of self.pending_parts we can "lock" any part numbers in progress so that they don't get repeated when doing self.part_number+1.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for the explanation 👍

@tomasfarias tomasfarias requested a review from rossgray December 3, 2024 10:37
Copy link
Contributor

@rossgray rossgray left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM :shipit:

Base automatically changed from refactor/spmc-abstractions to master December 4, 2024 10:28
@tomasfarias tomasfarias force-pushed the refactor/s3-batch-export-with-spmc-abstractions branch from 1b49560 to c6534c8 Compare December 4, 2024 10:37
@tomasfarias tomasfarias merged commit 25f0eac into master Dec 4, 2024
89 checks passed
@tomasfarias tomasfarias deleted the refactor/s3-batch-export-with-spmc-abstractions branch December 4, 2024 15:23
Copy link

sentry-io bot commented Dec 4, 2024

Suspect Issues

This pull request was deployed and Sentry observed the following issues:

  • ‼️ RecordBatchTaskError: The record batch consumer encountered an error during execution posthog.temporal.batch_exports.spmc in raise_on... View Issue
  • ‼️ RecordBatchConsumerRetryableExceptionGroup: At least one unhandled retryable errors in a RecordBatch consumer TaskGroup posthog.temporal.batch_exports.spmc in run_cons... View Issue
  • ‼️ RecordBatchTaskError: The record batch consumer encountered an error during execution posthog.temporal.batch_exports.spmc in raise_on... View Issue
  • ‼️ RecordBatchTaskError: The record batch consumer encountered an error during execution posthog.temporal.batch_exports.spmc in raise_on... View Issue

Did you find this useful? React with a 👍 or 👎

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants