diff --git a/posthog/temporal/batch_exports/http_batch_export.py b/posthog/temporal/batch_exports/http_batch_export.py index e4ef84317701d2..09cf8a809f9487 100644 --- a/posthog/temporal/batch_exports/http_batch_export.py +++ b/posthog/temporal/batch_exports/http_batch_export.py @@ -28,6 +28,30 @@ from posthog.temporal.common.logger import bind_temporal_worker_logger +class RetryableResponseError(Exception): + """Error for HTTP status >=500 (plus 429).""" + + def __init__(self, status): + super().__init__(f"RetryableResponseError status: {status}") + + +class NonRetryableResponseError(Exception): + """Error for HTTP status >= 400 and < 500 (excluding 429).""" + + def __init__(self, status): + super().__init__(f"NonRetryableResponseError status: {status}") + + +def raise_for_status(response: aiohttp.ClientResponse): + """Like aiohttp raise_for_status, but it distinguishes between retryable and non-retryable + errors.""" + if not response.ok: + if response.status >= 500 or response.status == 429: + raise RetryableResponseError(response.status) + else: + raise NonRetryableResponseError(response.status) + + def http_default_fields() -> list[BatchExportField]: return [ BatchExportField(expression="toString(uuid)", alias="uuid"), @@ -35,7 +59,7 @@ def http_default_fields() -> list[BatchExportField]: BatchExportField(expression="event", alias="event"), BatchExportField(expression="nullIf(properties, '')", alias="properties"), BatchExportField(expression="toString(distinct_id)", alias="distinct_id"), - BatchExportField(expression="toJSONString(elements_chain)", alias="elements_chain"), + BatchExportField(expression="elements_chain", alias="elements_chain"), ] @@ -57,7 +81,8 @@ async def post_json_file_to_url(url, batch_file): batch_file.seek(0) async with aiohttp.ClientSession() as session: headers = {"Content-Type": "application/json"} - async with session.post(url, data=batch_file, headers=headers, raise_for_status=True) as response: + async with session.post(url, data=batch_file, headers=headers) as response: + raise_for_status(response) return response @@ -160,7 +185,9 @@ async def flush_batch(): # Format result row as PostHog event, write JSON to the batch file. properties = row["properties"] - properties = json.loads(properties) if properties else None + properties = json.loads(properties) if properties else {} + properties["$geoip_disable"] = True + if row["event"] == "$autocapture" and row["elements_chain"] is not None: properties["$elements_chain"] = row["elements_chain"] @@ -220,7 +247,9 @@ async def run(self, inputs: HttpBatchExportInputs): initial_interval=dt.timedelta(seconds=10), maximum_interval=dt.timedelta(seconds=60), maximum_attempts=0, - non_retryable_error_types=[], + non_retryable_error_types=[ + "NonRetryableResponseError", + ], ), ) @@ -244,7 +273,9 @@ async def run(self, inputs: HttpBatchExportInputs): await execute_batch_export_insert_activity( insert_into_http_activity, insert_inputs, - non_retryable_error_types=[], + non_retryable_error_types=[ + "NonRetryableResponseError", + ], update_inputs=update_inputs, # Disable heartbeat timeout until we add heartbeat support. heartbeat_timeout_seconds=None, diff --git a/posthog/temporal/tests/batch_exports/test_http_batch_export_workflow.py b/posthog/temporal/tests/batch_exports/test_http_batch_export_workflow.py index e78e2b32938b56..6d31f72112eaff 100644 --- a/posthog/temporal/tests/batch_exports/test_http_batch_export_workflow.py +++ b/posthog/temporal/tests/batch_exports/test_http_batch_export_workflow.py @@ -1,4 +1,3 @@ -from aiohttp.client_exceptions import ClientResponseError import asyncio import datetime as dt import json @@ -26,6 +25,8 @@ HttpBatchExportInputs, HttpBatchExportWorkflow, HttpInsertInputs, + NonRetryableResponseError, + RetryableResponseError, insert_into_http_activity, http_default_fields, ) @@ -92,13 +93,15 @@ async def assert_clickhouse_records_in_mock_server( expected_record = {} for k, v in record.items(): - if k in {"properties", "set", "set_once", "person_properties"} and v is not None: - expected_record[k] = json.loads(v) + if k == "properties": + expected_record[k] = json.loads(v) if v else {} elif isinstance(v, dt.datetime): expected_record[k] = v.replace(tzinfo=dt.timezone.utc).isoformat() else: expected_record[k] = v + expected_record["properties"]["$geoip_disable"] = True + elements_chain = expected_record.pop("elements_chain", None) if expected_record["event"] == "$autocapture" and elements_chain is not None: expected_record["properties"]["$elements_chain"] = elements_chain @@ -236,14 +239,21 @@ async def test_insert_into_http_activity_throws_on_bad_http_status( BATCH_EXPORT_HTTP_UPLOAD_CHUNK_SIZE_BYTES=5 * 1024**2 ): m.post(TEST_URL, status=400, repeat=True) - with pytest.raises(ClientResponseError): + with pytest.raises(NonRetryableResponseError): + await activity_environment.run(insert_into_http_activity, insert_inputs) + + with aioresponses(passthrough=[settings.CLICKHOUSE_HTTP_URL]) as m, override_settings( + BATCH_EXPORT_HTTP_UPLOAD_CHUNK_SIZE_BYTES=5 * 1024**2 + ): + m.post(TEST_URL, status=429, repeat=True) + with pytest.raises(RetryableResponseError): await activity_environment.run(insert_into_http_activity, insert_inputs) with aioresponses(passthrough=[settings.CLICKHOUSE_HTTP_URL]) as m, override_settings( BATCH_EXPORT_HTTP_UPLOAD_CHUNK_SIZE_BYTES=5 * 1024**2 ): m.post(TEST_URL, status=500, repeat=True) - with pytest.raises(ClientResponseError): + with pytest.raises(RetryableResponseError): await activity_environment.run(insert_into_http_activity, insert_inputs) diff --git a/requirements-dev.in b/requirements-dev.in index 69676d92a66fd5..43a540277ab7d1 100644 --- a/requirements-dev.in +++ b/requirements-dev.in @@ -46,4 +46,4 @@ python-dateutil>=2.8.2 responses==0.23.1 syrupy~=4.6.0 flaky==3.7.0 -aioresponses==0.7.6 \ No newline at end of file +aioresponses==0.7.6