From 8e92a4bae30eb9069d8fac9e80f2c12d430fd8ec Mon Sep 17 00:00:00 2001 From: Brett Hoerner Date: Mon, 19 Feb 2024 03:40:09 -0700 Subject: [PATCH] feat(batch-exports): Add HTTP Batch Export destination (#20318) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat(batch-exports): Add HTTP Batch Export destination To possibly be reused in the future, but for now it only submits payloads in the PostHog /batch format. * add geoip_disable, don't toJSONString elements_chain, and mark some HTTP status codes as non-retryable * Add heartbeating to HTTP batch export * Update query snapshots * Update query snapshots * fix: Re-use client session * refactor: Rename last_uploaded_part_timestamp to last_uploaded_timestamp --------- Co-authored-by: github-actions <41898282+github-actions[bot]@users.noreply.github.com> Co-authored-by: Tomás Farías Santana --- latest_migrations.manifest | 2 +- .../__snapshots__/test_dashboard.ambr | 27 + posthog/batch_exports/models.py | 2 + posthog/batch_exports/service.py | 16 + .../0389_alter_batchexportdestination_type.py | 29 + posthog/settings/temporal.py | 3 +- posthog/temporal/batch_exports/__init__.py | 6 + .../temporal/batch_exports/batch_exports.py | 8 + .../batch_exports/http_batch_export.py | 363 ++++++++++++ .../test_http_batch_export_workflow.py | 551 ++++++++++++++++++ requirements-dev.in | 1 + requirements-dev.txt | 30 + 12 files changed, 1036 insertions(+), 2 deletions(-) create mode 100644 posthog/migrations/0389_alter_batchexportdestination_type.py create mode 100644 posthog/temporal/batch_exports/http_batch_export.py create mode 100644 posthog/temporal/tests/batch_exports/test_http_batch_export_workflow.py diff --git a/latest_migrations.manifest b/latest_migrations.manifest index 762dc2114fb123..78d33f21cf315c 100644 --- a/latest_migrations.manifest +++ b/latest_migrations.manifest @@ -5,7 +5,7 @@ contenttypes: 0002_remove_content_type_name ee: 0015_add_verified_properties otp_static: 0002_throttling otp_totp: 0002_auto_20190420_0723 -posthog: 0388_add_schema_to_batch_exports +posthog: 0389_alter_batchexportdestination_type sessions: 0001_initial social_django: 0010_uid_db_index two_factor: 0007_auto_20201201_1019 diff --git a/posthog/api/test/dashboards/__snapshots__/test_dashboard.ambr b/posthog/api/test/dashboards/__snapshots__/test_dashboard.ambr index 27dc42ed1593d4..8d97a95c8bb749 100644 --- a/posthog/api/test/dashboards/__snapshots__/test_dashboard.ambr +++ b/posthog/api/test/dashboards/__snapshots__/test_dashboard.ambr @@ -1235,6 +1235,33 @@ 5 /* ... */)) /*controller='project_dashboards-detail',route='api/projects/%28%3FP%3Cparent_lookup_team_id%3E%5B%5E/.%5D%2B%29/dashboards/%28%3FP%3Cpk%3E%5B%5E/.%5D%2B%29/%3F%24'*/ ''' # --- +# name: TestDashboard.test_adding_insights_is_not_nplus1_for_gets.32 + ''' + SELECT "posthog_dashboard"."id", + "posthog_dashboard"."name", + "posthog_dashboard"."description", + "posthog_dashboard"."team_id", + "posthog_dashboard"."pinned", + "posthog_dashboard"."created_at", + "posthog_dashboard"."created_by_id", + "posthog_dashboard"."deleted", + "posthog_dashboard"."last_accessed_at", + "posthog_dashboard"."filters", + "posthog_dashboard"."creation_mode", + "posthog_dashboard"."restriction_level", + "posthog_dashboard"."deprecated_tags", + "posthog_dashboard"."tags", + "posthog_dashboard"."share_token", + "posthog_dashboard"."is_shared" + FROM "posthog_dashboard" + WHERE (NOT ("posthog_dashboard"."deleted") + AND "posthog_dashboard"."id" IN (1, + 2, + 3, + 4, + 5 /* ... */)) /*controller='project_dashboards-detail',route='api/projects/%28%3FP%3Cparent_lookup_team_id%3E%5B%5E/.%5D%2B%29/dashboards/%28%3FP%3Cpk%3E%5B%5E/.%5D%2B%29/%3F%24'*/ + ''' +# --- # name: TestDashboard.test_adding_insights_is_not_nplus1_for_gets.4 ''' SELECT "posthog_dashboard"."id", diff --git a/posthog/batch_exports/models.py b/posthog/batch_exports/models.py index 6d00cf38f389e1..8cb95e44827ac8 100644 --- a/posthog/batch_exports/models.py +++ b/posthog/batch_exports/models.py @@ -28,6 +28,7 @@ class Destination(models.TextChoices): POSTGRES = "Postgres" REDSHIFT = "Redshift" BIGQUERY = "BigQuery" + HTTP = "HTTP" NOOP = "NoOp" secret_fields = { @@ -36,6 +37,7 @@ class Destination(models.TextChoices): "Postgres": set("password"), "Redshift": set("password"), "BigQuery": {"private_key", "private_key_id", "client_email", "token_uri"}, + "HTTP": set("token"), "NoOp": set(), } diff --git a/posthog/batch_exports/service.py b/posthog/batch_exports/service.py index 6d84a9b28823e9..ad0812031d2a7a 100644 --- a/posthog/batch_exports/service.py +++ b/posthog/batch_exports/service.py @@ -157,6 +157,21 @@ class BigQueryBatchExportInputs: batch_export_schema: BatchExportSchema | None = None +@dataclass +class HttpBatchExportInputs: + """Inputs for Http export workflow.""" + + batch_export_id: str + team_id: int + url: str + token: str + interval: str = "hour" + data_interval_end: str | None = None + exclude_events: list[str] | None = None + include_events: list[str] | None = None + batch_export_schema: BatchExportSchema | None = None + + @dataclass class NoOpInputs: """NoOp Workflow is used for testing, it takes a single argument to echo back.""" @@ -174,6 +189,7 @@ class NoOpInputs: "Postgres": ("postgres-export", PostgresBatchExportInputs), "Redshift": ("redshift-export", RedshiftBatchExportInputs), "BigQuery": ("bigquery-export", BigQueryBatchExportInputs), + "HTTP": ("http-export", HttpBatchExportInputs), "NoOp": ("no-op", NoOpInputs), } diff --git a/posthog/migrations/0389_alter_batchexportdestination_type.py b/posthog/migrations/0389_alter_batchexportdestination_type.py new file mode 100644 index 00000000000000..1236a9a24cdbf9 --- /dev/null +++ b/posthog/migrations/0389_alter_batchexportdestination_type.py @@ -0,0 +1,29 @@ +# Generated by Django 4.1.13 on 2024-02-13 18:47 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + dependencies = [ + ("posthog", "0388_add_schema_to_batch_exports"), + ] + + operations = [ + migrations.AlterField( + model_name="batchexportdestination", + name="type", + field=models.CharField( + choices=[ + ("S3", "S3"), + ("Snowflake", "Snowflake"), + ("Postgres", "Postgres"), + ("Redshift", "Redshift"), + ("BigQuery", "Bigquery"), + ("HTTP", "Http"), + ("NoOp", "Noop"), + ], + help_text="A choice of supported BatchExportDestination types.", + max_length=64, + ), + ), + ] diff --git a/posthog/settings/temporal.py b/posthog/settings/temporal.py index c1d78fa0e9a2f2..6881541bf9fcb7 100644 --- a/posthog/settings/temporal.py +++ b/posthog/settings/temporal.py @@ -15,6 +15,7 @@ BATCH_EXPORT_SNOWFLAKE_UPLOAD_CHUNK_SIZE_BYTES = 1024 * 1024 * 100 # 100MB BATCH_EXPORT_POSTGRES_UPLOAD_CHUNK_SIZE_BYTES = 1024 * 1024 * 50 # 50MB BATCH_EXPORT_BIGQUERY_UPLOAD_CHUNK_SIZE_BYTES = 1024 * 1024 * 100 # 100MB - +BATCH_EXPORT_HTTP_UPLOAD_CHUNK_SIZE_BYTES = 1024 * 1024 * 10 # 10MB +BATCH_EXPORT_HTTP_BATCH_SIZE = 1000 UNCONSTRAINED_TIMESTAMP_TEAM_IDS = get_list(os.getenv("UNCONSTRAINED_TIMESTAMP_TEAM_IDS", "")) diff --git a/posthog/temporal/batch_exports/__init__.py b/posthog/temporal/batch_exports/__init__.py index 777f0ae50a8cf6..0973a6b8c8974a 100644 --- a/posthog/temporal/batch_exports/__init__.py +++ b/posthog/temporal/batch_exports/__init__.py @@ -32,6 +32,10 @@ SnowflakeBatchExportWorkflow, insert_into_snowflake_activity, ) +from posthog.temporal.batch_exports.http_batch_export import ( + HttpBatchExportWorkflow, + insert_into_http_activity, +) from posthog.temporal.batch_exports.squash_person_overrides import * WORKFLOWS = [ @@ -42,6 +46,7 @@ RedshiftBatchExportWorkflow, S3BatchExportWorkflow, SnowflakeBatchExportWorkflow, + HttpBatchExportWorkflow, SquashPersonOverridesWorkflow, ] @@ -58,6 +63,7 @@ insert_into_redshift_activity, insert_into_s3_activity, insert_into_snowflake_activity, + insert_into_http_activity, noop_activity, prepare_dictionary, prepare_person_overrides, diff --git a/posthog/temporal/batch_exports/batch_exports.py b/posthog/temporal/batch_exports/batch_exports.py index 2708bc3b72204b..c5b70f6af8d24f 100644 --- a/posthog/temporal/batch_exports/batch_exports.py +++ b/posthog/temporal/batch_exports/batch_exports.py @@ -380,6 +380,14 @@ def write(self, content: bytes | str): return result + def write_record_as_bytes(self, record: bytes): + result = self.write(record) + + self.records_total += 1 + self.records_since_last_reset += 1 + + return result + def write_records_to_jsonl(self, records): """Write records to a temporary file as JSONL.""" if len(records) == 1: diff --git a/posthog/temporal/batch_exports/http_batch_export.py b/posthog/temporal/batch_exports/http_batch_export.py new file mode 100644 index 00000000000000..28651eff72ad63 --- /dev/null +++ b/posthog/temporal/batch_exports/http_batch_export.py @@ -0,0 +1,363 @@ +import asyncio +import datetime as dt +import json +from dataclasses import dataclass + +import aiohttp +from django.conf import settings +from temporalio import activity, workflow +from temporalio.common import RetryPolicy + +from posthog.batch_exports.service import BatchExportField, BatchExportSchema, HttpBatchExportInputs +from posthog.temporal.batch_exports.base import PostHogWorkflow +from posthog.temporal.batch_exports.batch_exports import ( + BatchExportTemporaryFile, + CreateBatchExportRunInputs, + UpdateBatchExportRunStatusInputs, + create_export_run, + execute_batch_export_insert_activity, + get_data_interval, + get_rows_count, + iter_records, + json_dumps_bytes, +) +from posthog.temporal.batch_exports.clickhouse import get_client +from posthog.temporal.batch_exports.metrics import ( + get_bytes_exported_metric, + get_rows_exported_metric, +) +from posthog.temporal.common.logger import bind_temporal_worker_logger + + +class RetryableResponseError(Exception): + """Error for HTTP status >=500 (plus 429).""" + + def __init__(self, status): + super().__init__(f"RetryableResponseError status: {status}") + + +class NonRetryableResponseError(Exception): + """Error for HTTP status >= 400 and < 500 (excluding 429).""" + + def __init__(self, status): + super().__init__(f"NonRetryableResponseError status: {status}") + + +def raise_for_status(response: aiohttp.ClientResponse): + """Like aiohttp raise_for_status, but it distinguishes between retryable and non-retryable + errors.""" + if not response.ok: + if response.status >= 500 or response.status == 429: + raise RetryableResponseError(response.status) + else: + raise NonRetryableResponseError(response.status) + + +def http_default_fields() -> list[BatchExportField]: + """Return default fields used in HTTP batch export, currently supporting only migrations.""" + return [ + BatchExportField(expression="toString(uuid)", alias="uuid"), + BatchExportField(expression="timestamp", alias="timestamp"), + BatchExportField(expression="COALESCE(inserted_at, _timestamp)", alias="_inserted_at"), + BatchExportField(expression="event", alias="event"), + BatchExportField(expression="nullIf(properties, '')", alias="properties"), + BatchExportField(expression="toString(distinct_id)", alias="distinct_id"), + BatchExportField(expression="elements_chain", alias="elements_chain"), + ] + + +class HeartbeatDetails: + """This class allows us to enforce a schema on the Heartbeat details. + + Attributes: + last_uploaded_timestamp: The timestamp of the last batch we managed to upload. + """ + + last_uploaded_timestamp: str + + def __init__(self, last_uploaded_timestamp: str): + self.last_uploaded_timestamp = last_uploaded_timestamp + + @classmethod + def from_activity_details(cls, details) -> "HeartbeatDetails": + last_uploaded_timestamp = details[0] + return HeartbeatDetails(last_uploaded_timestamp) + + +@dataclass +class HttpInsertInputs: + """Inputs for HTTP insert activity.""" + + team_id: int + url: str + token: str + data_interval_start: str + data_interval_end: str + exclude_events: list[str] | None = None + include_events: list[str] | None = None + batch_export_schema: BatchExportSchema | None = None + + +async def maybe_resume_from_heartbeat(inputs: HttpInsertInputs) -> str: + """Returns the `interval_start` to use, either resuming from previous heartbeat data or + using the `data_interval_start` from the inputs.""" + logger = await bind_temporal_worker_logger(team_id=inputs.team_id, destination="HTTP") + + interval_start = inputs.data_interval_start + details = activity.info().heartbeat_details + + if not details: + # No heartbeat found, so we start from the beginning. + return interval_start + + try: + interval_start = HeartbeatDetails.from_activity_details(details).last_uploaded_timestamp + except IndexError: + # This is the error we expect when there are no activity details as the sequence will be + # empty. + logger.debug( + "Did not receive details from previous activity Excecution. Export will start from the beginning %s", + interval_start, + ) + except Exception: + # We still start from the beginning, but we make a point to log unexpected errors. Ideally, + # any new exceptions should be added to the previous block after the first time and we will + # never land here. + logger.warning( + "Did not receive details from previous activity Excecution due to an unexpected error. Export will start from the beginning %s", + interval_start, + ) + + return interval_start + + +async def post_json_file_to_url(url, batch_file, session: aiohttp.ClientSession): + batch_file.seek(0) + + headers = {"Content-Type": "application/json"} + async with session.post(url, data=batch_file, headers=headers) as response: + raise_for_status(response) + return response + + +@activity.defn +async def insert_into_http_activity(inputs: HttpInsertInputs): + """Activity streams data from ClickHouse to an HTTP Endpoint.""" + logger = await bind_temporal_worker_logger(team_id=inputs.team_id, destination="HTTP") + logger.info( + "Exporting batch %s - %s", + inputs.data_interval_start, + inputs.data_interval_end, + ) + + async with get_client() as client: + if not await client.is_alive(): + raise ConnectionError("Cannot establish connection to ClickHouse") + + count = await get_rows_count( + client=client, + team_id=inputs.team_id, + interval_start=inputs.data_interval_start, + interval_end=inputs.data_interval_end, + exclude_events=inputs.exclude_events, + include_events=inputs.include_events, + ) + + if count == 0: + logger.info( + "Nothing to export in batch %s - %s", + inputs.data_interval_start, + inputs.data_interval_end, + ) + return + + logger.info("BatchExporting %s rows", count) + + if inputs.batch_export_schema is not None: + raise NotImplementedError("Batch export schema is not supported for HTTP export") + + fields = http_default_fields() + columns = [field["alias"] for field in fields] + + interval_start = await maybe_resume_from_heartbeat(inputs) + + record_iterator = iter_records( + client=client, + team_id=inputs.team_id, + interval_start=interval_start, + interval_end=inputs.data_interval_end, + exclude_events=inputs.exclude_events, + include_events=inputs.include_events, + fields=fields, + extra_query_parameters=None, + ) + + last_uploaded_timestamp: str | None = None + + async def worker_shutdown_handler(): + """Handle the Worker shutting down by heart-beating our latest status.""" + await activity.wait_for_worker_shutdown() + logger.warn( + f"Worker shutting down! Reporting back latest exported part {last_uploaded_timestamp}", + ) + if last_uploaded_timestamp is None: + # Don't heartbeat if worker shuts down before we could even send anything + # Just start from the beginning again. + return + + activity.heartbeat(last_uploaded_timestamp) + + asyncio.create_task(worker_shutdown_handler()) + + rows_exported = get_rows_exported_metric() + bytes_exported = get_bytes_exported_metric() + + # The HTTP destination currently only supports the PostHog batch capture endpoint. In the + # future we may support other endpoints, but we'll need a way to template the request body, + # headers, etc. + # + # For now, we write the batch out in PostHog capture format, which means each Batch Export + # temporary file starts with a header and ends with a footer. + # + # For example: + # + # Header written when temp file is opened: {"api_key": "api-key-from-inputs","batch": [ + # Each record is written out as an object: {"event": "foo", ...}, + # Finally, a footer is written out: ]} + # + # Why write to a file at all? Because we need to serialize the data anyway, and it's the + # safest way to stay within batch endpoint payload limits and not waste process memory. + posthog_batch_header = """{"api_key": "%s","batch": [""" % inputs.token + posthog_batch_footer = "]}" + + with BatchExportTemporaryFile() as batch_file: + + def write_event_to_batch(event): + if batch_file.records_since_last_reset == 0: + batch_file.write(posthog_batch_header) + else: + batch_file.write(",") + + batch_file.write_record_as_bytes(json_dumps_bytes(event)) + + async def flush_batch_to_http_endpoint(last_uploaded_timestamp: str, session: aiohttp.ClientSession): + logger.debug( + "Sending %s records of size %s bytes", + batch_file.records_since_last_reset, + batch_file.bytes_since_last_reset, + ) + + batch_file.write(posthog_batch_footer) + + await post_json_file_to_url(inputs.url, batch_file, session) + + rows_exported.add(batch_file.records_since_last_reset) + bytes_exported.add(batch_file.bytes_since_last_reset) + + activity.heartbeat(last_uploaded_timestamp) + + async with aiohttp.ClientSession() as session: + for record_batch in record_iterator: + for row in record_batch.select(columns).to_pylist(): + # Format result row as PostHog event, write JSON to the batch file. + + properties = row["properties"] + properties = json.loads(properties) if properties else {} + properties["$geoip_disable"] = True + + if row["event"] == "$autocapture" and row["elements_chain"] is not None: + properties["$elements_chain"] = row["elements_chain"] + + capture_event = { + "uuid": row["uuid"], + "distinct_id": row["distinct_id"], + "timestamp": row["timestamp"], + "event": row["event"], + "properties": properties, + } + + inserted_at = row.pop("_inserted_at") + + write_event_to_batch(capture_event) + + if ( + batch_file.tell() > settings.BATCH_EXPORT_HTTP_UPLOAD_CHUNK_SIZE_BYTES + or batch_file.records_since_last_reset >= settings.BATCH_EXPORT_HTTP_BATCH_SIZE + ): + last_uploaded_timestamp = str(inserted_at) + await flush_batch_to_http_endpoint(last_uploaded_timestamp, session) + batch_file.reset() + + if batch_file.tell() > 0: + last_uploaded_timestamp = str(inserted_at) + await flush_batch_to_http_endpoint(last_uploaded_timestamp, session) + + +@workflow.defn(name="http-export") +class HttpBatchExportWorkflow(PostHogWorkflow): + """A Temporal Workflow to export ClickHouse data to an HTTP endpoint. + + This Workflow is intended to be executed both manually and by a Temporal + Schedule. When ran by a schedule, `data_interval_end` should be set to + `None` so that we will fetch the end of the interval from the Temporal + search attribute `TemporalScheduledStartTime`. + """ + + @staticmethod + def parse_inputs(inputs: list[str]) -> HttpBatchExportInputs: + """Parse inputs from the management command CLI.""" + loaded = json.loads(inputs[0]) + return HttpBatchExportInputs(**loaded) + + @workflow.run + async def run(self, inputs: HttpBatchExportInputs): + """Workflow implementation to export data to an HTTP Endpoint.""" + data_interval_start, data_interval_end = get_data_interval(inputs.interval, inputs.data_interval_end) + + create_export_run_inputs = CreateBatchExportRunInputs( + team_id=inputs.team_id, + batch_export_id=inputs.batch_export_id, + data_interval_start=data_interval_start.isoformat(), + data_interval_end=data_interval_end.isoformat(), + ) + run_id = await workflow.execute_activity( + create_export_run, + create_export_run_inputs, + start_to_close_timeout=dt.timedelta(minutes=5), + retry_policy=RetryPolicy( + initial_interval=dt.timedelta(seconds=10), + maximum_interval=dt.timedelta(seconds=60), + maximum_attempts=0, + non_retryable_error_types=[ + "NonRetryableResponseError", + ], + ), + ) + + update_inputs = UpdateBatchExportRunStatusInputs( + id=run_id, + status="Completed", + team_id=inputs.team_id, + ) + + insert_inputs = HttpInsertInputs( + team_id=inputs.team_id, + url=inputs.url, + token=inputs.token, + data_interval_start=data_interval_start.isoformat(), + data_interval_end=data_interval_end.isoformat(), + exclude_events=inputs.exclude_events, + include_events=inputs.include_events, + batch_export_schema=inputs.batch_export_schema, + ) + + await execute_batch_export_insert_activity( + insert_into_http_activity, + insert_inputs, + non_retryable_error_types=[ + "NonRetryableResponseError", + ], + update_inputs=update_inputs, + # Disable heartbeat timeout until we add heartbeat support. + heartbeat_timeout_seconds=None, + ) diff --git a/posthog/temporal/tests/batch_exports/test_http_batch_export_workflow.py b/posthog/temporal/tests/batch_exports/test_http_batch_export_workflow.py new file mode 100644 index 00000000000000..95547342cd77dd --- /dev/null +++ b/posthog/temporal/tests/batch_exports/test_http_batch_export_workflow.py @@ -0,0 +1,551 @@ +import asyncio +import datetime as dt +import json +from random import randint +from uuid import uuid4 + +import pytest +import pytest_asyncio +from aioresponses import aioresponses +from django.conf import settings +from django.test import override_settings +from temporalio import activity +from temporalio.client import WorkflowFailureError +from temporalio.common import RetryPolicy +from temporalio.testing import WorkflowEnvironment +from temporalio.worker import UnsandboxedWorkflowRunner, Worker + +from posthog.temporal.batch_exports.batch_exports import ( + create_export_run, + iter_records, + update_export_run_status, +) +from posthog.temporal.batch_exports.clickhouse import ClickHouseClient +from posthog.temporal.batch_exports.http_batch_export import ( + HeartbeatDetails, + HttpBatchExportInputs, + HttpBatchExportWorkflow, + HttpInsertInputs, + NonRetryableResponseError, + RetryableResponseError, + http_default_fields, + insert_into_http_activity, +) +from posthog.temporal.tests.utils.events import generate_test_events_in_clickhouse +from posthog.temporal.tests.utils.models import ( + acreate_batch_export, + adelete_batch_export, + afetch_batch_export_runs, +) + +pytestmark = [ + pytest.mark.asyncio, + pytest.mark.django_db, +] + +TEST_URL = "http://example.com/batch" +TEST_TOKEN = "abcdef123456" + + +@pytest.fixture +def http_config(): + return { + "url": TEST_URL, + "token": TEST_TOKEN, + } + + +class MockServer: + def __init__(self): + self.records = [] + + def post(self, url, data, **kwargs): + data = json.loads(data.read()) + assert data["api_key"] == TEST_TOKEN + self.records.extend(data["batch"]) + + +async def assert_clickhouse_records_in_mock_server( + mock_server, + clickhouse_client: ClickHouseClient, + team_id: int, + data_interval_start: dt.datetime, + data_interval_end: dt.datetime, + exclude_events: list[str] | None = None, + include_events: list[str] | None = None, +): + """Assert expected records are written to a MockServer instance.""" + posted_records = mock_server.records + + schema_column_names = [field["alias"] for field in http_default_fields()] + + expected_records = [] + for records in iter_records( + client=clickhouse_client, + team_id=team_id, + interval_start=data_interval_start.isoformat(), + interval_end=data_interval_end.isoformat(), + exclude_events=exclude_events, + include_events=include_events, + fields=http_default_fields(), + extra_query_parameters=None, + ): + for record in records.select(schema_column_names).to_pylist(): + expected_record = {} + + for k, v in record.items(): + if k == "properties": + expected_record[k] = json.loads(v) if v else {} + elif isinstance(v, dt.datetime): + expected_record[k] = v.replace(tzinfo=dt.timezone.utc).isoformat() + else: + expected_record[k] = v + + expected_record["properties"]["$geoip_disable"] = True + + expected_record.pop("_inserted_at") + elements_chain = expected_record.pop("elements_chain", None) + if expected_record["event"] == "$autocapture" and elements_chain is not None: + expected_record["properties"]["$elements_chain"] = elements_chain + + expected_records.append(expected_record) + + inserted_column_names = [column_name for column_name in posted_records[0].keys()].sort() + expected_column_names = [column_name for column_name in expected_records[0].keys()].sort() + + assert inserted_column_names == expected_column_names + assert posted_records[0] == expected_records[0] + assert posted_records == expected_records + + +@pytest.mark.parametrize("exclude_events", [None, ["test-exclude"]], indirect=True) +async def test_insert_into_http_activity_inserts_data_into_http_endpoint( + clickhouse_client, activity_environment, http_config, exclude_events +): + """Test that the insert_into_http_activity function POSTs data to an HTTP Endpoint. + + We use the generate_test_events_in_clickhouse function to generate several sets + of events. Some of these sets are expected to be exported, and others not. Expected + events are those that: + * Are created for the team_id of the batch export. + * Are created in the date range of the batch export. + * Are not duplicates of other events that are in the same batch. + * Do not have an event name contained in the batch export's exclude_events. + """ + data_interval_start = dt.datetime(2023, 4, 20, 14, 0, 0, tzinfo=dt.timezone.utc) + data_interval_end = dt.datetime(2023, 4, 25, 15, 0, 0, tzinfo=dt.timezone.utc) + + # Generate a random team id integer. There's still a chance of a collision, + # but it's very small. + team_id = randint(1, 1000000) + + await generate_test_events_in_clickhouse( + client=clickhouse_client, + team_id=team_id, + start_time=data_interval_start, + end_time=data_interval_end, + count=10000, + count_outside_range=10, + count_other_team=10, + duplicate=True, + properties={"$browser": "Chrome", "$os": "Mac OS X"}, + person_properties={"utm_medium": "referral", "$initial_os": "Linux"}, + ) + + await generate_test_events_in_clickhouse( + client=clickhouse_client, + team_id=team_id, + start_time=data_interval_start, + end_time=data_interval_end, + count=5, + count_outside_range=0, + count_other_team=0, + properties=None, + person_properties=None, + event_name="test-no-prop-{i}", + ) + + if exclude_events: + for event_name in exclude_events: + await generate_test_events_in_clickhouse( + client=clickhouse_client, + team_id=team_id, + start_time=data_interval_start, + end_time=data_interval_end, + count=5, + count_outside_range=0, + count_other_team=0, + event_name=event_name, + ) + + insert_inputs = HttpInsertInputs( + team_id=team_id, + data_interval_start=data_interval_start.isoformat(), + data_interval_end=data_interval_end.isoformat(), + exclude_events=exclude_events, + batch_export_schema=None, + **http_config, + ) + + mock_server = MockServer() + with aioresponses(passthrough=[settings.CLICKHOUSE_HTTP_URL]) as m, override_settings( + BATCH_EXPORT_HTTP_UPLOAD_CHUNK_SIZE_BYTES=5 * 1024**2 + ): + m.post(TEST_URL, status=200, callback=mock_server.post, repeat=True) + await activity_environment.run(insert_into_http_activity, insert_inputs) + + await assert_clickhouse_records_in_mock_server( + mock_server=mock_server, + clickhouse_client=clickhouse_client, + team_id=team_id, + data_interval_start=data_interval_start, + data_interval_end=data_interval_end, + exclude_events=exclude_events, + ) + + +async def test_insert_into_http_activity_throws_on_bad_http_status( + clickhouse_client, activity_environment, http_config, exclude_events +): + """Test that the insert_into_http_activity function throws on status >= 400""" + data_interval_start = dt.datetime(2023, 4, 20, 14, 0, 0, tzinfo=dt.timezone.utc) + data_interval_end = dt.datetime(2023, 4, 25, 15, 0, 0, tzinfo=dt.timezone.utc) + + # Generate a random team id integer. There's still a chance of a collision, + # but it's very small. + team_id = randint(1, 1000000) + + await generate_test_events_in_clickhouse( + client=clickhouse_client, + team_id=team_id, + start_time=data_interval_start, + end_time=data_interval_end, + count=1, + count_outside_range=10, + count_other_team=10, + duplicate=True, + properties={"$browser": "Chrome", "$os": "Mac OS X"}, + person_properties={"utm_medium": "referral", "$initial_os": "Linux"}, + ) + + insert_inputs = HttpInsertInputs( + team_id=team_id, + data_interval_start=data_interval_start.isoformat(), + data_interval_end=data_interval_end.isoformat(), + exclude_events=exclude_events, + batch_export_schema=None, + **http_config, + ) + + with aioresponses(passthrough=[settings.CLICKHOUSE_HTTP_URL]) as m, override_settings( + BATCH_EXPORT_HTTP_UPLOAD_CHUNK_SIZE_BYTES=5 * 1024**2 + ): + m.post(TEST_URL, status=400, repeat=True) + with pytest.raises(NonRetryableResponseError): + await activity_environment.run(insert_into_http_activity, insert_inputs) + + with aioresponses(passthrough=[settings.CLICKHOUSE_HTTP_URL]) as m, override_settings( + BATCH_EXPORT_HTTP_UPLOAD_CHUNK_SIZE_BYTES=5 * 1024**2 + ): + m.post(TEST_URL, status=429, repeat=True) + with pytest.raises(RetryableResponseError): + await activity_environment.run(insert_into_http_activity, insert_inputs) + + with aioresponses(passthrough=[settings.CLICKHOUSE_HTTP_URL]) as m, override_settings( + BATCH_EXPORT_HTTP_UPLOAD_CHUNK_SIZE_BYTES=5 * 1024**2 + ): + m.post(TEST_URL, status=500, repeat=True) + with pytest.raises(RetryableResponseError): + await activity_environment.run(insert_into_http_activity, insert_inputs) + + +@pytest_asyncio.fixture +async def http_batch_export(ateam, http_config, interval, exclude_events, temporal_client): + destination_data = { + "type": "HTTP", + "config": {**http_config, "exclude_events": exclude_events}, + } + batch_export_data = { + "name": "my-production-http-export", + "destination": destination_data, + "interval": interval, + } + + batch_export = await acreate_batch_export( + team_id=ateam.pk, + name=batch_export_data["name"], + destination_data=batch_export_data["destination"], + interval=batch_export_data["interval"], + ) + + yield batch_export + + await adelete_batch_export(batch_export, temporal_client) + + +@pytest.mark.parametrize("interval", ["hour", "day"], indirect=True) +@pytest.mark.parametrize("exclude_events", [None, ["test-exclude"]], indirect=True) +async def test_http_export_workflow( + clickhouse_client, + http_batch_export, + interval, + exclude_events, + ateam, + batch_export_schema, +): + """Test HTTP Export Workflow end-to-end by using a mock server. + + The workflow should update the batch export run status to completed and produce the expected + records to the mock server. + """ + data_interval_end = dt.datetime.fromisoformat("2023-04-25T14:30:00.000000+00:00") + data_interval_start = data_interval_end - http_batch_export.interval_time_delta + + await generate_test_events_in_clickhouse( + client=clickhouse_client, + team_id=ateam.pk, + start_time=data_interval_start, + end_time=data_interval_end, + count=100, + count_outside_range=10, + count_other_team=10, + duplicate=True, + properties={"$browser": "Chrome", "$os": "Mac OS X"}, + person_properties={"utm_medium": "referral", "$initial_os": "Linux"}, + ) + + if exclude_events: + for event_name in exclude_events: + await generate_test_events_in_clickhouse( + client=clickhouse_client, + team_id=ateam.pk, + start_time=data_interval_start, + end_time=data_interval_end, + count=5, + count_outside_range=0, + count_other_team=0, + event_name=event_name, + ) + + workflow_id = str(uuid4()) + inputs = HttpBatchExportInputs( + team_id=ateam.pk, + batch_export_id=str(http_batch_export.id), + data_interval_end=data_interval_end.isoformat(), + interval=interval, + batch_export_schema=batch_export_schema, + **http_batch_export.destination.config, + ) + + mock_server = MockServer() + async with await WorkflowEnvironment.start_time_skipping() as activity_environment: + async with Worker( + activity_environment.client, + task_queue=settings.TEMPORAL_TASK_QUEUE, + workflows=[HttpBatchExportWorkflow], + activities=[ + create_export_run, + insert_into_http_activity, + update_export_run_status, + ], + workflow_runner=UnsandboxedWorkflowRunner(), + ): + with aioresponses(passthrough=[settings.CLICKHOUSE_HTTP_URL]) as m, override_settings( + BATCH_EXPORT_HTTP_UPLOAD_CHUNK_SIZE_BYTES=5 * 1024**2 + ): + m.post(TEST_URL, status=200, callback=mock_server.post, repeat=True) + + await activity_environment.client.execute_workflow( + HttpBatchExportWorkflow.run, + inputs, + id=workflow_id, + task_queue=settings.TEMPORAL_TASK_QUEUE, + retry_policy=RetryPolicy(maximum_attempts=1), + execution_timeout=dt.timedelta(seconds=10), + ) + + runs = await afetch_batch_export_runs(batch_export_id=http_batch_export.id) + assert len(runs) == 1 + + run = runs[0] + assert run.status == "Completed" + + await assert_clickhouse_records_in_mock_server( + mock_server=mock_server, + clickhouse_client=clickhouse_client, + team_id=ateam.pk, + data_interval_start=data_interval_start, + data_interval_end=data_interval_end, + exclude_events=exclude_events, + ) + + +async def test_http_export_workflow_handles_insert_activity_errors(ateam, http_batch_export, interval): + """Test that HTTP Export Workflow can gracefully handle errors when POSTing to HTTP Endpoint.""" + data_interval_end = dt.datetime.fromisoformat("2023-04-25T14:30:00.000000+00:00") + + workflow_id = str(uuid4()) + inputs = HttpBatchExportInputs( + team_id=ateam.pk, + batch_export_id=str(http_batch_export.id), + data_interval_end=data_interval_end.isoformat(), + interval=interval, + **http_batch_export.destination.config, + ) + + @activity.defn(name="insert_into_http_activity") + async def insert_into_http_activity_mocked(_: HttpInsertInputs) -> str: + raise ValueError("A useful error message") + + async with await WorkflowEnvironment.start_time_skipping() as activity_environment: + async with Worker( + activity_environment.client, + task_queue=settings.TEMPORAL_TASK_QUEUE, + workflows=[HttpBatchExportWorkflow], + activities=[ + create_export_run, + insert_into_http_activity_mocked, + update_export_run_status, + ], + workflow_runner=UnsandboxedWorkflowRunner(), + ): + with pytest.raises(WorkflowFailureError): + await activity_environment.client.execute_workflow( + HttpBatchExportWorkflow.run, + inputs, + id=workflow_id, + task_queue=settings.TEMPORAL_TASK_QUEUE, + retry_policy=RetryPolicy(maximum_attempts=1), + ) + + runs = await afetch_batch_export_runs(batch_export_id=http_batch_export.id) + assert len(runs) == 1 + + run = runs[0] + assert run.status == "Failed" + assert run.latest_error == "ValueError: A useful error message" + + +async def test_http_export_workflow_handles_cancellation(ateam, http_batch_export, interval): + """Test that HTTP Export Workflow can gracefully handle cancellations when POSTing to HTTP Endpoint.""" + data_interval_end = dt.datetime.fromisoformat("2023-04-25T14:30:00.000000+00:00") + + workflow_id = str(uuid4()) + inputs = HttpBatchExportInputs( + team_id=ateam.pk, + batch_export_id=str(http_batch_export.id), + data_interval_end=data_interval_end.isoformat(), + interval=interval, + **http_batch_export.destination.config, + ) + + @activity.defn(name="insert_into_http_activity") + async def never_finish_activity(_: HttpInsertInputs) -> str: + while True: + activity.heartbeat() + await asyncio.sleep(1) + + async with await WorkflowEnvironment.start_time_skipping() as activity_environment: + async with Worker( + activity_environment.client, + task_queue=settings.TEMPORAL_TASK_QUEUE, + workflows=[HttpBatchExportWorkflow], + activities=[ + create_export_run, + never_finish_activity, + update_export_run_status, + ], + workflow_runner=UnsandboxedWorkflowRunner(), + ): + handle = await activity_environment.client.start_workflow( + HttpBatchExportWorkflow.run, + inputs, + id=workflow_id, + task_queue=settings.TEMPORAL_TASK_QUEUE, + retry_policy=RetryPolicy(maximum_attempts=1), + ) + await asyncio.sleep(5) + await handle.cancel() + + with pytest.raises(WorkflowFailureError): + await handle.result() + + runs = await afetch_batch_export_runs(batch_export_id=http_batch_export.id) + assert len(runs) == 1 + + run = runs[0] + assert run.status == "Cancelled" + assert run.latest_error == "Cancelled" + + +async def test_insert_into_http_activity_heartbeats( + clickhouse_client, + ateam, + http_batch_export, + activity_environment, + http_config, +): + """Test that the insert_into_http_activity activity sends heartbeats. + + We use a function that runs on_heartbeat to check and track the heartbeat contents. + """ + data_interval_end = dt.datetime.fromisoformat("2023-04-20T14:30:00.000000+00:00") + data_interval_start = data_interval_end - http_batch_export.interval_time_delta + + num_expected_parts = 3 + + for i in range(1, num_expected_parts + 1): + part_inserted_at = data_interval_end - http_batch_export.interval_time_delta / i + + await generate_test_events_in_clickhouse( + client=clickhouse_client, + team_id=ateam.pk, + start_time=data_interval_start, + end_time=data_interval_end, + count=1, + count_outside_range=0, + count_other_team=0, + duplicate=False, + # We need to roll over 5MB to trigger a batch flush (and a heartbeat). + properties={"$chonky": ("a" * 5 * 1024**2)}, + inserted_at=part_inserted_at, + ) + + heartbeat_count = 0 + + def assert_heartbeat_details(*raw_details): + """A function to track and assert we are heartbeating.""" + nonlocal heartbeat_count + heartbeat_count += 1 + + details = HeartbeatDetails.from_activity_details(raw_details) + + last_uploaded_dt = dt.datetime.fromisoformat(details.last_uploaded_timestamp) + assert last_uploaded_dt == data_interval_end - http_batch_export.interval_time_delta / heartbeat_count + + activity_environment.on_heartbeat = assert_heartbeat_details + + insert_inputs = HttpInsertInputs( + team_id=ateam.pk, + data_interval_start=data_interval_start.isoformat(), + data_interval_end=data_interval_end.isoformat(), + **http_config, + ) + + mock_server = MockServer() + with aioresponses(passthrough=[settings.CLICKHOUSE_HTTP_URL]) as m, override_settings( + BATCH_EXPORT_HTTP_UPLOAD_CHUNK_SIZE_BYTES=5 * 1024**2 + ): + m.post(TEST_URL, status=200, callback=mock_server.post, repeat=True) + await activity_environment.run(insert_into_http_activity, insert_inputs) + + # This checks that the assert_heartbeat_details function was actually called. + assert heartbeat_count == num_expected_parts + + await assert_clickhouse_records_in_mock_server( + mock_server=mock_server, + clickhouse_client=clickhouse_client, + team_id=ateam.pk, + data_interval_start=data_interval_start, + data_interval_end=data_interval_end, + ) diff --git a/requirements-dev.in b/requirements-dev.in index daa1a9b4ee4fe4..43a540277ab7d1 100644 --- a/requirements-dev.in +++ b/requirements-dev.in @@ -46,3 +46,4 @@ python-dateutil>=2.8.2 responses==0.23.1 syrupy~=4.6.0 flaky==3.7.0 +aioresponses==0.7.6 diff --git a/requirements-dev.txt b/requirements-dev.txt index 54229a1f88bb85..63d0cc7a5cb746 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -4,6 +4,16 @@ # # pip-compile requirements-dev.in # +aiohttp==3.9.0 + # via + # -c requirements.txt + # aioresponses +aioresponses==0.7.6 + # via -r requirements-dev.in +aiosignal==1.2.0 + # via + # -c requirements.txt + # aiohttp annotated-types==0.5.0 # via # -c requirements.txt @@ -17,7 +27,12 @@ asgiref==3.7.2 async-timeout==4.0.2 # via # -c requirements.txt + # aiohttp # redis +attrs==23.2.0 + # via + # -c requirements.txt + # aiohttp black==22.8.0 # via # -r requirements-dev.in @@ -84,6 +99,11 @@ flaky==3.7.0 # via -r requirements-dev.in freezegun==1.2.2 # via -r requirements-dev.in +frozenlist==1.3.0 + # via + # -c requirements.txt + # aiohttp + # aiosignal genson==1.2.2 # via datamodel-code-generator icdiff==2.0.5 @@ -93,6 +113,7 @@ idna==2.8 # -c requirements.txt # email-validator # requests + # yarl inflect==5.6.2 # via datamodel-code-generator iniconfig==1.1.1 @@ -109,6 +130,11 @@ lupa==1.14.1 # via fakeredis markupsafe==1.1.1 # via jinja2 +multidict==6.0.2 + # via + # -c requirements.txt + # aiohttp + # yarl mypy==1.8.0 # via # -r requirements-dev.in @@ -273,6 +299,10 @@ wheel==0.42.0 # via # -c requirements.txt # pip-tools +yarl==1.7.2 + # via + # -c requirements.txt + # aiohttp # The following packages are considered to be unsafe in a requirements file: # pip