Skip to content

Commit

Permalink
fix: Complete S3 batch export with parts in sorted order (#26645)
Browse files Browse the repository at this point in the history
  • Loading branch information
tomasfarias authored Dec 4, 2024
1 parent f79cb71 commit 8eef695
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 6 deletions.
12 changes: 7 additions & 5 deletions posthog/temporal/batch_exports/redshift_batch_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@
start_batch_export_run,
start_produce_batch_export_record_batches,
)
from posthog.temporal.batch_exports.heartbeat import (
BatchExportRangeHeartbeatDetails,
DateRange,
should_resume_from_activity_heartbeat,
)
from posthog.temporal.batch_exports.metrics import get_rows_exported_metric
from posthog.temporal.batch_exports.postgres_batch_export import (
Fields,
Expand All @@ -47,11 +52,6 @@
from posthog.temporal.common.clickhouse import get_client
from posthog.temporal.common.heartbeat import Heartbeater
from posthog.temporal.common.logger import configure_temporal_worker_logger
from posthog.temporal.batch_exports.heartbeat import (
BatchExportRangeHeartbeatDetails,
DateRange,
should_resume_from_activity_heartbeat,
)


def remove_escaped_whitespace_recursive(value):
Expand Down Expand Up @@ -715,6 +715,8 @@ async def run(self, inputs: RedshiftBatchExportInputs):
"StringDataRightTruncation",
# Raised by our PostgreSQL client when failing to connect after several attempts.
"PostgreSQLConnectionError",
# Column missing in Redshift, likely the schema was altered.
"UndefinedColumn",
],
finish_inputs=finish_inputs,
)
4 changes: 3 additions & 1 deletion posthog/temporal/batch_exports/s3_batch_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import datetime as dt
import io
import json
import operator
import posixpath
import typing

Expand Down Expand Up @@ -285,12 +286,13 @@ async def complete(self) -> str:
if self.is_upload_in_progress() is False:
raise NoUploadInProgressError()

sorted_parts = sorted(self.parts, key=operator.itemgetter("PartNumber"))
async with self.s3_client() as s3_client:
response = await s3_client.complete_multipart_upload(
Bucket=self.bucket_name,
Key=self.key,
UploadId=self.upload_id,
MultipartUpload={"Parts": self.parts},
MultipartUpload={"Parts": sorted_parts},
)

self.upload_id = None
Expand Down

0 comments on commit 8eef695

Please sign in to comment.