Skip to content

Commit

Permalink
add geoip_disable, don't toJSONString elements_chain, and mark some H…
Browse files Browse the repository at this point in the history
…TTP status codes as non-retryable
  • Loading branch information
bretthoerner committed Feb 14, 2024
1 parent 23bc659 commit 41ba46f
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 11 deletions.
41 changes: 36 additions & 5 deletions posthog/temporal/batch_exports/http_batch_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,38 @@
from posthog.temporal.common.logger import bind_temporal_worker_logger


class RetryableResponseError(Exception):
"""Error for HTTP status >=500 (plus 429)."""

def __init__(self, status):
super().__init__(f"RetryableResponseError status: {status}")


class NonRetryableResponseError(Exception):
"""Error for HTTP status >= 400 and < 500 (excluding 429)."""

def __init__(self, status):
super().__init__(f"NonRetryableResponseError status: {status}")


def raise_for_status(response: aiohttp.ClientResponse):
"""Like aiohttp raise_for_status, but it distinguishes between retryable and non-retryable
errors."""
if not response.ok:
if response.status >= 500 or response.status == 429:
raise RetryableResponseError(response.status)
else:
raise NonRetryableResponseError(response.status)


def http_default_fields() -> list[BatchExportField]:
return [
BatchExportField(expression="toString(uuid)", alias="uuid"),
BatchExportField(expression="timestamp", alias="timestamp"),
BatchExportField(expression="event", alias="event"),
BatchExportField(expression="nullIf(properties, '')", alias="properties"),
BatchExportField(expression="toString(distinct_id)", alias="distinct_id"),
BatchExportField(expression="toJSONString(elements_chain)", alias="elements_chain"),
BatchExportField(expression="elements_chain", alias="elements_chain"),
]


Expand All @@ -57,7 +81,8 @@ async def post_json_file_to_url(url, batch_file):
batch_file.seek(0)
async with aiohttp.ClientSession() as session:
headers = {"Content-Type": "application/json"}
async with session.post(url, data=batch_file, headers=headers, raise_for_status=True) as response:
async with session.post(url, data=batch_file, headers=headers) as response:
raise_for_status(response)
return response


Expand Down Expand Up @@ -160,7 +185,9 @@ async def flush_batch():
# Format result row as PostHog event, write JSON to the batch file.

properties = row["properties"]
properties = json.loads(properties) if properties else None
properties = json.loads(properties) if properties else {}
properties["$geoip_disable"] = True

if row["event"] == "$autocapture" and row["elements_chain"] is not None:
properties["$elements_chain"] = row["elements_chain"]

Expand Down Expand Up @@ -220,7 +247,9 @@ async def run(self, inputs: HttpBatchExportInputs):
initial_interval=dt.timedelta(seconds=10),
maximum_interval=dt.timedelta(seconds=60),
maximum_attempts=0,
non_retryable_error_types=[],
non_retryable_error_types=[
"NonRetryableResponseError",
],
),
)

Expand All @@ -244,7 +273,9 @@ async def run(self, inputs: HttpBatchExportInputs):
await execute_batch_export_insert_activity(
insert_into_http_activity,
insert_inputs,
non_retryable_error_types=[],
non_retryable_error_types=[
"NonRetryableResponseError",
],
update_inputs=update_inputs,
# Disable heartbeat timeout until we add heartbeat support.
heartbeat_timeout_seconds=None,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
from aiohttp.client_exceptions import ClientResponseError
import asyncio
import datetime as dt
import json
Expand Down Expand Up @@ -26,6 +25,8 @@
HttpBatchExportInputs,
HttpBatchExportWorkflow,
HttpInsertInputs,
NonRetryableResponseError,
RetryableResponseError,
insert_into_http_activity,
http_default_fields,
)
Expand Down Expand Up @@ -92,13 +93,15 @@ async def assert_clickhouse_records_in_mock_server(
expected_record = {}

for k, v in record.items():
if k in {"properties", "set", "set_once", "person_properties"} and v is not None:
expected_record[k] = json.loads(v)
if k == "properties":
expected_record[k] = json.loads(v) if v else {}
elif isinstance(v, dt.datetime):
expected_record[k] = v.replace(tzinfo=dt.timezone.utc).isoformat()
else:
expected_record[k] = v

expected_record["properties"]["$geoip_disable"] = True

elements_chain = expected_record.pop("elements_chain", None)
if expected_record["event"] == "$autocapture" and elements_chain is not None:
expected_record["properties"]["$elements_chain"] = elements_chain
Expand Down Expand Up @@ -236,14 +239,21 @@ async def test_insert_into_http_activity_throws_on_bad_http_status(
BATCH_EXPORT_HTTP_UPLOAD_CHUNK_SIZE_BYTES=5 * 1024**2
):
m.post(TEST_URL, status=400, repeat=True)
with pytest.raises(ClientResponseError):
with pytest.raises(NonRetryableResponseError):
await activity_environment.run(insert_into_http_activity, insert_inputs)

with aioresponses(passthrough=[settings.CLICKHOUSE_HTTP_URL]) as m, override_settings(
BATCH_EXPORT_HTTP_UPLOAD_CHUNK_SIZE_BYTES=5 * 1024**2
):
m.post(TEST_URL, status=429, repeat=True)
with pytest.raises(RetryableResponseError):
await activity_environment.run(insert_into_http_activity, insert_inputs)

with aioresponses(passthrough=[settings.CLICKHOUSE_HTTP_URL]) as m, override_settings(
BATCH_EXPORT_HTTP_UPLOAD_CHUNK_SIZE_BYTES=5 * 1024**2
):
m.post(TEST_URL, status=500, repeat=True)
with pytest.raises(ClientResponseError):
with pytest.raises(RetryableResponseError):
await activity_environment.run(insert_into_http_activity, insert_inputs)


Expand Down
2 changes: 1 addition & 1 deletion requirements-dev.in
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,4 @@ python-dateutil>=2.8.2
responses==0.23.1
syrupy~=4.6.0
flaky==3.7.0
aioresponses==0.7.6
aioresponses==0.7.6

0 comments on commit 41ba46f

Please sign in to comment.