diff --git a/posthog/temporal/batch_exports/http_batch_export.py b/posthog/temporal/batch_exports/http_batch_export.py index 09cf8a809f948..e9d6ebadcd055 100644 --- a/posthog/temporal/batch_exports/http_batch_export.py +++ b/posthog/temporal/batch_exports/http_batch_export.py @@ -1,4 +1,5 @@ import aiohttp +import asyncio import datetime as dt import json from dataclasses import dataclass @@ -56,6 +57,7 @@ def http_default_fields() -> list[BatchExportField]: return [ BatchExportField(expression="toString(uuid)", alias="uuid"), BatchExportField(expression="timestamp", alias="timestamp"), + BatchExportField(expression="COALESCE(inserted_at, _timestamp)", alias="_inserted_at"), BatchExportField(expression="event", alias="event"), BatchExportField(expression="nullIf(properties, '')", alias="properties"), BatchExportField(expression="toString(distinct_id)", alias="distinct_id"), @@ -63,6 +65,24 @@ def http_default_fields() -> list[BatchExportField]: ] +class HeartbeatDetails: + """This class allows us to enforce a schema on the Heartbeat details. + + Attributes: + last_uploaded_part_timestamp: The timestamp of the last part we managed to upload. + """ + + last_uploaded_part_timestamp: str + + def __init__(self, last_uploaded_part_timestamp: str): + self.last_uploaded_part_timestamp = last_uploaded_part_timestamp + + @classmethod + def from_activity_details(cls, details) -> "HeartbeatDetails": + last_uploaded_part_timestamp = details[0] + return HeartbeatDetails(last_uploaded_part_timestamp) + + @dataclass class HttpInsertInputs: """Inputs for HTTP insert activity.""" @@ -77,6 +97,39 @@ class HttpInsertInputs: batch_export_schema: BatchExportSchema | None = None +async def maybe_resume_from_heartbeat(inputs: HttpInsertInputs) -> str: + """Returns the `interval_start` to use, either resuming from previous heartbeat data or + using the `data_interval_start` from the inputs.""" + logger = await bind_temporal_worker_logger(team_id=inputs.team_id, destination="HTTP") + + interval_start = inputs.data_interval_start + details = activity.info().heartbeat_details + + if not details: + # No heartbeat found, so we start from the beginning. + return interval_start + + try: + interval_start = HeartbeatDetails.from_activity_details(details).last_uploaded_part_timestamp + except IndexError: + # This is the error we expect when there are no activity details as the sequence will be + # empty. + logger.debug( + "Did not receive details from previous activity Excecution. Export will start from the beginning %s", + interval_start, + ) + except Exception: + # We still start from the beginning, but we make a point to log unexpected errors. Ideally, + # any new exceptions should be added to the previous block after the first time and we will + # never land here. + logger.warning( + "Did not receive details from previous activity Excecution due to an unexpected error. Export will start from the beginning %s", + interval_start, + ) + + return interval_start + + async def post_json_file_to_url(url, batch_file): batch_file.seek(0) async with aiohttp.ClientSession() as session: @@ -125,10 +178,12 @@ async def insert_into_http_activity(inputs: HttpInsertInputs): fields = http_default_fields() columns = [field["alias"] for field in fields] + interval_start = await maybe_resume_from_heartbeat(inputs) + record_iterator = iter_records( client=client, team_id=inputs.team_id, - interval_start=inputs.data_interval_start, + interval_start=interval_start, interval_end=inputs.data_interval_end, exclude_events=inputs.exclude_events, include_events=inputs.include_events, @@ -136,6 +191,23 @@ async def insert_into_http_activity(inputs: HttpInsertInputs): extra_query_parameters=None, ) + last_uploaded_part_timestamp: str | None = None + + async def worker_shutdown_handler(): + """Handle the Worker shutting down by heart-beating our latest status.""" + await activity.wait_for_worker_shutdown() + logger.warn( + f"Worker shutting down! Reporting back latest exported part {last_uploaded_part_timestamp}", + ) + if last_uploaded_part_timestamp is None: + # Don't heartbeat if worker shuts down before we could even send anything + # Just start from the beginning again. + return + + activity.heartbeat(last_uploaded_part_timestamp) + + asyncio.create_task(worker_shutdown_handler()) + rows_exported = get_rows_exported_metric() bytes_exported = get_bytes_exported_metric() @@ -167,7 +239,7 @@ def write_event_to_batch(event): batch_file.write_record_as_bytes(json_dumps_bytes(event)) - async def flush_batch(): + async def flush_batch_to_http_endpoint(last_uploaded_part_timestamp: str): logger.debug( "Sending %s records of size %s bytes", batch_file.records_since_last_reset, @@ -175,11 +247,14 @@ async def flush_batch(): ) batch_file.write(posthog_batch_footer) + await post_json_file_to_url(inputs.url, batch_file) rows_exported.add(batch_file.records_since_last_reset) bytes_exported.add(batch_file.bytes_since_last_reset) + activity.heartbeat(last_uploaded_part_timestamp) + for record_batch in record_iterator: for row in record_batch.select(columns).to_pylist(): # Format result row as PostHog event, write JSON to the batch file. @@ -199,17 +274,21 @@ async def flush_batch(): "properties": properties, } + inserted_at = row.pop("_inserted_at") + write_event_to_batch(capture_event) if ( batch_file.tell() > settings.BATCH_EXPORT_HTTP_UPLOAD_CHUNK_SIZE_BYTES or batch_file.records_since_last_reset >= settings.BATCH_EXPORT_HTTP_BATCH_SIZE ): - await flush_batch() + last_uploaded_part_timestamp = str(inserted_at) + await flush_batch_to_http_endpoint(last_uploaded_part_timestamp) batch_file.reset() if batch_file.tell() > 0: - await flush_batch() + last_uploaded_part_timestamp = str(inserted_at) + await flush_batch_to_http_endpoint(last_uploaded_part_timestamp) @workflow.defn(name="http-export") diff --git a/posthog/temporal/tests/batch_exports/test_http_batch_export_workflow.py b/posthog/temporal/tests/batch_exports/test_http_batch_export_workflow.py index 6d31f72112eaf..a6a2613145ed9 100644 --- a/posthog/temporal/tests/batch_exports/test_http_batch_export_workflow.py +++ b/posthog/temporal/tests/batch_exports/test_http_batch_export_workflow.py @@ -22,6 +22,7 @@ ) from posthog.temporal.batch_exports.clickhouse import ClickHouseClient from posthog.temporal.batch_exports.http_batch_export import ( + HeartbeatDetails, HttpBatchExportInputs, HttpBatchExportWorkflow, HttpInsertInputs, @@ -102,6 +103,7 @@ async def assert_clickhouse_records_in_mock_server( expected_record["properties"]["$geoip_disable"] = True + expected_record.pop("_inserted_at") elements_chain = expected_record.pop("elements_chain", None) if expected_record["event"] == "$autocapture" and elements_chain is not None: expected_record["properties"]["$elements_chain"] = elements_chain @@ -474,3 +476,76 @@ async def never_finish_activity(_: HttpInsertInputs) -> str: run = runs[0] assert run.status == "Cancelled" assert run.latest_error == "Cancelled" + + +async def test_insert_into_http_activity_heartbeats( + clickhouse_client, + ateam, + http_batch_export, + activity_environment, + http_config, +): + """Test that the insert_into_http_activity activity sends heartbeats. + + We use a function that runs on_heartbeat to check and track the heartbeat contents. + """ + data_interval_end = dt.datetime.fromisoformat("2023-04-20T14:30:00.000000+00:00") + data_interval_start = data_interval_end - http_batch_export.interval_time_delta + + num_expected_parts = 3 + + for i in range(1, num_expected_parts + 1): + part_inserted_at = data_interval_end - http_batch_export.interval_time_delta / i + + await generate_test_events_in_clickhouse( + client=clickhouse_client, + team_id=ateam.pk, + start_time=data_interval_start, + end_time=data_interval_end, + count=1, + count_outside_range=0, + count_other_team=0, + duplicate=False, + # We need to roll over 5MB to trigger a batch flush (and a heartbeat). + properties={"$chonky": ("a" * 5 * 1024**2)}, + inserted_at=part_inserted_at, + ) + + heartbeat_count = 0 + + def assert_heartbeat_details(*raw_details): + """A function to track and assert we are heartbeating.""" + nonlocal heartbeat_count + heartbeat_count += 1 + + details = HeartbeatDetails.from_activity_details(raw_details) + + last_uploaded_part_dt = dt.datetime.fromisoformat(details.last_uploaded_part_timestamp) + assert last_uploaded_part_dt == data_interval_end - http_batch_export.interval_time_delta / heartbeat_count + + activity_environment.on_heartbeat = assert_heartbeat_details + + insert_inputs = HttpInsertInputs( + team_id=ateam.pk, + data_interval_start=data_interval_start.isoformat(), + data_interval_end=data_interval_end.isoformat(), + **http_config, + ) + + mock_server = MockServer() + with aioresponses(passthrough=[settings.CLICKHOUSE_HTTP_URL]) as m, override_settings( + BATCH_EXPORT_HTTP_UPLOAD_CHUNK_SIZE_BYTES=5 * 1024**2 + ): + m.post(TEST_URL, status=200, callback=mock_server.post, repeat=True) + await activity_environment.run(insert_into_http_activity, insert_inputs) + + # This checks that the assert_heartbeat_details function was actually called. + assert heartbeat_count == num_expected_parts + + await assert_clickhouse_records_in_mock_server( + mock_server=mock_server, + clickhouse_client=clickhouse_client, + team_id=ateam.pk, + data_interval_start=data_interval_start, + data_interval_end=data_interval_end, + )