Skip to content

Commit

Permalink
fix(batch-exports): Exception handling of intermittent errors (#24416)
Browse files Browse the repository at this point in the history
  • Loading branch information
tomasfarias authored Aug 23, 2024
1 parent 88003f8 commit 3714d5a
Show file tree
Hide file tree
Showing 5 changed files with 116 additions and 51 deletions.
4 changes: 2 additions & 2 deletions posthog/batch_exports/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -537,7 +537,7 @@ def update_batch_export_run(
run_id: The id of the BatchExportRun to update.
"""
model = BatchExportRun.objects.filter(id=run_id)
update_at = dt.datetime.now()
update_at = dt.datetime.now(dt.UTC)

updated = model.update(
**kwargs,
Expand All @@ -560,7 +560,7 @@ async def aupdate_batch_export_run(
run_id: The id of the BatchExportRun to update.
"""
model = BatchExportRun.objects.filter(id=run_id)
update_at = dt.datetime.now()
update_at = dt.datetime.now(dt.UTC)

updated = await model.aupdate(
**kwargs,
Expand Down
2 changes: 1 addition & 1 deletion posthog/temporal/batch_exports/batch_exports.py
Original file line number Diff line number Diff line change
Expand Up @@ -478,7 +478,7 @@ async def finish_batch_export_run(inputs: FinishBatchExportRunInputs) -> None:
}
batch_export_run = await database_sync_to_async(update_batch_export_run)(
run_id=uuid.UUID(inputs.id),
finished_at=dt.datetime.now(),
finished_at=dt.datetime.now(dt.UTC),
**update_params,
)

Expand Down
96 changes: 72 additions & 24 deletions posthog/temporal/batch_exports/s3_batch_export.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import contextlib
import dataclasses
import datetime as dt
Expand Down Expand Up @@ -130,12 +131,12 @@ def __init__(self, part_number: int):
super().__init__(f"An intermittent `RequestTimeout` was raised while attempting to upload part {part_number}")


class S3MultiPartUploadState(typing.NamedTuple):
upload_id: str
parts: list[dict[str, str | int]]
Part = dict[str, str | int]


Part = dict[str, str | int]
class S3MultiPartUploadState(typing.NamedTuple):
upload_id: str
parts: list[Part]


class S3MultiPartUpload:
Expand Down Expand Up @@ -274,7 +275,15 @@ async def abort(self):
self.upload_id = None
self.parts = []

async def upload_part(self, body: BatchExportTemporaryFile, rewind: bool = True):
async def upload_part(
self,
body: BatchExportTemporaryFile,
rewind: bool = True,
max_attempts: int = 5,
initial_retry_delay: float | int = 2,
max_retry_delay: float | int = 32,
exponential_backoff_coefficient: int = 2,
):
"""Upload a part of this multi-part upload."""
next_part_number = self.part_number + 1

Expand All @@ -286,26 +295,64 @@ async def upload_part(self, body: BatchExportTemporaryFile, rewind: bool = True)
# So we tell mypy to be nice with us.
reader = io.BufferedReader(body) # type: ignore

try:
etag = await self.upload_part_retryable(
reader,
next_part_number,
max_attempts=max_attempts,
initial_retry_delay=initial_retry_delay,
max_retry_delay=max_retry_delay,
exponential_backoff_coefficient=exponential_backoff_coefficient,
)
except Exception:
raise

finally:
reader.detach() # BufferedReader closes the file otherwise.

self.parts.append({"PartNumber": next_part_number, "ETag": etag})

async def upload_part_retryable(
self,
reader: io.BufferedReader,
next_part_number: int,
max_attempts: int = 5,
initial_retry_delay: float | int = 2,
max_retry_delay: float | int = 32,
exponential_backoff_coefficient: int = 2,
) -> str:
"""Attempt to upload a part for this multi-part upload retrying on transient errors."""
response: dict[str, str] | None = None
attempt = 0

async with self.s3_client() as s3_client:
try:
response = await s3_client.upload_part(
Bucket=self.bucket_name,
Key=self.key,
PartNumber=next_part_number,
UploadId=self.upload_id,
Body=reader,
)
except botocore.exceptions.ClientError as err:
error_code = err.response.get("Error", {}).get("Code", None)
while response is None:
try:
response = await s3_client.upload_part(
Bucket=self.bucket_name,
Key=self.key,
PartNumber=next_part_number,
UploadId=self.upload_id,
Body=reader,
)

except botocore.exceptions.ClientError as err:
error_code = err.response.get("Error", {}).get("Code", None)
attempt += 1

if error_code is not None and error_code == "RequestTimeout":
if attempt >= max_attempts:
raise IntermittentUploadPartTimeoutError(part_number=next_part_number) from err

if error_code is not None and error_code == "RequestTimeout":
raise IntermittentUploadPartTimeoutError(part_number=next_part_number) from err
else:
raise
await asyncio.sleep(
min(max_retry_delay, initial_retry_delay * (attempt**exponential_backoff_coefficient))
)

reader.detach() # BufferedReader closes the file otherwise.
continue
else:
raise

self.parts.append({"PartNumber": next_part_number, "ETag": response["ETag"]})
return response["ETag"]

async def __aenter__(self):
"""Asynchronous context manager protocol enter."""
Expand Down Expand Up @@ -395,15 +442,15 @@ async def initialize_and_resume_multipart_upload(inputs: S3InsertInputs) -> tupl
# This is the error we expect when no details as the sequence will be empty.
interval_start = inputs.data_interval_start
logger.debug(
"Did not receive details from previous activity Excecution. Export will start from the beginning %s",
"Did not receive details from previous activity Execution. Export will start from the beginning %s",
interval_start,
)
except Exception:
# We still start from the beginning, but we make a point to log unexpected errors.
# Ideally, any new exceptions should be added to the previous block after the first time and we will never land here.
interval_start = inputs.data_interval_start
logger.warning(
"Did not receive details from previous activity Excecution due to an unexpected error. Export will start from the beginning %s",
"Did not receive details from previous activity Execution due to an unexpected error. Export will start from the beginning %s",
interval_start,
)
else:
Expand Down Expand Up @@ -526,6 +573,7 @@ async def flush_to_s3(
)

await s3_upload.upload_part(local_results_file)

rows_exported.add(records_since_last_flush)
bytes_exported.add(bytes_since_last_flush)

Expand Down Expand Up @@ -585,7 +633,7 @@ def get_batch_export_writer(
)
elif inputs.file_format == "JSONLines":
writer = JSONLBatchExportWriter(
max_bytes=settings.BATCH_EXPORT_S3_UPLOAD_CHUNK_SIZE_BYTES,
max_bytes=max_bytes,
flush_callable=flush_callable,
compression=inputs.compression,
)
Expand Down
23 changes: 13 additions & 10 deletions posthog/temporal/batch_exports/temporary_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ def __enter__(self):

def __exit__(self, exc, value, tb):
"""Context-manager protocol exit method."""
return self._file.__exit__(exc, value, tb)
self._file.__exit__(exc, value, tb)
return False

def __iter__(self):
yield from self._file
Expand Down Expand Up @@ -334,9 +335,11 @@ async def open_temporary_file(self, current_flush_counter: int = 0):

try:
yield
except Exception as e:
self.error = e

except Exception as temp_err:
self.error = temp_err
raise

finally:
self.track_bytes_written(temp_file)

Expand All @@ -347,7 +350,7 @@ async def open_temporary_file(self, current_flush_counter: int = 0):
# `write_record_batch`. For example, footer bytes.
await self.flush(self.last_inserted_at, is_last=True)

self._batch_export_file = None
self._batch_export_file = None

@property
def batch_export_file(self):
Expand Down Expand Up @@ -445,24 +448,24 @@ def __init__(

self.default = default

def write_dict(self, content: dict[str, typing.Any]) -> int:
def write_dict(self, d: dict[str, typing.Any]) -> int:
"""Write a single row of JSONL."""
try:
n = self.batch_export_file.write(orjson.dumps(content, default=str) + b"\n")
n = self.batch_export_file.write(orjson.dumps(d, default=str) + b"\n")
except orjson.JSONEncodeError:
# orjson is very strict about invalid unicode. This slow path protects us against
# things we've observed in practice, like single surrogate codes, e.g. "\ud83d"
cleaned_content = replace_broken_unicode(content)
cleaned_content = replace_broken_unicode(d)
n = self.batch_export_file.write(orjson.dumps(cleaned_content, default=str) + b"\n")
return n

def _write_record_batch(self, record_batch: pa.RecordBatch) -> None:
"""Write records to a temporary file as JSONL."""
for record in record_batch.to_pylist():
if not record:
for record_dict in record_batch.to_pylist():
if not record_dict:
continue

self.write_dict(record)
self.write_dict(record_dict)


class CSVBatchExportWriter(BatchExportWriter):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1407,7 +1407,7 @@ async def test_s3_export_workflow_with_request_timeouts(
elif model is not None:
batch_export_schema = model

raised = False
raised = 0

class FakeSession(aioboto3.Session):
@contextlib.asynccontextmanager
Expand All @@ -1420,8 +1420,8 @@ async def client(self, *args, **kwargs):
async def faulty_upload_part(*args, **kwargs):
nonlocal raised

if not raised:
raised = True
if raised < 5:
raised = raised + 1
raise botocore.exceptions.ClientError(
error_response={
"Error": {"Code": "RequestTimeout", "Message": "Oh no!"},
Expand All @@ -1436,6 +1436,11 @@ async def faulty_upload_part(*args, **kwargs):

yield client

class DoNotRetryPolicy(RetryPolicy):
def __init__(self, *args, **kwargs):
kwargs["maximum_attempts"] = 1
super().__init__(*args, **kwargs)

workflow_id = str(uuid.uuid4())
inputs = S3BatchExportInputs(
team_id=ateam.pk,
Expand All @@ -1447,8 +1452,9 @@ async def faulty_upload_part(*args, **kwargs):
**s3_batch_export.destination.config,
)

async with await WorkflowEnvironment.start_time_skipping() as activity_environment:
async with Worker(
async with (
await WorkflowEnvironment.start_time_skipping() as activity_environment,
Worker(
activity_environment.client,
task_queue=settings.TEMPORAL_TASK_QUEUE,
workflows=[S3BatchExportWorkflow],
Expand All @@ -1458,16 +1464,20 @@ async def faulty_upload_part(*args, **kwargs):
finish_batch_export_run,
],
workflow_runner=UnsandboxedWorkflowRunner(),
),
):
with (
mock.patch("posthog.temporal.batch_exports.s3_batch_export.aioboto3.Session", FakeSession),
mock.patch("posthog.temporal.batch_exports.batch_exports.RetryPolicy", DoNotRetryPolicy),
):
with mock.patch("posthog.temporal.batch_exports.s3_batch_export.aioboto3.Session", FakeSession):
await activity_environment.client.execute_workflow(
S3BatchExportWorkflow.run,
inputs,
id=workflow_id,
task_queue=settings.TEMPORAL_TASK_QUEUE,
retry_policy=RetryPolicy(maximum_attempts=2),
execution_timeout=dt.timedelta(seconds=10),
)
await activity_environment.client.execute_workflow(
S3BatchExportWorkflow.run,
inputs,
id=workflow_id,
task_queue=settings.TEMPORAL_TASK_QUEUE,
retry_policy=RetryPolicy(maximum_attempts=2),
execution_timeout=dt.timedelta(minutes=2),
)

runs = await afetch_batch_export_runs(batch_export_id=s3_batch_export.id)
assert len(runs) == 2
Expand All @@ -1478,6 +1488,10 @@ async def faulty_upload_part(*args, **kwargs):
(events_to_export_created, persons_to_export_created) = generate_test_data
assert run.status == "FailedRetryable"
assert run.records_completed is None
assert (
run.latest_error
== "IntermittentUploadPartTimeoutError: An intermittent `RequestTimeout` was raised while attempting to upload part 1"
)

run = runs[1]
(events_to_export_created, persons_to_export_created) = generate_test_data
Expand Down

0 comments on commit 3714d5a

Please sign in to comment.