Skip to content

Commit

Permalink
Add heartbeating to HTTP batch export
Browse files Browse the repository at this point in the history
  • Loading branch information
bretthoerner committed Feb 14, 2024
1 parent 41ba46f commit a3e89d8
Show file tree
Hide file tree
Showing 2 changed files with 153 additions and 4 deletions.
82 changes: 78 additions & 4 deletions posthog/temporal/batch_exports/http_batch_export.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import aiohttp
import asyncio
import datetime as dt
import json
import typing
from dataclasses import dataclass

from django.conf import settings
Expand Down Expand Up @@ -56,13 +58,29 @@ def http_default_fields() -> list[BatchExportField]:
return [
BatchExportField(expression="toString(uuid)", alias="uuid"),
BatchExportField(expression="timestamp", alias="timestamp"),
BatchExportField(expression="COALESCE(inserted_at, _timestamp)", alias="_inserted_at"),
BatchExportField(expression="event", alias="event"),
BatchExportField(expression="nullIf(properties, '')", alias="properties"),
BatchExportField(expression="toString(distinct_id)", alias="distinct_id"),
BatchExportField(expression="elements_chain", alias="elements_chain"),
]


class HeartbeatDetails(typing.NamedTuple):
"""This tuple allows us to enforce a schema on the Heartbeat details.
Attributes:
last_uploaded_part_timestamp: The timestamp of the last part we managed to upload.
"""

last_uploaded_part_timestamp: str

@classmethod
def from_activity_details(cls, details):
last_uploaded_part_timestamp = details[0]
return cls(last_uploaded_part_timestamp)


@dataclass
class HttpInsertInputs:
"""Inputs for HTTP insert activity."""
Expand All @@ -77,6 +95,36 @@ class HttpInsertInputs:
batch_export_schema: BatchExportSchema | None = None


async def maybe_resume_from_heartbeat(inputs: HttpInsertInputs) -> str:
"""Returns the `interval_start` to use, either resuming from previous heartbeat data or
using the `data_interval_start` from the inputs."""
logger = await bind_temporal_worker_logger(team_id=inputs.team_id, destination="HTTP")

details = activity.info().heartbeat_details

try:
interval_start = HeartbeatDetails.from_activity_details(details)
except IndexError:
# This is the error we expect when there are no activity details as the sequence will be
# empty.
interval_start = inputs.data_interval_start
logger.debug(
"Did not receive details from previous activity Excecution. Export will start from the beginning %s",
interval_start,
)
except Exception:
# We still start from the beginning, but we make a point to log unexpected errors. Ideally,
# any new exceptions should be added to the previous block after the first time and we will
# never land here.
interval_start = inputs.data_interval_start
logger.warning(
"Did not receive details from previous activity Excecution due to an unexpected error. Export will start from the beginning %s",
interval_start,
)

return interval_start


async def post_json_file_to_url(url, batch_file):
batch_file.seek(0)
async with aiohttp.ClientSession() as session:
Expand Down Expand Up @@ -125,17 +173,36 @@ async def insert_into_http_activity(inputs: HttpInsertInputs):
fields = http_default_fields()
columns = [field["alias"] for field in fields]

interval_start = await maybe_resume_from_heartbeat(inputs)

record_iterator = iter_records(
client=client,
team_id=inputs.team_id,
interval_start=inputs.data_interval_start,
interval_start=interval_start,
interval_end=inputs.data_interval_end,
exclude_events=inputs.exclude_events,
include_events=inputs.include_events,
fields=fields,
extra_query_parameters=None,
)

last_uploaded_part_timestamp: str | None = None

async def worker_shutdown_handler():
"""Handle the Worker shutting down by heart-beating our latest status."""
await activity.wait_for_worker_shutdown()
logger.warn(
f"Worker shutting down! Reporting back latest exported part {last_uploaded_part_timestamp}",
)
if last_uploaded_part_timestamp is None:
# Don't heartbeat if worker shuts down before we could even send anything
# Just start from the beginning again.
return

activity.heartbeat(last_uploaded_part_timestamp)

asyncio.create_task(worker_shutdown_handler())

rows_exported = get_rows_exported_metric()
bytes_exported = get_bytes_exported_metric()

Expand Down Expand Up @@ -167,19 +234,22 @@ def write_event_to_batch(event):

batch_file.write_record_as_bytes(json_dumps_bytes(event))

async def flush_batch():
async def flush_batch_to_http_endpoint(last_uploaded_part_timestamp: str):
logger.debug(
"Sending %s records of size %s bytes",
batch_file.records_since_last_reset,
batch_file.bytes_since_last_reset,
)

batch_file.write(posthog_batch_footer)

await post_json_file_to_url(inputs.url, batch_file)

rows_exported.add(batch_file.records_since_last_reset)
bytes_exported.add(batch_file.bytes_since_last_reset)

activity.heartbeat(last_uploaded_part_timestamp)

for record_batch in record_iterator:
for row in record_batch.select(columns).to_pylist():
# Format result row as PostHog event, write JSON to the batch file.
Expand All @@ -199,17 +269,21 @@ async def flush_batch():
"properties": properties,
}

inserted_at = row.pop("_inserted_at")

write_event_to_batch(capture_event)

if (
batch_file.tell() > settings.BATCH_EXPORT_HTTP_UPLOAD_CHUNK_SIZE_BYTES
or batch_file.records_since_last_reset >= settings.BATCH_EXPORT_HTTP_BATCH_SIZE
):
await flush_batch()
last_uploaded_part_timestamp = str(inserted_at)
await flush_batch_to_http_endpoint(last_uploaded_part_timestamp)
batch_file.reset()

if batch_file.tell() > 0:
await flush_batch()
last_uploaded_part_timestamp = str(inserted_at)
await flush_batch_to_http_endpoint(last_uploaded_part_timestamp)


@workflow.defn(name="http-export")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
)
from posthog.temporal.batch_exports.clickhouse import ClickHouseClient
from posthog.temporal.batch_exports.http_batch_export import (
HeartbeatDetails,
HttpBatchExportInputs,
HttpBatchExportWorkflow,
HttpInsertInputs,
Expand Down Expand Up @@ -102,6 +103,7 @@ async def assert_clickhouse_records_in_mock_server(

expected_record["properties"]["$geoip_disable"] = True

expected_record.pop("_inserted_at")
elements_chain = expected_record.pop("elements_chain", None)
if expected_record["event"] == "$autocapture" and elements_chain is not None:
expected_record["properties"]["$elements_chain"] = elements_chain
Expand Down Expand Up @@ -474,3 +476,76 @@ async def never_finish_activity(_: HttpInsertInputs) -> str:
run = runs[0]
assert run.status == "Cancelled"
assert run.latest_error == "Cancelled"


async def test_insert_into_http_activity_heartbeats(
clickhouse_client,
ateam,
http_batch_export,
activity_environment,
http_config,
):
"""Test that the insert_into_http_activity activity sends heartbeats.
We use a function that runs on_heartbeat to check and track the heartbeat contents.
"""
data_interval_end = dt.datetime.fromisoformat("2023-04-20T14:30:00.000000+00:00")
data_interval_start = data_interval_end - http_batch_export.interval_time_delta

num_expected_parts = 3

for i in range(1, num_expected_parts + 1):
part_inserted_at = data_interval_end - http_batch_export.interval_time_delta / i

await generate_test_events_in_clickhouse(
client=clickhouse_client,
team_id=ateam.pk,
start_time=data_interval_start,
end_time=data_interval_end,
count=1,
count_outside_range=0,
count_other_team=0,
duplicate=False,
# We need to roll over 5MB to trigger a batch flush (and a heartbeat).
properties={"$chonky": ("a" * 5 * 1024**2)},
inserted_at=part_inserted_at,
)

heartbeat_count = 0

def assert_heartbeat_details(*details):
"""A function to track and assert we are heartbeating."""
nonlocal heartbeat_count
heartbeat_count += 1

details = HeartbeatDetails.from_activity_details(details)

last_uploaded_part_dt = dt.datetime.fromisoformat(details.last_uploaded_part_timestamp)

Check failure on line 523 in posthog/temporal/tests/batch_exports/test_http_batch_export_workflow.py

View workflow job for this annotation

GitHub Actions / Python code quality checks

"tuple[Any, ...]" has no attribute "last_uploaded_part_timestamp"
assert last_uploaded_part_dt == data_interval_end - http_batch_export.interval_time_delta / heartbeat_count

activity_environment.on_heartbeat = assert_heartbeat_details

insert_inputs = HttpInsertInputs(
team_id=ateam.pk,
data_interval_start=data_interval_start.isoformat(),
data_interval_end=data_interval_end.isoformat(),
**http_config,
)

mock_server = MockServer()
with aioresponses(passthrough=[settings.CLICKHOUSE_HTTP_URL]) as m, override_settings(
BATCH_EXPORT_HTTP_UPLOAD_CHUNK_SIZE_BYTES=5 * 1024**2
):
m.post(TEST_URL, status=200, callback=mock_server.post, repeat=True)
await activity_environment.run(insert_into_http_activity, insert_inputs)

# This checks that the assert_heartbeat_details function was actually called.
assert heartbeat_count == num_expected_parts

await assert_clickhouse_records_in_mock_server(
mock_server=mock_server,
clickhouse_client=clickhouse_client,
team_id=ateam.pk,
data_interval_start=data_interval_start,
data_interval_end=data_interval_end,
)

0 comments on commit a3e89d8

Please sign in to comment.