From 4037a3e4161001a5fa8a96995b24012ab66b8004 Mon Sep 17 00:00:00 2001 From: Brett Hoerner Date: Tue, 13 Feb 2024 11:36:10 -0700 Subject: [PATCH] feat(batch-exports): Add HTTP Batch Export destination To possibly be reused in the future, but for now it only submits payloads in the PostHog /batch format. --- latest_migrations.manifest | 2 +- posthog/batch_exports/models.py | 2 + posthog/batch_exports/service.py | 16 + .../0389_alter_batchexportdestination_type.py | 29 ++ posthog/settings/temporal.py | 3 +- posthog/temporal/batch_exports/__init__.py | 6 + .../temporal/batch_exports/batch_exports.py | 8 + .../batch_exports/http_batch_export.py | 251 ++++++++++ .../test_http_batch_export_workflow.py | 466 ++++++++++++++++++ requirements-dev.in | 1 + requirements-dev.txt | 30 ++ 11 files changed, 812 insertions(+), 2 deletions(-) create mode 100644 posthog/migrations/0389_alter_batchexportdestination_type.py create mode 100644 posthog/temporal/batch_exports/http_batch_export.py create mode 100644 posthog/temporal/tests/batch_exports/test_http_batch_export_workflow.py diff --git a/latest_migrations.manifest b/latest_migrations.manifest index 762dc2114fb123..78d33f21cf315c 100644 --- a/latest_migrations.manifest +++ b/latest_migrations.manifest @@ -5,7 +5,7 @@ contenttypes: 0002_remove_content_type_name ee: 0015_add_verified_properties otp_static: 0002_throttling otp_totp: 0002_auto_20190420_0723 -posthog: 0388_add_schema_to_batch_exports +posthog: 0389_alter_batchexportdestination_type sessions: 0001_initial social_django: 0010_uid_db_index two_factor: 0007_auto_20201201_1019 diff --git a/posthog/batch_exports/models.py b/posthog/batch_exports/models.py index 6d00cf38f389e1..8cb95e44827ac8 100644 --- a/posthog/batch_exports/models.py +++ b/posthog/batch_exports/models.py @@ -28,6 +28,7 @@ class Destination(models.TextChoices): POSTGRES = "Postgres" REDSHIFT = "Redshift" BIGQUERY = "BigQuery" + HTTP = "HTTP" NOOP = "NoOp" secret_fields = { @@ -36,6 +37,7 @@ class Destination(models.TextChoices): "Postgres": set("password"), "Redshift": set("password"), "BigQuery": {"private_key", "private_key_id", "client_email", "token_uri"}, + "HTTP": set("token"), "NoOp": set(), } diff --git a/posthog/batch_exports/service.py b/posthog/batch_exports/service.py index 6d84a9b28823e9..ad0812031d2a7a 100644 --- a/posthog/batch_exports/service.py +++ b/posthog/batch_exports/service.py @@ -157,6 +157,21 @@ class BigQueryBatchExportInputs: batch_export_schema: BatchExportSchema | None = None +@dataclass +class HttpBatchExportInputs: + """Inputs for Http export workflow.""" + + batch_export_id: str + team_id: int + url: str + token: str + interval: str = "hour" + data_interval_end: str | None = None + exclude_events: list[str] | None = None + include_events: list[str] | None = None + batch_export_schema: BatchExportSchema | None = None + + @dataclass class NoOpInputs: """NoOp Workflow is used for testing, it takes a single argument to echo back.""" @@ -174,6 +189,7 @@ class NoOpInputs: "Postgres": ("postgres-export", PostgresBatchExportInputs), "Redshift": ("redshift-export", RedshiftBatchExportInputs), "BigQuery": ("bigquery-export", BigQueryBatchExportInputs), + "HTTP": ("http-export", HttpBatchExportInputs), "NoOp": ("no-op", NoOpInputs), } diff --git a/posthog/migrations/0389_alter_batchexportdestination_type.py b/posthog/migrations/0389_alter_batchexportdestination_type.py new file mode 100644 index 00000000000000..1236a9a24cdbf9 --- /dev/null +++ b/posthog/migrations/0389_alter_batchexportdestination_type.py @@ -0,0 +1,29 @@ +# Generated by Django 4.1.13 on 2024-02-13 18:47 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + dependencies = [ + ("posthog", "0388_add_schema_to_batch_exports"), + ] + + operations = [ + migrations.AlterField( + model_name="batchexportdestination", + name="type", + field=models.CharField( + choices=[ + ("S3", "S3"), + ("Snowflake", "Snowflake"), + ("Postgres", "Postgres"), + ("Redshift", "Redshift"), + ("BigQuery", "Bigquery"), + ("HTTP", "Http"), + ("NoOp", "Noop"), + ], + help_text="A choice of supported BatchExportDestination types.", + max_length=64, + ), + ), + ] diff --git a/posthog/settings/temporal.py b/posthog/settings/temporal.py index c1d78fa0e9a2f2..6881541bf9fcb7 100644 --- a/posthog/settings/temporal.py +++ b/posthog/settings/temporal.py @@ -15,6 +15,7 @@ BATCH_EXPORT_SNOWFLAKE_UPLOAD_CHUNK_SIZE_BYTES = 1024 * 1024 * 100 # 100MB BATCH_EXPORT_POSTGRES_UPLOAD_CHUNK_SIZE_BYTES = 1024 * 1024 * 50 # 50MB BATCH_EXPORT_BIGQUERY_UPLOAD_CHUNK_SIZE_BYTES = 1024 * 1024 * 100 # 100MB - +BATCH_EXPORT_HTTP_UPLOAD_CHUNK_SIZE_BYTES = 1024 * 1024 * 10 # 10MB +BATCH_EXPORT_HTTP_BATCH_SIZE = 1000 UNCONSTRAINED_TIMESTAMP_TEAM_IDS = get_list(os.getenv("UNCONSTRAINED_TIMESTAMP_TEAM_IDS", "")) diff --git a/posthog/temporal/batch_exports/__init__.py b/posthog/temporal/batch_exports/__init__.py index 777f0ae50a8cf6..0973a6b8c8974a 100644 --- a/posthog/temporal/batch_exports/__init__.py +++ b/posthog/temporal/batch_exports/__init__.py @@ -32,6 +32,10 @@ SnowflakeBatchExportWorkflow, insert_into_snowflake_activity, ) +from posthog.temporal.batch_exports.http_batch_export import ( + HttpBatchExportWorkflow, + insert_into_http_activity, +) from posthog.temporal.batch_exports.squash_person_overrides import * WORKFLOWS = [ @@ -42,6 +46,7 @@ RedshiftBatchExportWorkflow, S3BatchExportWorkflow, SnowflakeBatchExportWorkflow, + HttpBatchExportWorkflow, SquashPersonOverridesWorkflow, ] @@ -58,6 +63,7 @@ insert_into_redshift_activity, insert_into_s3_activity, insert_into_snowflake_activity, + insert_into_http_activity, noop_activity, prepare_dictionary, prepare_person_overrides, diff --git a/posthog/temporal/batch_exports/batch_exports.py b/posthog/temporal/batch_exports/batch_exports.py index 2708bc3b72204b..c5b70f6af8d24f 100644 --- a/posthog/temporal/batch_exports/batch_exports.py +++ b/posthog/temporal/batch_exports/batch_exports.py @@ -380,6 +380,14 @@ def write(self, content: bytes | str): return result + def write_record_as_bytes(self, record: bytes): + result = self.write(record) + + self.records_total += 1 + self.records_since_last_reset += 1 + + return result + def write_records_to_jsonl(self, records): """Write records to a temporary file as JSONL.""" if len(records) == 1: diff --git a/posthog/temporal/batch_exports/http_batch_export.py b/posthog/temporal/batch_exports/http_batch_export.py new file mode 100644 index 00000000000000..e4ef84317701d2 --- /dev/null +++ b/posthog/temporal/batch_exports/http_batch_export.py @@ -0,0 +1,251 @@ +import aiohttp +import datetime as dt +import json +from dataclasses import dataclass + +from django.conf import settings +from temporalio import activity, workflow +from temporalio.common import RetryPolicy + +from posthog.batch_exports.service import BatchExportField, BatchExportSchema, HttpBatchExportInputs +from posthog.temporal.batch_exports.base import PostHogWorkflow +from posthog.temporal.batch_exports.batch_exports import ( + BatchExportTemporaryFile, + CreateBatchExportRunInputs, + UpdateBatchExportRunStatusInputs, + create_export_run, + execute_batch_export_insert_activity, + get_data_interval, + get_rows_count, + iter_records, + json_dumps_bytes, +) +from posthog.temporal.batch_exports.clickhouse import get_client +from posthog.temporal.batch_exports.metrics import ( + get_bytes_exported_metric, + get_rows_exported_metric, +) +from posthog.temporal.common.logger import bind_temporal_worker_logger + + +def http_default_fields() -> list[BatchExportField]: + return [ + BatchExportField(expression="toString(uuid)", alias="uuid"), + BatchExportField(expression="timestamp", alias="timestamp"), + BatchExportField(expression="event", alias="event"), + BatchExportField(expression="nullIf(properties, '')", alias="properties"), + BatchExportField(expression="toString(distinct_id)", alias="distinct_id"), + BatchExportField(expression="toJSONString(elements_chain)", alias="elements_chain"), + ] + + +@dataclass +class HttpInsertInputs: + """Inputs for HTTP insert activity.""" + + team_id: int + url: str + token: str + data_interval_start: str + data_interval_end: str + exclude_events: list[str] | None = None + include_events: list[str] | None = None + batch_export_schema: BatchExportSchema | None = None + + +async def post_json_file_to_url(url, batch_file): + batch_file.seek(0) + async with aiohttp.ClientSession() as session: + headers = {"Content-Type": "application/json"} + async with session.post(url, data=batch_file, headers=headers, raise_for_status=True) as response: + return response + + +@activity.defn +async def insert_into_http_activity(inputs: HttpInsertInputs): + """Activity streams data from ClickHouse to an HTTP Endpoint.""" + logger = await bind_temporal_worker_logger(team_id=inputs.team_id, destination="HTTP") + logger.info( + "Exporting batch %s - %s", + inputs.data_interval_start, + inputs.data_interval_end, + ) + + async with get_client() as client: + if not await client.is_alive(): + raise ConnectionError("Cannot establish connection to ClickHouse") + + count = await get_rows_count( + client=client, + team_id=inputs.team_id, + interval_start=inputs.data_interval_start, + interval_end=inputs.data_interval_end, + exclude_events=inputs.exclude_events, + include_events=inputs.include_events, + ) + + if count == 0: + logger.info( + "Nothing to export in batch %s - %s", + inputs.data_interval_start, + inputs.data_interval_end, + ) + return + + logger.info("BatchExporting %s rows", count) + + if inputs.batch_export_schema is not None: + raise NotImplementedError("Batch export schema is not supported for HTTP export") + + fields = http_default_fields() + columns = [field["alias"] for field in fields] + + record_iterator = iter_records( + client=client, + team_id=inputs.team_id, + interval_start=inputs.data_interval_start, + interval_end=inputs.data_interval_end, + exclude_events=inputs.exclude_events, + include_events=inputs.include_events, + fields=fields, + extra_query_parameters=None, + ) + + rows_exported = get_rows_exported_metric() + bytes_exported = get_bytes_exported_metric() + + # The HTTP destination currently only supports the PostHog batch capture endpoint. In the + # future we may support other endpoints, but we'll need a way to template the request body, + # headers, etc. + # + # For now, we write the batch out in PostHog capture format, which means each Batch Export + # temporary file starts with a header and ends with a footer. + # + # For example: + # + # Header written when temp file is opened: {"api_key": "api-key-from-inputs","batch": [ + # Each record is written out as an object: {"event": "foo", ...}, + # Finally, a footer is written out: ]} + # + # Why write to a file at all? Because we need to serialize the data anyway, and it's the + # safest way to stay within batch endpoint payload limits and not waste process memory. + posthog_batch_header = """{"api_key": "%s","batch": [""" % inputs.token + posthog_batch_footer = "]}" + + with BatchExportTemporaryFile() as batch_file: + + def write_event_to_batch(event): + if batch_file.records_since_last_reset == 0: + batch_file.write(posthog_batch_header) + else: + batch_file.write(",") + + batch_file.write_record_as_bytes(json_dumps_bytes(event)) + + async def flush_batch(): + logger.debug( + "Sending %s records of size %s bytes", + batch_file.records_since_last_reset, + batch_file.bytes_since_last_reset, + ) + + batch_file.write(posthog_batch_footer) + await post_json_file_to_url(inputs.url, batch_file) + + rows_exported.add(batch_file.records_since_last_reset) + bytes_exported.add(batch_file.bytes_since_last_reset) + + for record_batch in record_iterator: + for row in record_batch.select(columns).to_pylist(): + # Format result row as PostHog event, write JSON to the batch file. + + properties = row["properties"] + properties = json.loads(properties) if properties else None + if row["event"] == "$autocapture" and row["elements_chain"] is not None: + properties["$elements_chain"] = row["elements_chain"] + + capture_event = { + "uuid": row["uuid"], + "distinct_id": row["distinct_id"], + "timestamp": row["timestamp"], + "event": row["event"], + "properties": properties, + } + + write_event_to_batch(capture_event) + + if ( + batch_file.tell() > settings.BATCH_EXPORT_HTTP_UPLOAD_CHUNK_SIZE_BYTES + or batch_file.records_since_last_reset >= settings.BATCH_EXPORT_HTTP_BATCH_SIZE + ): + await flush_batch() + batch_file.reset() + + if batch_file.tell() > 0: + await flush_batch() + + +@workflow.defn(name="http-export") +class HttpBatchExportWorkflow(PostHogWorkflow): + """A Temporal Workflow to export ClickHouse data to an HTTP endpoint. + + This Workflow is intended to be executed both manually and by a Temporal + Schedule. When ran by a schedule, `data_interval_end` should be set to + `None` so that we will fetch the end of the interval from the Temporal + search attribute `TemporalScheduledStartTime`. + """ + + @staticmethod + def parse_inputs(inputs: list[str]) -> HttpBatchExportInputs: + """Parse inputs from the management command CLI.""" + loaded = json.loads(inputs[0]) + return HttpBatchExportInputs(**loaded) + + @workflow.run + async def run(self, inputs: HttpBatchExportInputs): + """Workflow implementation to export data to an HTTP Endpoint.""" + data_interval_start, data_interval_end = get_data_interval(inputs.interval, inputs.data_interval_end) + + create_export_run_inputs = CreateBatchExportRunInputs( + team_id=inputs.team_id, + batch_export_id=inputs.batch_export_id, + data_interval_start=data_interval_start.isoformat(), + data_interval_end=data_interval_end.isoformat(), + ) + run_id = await workflow.execute_activity( + create_export_run, + create_export_run_inputs, + start_to_close_timeout=dt.timedelta(minutes=5), + retry_policy=RetryPolicy( + initial_interval=dt.timedelta(seconds=10), + maximum_interval=dt.timedelta(seconds=60), + maximum_attempts=0, + non_retryable_error_types=[], + ), + ) + + update_inputs = UpdateBatchExportRunStatusInputs( + id=run_id, + status="Completed", + team_id=inputs.team_id, + ) + + insert_inputs = HttpInsertInputs( + team_id=inputs.team_id, + url=inputs.url, + token=inputs.token, + data_interval_start=data_interval_start.isoformat(), + data_interval_end=data_interval_end.isoformat(), + exclude_events=inputs.exclude_events, + include_events=inputs.include_events, + batch_export_schema=inputs.batch_export_schema, + ) + + await execute_batch_export_insert_activity( + insert_into_http_activity, + insert_inputs, + non_retryable_error_types=[], + update_inputs=update_inputs, + # Disable heartbeat timeout until we add heartbeat support. + heartbeat_timeout_seconds=None, + ) diff --git a/posthog/temporal/tests/batch_exports/test_http_batch_export_workflow.py b/posthog/temporal/tests/batch_exports/test_http_batch_export_workflow.py new file mode 100644 index 00000000000000..e78e2b32938b56 --- /dev/null +++ b/posthog/temporal/tests/batch_exports/test_http_batch_export_workflow.py @@ -0,0 +1,466 @@ +from aiohttp.client_exceptions import ClientResponseError +import asyncio +import datetime as dt +import json +from random import randint +from uuid import uuid4 + +from aioresponses import aioresponses +import pytest +import pytest_asyncio +from django.conf import settings +from django.test import override_settings +from temporalio import activity +from temporalio.client import WorkflowFailureError +from temporalio.common import RetryPolicy +from temporalio.testing import WorkflowEnvironment +from temporalio.worker import UnsandboxedWorkflowRunner, Worker + +from posthog.temporal.batch_exports.batch_exports import ( + create_export_run, + iter_records, + update_export_run_status, +) +from posthog.temporal.batch_exports.clickhouse import ClickHouseClient +from posthog.temporal.batch_exports.http_batch_export import ( + HttpBatchExportInputs, + HttpBatchExportWorkflow, + HttpInsertInputs, + insert_into_http_activity, + http_default_fields, +) +from posthog.temporal.tests.utils.events import generate_test_events_in_clickhouse +from posthog.temporal.tests.utils.models import ( + acreate_batch_export, + adelete_batch_export, + afetch_batch_export_runs, +) + +pytestmark = [ + pytest.mark.asyncio, + pytest.mark.django_db, +] + +TEST_URL = "http://example.com/batch" +TEST_TOKEN = "abcdef123456" + + +@pytest.fixture +def http_config(): + return { + "url": TEST_URL, + "token": TEST_TOKEN, + } + + +class MockServer: + def __init__(self): + self.records = [] + + def post(self, url, data, **kwargs): + data = json.loads(data.read()) + assert data["api_key"] == TEST_TOKEN + self.records.extend(data["batch"]) + + +async def assert_clickhouse_records_in_mock_server( + mock_server, + clickhouse_client: ClickHouseClient, + team_id: int, + data_interval_start: dt.datetime, + data_interval_end: dt.datetime, + exclude_events: list[str] | None = None, + include_events: list[str] | None = None, +): + """Assert expected records are written to a MockServer instance.""" + posted_records = mock_server.records + + schema_column_names = [field["alias"] for field in http_default_fields()] + + expected_records = [] + for records in iter_records( + client=clickhouse_client, + team_id=team_id, + interval_start=data_interval_start.isoformat(), + interval_end=data_interval_end.isoformat(), + exclude_events=exclude_events, + include_events=include_events, + fields=http_default_fields(), + extra_query_parameters=None, + ): + for record in records.select(schema_column_names).to_pylist(): + expected_record = {} + + for k, v in record.items(): + if k in {"properties", "set", "set_once", "person_properties"} and v is not None: + expected_record[k] = json.loads(v) + elif isinstance(v, dt.datetime): + expected_record[k] = v.replace(tzinfo=dt.timezone.utc).isoformat() + else: + expected_record[k] = v + + elements_chain = expected_record.pop("elements_chain", None) + if expected_record["event"] == "$autocapture" and elements_chain is not None: + expected_record["properties"]["$elements_chain"] = elements_chain + + expected_records.append(expected_record) + + inserted_column_names = [column_name for column_name in posted_records[0].keys()].sort() + expected_column_names = [column_name for column_name in expected_records[0].keys()].sort() + + assert inserted_column_names == expected_column_names + assert posted_records[0] == expected_records[0] + assert posted_records == expected_records + + +@pytest.mark.parametrize("exclude_events", [None, ["test-exclude"]], indirect=True) +async def test_insert_into_http_activity_inserts_data_into_http_endpoint( + clickhouse_client, activity_environment, http_config, exclude_events +): + """Test that the insert_into_http_activity function POSTs data to an HTTP Endpoint. + + We use the generate_test_events_in_clickhouse function to generate several sets + of events. Some of these sets are expected to be exported, and others not. Expected + events are those that: + * Are created for the team_id of the batch export. + * Are created in the date range of the batch export. + * Are not duplicates of other events that are in the same batch. + * Do not have an event name contained in the batch export's exclude_events. + """ + data_interval_start = dt.datetime(2023, 4, 20, 14, 0, 0, tzinfo=dt.timezone.utc) + data_interval_end = dt.datetime(2023, 4, 25, 15, 0, 0, tzinfo=dt.timezone.utc) + + # Generate a random team id integer. There's still a chance of a collision, + # but it's very small. + team_id = randint(1, 1000000) + + await generate_test_events_in_clickhouse( + client=clickhouse_client, + team_id=team_id, + start_time=data_interval_start, + end_time=data_interval_end, + count=10000, + count_outside_range=10, + count_other_team=10, + duplicate=True, + properties={"$browser": "Chrome", "$os": "Mac OS X"}, + person_properties={"utm_medium": "referral", "$initial_os": "Linux"}, + ) + + await generate_test_events_in_clickhouse( + client=clickhouse_client, + team_id=team_id, + start_time=data_interval_start, + end_time=data_interval_end, + count=5, + count_outside_range=0, + count_other_team=0, + properties=None, + person_properties=None, + event_name="test-no-prop-{i}", + ) + + if exclude_events: + for event_name in exclude_events: + await generate_test_events_in_clickhouse( + client=clickhouse_client, + team_id=team_id, + start_time=data_interval_start, + end_time=data_interval_end, + count=5, + count_outside_range=0, + count_other_team=0, + event_name=event_name, + ) + + insert_inputs = HttpInsertInputs( + team_id=team_id, + data_interval_start=data_interval_start.isoformat(), + data_interval_end=data_interval_end.isoformat(), + exclude_events=exclude_events, + batch_export_schema=None, + **http_config, + ) + + mock_server = MockServer() + with aioresponses(passthrough=[settings.CLICKHOUSE_HTTP_URL]) as m, override_settings( + BATCH_EXPORT_HTTP_UPLOAD_CHUNK_SIZE_BYTES=5 * 1024**2 + ): + m.post(TEST_URL, status=200, callback=mock_server.post, repeat=True) + await activity_environment.run(insert_into_http_activity, insert_inputs) + + await assert_clickhouse_records_in_mock_server( + mock_server=mock_server, + clickhouse_client=clickhouse_client, + team_id=team_id, + data_interval_start=data_interval_start, + data_interval_end=data_interval_end, + exclude_events=exclude_events, + ) + + +async def test_insert_into_http_activity_throws_on_bad_http_status( + clickhouse_client, activity_environment, http_config, exclude_events +): + """Test that the insert_into_http_activity function throws on status >= 400""" + data_interval_start = dt.datetime(2023, 4, 20, 14, 0, 0, tzinfo=dt.timezone.utc) + data_interval_end = dt.datetime(2023, 4, 25, 15, 0, 0, tzinfo=dt.timezone.utc) + + # Generate a random team id integer. There's still a chance of a collision, + # but it's very small. + team_id = randint(1, 1000000) + + await generate_test_events_in_clickhouse( + client=clickhouse_client, + team_id=team_id, + start_time=data_interval_start, + end_time=data_interval_end, + count=1, + count_outside_range=10, + count_other_team=10, + duplicate=True, + properties={"$browser": "Chrome", "$os": "Mac OS X"}, + person_properties={"utm_medium": "referral", "$initial_os": "Linux"}, + ) + + insert_inputs = HttpInsertInputs( + team_id=team_id, + data_interval_start=data_interval_start.isoformat(), + data_interval_end=data_interval_end.isoformat(), + exclude_events=exclude_events, + batch_export_schema=None, + **http_config, + ) + + with aioresponses(passthrough=[settings.CLICKHOUSE_HTTP_URL]) as m, override_settings( + BATCH_EXPORT_HTTP_UPLOAD_CHUNK_SIZE_BYTES=5 * 1024**2 + ): + m.post(TEST_URL, status=400, repeat=True) + with pytest.raises(ClientResponseError): + await activity_environment.run(insert_into_http_activity, insert_inputs) + + with aioresponses(passthrough=[settings.CLICKHOUSE_HTTP_URL]) as m, override_settings( + BATCH_EXPORT_HTTP_UPLOAD_CHUNK_SIZE_BYTES=5 * 1024**2 + ): + m.post(TEST_URL, status=500, repeat=True) + with pytest.raises(ClientResponseError): + await activity_environment.run(insert_into_http_activity, insert_inputs) + + +@pytest_asyncio.fixture +async def http_batch_export(ateam, http_config, interval, exclude_events, temporal_client): + destination_data = { + "type": "HTTP", + "config": {**http_config, "exclude_events": exclude_events}, + } + batch_export_data = { + "name": "my-production-http-export", + "destination": destination_data, + "interval": interval, + } + + batch_export = await acreate_batch_export( + team_id=ateam.pk, + name=batch_export_data["name"], + destination_data=batch_export_data["destination"], + interval=batch_export_data["interval"], + ) + + yield batch_export + + await adelete_batch_export(batch_export, temporal_client) + + +@pytest.mark.parametrize("interval", ["hour", "day"], indirect=True) +@pytest.mark.parametrize("exclude_events", [None, ["test-exclude"]], indirect=True) +async def test_http_export_workflow( + clickhouse_client, + http_batch_export, + interval, + exclude_events, + ateam, + batch_export_schema, +): + """Test HTTP Export Workflow end-to-end by using a mock server. + + The workflow should update the batch export run status to completed and produce the expected + records to the mock server. + """ + data_interval_end = dt.datetime.fromisoformat("2023-04-25T14:30:00.000000+00:00") + data_interval_start = data_interval_end - http_batch_export.interval_time_delta + + await generate_test_events_in_clickhouse( + client=clickhouse_client, + team_id=ateam.pk, + start_time=data_interval_start, + end_time=data_interval_end, + count=100, + count_outside_range=10, + count_other_team=10, + duplicate=True, + properties={"$browser": "Chrome", "$os": "Mac OS X"}, + person_properties={"utm_medium": "referral", "$initial_os": "Linux"}, + ) + + if exclude_events: + for event_name in exclude_events: + await generate_test_events_in_clickhouse( + client=clickhouse_client, + team_id=ateam.pk, + start_time=data_interval_start, + end_time=data_interval_end, + count=5, + count_outside_range=0, + count_other_team=0, + event_name=event_name, + ) + + workflow_id = str(uuid4()) + inputs = HttpBatchExportInputs( + team_id=ateam.pk, + batch_export_id=str(http_batch_export.id), + data_interval_end=data_interval_end.isoformat(), + interval=interval, + batch_export_schema=batch_export_schema, + **http_batch_export.destination.config, + ) + + mock_server = MockServer() + async with await WorkflowEnvironment.start_time_skipping() as activity_environment: + async with Worker( + activity_environment.client, + task_queue=settings.TEMPORAL_TASK_QUEUE, + workflows=[HttpBatchExportWorkflow], + activities=[ + create_export_run, + insert_into_http_activity, + update_export_run_status, + ], + workflow_runner=UnsandboxedWorkflowRunner(), + ): + with aioresponses(passthrough=[settings.CLICKHOUSE_HTTP_URL]) as m, override_settings( + BATCH_EXPORT_HTTP_UPLOAD_CHUNK_SIZE_BYTES=5 * 1024**2 + ): + m.post(TEST_URL, status=200, callback=mock_server.post, repeat=True) + + await activity_environment.client.execute_workflow( + HttpBatchExportWorkflow.run, + inputs, + id=workflow_id, + task_queue=settings.TEMPORAL_TASK_QUEUE, + retry_policy=RetryPolicy(maximum_attempts=1), + execution_timeout=dt.timedelta(seconds=10), + ) + + runs = await afetch_batch_export_runs(batch_export_id=http_batch_export.id) + assert len(runs) == 1 + + run = runs[0] + assert run.status == "Completed" + + await assert_clickhouse_records_in_mock_server( + mock_server=mock_server, + clickhouse_client=clickhouse_client, + team_id=ateam.pk, + data_interval_start=data_interval_start, + data_interval_end=data_interval_end, + exclude_events=exclude_events, + ) + + +async def test_http_export_workflow_handles_insert_activity_errors(ateam, http_batch_export, interval): + """Test that HTTP Export Workflow can gracefully handle errors when POSTing to HTTP Endpoint.""" + data_interval_end = dt.datetime.fromisoformat("2023-04-25T14:30:00.000000+00:00") + + workflow_id = str(uuid4()) + inputs = HttpBatchExportInputs( + team_id=ateam.pk, + batch_export_id=str(http_batch_export.id), + data_interval_end=data_interval_end.isoformat(), + interval=interval, + **http_batch_export.destination.config, + ) + + @activity.defn(name="insert_into_http_activity") + async def insert_into_http_activity_mocked(_: HttpInsertInputs) -> str: + raise ValueError("A useful error message") + + async with await WorkflowEnvironment.start_time_skipping() as activity_environment: + async with Worker( + activity_environment.client, + task_queue=settings.TEMPORAL_TASK_QUEUE, + workflows=[HttpBatchExportWorkflow], + activities=[ + create_export_run, + insert_into_http_activity_mocked, + update_export_run_status, + ], + workflow_runner=UnsandboxedWorkflowRunner(), + ): + with pytest.raises(WorkflowFailureError): + await activity_environment.client.execute_workflow( + HttpBatchExportWorkflow.run, + inputs, + id=workflow_id, + task_queue=settings.TEMPORAL_TASK_QUEUE, + retry_policy=RetryPolicy(maximum_attempts=1), + ) + + runs = await afetch_batch_export_runs(batch_export_id=http_batch_export.id) + assert len(runs) == 1 + + run = runs[0] + assert run.status == "Failed" + assert run.latest_error == "ValueError: A useful error message" + + +async def test_http_export_workflow_handles_cancellation(ateam, http_batch_export, interval): + """Test that HTTP Export Workflow can gracefully handle cancellations when POSTing to HTTP Endpoint.""" + data_interval_end = dt.datetime.fromisoformat("2023-04-25T14:30:00.000000+00:00") + + workflow_id = str(uuid4()) + inputs = HttpBatchExportInputs( + team_id=ateam.pk, + batch_export_id=str(http_batch_export.id), + data_interval_end=data_interval_end.isoformat(), + interval=interval, + **http_batch_export.destination.config, + ) + + @activity.defn(name="insert_into_http_activity") + async def never_finish_activity(_: HttpInsertInputs) -> str: + while True: + activity.heartbeat() + await asyncio.sleep(1) + + async with await WorkflowEnvironment.start_time_skipping() as activity_environment: + async with Worker( + activity_environment.client, + task_queue=settings.TEMPORAL_TASK_QUEUE, + workflows=[HttpBatchExportWorkflow], + activities=[ + create_export_run, + never_finish_activity, + update_export_run_status, + ], + workflow_runner=UnsandboxedWorkflowRunner(), + ): + handle = await activity_environment.client.start_workflow( + HttpBatchExportWorkflow.run, + inputs, + id=workflow_id, + task_queue=settings.TEMPORAL_TASK_QUEUE, + retry_policy=RetryPolicy(maximum_attempts=1), + ) + await asyncio.sleep(5) + await handle.cancel() + + with pytest.raises(WorkflowFailureError): + await handle.result() + + runs = await afetch_batch_export_runs(batch_export_id=http_batch_export.id) + assert len(runs) == 1 + + run = runs[0] + assert run.status == "Cancelled" + assert run.latest_error == "Cancelled" diff --git a/requirements-dev.in b/requirements-dev.in index daa1a9b4ee4fe4..69676d92a66fd5 100644 --- a/requirements-dev.in +++ b/requirements-dev.in @@ -46,3 +46,4 @@ python-dateutil>=2.8.2 responses==0.23.1 syrupy~=4.6.0 flaky==3.7.0 +aioresponses==0.7.6 \ No newline at end of file diff --git a/requirements-dev.txt b/requirements-dev.txt index 54229a1f88bb85..63d0cc7a5cb746 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -4,6 +4,16 @@ # # pip-compile requirements-dev.in # +aiohttp==3.9.0 + # via + # -c requirements.txt + # aioresponses +aioresponses==0.7.6 + # via -r requirements-dev.in +aiosignal==1.2.0 + # via + # -c requirements.txt + # aiohttp annotated-types==0.5.0 # via # -c requirements.txt @@ -17,7 +27,12 @@ asgiref==3.7.2 async-timeout==4.0.2 # via # -c requirements.txt + # aiohttp # redis +attrs==23.2.0 + # via + # -c requirements.txt + # aiohttp black==22.8.0 # via # -r requirements-dev.in @@ -84,6 +99,11 @@ flaky==3.7.0 # via -r requirements-dev.in freezegun==1.2.2 # via -r requirements-dev.in +frozenlist==1.3.0 + # via + # -c requirements.txt + # aiohttp + # aiosignal genson==1.2.2 # via datamodel-code-generator icdiff==2.0.5 @@ -93,6 +113,7 @@ idna==2.8 # -c requirements.txt # email-validator # requests + # yarl inflect==5.6.2 # via datamodel-code-generator iniconfig==1.1.1 @@ -109,6 +130,11 @@ lupa==1.14.1 # via fakeredis markupsafe==1.1.1 # via jinja2 +multidict==6.0.2 + # via + # -c requirements.txt + # aiohttp + # yarl mypy==1.8.0 # via # -r requirements-dev.in @@ -273,6 +299,10 @@ wheel==0.42.0 # via # -c requirements.txt # pip-tools +yarl==1.7.2 + # via + # -c requirements.txt + # aiohttp # The following packages are considered to be unsafe in a requirements file: # pip