From 098a07388592b239414923b638acc2d315333495 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Far=C3=ADas=20Santana?= Date: Wed, 20 Sep 2023 16:00:15 +0200 Subject: [PATCH] refactor: BatchExport tests for S3 --- posthog/temporal/tests/batch_exports/base.py | 143 ++- .../temporal/tests/batch_exports/fixtures.py | 48 - .../test_s3_batch_export_workflow.py | 991 +++++------------- posthog/temporal/tests/conftest.py | 36 + posthog/temporal/workflows/s3_batch_export.py | 2 +- 5 files changed, 453 insertions(+), 767 deletions(-) delete mode 100644 posthog/temporal/tests/batch_exports/fixtures.py diff --git a/posthog/temporal/tests/batch_exports/base.py b/posthog/temporal/tests/batch_exports/base.py index 88a52fe798426e..4d7808e869cf29 100644 --- a/posthog/temporal/tests/batch_exports/base.py +++ b/posthog/temporal/tests/batch_exports/base.py @@ -1,10 +1,19 @@ import datetime as dt import json +import random import typing +import uuid from asgiref.sync import sync_to_async +from temporalio.client import Client from ee.clickhouse.materialized_columns.columns import materialize +from posthog.batch_exports.models import ( + BatchExport, + BatchExportDestination, + BatchExportRun, +) +from posthog.batch_exports.service import sync_batch_export from posthog.temporal.workflows.clickhouse import ClickHouseClient @@ -25,8 +34,99 @@ class EventValues(typing.TypedDict): uuid: str -async def insert_events(client: ClickHouseClient, events: list[EventValues]): +def date_range(start: dt.datetime, stop: dt.datetime, step: dt.timedelta): + """Generate a range of dates between two dates.""" + current = start + + while current < stop: + yield current + current += step + + +async def insert_events( + client: ClickHouseClient, + team, + start_time, + end_time, + n: int = 100, + n_outside_range: int = 10, + n_other_team: int = 10, + override_values: dict | None = None, + duplicate: bool = False, +) -> tuple[list[EventValues], list[EventValues], list[EventValues]]: """Insert some events into the sharded_events table.""" + possible_datetimes = list(date_range(start_time, end_time, dt.timedelta(minutes=1))) + if override_values is None: + override_dict = {} + else: + override_dict = override_values + + properties = {"$browser": "Chrome", "$os": "Mac OS X", "super-property": "super-value"} + + events: list[EventValues] = [ + { + "uuid": str(uuid.uuid4()), + "event": f"test-{i}", + "timestamp": random.choice(possible_datetimes).strftime("%Y-%m-%d %H:%M:%S.%f"), + "created_at": random.choice(possible_datetimes).strftime("%Y-%m-%d %H:%M:%S.%f"), + "inserted_at": override_dict.get( + "inserted_at", random.choice(possible_datetimes).strftime("%Y-%m-%d %H:%M:%S.%f") + ), + "_timestamp": random.choice(possible_datetimes).strftime("%Y-%m-%d %H:%M:%S"), + "person_id": str(uuid.uuid4()), + "person_properties": override_dict.get("properties", properties), + "team_id": team.pk, + "properties": override_dict.get("properties", properties), + "distinct_id": str(uuid.uuid4()), + "elements_chain": "this is a comman, separated, list, of css selectors(?)", + } + for i in range(n) + ] + + duplicate_events = [] + if duplicate is True: + duplicate_events = events + + delta = (end_time - start_time) + dt.timedelta(hours=1) + events_outside_range: list[EventValues] = [ + { + "uuid": str(uuid.uuid4()), + "event": f"test-{i}", + "timestamp": (random.choice(possible_datetimes) + delta).strftime("%Y-%m-%d %H:%M:%S.%f"), + "created_at": (random.choice(possible_datetimes) + delta).strftime("%Y-%m-%d %H:%M:%S.%f"), + "inserted_at": override_dict.get( + "inserted_at", (random.choice(possible_datetimes) + delta).strftime("%Y-%m-%d %H:%M:%S.%f") + ), + "_timestamp": (random.choice(possible_datetimes) + delta).strftime("%Y-%m-%d %H:%M:%S"), + "person_id": str(uuid.uuid4()), + "person_properties": override_dict.get("properties", properties), + "team_id": team.pk, + "properties": override_dict.get("properties", properties), + "distinct_id": str(uuid.uuid4()), + "elements_chain": "this is a comman, separated, list, of css selectors(?)", + } + for i in range(n_outside_range) + ] + + events_from_other_team: list[EventValues] = [ + { + "uuid": str(uuid.uuid4()), + "event": f"test-{i}", + "timestamp": random.choice(possible_datetimes).strftime("%Y-%m-%d %H:%M:%S.%f"), + "created_at": random.choice(possible_datetimes).strftime("%Y-%m-%d %H:%M:%S.%f"), + "inserted_at": override_dict.get( + "inserted_at", random.choice(possible_datetimes).strftime("%Y-%m-%d %H:%M:%S.%f") + ), + "_timestamp": random.choice(possible_datetimes).strftime("%Y-%m-%d %H:%M:%S"), + "person_id": str(uuid.uuid4()), + "person_properties": override_dict.get("properties", properties), + "team_id": team.pk + 1, + "properties": override_dict.get("properties", properties), + "distinct_id": str(uuid.uuid4()), + "elements_chain": "this is a comman, separated, list, of css selectors(?)", + } + for i in range(n_other_team) + ] await client.execute_query( f""" @@ -63,10 +163,12 @@ async def insert_events(client: ClickHouseClient, events: list[EventValues]): if isinstance(event["person_properties"], dict) else event["person_properties"], ) - for event in events + for event in events + events_outside_range + events_from_other_team + duplicate_events ], ) + return (events, events_outside_range, events_from_other_team) + @sync_to_async def amaterialize(table: typing.Literal["events", "person", "groups"], column: str): @@ -79,3 +181,40 @@ def to_isoformat(d: str | None) -> str | None: if d is None: return None return dt.datetime.fromisoformat(d).replace(tzinfo=dt.timezone.utc).isoformat() + + +def create_batch_export(team_id: int, interval: str, name: str, destination_data: dict) -> BatchExport: + """Create a BatchExport and its underlying Schedule.""" + + destination = BatchExportDestination(**destination_data) + batch_export = BatchExport(team_id=team_id, destination=destination, interval=interval, name=name) + + sync_batch_export(batch_export, created=True) + + destination.save() + batch_export.save() + + return batch_export + + +async def acreate_batch_export(team_id: int, interval: str, name: str, destination_data: dict) -> BatchExport: + """Create a BatchExport and its underlying Schedule.""" + return await sync_to_async(create_batch_export)(team_id, interval, name, destination_data) # type: ignore + + +def fetch_batch_export_runs(batch_export_id: uuid.UUID, limit: int = 100) -> list[BatchExportRun]: + """Fetch the BatchExportRuns for a given BatchExport.""" + return list(BatchExportRun.objects.filter(batch_export_id=batch_export_id).order_by("-created_at")[:limit]) + + +async def afetch_batch_export_runs(batch_export_id: uuid.UUID, limit: int = 100) -> list[BatchExportRun]: + """Fetch the BatchExportRuns for a given BatchExport.""" + return await sync_to_async(fetch_batch_export_runs)(batch_export_id, limit) # type: ignore + + +async def adelete_batch_export(batch_export: BatchExport, temporal: Client) -> None: + """Delete a BatchExport and its underlying Schedule.""" + handle = temporal.get_schedule_handle(str(batch_export.id)) + await handle.delete() + + await sync_to_async(batch_export.delete)() # type: ignore diff --git a/posthog/temporal/tests/batch_exports/fixtures.py b/posthog/temporal/tests/batch_exports/fixtures.py deleted file mode 100644 index d54db02304cc53..00000000000000 --- a/posthog/temporal/tests/batch_exports/fixtures.py +++ /dev/null @@ -1,48 +0,0 @@ -from uuid import UUID - -from asgiref.sync import sync_to_async -from temporalio.client import Client - -from posthog.batch_exports.models import ( - BatchExport, - BatchExportDestination, - BatchExportRun, -) -from posthog.batch_exports.service import sync_batch_export - - -def create_batch_export(team_id: int, interval: str, name: str, destination_data: dict) -> BatchExport: - """Create a BatchExport and its underlying Schedule.""" - - destination = BatchExportDestination(**destination_data) - batch_export = BatchExport(team_id=team_id, destination=destination, interval=interval, name=name) - - sync_batch_export(batch_export, created=True) - - destination.save() - batch_export.save() - - return batch_export - - -async def acreate_batch_export(team_id: int, interval: str, name: str, destination_data: dict) -> BatchExport: - """Create a BatchExport and its underlying Schedule.""" - return await sync_to_async(create_batch_export)(team_id, interval, name, destination_data) # type: ignore - - -def fetch_batch_export_runs(batch_export_id: UUID, limit: int = 100) -> list[BatchExportRun]: - """Fetch the BatchExportRuns for a given BatchExport.""" - return list(BatchExportRun.objects.filter(batch_export_id=batch_export_id).order_by("-created_at")[:limit]) - - -async def afetch_batch_export_runs(batch_export_id: UUID, limit: int = 100) -> list[BatchExportRun]: - """Fetch the BatchExportRuns for a given BatchExport.""" - return await sync_to_async(fetch_batch_export_runs)(batch_export_id, limit) # type: ignore - - -async def adelete_batch_export(batch_export: BatchExport, temporal: Client) -> None: - """Delete a BatchExport and its underlying Schedule.""" - handle = temporal.get_schedule_handle(str(batch_export.id)) - await handle.delete() - - await sync_to_async(batch_export.delete)() # type: ignore diff --git a/posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py b/posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py index 2511580358e72d..2f9cc61f01a8fd 100644 --- a/posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py +++ b/posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py @@ -1,13 +1,13 @@ import asyncio +import collections import datetime as dt import functools import gzip import itertools import json import os -from random import randint +import uuid from unittest import mock -from uuid import uuid4 import boto3 import botocore.exceptions @@ -16,7 +16,6 @@ import pytest_asyncio from asgiref.sync import sync_to_async from django.conf import settings -from django.test import Client as HttpClient from django.test import override_settings from temporalio import activity from temporalio.client import WorkflowFailureError @@ -24,22 +23,15 @@ from temporalio.testing import WorkflowEnvironment from temporalio.worker import UnsandboxedWorkflowRunner, Worker -from posthog.api.test.test_organization import acreate_organization -from posthog.api.test.test_team import acreate_team -from posthog.temporal.client import connect from posthog.temporal.tests.batch_exports.base import ( - EventValues, - amaterialize, - insert_events, - to_isoformat, -) -from posthog.temporal.tests.batch_exports.fixtures import ( acreate_batch_export, adelete_batch_export, afetch_batch_export_runs, + amaterialize, + insert_events, + to_isoformat, ) from posthog.temporal.workflows.base import create_export_run, update_export_run_status -from posthog.temporal.workflows.clickhouse import ClickHouseClient from posthog.temporal.workflows.s3_batch_export import ( S3BatchExportInputs, S3BatchExportWorkflow, @@ -68,7 +60,58 @@ def check_valid_credentials() -> bool: @pytest.fixture def bucket_name() -> str: """Name for a test S3 bucket.""" - return f"{TEST_ROOT_BUCKET}-{str(uuid4())}" + return f"{TEST_ROOT_BUCKET}-{str(uuid.uuid4())}" + + +S3BatchExportTestParameters = collections.namedtuple( + "S3BatchExportTestParameters", ("interval", "compression", "encryption", "exclude_events") +) + +ALL_S3_TEST_PARAMETERS = [ + S3BatchExportTestParameters( + interval=interval, compression=compression, encryption=encryption, exclude_events=exclude_events + ) + for (interval, compression, encryption, exclude_events) in itertools.product( + ["hour", "day"], [None, "gzip", "brotli"], [None, "AES256", "aws:kms"], [None, ["test-1", "test-2"]] + ) +] + + +@pytest_asyncio.fixture(params=ALL_S3_TEST_PARAMETERS) +async def s3_batch_export(request, ateam, temporal_client, bucket_name): + batch_export_test_params = request.param + prefix = f"posthog-events-{str(uuid.uuid4())}" + destination_data = { + "type": "S3", + "config": { + "bucket_name": bucket_name, + "region": "us-east-1", + "prefix": prefix, + "aws_access_key_id": "object_storage_root_user", + "aws_secret_access_key": "object_storage_root_password", + "compression": batch_export_test_params.compression, + "encryption": batch_export_test_params.encryption, + "exclude_events": batch_export_test_params.exclude_events, + "kms_key_id": os.getenv("S3_TEST_KMS_KEY_ID", None), + }, + } + + batch_export_data = { + "name": "my-production-s3-bucket-export", + "destination": destination_data, + "interval": batch_export_test_params.interval, + } + + batch_export = await acreate_batch_export( + team_id=ateam.pk, + name=batch_export_data["name"], + destination_data=batch_export_data["destination"], + interval=batch_export_data["interval"], + ) + + yield batch_export + + await adelete_batch_export(batch_export, temporal_client) @pytest.fixture @@ -126,7 +169,7 @@ def assert_events_in_s3( json_data = [json.loads(line) for line in data.decode("utf-8").split("\n") if line] # Pull out the fields we inserted only - json_data.sort(key=lambda x: x["timestamp"]) + json_data.sort(key=lambda x: x["event"]) # Remove team_id, _timestamp from events if exclude_events is None: @@ -143,154 +186,50 @@ def to_expected_event(event): } expected_events = list(map(to_expected_event, (event for event in events if event["event"] not in exclude_events))) - - expected_events.sort(key=lambda x: x["timestamp"] if x["timestamp"] is not None else 0) + expected_events.sort(key=lambda x: x["event"]) # First check one event, the first one, so that we can get a nice diff if # the included data is different. assert json_data[0] == expected_events[0] - assert json_data == expected_events + assert len(json_data) == len(expected_events) + for event_1, event_2 in zip(json_data, expected_events): + assert event_1 == event_2 @pytest.mark.django_db @pytest.mark.asyncio +@pytest.mark.usefixtures("truncate_events") @pytest.mark.parametrize( "compression,exclude_events", - itertools.product([None, "gzip", "brotli"], [None, ["test-exclude"]]), + itertools.product([None, "gzip", "brotli"], [None, ["test-1", "test-2"]]), ) async def test_insert_into_s3_activity_puts_data_into_s3( - bucket_name, s3_client, activity_environment, compression, exclude_events + bucket_name, s3_client, activity_environment, compression, exclude_events, ch_client, ateam ): """Test that the insert_into_s3_activity function puts data into S3.""" - data_interval_start = "2023-04-20 14:00:00" - data_interval_end = "2023-04-25 15:00:00" - - # Generate a random team id integer. There's still a chance of a collision, - # but it's very small. - team_id = randint(1, 1000000) + data_interval_end = "2023-04-21 15:00:00" - client = ClickHouseClient( - url=settings.CLICKHOUSE_HTTP_URL, - user=settings.CLICKHOUSE_USER, - password=settings.CLICKHOUSE_PASSWORD, - database=settings.CLICKHOUSE_DATABASE, + events, _, _ = await insert_events( + client=ch_client, + team=ateam, + start_time=dt.datetime.fromisoformat(data_interval_start), + end_time=dt.datetime.fromisoformat(data_interval_end), + n=1000, ) - # Add a materialized column such that we can verify that it is NOT included # in the export. await amaterialize("events", "$browser") - # Create enough events to ensure we span more than 5MB, the smallest - # multipart chunk size for multipart uploads to S3. - events: list[EventValues] = [ - { - "uuid": str(uuid4()), - "event": "test", - "_timestamp": "2023-04-20 14:30:00", - "timestamp": f"2023-04-20 14:30:00.{i:06d}", - "inserted_at": f"2023-04-20 14:30:00.{i:06d}", - "created_at": "2023-04-20 14:30:00.000000", - "distinct_id": str(uuid4()), - "person_id": str(uuid4()), - "person_properties": {"$browser": "Chrome", "$os": "Mac OS X"}, - "team_id": team_id, - "properties": {"$browser": "Chrome", "$os": "Mac OS X"}, - "elements_chain": "this that and the other", - } - # NOTE: we have to do a lot here, otherwise we do not trigger a - # multipart upload, and the minimum part chunk size is 5MB. - for i in range(10000) - ] - - events += [ - # Insert an events with an empty string in `properties` and - # `person_properties` to ensure that we handle empty strings correctly. - EventValues( - { - "uuid": str(uuid4()), - "event": "test-exclude", - "_timestamp": "2023-04-20 14:29:00", - "timestamp": "2023-04-20 14:29:00.000000", - "inserted_at": "2023-04-20 14:30:00.000000", - "created_at": "2023-04-20 14:29:00.000000", - "distinct_id": str(uuid4()), - "person_id": str(uuid4()), - "person_properties": None, - "team_id": team_id, - "properties": None, - "elements_chain": "", - } - ) - ] - - # Insert some data into the `sharded_events` table. - await insert_events( - client=client, - events=events, - ) - - # Insert some events before the hour and after the hour, as well as some - # events from another team to ensure that we only export the events from - # the team that the batch export is for. - other_team_id = team_id + 1 - await insert_events( - client=client, - events=[ - { - "uuid": str(uuid4()), - "event": "test", - "timestamp": "2023-04-20 13:30:00", - "_timestamp": "2023-04-20 13:30:00", - "inserted_at": "2023-04-20 13:30:00.000000", - "created_at": "2023-04-20 13:30:00.000000", - "person_id": str(uuid4()), - "distinct_id": str(uuid4()), - "team_id": team_id, - "properties": {"$browser": "Chrome", "$os": "Mac OS X"}, - "person_properties": {"$browser": "Chrome", "$os": "Mac OS X"}, - "elements_chain": "this is a comman, separated, list, of css selectors(?)", - }, - { - "uuid": str(uuid4()), - "event": "test", - "timestamp": "2023-04-20 15:30:00", - "_timestamp": "2023-04-20 13:30:00", - "inserted_at": "2023-04-20 13:30:00.000000", - "created_at": "2023-04-20 13:30:00.000000", - "person_id": str(uuid4()), - "distinct_id": str(uuid4()), - "team_id": team_id, - "properties": {"$browser": "Chrome", "$os": "Mac OS X"}, - "person_properties": {"$browser": "Chrome", "$os": "Mac OS X"}, - "elements_chain": "this is a comman, separated, list, of css selectors(?)", - }, - { - "uuid": str(uuid4()), - "event": "test", - "timestamp": "2023-04-20 14:30:00", - "_timestamp": "2023-04-20 14:30:00", - "inserted_at": "2023-04-20 14:30:00.000000", - "created_at": "2023-04-20 14:30:00.000000", - "person_id": str(uuid4()), - "distinct_id": str(uuid4()), - "team_id": other_team_id, - "properties": {"$browser": "Chrome", "$os": "Mac OS X"}, - "person_properties": {"$browser": "Chrome", "$os": "Mac OS X"}, - "elements_chain": "this is a comman, separated, list, of css selectors(?)", - }, - ], - ) - # Make a random string to prefix the S3 keys with. This allows us to ensure # isolation of the test, and also to check that the data is being written. - prefix = str(uuid4()) + prefix = str(uuid.uuid4()) insert_inputs = S3InsertInputs( bucket_name=bucket_name, region="us-east-1", prefix=prefix, - team_id=team_id, + team_id=ateam.pk, data_interval_start=data_interval_start, data_interval_end=data_interval_end, aws_access_key_id="object_storage_root_user", @@ -310,118 +249,35 @@ async def test_insert_into_s3_activity_puts_data_into_s3( @pytest.mark.django_db @pytest.mark.asyncio -@pytest.mark.parametrize( - "interval,compression,exclude_events", - itertools.product(["hour", "day"], [None, "gzip", "brotli"], [None, ["test-exclude"]]), -) -async def test_s3_export_workflow_with_minio_bucket( - client: HttpClient, s3_client, bucket_name, interval, compression, exclude_events -): +@pytest.mark.usefixtures("truncate_events") +async def test_s3_export_workflow_with_minio_bucket(s3_client, s3_batch_export, ch_client): """Test S3 Export Workflow end-to-end by using a local MinIO bucket instead of S3. The workflow should update the batch export run status to completed and produce the expected records to the MinIO bucket. """ - prefix = f"posthog-events-{str(uuid4())}" - destination_data = { - "type": "S3", - "config": { - "bucket_name": bucket_name, - "region": "us-east-1", - "prefix": prefix, - "aws_access_key_id": "object_storage_root_user", - "aws_secret_access_key": "object_storage_root_password", - "compression": compression, - "exclude_events": exclude_events, - }, - } - - batch_export_data = { - "name": "my-production-s3-bucket-destination", - "destination": destination_data, - "interval": interval, - } - - organization = await acreate_organization("test") - team = await acreate_team(organization=organization) - batch_export = await acreate_batch_export( - team_id=team.pk, - name=batch_export_data["name"], - destination_data=batch_export_data["destination"], - interval=batch_export_data["interval"], - ) + if s3_batch_export.destination.config.get("encryption", None) is not None: + pytest.skip("Encryption is not supported in MinIO") - events: list[EventValues] = [ - { - "uuid": str(uuid4()), - "event": "test", - "timestamp": "2023-04-25 13:30:00.000000", - "created_at": "2023-04-25 13:30:00.000000", - "inserted_at": "2023-04-25 13:30:00.000000", - "_timestamp": "2023-04-25 13:30:00", - "person_id": str(uuid4()), - "person_properties": {"$browser": "Chrome", "$os": "Mac OS X"}, - "team_id": team.pk, - "properties": {"$browser": "Chrome", "$os": "Mac OS X"}, - "distinct_id": str(uuid4()), - "elements_chain": "this is a comman, separated, list, of css selectors(?)", - }, - { - "uuid": str(uuid4()), - "event": "test-exclude", - "timestamp": "2023-04-25 14:29:00.000000", - "created_at": "2023-04-25 14:29:00.000000", - "inserted_at": "2023-04-25 14:29:00.000000", - "_timestamp": "2023-04-25 14:29:00", - "person_id": str(uuid4()), - "person_properties": {"$browser": "Chrome", "$os": "Mac OS X"}, - "team_id": team.pk, - "properties": {"$browser": "Chrome", "$os": "Mac OS X"}, - "distinct_id": str(uuid4()), - "elements_chain": "this is a comman, separated, list, of css selectors(?)", - }, - ] - - if interval == "day": - # Add an event outside the hour range but within the day range to ensure it's exported too. - events_outside_hour: list[EventValues] = [ - { - "uuid": str(uuid4()), - "event": "test", - "timestamp": "2023-04-25 00:30:00.000000", - "created_at": "2023-04-25 00:30:00.000000", - "inserted_at": "2023-04-25 00:30:00.000000", - "_timestamp": "2023-04-25 00:30:00", - "person_id": str(uuid4()), - "person_properties": {"$browser": "Chrome", "$os": "Mac OS X"}, - "team_id": team.pk, - "properties": {"$browser": "Chrome", "$os": "Mac OS X"}, - "distinct_id": str(uuid4()), - "elements_chain": "this is a comman, separated, list, of css selectors(?)", - } - ] - events += events_outside_hour - - ch_client = ClickHouseClient( - url=settings.CLICKHOUSE_HTTP_URL, - user=settings.CLICKHOUSE_USER, - password=settings.CLICKHOUSE_PASSWORD, - database=settings.CLICKHOUSE_DATABASE, - ) + if s3_batch_export.interval == "hour": + start_time, end_time = dt.datetime(2023, 9, 20, 0, 0, 0), dt.datetime(2023, 9, 20, 1, 0, 0) + else: + start_time, end_time = dt.datetime(2023, 9, 20, 0, 0, 0), dt.datetime(2023, 9, 21, 0, 0, 0) - # Insert some data into the `sharded_events` table. - await insert_events( + events, _, _ = await insert_events( client=ch_client, - events=events, + team=s3_batch_export.team, + start_time=start_time, + end_time=end_time, ) - workflow_id = str(uuid4()) + workflow_id = str(uuid.uuid4()) inputs = S3BatchExportInputs( - team_id=team.pk, - batch_export_id=str(batch_export.id), - data_interval_end="2023-04-25 14:30:00.000000", - interval=interval, - **batch_export.destination.config, + team_id=s3_batch_export.team.pk, + batch_export_id=str(s3_batch_export.id), + data_interval_end=end_time.isoformat(), + interval=s3_batch_export.interval, + **s3_batch_export.destination.config, ) async with await WorkflowEnvironment.start_time_skipping() as activity_environment: @@ -442,13 +298,20 @@ async def test_s3_export_workflow_with_minio_bucket( execution_timeout=dt.timedelta(seconds=10), ) - runs = await afetch_batch_export_runs(batch_export_id=batch_export.id) + runs = await afetch_batch_export_runs(batch_export_id=s3_batch_export.id) assert len(runs) == 1 run = runs[0] assert run.status == "Completed" - assert_events_in_s3(s3_client, bucket_name, prefix, events, compression, exclude_events) + assert_events_in_s3( + s3_client, + s3_batch_export.destination.config["bucket_name"], + s3_batch_export.destination.config["prefix"], + events, + s3_batch_export.destination.config["compression"], + s3_batch_export.destination.config["exclude_events"], + ) @pytest.mark.skipif( @@ -457,11 +320,8 @@ async def test_s3_export_workflow_with_minio_bucket( ) @pytest.mark.django_db @pytest.mark.asyncio -@pytest.mark.parametrize( - "interval,compression,encryption,exclude_events", - itertools.product(["hour", "day"], [None, "gzip", "brotli"], [None, "AES256", "aws:kms"], [None, ["test-exclude"]]), -) -async def test_s3_export_workflow_with_s3_bucket(interval, compression, encryption, exclude_events): +@pytest.mark.usefixtures("truncate_events") +async def test_s3_export_workflow_with_s3_bucket(s3_batch_export, ch_client): """Test S3 Export Workflow end-to-end by using an S3 bucket. The S3_TEST_BUCKET environment variable is used to set the name of the bucket for this test. @@ -471,110 +331,29 @@ async def test_s3_export_workflow_with_s3_bucket(interval, compression, encrypti The workflow should update the batch export run status to completed and produce the expected records to the S3 bucket. """ - bucket_name = os.getenv("S3_TEST_BUCKET") - kms_key_id = os.getenv("S3_TEST_KMS_KEY_ID") - prefix = f"posthog-events-{str(uuid4())}" - destination_data = { - "type": "S3", - "config": { - "bucket_name": bucket_name, - "region": "us-east-1", - "prefix": prefix, - "aws_access_key_id": "object_storage_root_user", - "aws_secret_access_key": "object_storage_root_password", - "compression": compression, - "exclude_events": exclude_events, - "encryption": encryption, - "kms_key_id": kms_key_id if encryption == "aws:kms" else None, - }, - } + s3_batch_export.destination.config["bucket_name"] = os.environ["S3_TEST_BUCKET"] + # Update to use real bucket + await sync_to_async(s3_batch_export.save)() # type: ignore - batch_export_data = { - "name": "my-production-s3-bucket-destination", - "destination": destination_data, - "interval": interval, - } - - organization = await acreate_organization("test") - team = await acreate_team(organization=organization) - batch_export = await acreate_batch_export( - team_id=team.pk, - name=batch_export_data["name"], - destination_data=batch_export_data["destination"], - interval=batch_export_data["interval"], - ) - - events: list[EventValues] = [ - { - "uuid": str(uuid4()), - "event": "test", - "timestamp": "2023-04-25 13:30:00.000000", - "created_at": "2023-04-25 13:30:00.000000", - "inserted_at": "2023-04-25 13:30:00.000000", - "_timestamp": "2023-04-25 13:30:00", - "person_id": str(uuid4()), - "person_properties": {"$browser": "Chrome", "$os": "Mac OS X"}, - "team_id": team.pk, - "properties": {"$browser": "Chrome", "$os": "Mac OS X"}, - "distinct_id": str(uuid4()), - "elements_chain": "this is a comman, separated, list, of css selectors(?)", - }, - { - "uuid": str(uuid4()), - "event": "test-exclude", - "timestamp": "2023-04-25 14:29:00.000000", - "created_at": "2023-04-25 14:29:00.000000", - "inserted_at": "2023-04-25 14:29:00.000000", - "_timestamp": "2023-04-25 14:29:00", - "person_id": str(uuid4()), - "person_properties": {"$browser": "Chrome", "$os": "Mac OS X"}, - "team_id": team.pk, - "properties": {"$browser": "Chrome", "$os": "Mac OS X"}, - "distinct_id": str(uuid4()), - "elements_chain": "this is a comman, separated, list, of css selectors(?)", - }, - ] - - if interval == "day": - # Add an event outside the hour range but within the day range to ensure it's exported too. - events_outside_hour: list[EventValues] = [ - { - "uuid": str(uuid4()), - "event": "test", - "timestamp": "2023-04-25 00:30:00.000000", - "created_at": "2023-04-25 00:30:00.000000", - "inserted_at": "2023-04-25 00:30:00.000000", - "_timestamp": "2023-04-25 00:30:00", - "person_id": str(uuid4()), - "person_properties": {"$browser": "Chrome", "$os": "Mac OS X"}, - "team_id": team.pk, - "properties": {"$browser": "Chrome", "$os": "Mac OS X"}, - "distinct_id": str(uuid4()), - "elements_chain": "this is a comman, separated, list, of css selectors(?)", - } - ] - events += events_outside_hour - - ch_client = ClickHouseClient( - url=settings.CLICKHOUSE_HTTP_URL, - user=settings.CLICKHOUSE_USER, - password=settings.CLICKHOUSE_PASSWORD, - database=settings.CLICKHOUSE_DATABASE, - ) + if s3_batch_export.interval == "hour": + start_time, end_time = dt.datetime(2023, 9, 20, 0, 0, 0), dt.datetime(2023, 9, 20, 1, 0, 0) + else: + start_time, end_time = dt.datetime(2023, 9, 20, 0, 0, 0), dt.datetime(2023, 9, 21, 0, 0, 0) - # Insert some data into the `sharded_events` table. - await insert_events( + events, _, _ = await insert_events( client=ch_client, - events=events, + team=s3_batch_export.team, + start_time=start_time, + end_time=end_time, ) - workflow_id = str(uuid4()) + workflow_id = str(uuid.uuid4()) inputs = S3BatchExportInputs( - team_id=team.pk, - batch_export_id=str(batch_export.id), - data_interval_end="2023-04-25 14:30:00.000000", - interval=interval, - **batch_export.destination.config, + team_id=s3_batch_export.team.pk, + batch_export_id=str(s3_batch_export.id), + data_interval_end=end_time.isoformat(), + interval=s3_batch_export.interval, + **s3_batch_export.destination.config, ) s3_client = boto3.client("s3") @@ -598,94 +377,64 @@ def create_s3_client(*args, **kwargs): id=workflow_id, task_queue=settings.TEMPORAL_TASK_QUEUE, retry_policy=RetryPolicy(maximum_attempts=1), - execution_timeout=dt.timedelta(seconds=10), + execution_timeout=dt.timedelta(seconds=20), ) - runs = await afetch_batch_export_runs(batch_export_id=batch_export.id) + runs = await afetch_batch_export_runs(batch_export_id=s3_batch_export.id) assert len(runs) == 1 run = runs[0] assert run.status == "Completed" - assert_events_in_s3(s3_client, bucket_name, prefix, events, compression, exclude_events) + assert_events_in_s3( + s3_client, + s3_batch_export.destination.config["bucket_name"], + s3_batch_export.destination.config["prefix"], + events, + s3_batch_export.destination.config["compression"], + s3_batch_export.destination.config["exclude_events"], + ) @pytest.mark.django_db @pytest.mark.asyncio -@pytest.mark.parametrize("compression", [None, "gzip"]) -async def test_s3_export_workflow_with_minio_bucket_and_a_lot_of_data( - client: HttpClient, s3_client, bucket_name, compression -): +@pytest.mark.usefixtures("truncate_events") +async def test_s3_export_workflow_with_minio_bucket_and_a_lot_of_data(s3_client, s3_batch_export, ch_client): """Test the full S3 workflow targetting a MinIO bucket. The workflow should update the batch export run status to completed and produce the expected records to the MinIO bucket. """ - ch_client = ClickHouseClient( - url=settings.CLICKHOUSE_HTTP_URL, - user=settings.CLICKHOUSE_USER, - password=settings.CLICKHOUSE_PASSWORD, - database=settings.CLICKHOUSE_DATABASE, - ) - - prefix = f"posthog-events-{str(uuid4())}-{{year}}-{{month}}-{{day}}" - destination_data = { - "type": "S3", - "config": { - "bucket_name": bucket_name, - "region": "us-east-1", - "prefix": prefix, - "aws_access_key_id": "object_storage_root_user", - "aws_secret_access_key": "object_storage_root_password", - "compression": compression, - }, - } - - batch_export_data = { - "name": "my-production-s3-bucket-destination", - "destination": destination_data, - "interval": "hour", - } - - organization = await acreate_organization("test") - team = await acreate_team(organization=organization) - batch_export = await acreate_batch_export( - team_id=team.pk, - name=batch_export_data["name"], - destination_data=batch_export_data["destination"], - interval=batch_export_data["interval"], - ) + if s3_batch_export.destination.config.get("encryption", None) is not None: + pytest.skip("Encryption is not supported in MinIO") + if s3_batch_export.destination.config.get("compression", "") == "brotli": + pytest.skip("Brotli performs badly with a lot of data") - events: list[EventValues] = [ - { - "uuid": str(uuid4()), - "event": f"test-{i}", - "timestamp": f"2023-04-25 13:30:00.{i:06}", - "created_at": "2023-04-25 13:30:00.000000", - "inserted_at": f"2023-04-25 13:30:00.{i:06}", - "_timestamp": "2023-04-25 13:30:00", - "person_id": str(uuid4()), - "person_properties": {"$browser": "Chrome", "$os": "Mac OS X"}, - "team_id": team.pk, - "properties": {"$browser": "Chrome", "$os": "Mac OS X"}, - "distinct_id": str(uuid4()), - "elements_chain": "this is a comman, separated, list, of css selectors(?)", - } - for i in range(1000000) - ] + if s3_batch_export.interval == "hour": + start_time, end_time = dt.datetime(2023, 9, 20, 0, 0, 0), dt.datetime(2023, 9, 20, 1, 0, 0) + else: + start_time, end_time = dt.datetime(2023, 9, 20, 0, 0, 0), dt.datetime(2023, 9, 21, 0, 0, 0) - # Insert some data into the `sharded_events` table. - await insert_events( + events, _, _ = await insert_events( client=ch_client, - events=events, + team=s3_batch_export.team, + start_time=start_time, + end_time=end_time, + n=1000000, ) - workflow_id = str(uuid4()) + prefix = f"posthog-events-{str(uuid.uuid4())}-{{year}}-{{month}}-{{day}}" + s3_batch_export.destination.config["prefix"] = prefix + # Update to use new prefix + await sync_to_async(s3_batch_export.save)() # type: ignore + + workflow_id = str(uuid.uuid4()) inputs = S3BatchExportInputs( - team_id=team.pk, - batch_export_id=str(batch_export.id), - data_interval_end="2023-04-25 14:30:00.000000", - **batch_export.destination.config, + team_id=s3_batch_export.team.pk, + batch_export_id=str(s3_batch_export.id), + data_interval_end=end_time.isoformat(), + interval=s3_batch_export.interval, + **s3_batch_export.destination.config, ) async with await WorkflowEnvironment.start_time_skipping() as activity_environment: @@ -706,104 +455,61 @@ async def test_s3_export_workflow_with_minio_bucket_and_a_lot_of_data( execution_timeout=dt.timedelta(seconds=360), ) - runs = await afetch_batch_export_runs(batch_export_id=batch_export.id) + runs = await afetch_batch_export_runs(batch_export_id=s3_batch_export.id) assert len(runs) == 1 run = runs[0] assert run.status == "Completed" - assert_events_in_s3(s3_client, bucket_name, prefix.format(year=2023, month="04", day="25"), events, compression) + parametrized_prefix = prefix.format( + table="events", + year=end_time.year, + month=end_time.strftime("%m"), + day=end_time.strftime("%d"), + ) + + assert_events_in_s3( + s3_client, + s3_batch_export.destination.config["bucket_name"], + parametrized_prefix, + events, + s3_batch_export.destination.config["compression"], + s3_batch_export.destination.config["exclude_events"], + ) @pytest.mark.django_db @pytest.mark.asyncio -@pytest.mark.parametrize("compression", [None, "gzip", "brotli"]) -async def test_s3_export_workflow_defaults_to_timestamp_on_null_inserted_at( - client: HttpClient, s3_client, bucket_name, compression -): +@pytest.mark.usefixtures("truncate_events") +async def test_s3_export_workflow_defaults_to_timestamp_on_null_inserted_at(s3_client, s3_batch_export, ch_client): """Test the full S3 workflow targetting a MinIO bucket. In this scenario we assert that when inserted_at is NULL, we default to _timestamp. - This scenario is relevant values inserted before the migration happened. + This scenario is relevant for rows inserted before the migration happened. """ - ch_client = ClickHouseClient( - url=settings.CLICKHOUSE_HTTP_URL, - user=settings.CLICKHOUSE_USER, - password=settings.CLICKHOUSE_PASSWORD, - database=settings.CLICKHOUSE_DATABASE, - ) - - prefix = f"posthog-events-{str(uuid4())}" - destination_data = { - "type": "S3", - "config": { - "bucket_name": bucket_name, - "region": "us-east-1", - "prefix": prefix, - "aws_access_key_id": "object_storage_root_user", - "aws_secret_access_key": "object_storage_root_password", - "compression": compression, - }, - } + if s3_batch_export.destination.config.get("encryption", None) is not None: + pytest.skip("Encryption is not supported in MinIO") - batch_export_data = { - "name": "my-production-s3-bucket-destination", - "destination": destination_data, - "interval": "hour", - } - - organization = await acreate_organization("test") - team = await acreate_team(organization=organization) - batch_export = await acreate_batch_export( - team_id=team.pk, - name=batch_export_data["name"], - destination_data=batch_export_data["destination"], - interval=batch_export_data["interval"], - ) - - events: list[EventValues] = [ - { - "uuid": str(uuid4()), - "event": "test", - "timestamp": "2023-04-25 13:30:00.000000", - "created_at": "2023-04-25 13:30:00.000000", - "inserted_at": None, - "_timestamp": "2023-04-25 13:30:00", - "person_id": str(uuid4()), - "person_properties": {"$browser": "Chrome", "$os": "Mac OS X"}, - "team_id": team.pk, - "properties": {"$browser": "Chrome", "$os": "Mac OS X"}, - "distinct_id": str(uuid4()), - "elements_chain": "this is a comman, separated, list, of css selectors(?)", - }, - { - "uuid": str(uuid4()), - "event": "test", - "timestamp": "2023-04-25 14:29:00.000000", - "created_at": "2023-04-25 14:29:00.000000", - "inserted_at": None, - "_timestamp": "2023-04-25 14:29:00", - "person_id": str(uuid4()), - "person_properties": {"$browser": "Chrome", "$os": "Mac OS X"}, - "team_id": team.pk, - "properties": {"$browser": "Chrome", "$os": "Mac OS X"}, - "distinct_id": str(uuid4()), - "elements_chain": "this is a comman, separated, list, of css selectors(?)", - }, - ] + if s3_batch_export.interval == "hour": + start_time, end_time = dt.datetime(2023, 9, 20, 0, 0, 0), dt.datetime(2023, 9, 20, 1, 0, 0) + else: + start_time, end_time = dt.datetime(2023, 9, 20, 0, 0, 0), dt.datetime(2023, 9, 21, 0, 0, 0) - # Insert some data into the `sharded_events` table. - await insert_events( + events, _, _ = await insert_events( client=ch_client, - events=events, + team=s3_batch_export.team, + start_time=start_time, + end_time=end_time, + override_values={"inserted_at": None}, ) - workflow_id = str(uuid4()) + workflow_id = str(uuid.uuid4()) inputs = S3BatchExportInputs( - team_id=team.pk, - batch_export_id=str(batch_export.id), - data_interval_end="2023-04-25 14:30:00.000000", - **batch_export.destination.config, + team_id=s3_batch_export.team.pk, + batch_export_id=str(s3_batch_export.id), + data_interval_end=end_time.isoformat(), + interval=s3_batch_export.interval, + **s3_batch_export.destination.config, ) async with await WorkflowEnvironment.start_time_skipping() as activity_environment: @@ -824,89 +530,57 @@ async def test_s3_export_workflow_defaults_to_timestamp_on_null_inserted_at( execution_timeout=dt.timedelta(seconds=10), ) - runs = await afetch_batch_export_runs(batch_export_id=batch_export.id) + runs = await afetch_batch_export_runs(batch_export_id=s3_batch_export.id) assert len(runs) == 1 run = runs[0] assert run.status == "Completed" - assert_events_in_s3(s3_client, bucket_name, prefix, events, compression) + assert_events_in_s3( + s3_client, + s3_batch_export.destination.config["bucket_name"], + s3_batch_export.destination.config["prefix"], + events, + s3_batch_export.destination.config["compression"], + s3_batch_export.destination.config["exclude_events"], + ) @pytest.mark.django_db @pytest.mark.asyncio -@pytest.mark.parametrize("compression", [None, "gzip", "brotli"]) -async def test_s3_export_workflow_with_minio_bucket_and_custom_key_prefix( - client: HttpClient, s3_client, bucket_name, compression -): +@pytest.mark.usefixtures("truncate_events") +async def test_s3_export_workflow_with_minio_bucket_and_custom_key_prefix(s3_client, s3_batch_export, ch_client): """Test the S3BatchExport Workflow utilizing a custom key prefix. We will be asserting that exported events land in the appropiate S3 key according to the prefix. """ - ch_client = ClickHouseClient( - url=settings.CLICKHOUSE_HTTP_URL, - user=settings.CLICKHOUSE_USER, - password=settings.CLICKHOUSE_PASSWORD, - database=settings.CLICKHOUSE_DATABASE, - ) + if s3_batch_export.destination.config.get("encryption", None) is not None: + pytest.skip("Encryption is not supported in MinIO") prefix = "posthog-{table}/{year}-{month}-{day}/{hour}:{minute}:{second}" - destination_data = { - "type": "S3", - "config": { - "bucket_name": bucket_name, - "region": "us-east-1", - "prefix": prefix, - "aws_access_key_id": "object_storage_root_user", - "aws_secret_access_key": "object_storage_root_password", - "compression": compression, - }, - } - - batch_export_data = { - "name": "my-production-s3-bucket-destination", - "destination": destination_data, - "interval": "hour", - } - - organization = await acreate_organization("test") - team = await acreate_team(organization=organization) - batch_export = await acreate_batch_export( - team_id=team.pk, - name=batch_export_data["name"], - destination_data=batch_export_data["destination"], - interval=batch_export_data["interval"], - ) + s3_batch_export.destination.config["prefix"] = prefix + # Update to use new prefix + await sync_to_async(s3_batch_export.save)() # type: ignore - events: list[EventValues] = [ - { - "uuid": str(uuid4()), - "event": "test", - "timestamp": "2023-04-25 13:30:00.000000", - "created_at": "2023-04-25 13:30:00.000000", - "inserted_at": "2023-04-25 13:31:00.000000", - "_timestamp": "2023-04-25 13:30:00", - "person_id": str(uuid4()), - "person_properties": {"$browser": "Chrome", "$os": "Mac OS X"}, - "team_id": team.pk, - "properties": {"$browser": "Chrome", "$os": "Mac OS X"}, - "distinct_id": str(uuid4()), - "elements_chain": "this is a comman, separated, list, of css selectors(?)", - }, - ] + if s3_batch_export.interval == "hour": + start_time, end_time = dt.datetime(2023, 9, 20, 0, 0, 0), dt.datetime(2023, 9, 20, 1, 0, 0) + else: + start_time, end_time = dt.datetime(2023, 9, 20, 0, 0, 0), dt.datetime(2023, 9, 21, 0, 0, 0) - # Insert some data into the `sharded_events` table. - await insert_events( + events, _, _ = await insert_events( client=ch_client, - events=events, + team=s3_batch_export.team, + start_time=start_time, + end_time=end_time, ) - workflow_id = str(uuid4()) + workflow_id = str(uuid.uuid4()) inputs = S3BatchExportInputs( - team_id=team.pk, - batch_export_id=str(batch_export.id), - data_interval_end="2023-04-25 14:30:00.000000", - **batch_export.destination.config, + team_id=s3_batch_export.team.pk, + batch_export_id=str(s3_batch_export.id), + data_interval_end=end_time.isoformat(), + interval=s3_batch_export.interval, + **s3_batch_export.destination.config, ) async with await WorkflowEnvironment.start_time_skipping() as activity_environment: @@ -927,130 +601,63 @@ async def test_s3_export_workflow_with_minio_bucket_and_custom_key_prefix( execution_timeout=dt.timedelta(seconds=10), ) - runs = await afetch_batch_export_runs(batch_export_id=batch_export.id) + runs = await afetch_batch_export_runs(batch_export_id=s3_batch_export.id) assert len(runs) == 1 run = runs[0] assert run.status == "Completed" - expected_key_prefix = prefix.format( - table="events", year="2023", month="04", day="25", hour="14", minute="30", second="00" + parametrized_prefix = prefix.format( + table="events", + year=end_time.year, + month=end_time.strftime("%m"), + day=end_time.strftime("%d"), + hour=end_time.strftime("%H"), + minute=end_time.strftime("%M"), + second=end_time.strftime("%S"), ) - objects = s3_client.list_objects_v2(Bucket=bucket_name, Prefix=expected_key_prefix) - key = objects["Contents"][0].get("Key") - assert len(objects.get("Contents", [])) == 1 - assert key.startswith(expected_key_prefix) - assert_events_in_s3(s3_client, bucket_name, expected_key_prefix, events, compression) + assert_events_in_s3( + s3_client, + s3_batch_export.destination.config["bucket_name"], + parametrized_prefix, + events, + s3_batch_export.destination.config["compression"], + s3_batch_export.destination.config["exclude_events"], + ) @pytest.mark.django_db @pytest.mark.asyncio -@pytest.mark.parametrize("compression", [None, "gzip", "brotli"]) -async def test_s3_export_workflow_with_minio_bucket_produces_no_duplicates( - client: HttpClient, s3_client, bucket_name, compression -): +@pytest.mark.usefixtures("truncate_events") +async def test_s3_export_workflow_with_minio_bucket_produces_no_duplicates(s3_client, s3_batch_export, ch_client): """Test that S3 Export Workflow end-to-end by using a local MinIO bucket instead of S3. In this particular instance of the test, we assert no duplicates are exported to S3. """ - ch_client = ClickHouseClient( - url=settings.CLICKHOUSE_HTTP_URL, - user=settings.CLICKHOUSE_USER, - password=settings.CLICKHOUSE_PASSWORD, - database=settings.CLICKHOUSE_DATABASE, - ) - - prefix = f"posthog-events-{str(uuid4())}" - destination_data = { - "type": "S3", - "config": { - "bucket_name": bucket_name, - "region": "us-east-1", - "prefix": prefix, - "aws_access_key_id": "object_storage_root_user", - "aws_secret_access_key": "object_storage_root_password", - "compression": compression, - }, - } - - batch_export_data = { - "name": "my-production-s3-bucket-destination", - "destination": destination_data, - "interval": "hour", - } - - organization = await acreate_organization("test") - team = await acreate_team(organization=organization) - batch_export = await acreate_batch_export( - team_id=team.pk, - name=batch_export_data["name"], - destination_data=batch_export_data["destination"], - interval=batch_export_data["interval"], - ) + if s3_batch_export.destination.config.get("encryption", None) is not None: + pytest.skip("Encryption is not supported in MinIO") - duplicate_id = str(uuid4()) - duplicate_distinct_id = str(uuid4()) - duplicate_person_id = str(uuid4()) - events: list[EventValues] = [ - { - "uuid": str(uuid4()), - "event": "test", - "timestamp": "2023-04-25 13:30:00.000000", - "created_at": "2023-04-25 13:30:00.000000", - "inserted_at": f"2023-04-25 13:30:00.000000", - "_timestamp": "2023-04-25 13:30:00", - "person_id": str(uuid4()), - "person_properties": {"$browser": "Chrome", "$os": "Mac OS X"}, - "team_id": team.pk, - "properties": {"$browser": "Chrome", "$os": "Mac OS X"}, - "distinct_id": str(uuid4()), - "elements_chain": "this is a comman, separated, list, of css selectors(?)", - }, - { - "uuid": duplicate_id, - "event": "test", - "timestamp": "2023-04-25 14:29:00.000000", - "created_at": "2023-04-25 14:29:00.000000", - "inserted_at": f"2023-04-25 14:29:00.000000", - "_timestamp": "2023-04-25 14:29:00", - "person_id": duplicate_person_id, - "person_properties": {"$browser": "Chrome", "$os": "Mac OS X"}, - "team_id": team.pk, - "properties": {"$browser": "Chrome", "$os": "Mac OS X"}, - "distinct_id": duplicate_distinct_id, - "elements_chain": "this is a comman, separated, list, of css selectors(?)", - }, - ] - events_with_duplicates = events + [ - { - "uuid": duplicate_id, - "event": "test", - "timestamp": "2023-04-25 14:29:00.000000", - "created_at": "2023-04-25 14:29:00.000000", - "inserted_at": f"2023-04-25 14:29:00.000000", - "_timestamp": "2023-04-25 14:29:00", - "person_id": duplicate_person_id, - "person_properties": {"$browser": "Chrome", "$os": "Mac OS X"}, - "team_id": team.pk, - "properties": {"$browser": "Chrome", "$os": "Mac OS X"}, - "distinct_id": duplicate_distinct_id, - "elements_chain": "this is a comman, separated, list, of css selectors(?)", - } - ] + if s3_batch_export.interval == "hour": + start_time, end_time = dt.datetime(2023, 9, 20, 0, 0, 0), dt.datetime(2023, 9, 20, 1, 0, 0) + else: + start_time, end_time = dt.datetime(2023, 9, 20, 0, 0, 0), dt.datetime(2023, 9, 21, 0, 0, 0) - # Insert some data into the `sharded_events` table. - await insert_events( + events, _, _ = await insert_events( client=ch_client, - events=events_with_duplicates, + team=s3_batch_export.team, + start_time=start_time, + end_time=end_time, + duplicate=True, ) - workflow_id = str(uuid4()) + workflow_id = str(uuid.uuid4()) inputs = S3BatchExportInputs( - team_id=team.pk, - batch_export_id=str(batch_export.id), - data_interval_end="2023-04-25 14:30:00.000000", - **batch_export.destination.config, + team_id=s3_batch_export.team.pk, + batch_export_id=str(s3_batch_export.id), + data_interval_end=end_time.isoformat(), + interval=s3_batch_export.interval, + **s3_batch_export.destination.config, ) async with await WorkflowEnvironment.start_time_skipping() as activity_environment: @@ -1070,77 +677,29 @@ async def test_s3_export_workflow_with_minio_bucket_produces_no_duplicates( retry_policy=RetryPolicy(maximum_attempts=1), ) - runs = await afetch_batch_export_runs(batch_export_id=batch_export.id) - assert len(runs) == 1 - - run = runs[0] - assert run.status == "Completed" - - assert_events_in_s3(s3_client, bucket_name, prefix, events, compression) - - -@pytest_asyncio.fixture -async def organization(): - organization = await acreate_organization("test") - yield organization - await sync_to_async(organization.delete)() # type: ignore - - -@pytest_asyncio.fixture -async def team(organization): - team = await acreate_team(organization=organization) - yield team - await sync_to_async(team.delete)() # type: ignore - - -@pytest_asyncio.fixture -async def batch_export(team): - prefix = f"posthog-events-{str(uuid4())}" - destination_data = { - "type": "S3", - "config": { - "bucket_name": "test-bucket", - "region": "us-east-1", - "prefix": prefix, - "aws_access_key_id": "object_storage_root_user", - "aws_secret_access_key": "object_storage_root_password", - "compression": "gzip", - }, - } - - batch_export_data = { - "name": "my-production-s3-bucket-destination", - "destination": destination_data, - "interval": "hour", - } - - batch_export = await acreate_batch_export( - team_id=team.pk, - name=batch_export_data["name"], - destination_data=batch_export_data["destination"], - interval=batch_export_data["interval"], - ) + runs = await afetch_batch_export_runs(batch_export_id=s3_batch_export.id) + assert len(runs) == 1 - yield batch_export + run = runs[0] + assert run.status == "Completed" - client = await connect( - settings.TEMPORAL_HOST, - settings.TEMPORAL_PORT, - settings.TEMPORAL_NAMESPACE, - settings.TEMPORAL_CLIENT_ROOT_CA, - settings.TEMPORAL_CLIENT_CERT, - settings.TEMPORAL_CLIENT_KEY, + assert_events_in_s3( + s3_client, + s3_batch_export.destination.config["bucket_name"], + s3_batch_export.destination.config["prefix"], + events, + s3_batch_export.destination.config["compression"], + s3_batch_export.destination.config["exclude_events"], ) - await adelete_batch_export(batch_export, client) @pytest.mark.django_db @pytest.mark.asyncio -async def test_s3_export_workflow_handles_insert_activity_errors(team, batch_export): +async def test_s3_export_workflow_handles_insert_activity_errors(ateam, batch_export): """Test that S3 Export Workflow can gracefully handle errors when inserting S3 data.""" - workflow_id = str(uuid4()) + workflow_id = str(uuid.uuid4()) inputs = S3BatchExportInputs( - team_id=team.pk, + team_id=ateam.pk, batch_export_id=str(batch_export.id), data_interval_end="2023-04-25 14:30:00.000000", **batch_export.destination.config, @@ -1177,11 +736,11 @@ async def insert_into_s3_activity_mocked(_: S3InsertInputs) -> str: @pytest.mark.django_db @pytest.mark.asyncio -async def test_s3_export_workflow_handles_cancellation(team, batch_export): +async def test_s3_export_workflow_handles_cancellation(ateam, batch_export): """Test that S3 Export Workflow can gracefully handle cancellations when inserting S3 data.""" - workflow_id = str(uuid4()) + workflow_id = str(uuid.uuid4()) inputs = S3BatchExportInputs( - team_id=team.pk, + team_id=ateam.pk, batch_export_id=str(batch_export.id), data_interval_end="2023-04-25 14:30:00.000000", **batch_export.destination.config, diff --git a/posthog/temporal/tests/conftest.py b/posthog/temporal/tests/conftest.py index 44cf05fe63ce1e..44983b127bcd68 100644 --- a/posthog/temporal/tests/conftest.py +++ b/posthog/temporal/tests/conftest.py @@ -1,7 +1,13 @@ import pytest +import pytest_asyncio +from asgiref.sync import sync_to_async +from django.conf import settings from temporalio.testing import ActivityEnvironment +from posthog.api.test.test_organization import acreate_organization +from posthog.api.test.test_team import acreate_team from posthog.models import Organization, Team +from posthog.temporal.client import connect @pytest.fixture @@ -26,6 +32,36 @@ def team(organization): team.delete() +@pytest_asyncio.fixture +async def aorganization(): + """A test organization in an asynchronous fixture.""" + organization = await acreate_organization("test") + yield organization + await sync_to_async(organization.delete)() # type: ignore + + +@pytest_asyncio.fixture +async def ateam(aorganization): + """A test team in an asynchronous fixture.""" + team = await acreate_team(organization=aorganization) + yield team + await sync_to_async(team.delete)() # type: ignore + + +@pytest_asyncio.fixture +async def temporal_client(): + """A Temporal client.""" + client = await connect( + settings.TEMPORAL_HOST, + settings.TEMPORAL_PORT, + settings.TEMPORAL_NAMESPACE, + settings.TEMPORAL_CLIENT_ROOT_CA, + settings.TEMPORAL_CLIENT_CERT, + settings.TEMPORAL_CLIENT_KEY, + ) + return client + + @pytest.fixture def activity_environment(): """Return a testing temporal ActivityEnvironment.""" diff --git a/posthog/temporal/workflows/s3_batch_export.py b/posthog/temporal/workflows/s3_batch_export.py index a4987b60243380..8080fbbc687857 100644 --- a/posthog/temporal/workflows/s3_batch_export.py +++ b/posthog/temporal/workflows/s3_batch_export.py @@ -127,7 +127,7 @@ def start(self) -> str: optional_kwargs = {} if self.encryption: optional_kwargs["ServerSideEncryption"] = self.encryption - if self.kms_key_id: + if self.kms_key_id and self.encryption == "aws:kms": optional_kwargs["SSEKMSKeyId"] = self.kms_key_id multipart_response = self.s3_client.create_multipart_upload(