From 82f302265c5a0a994af3e513a18c0fa99e0ccc63 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Far=C3=ADas=20Santana?= Date: Wed, 11 Oct 2023 10:43:14 +0200 Subject: [PATCH 01/18] feat(batch-exports): Add backfill model and service support --- .../0354_add_batch_export_backfill_model.py | 83 +++++++++++++++++++ 1 file changed, 83 insertions(+) create mode 100644 posthog/migrations/0354_add_batch_export_backfill_model.py diff --git a/posthog/migrations/0354_add_batch_export_backfill_model.py b/posthog/migrations/0354_add_batch_export_backfill_model.py new file mode 100644 index 0000000000000..1428af2d258e1 --- /dev/null +++ b/posthog/migrations/0354_add_batch_export_backfill_model.py @@ -0,0 +1,83 @@ +# Generated by Django 3.2.19 on 2023-10-11 08:17 + +from django.db import migrations, models +import django.db.models.deletion +import posthog.models.utils + + +class Migration(migrations.Migration): + + dependencies = [ + ("posthog", "0353_add_5_minute_interval_to_batch_exports"), + ] + + operations = [ + migrations.CreateModel( + name="BatchExportBackfill", + fields=[ + ( + "id", + models.UUIDField( + default=posthog.models.utils.UUIDT, editable=False, primary_key=True, serialize=False + ), + ), + ("start_at", models.DateTimeField(help_text="The start of the data interval.")), + ("end_at", models.DateTimeField(help_text="The end of the data interval.")), + ( + "status", + models.CharField( + choices=[ + ("Cancelled", "Cancelled"), + ("Completed", "Completed"), + ("ContinuedAsNew", "Continuedasnew"), + ("Failed", "Failed"), + ("Terminated", "Terminated"), + ("TimedOut", "Timedout"), + ("Running", "Running"), + ("Starting", "Starting"), + ], + help_text="The status of this backfill.", + max_length=64, + ), + ), + ( + "created_at", + models.DateTimeField( + auto_now_add=True, help_text="The timestamp at which this BatchExportBackfill was created." + ), + ), + ( + "finished_at", + models.DateTimeField( + help_text="The timestamp at which this BatchExportBackfill finished, successfully or not.", + null=True, + ), + ), + ( + "last_updated_at", + models.DateTimeField( + auto_now=True, help_text="The timestamp at which this BatchExportBackfill was last updated." + ), + ), + ( + "batch_export", + models.ForeignKey( + help_text="The BatchExport this backfill belongs to.", + on_delete=django.db.models.deletion.CASCADE, + to="posthog.batchexport", + ), + ), + ( + "team", + models.ForeignKey( + help_text="The team this belongs to.", + on_delete=django.db.models.deletion.CASCADE, + to="posthog.team", + ), + ), + ], + options={ + "abstract": False, + }, + ), + ] From b27b75d418f9e3ea9a11481721f81f7d7077dd2c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Far=C3=ADas=20Santana?= Date: Wed, 11 Oct 2023 15:04:14 +0200 Subject: [PATCH 02/18] feat(batch-export-backfills): Account for potential restarts while backfilling --- posthog/temporal/workflows/backfill_batch_export.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/posthog/temporal/workflows/backfill_batch_export.py b/posthog/temporal/workflows/backfill_batch_export.py index b47cbf5d8cb30..17f55ae1d8b54 100644 --- a/posthog/temporal/workflows/backfill_batch_export.py +++ b/posthog/temporal/workflows/backfill_batch_export.py @@ -216,6 +216,11 @@ async def wait_for_schedule_backfill_in_range( f'AND StartTime >= "{now.isoformat()}"' ) + workflows = [workflow async for workflow in client.list_workflows(query=query)] + + if workflows and check_workflow_executions_not_running(workflows) is True: + return + done = False while not done: await asyncio.sleep(wait_delay) From 97d03b75e0dc03fa701cf5057b4828680037b70a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Far=C3=ADas=20Santana?= Date: Fri, 13 Oct 2023 11:59:59 +0200 Subject: [PATCH 03/18] test(batch-exports-backfills): Add Workflow test --- .../0354_add_batch_export_backfill_model.py | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/posthog/migrations/0354_add_batch_export_backfill_model.py b/posthog/migrations/0354_add_batch_export_backfill_model.py index 1428af2d258e1..6f7d2c3e99f3f 100644 --- a/posthog/migrations/0354_add_batch_export_backfill_model.py +++ b/posthog/migrations/0354_add_batch_export_backfill_model.py @@ -1,4 +1,4 @@ -# Generated by Django 3.2.19 on 2023-10-11 08:17 +# Generated by Django 3.2.19 on 2023-10-13 09:13 from django.db import migrations, models import django.db.models.deletion @@ -12,6 +12,21 @@ class Migration(migrations.Migration): ] operations = [ + migrations.AlterField( + model_name="batchexportdestination", + name="type", + field=models.CharField( + choices=[ + ("S3", "S3"), + ("Snowflake", "Snowflake"), + ("Postgres", "Postgres"), + ("BigQuery", "Bigquery"), + ("NoOp", "Noop"), + ], + help_text="A choice of supported BatchExportDestination types.", + max_length=64, + ), + ), migrations.CreateModel( name="BatchExportBackfill", fields=[ From b611bc8505f66068ca43d05e09a83186873db184 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Far=C3=ADas=20Santana?= Date: Tue, 17 Oct 2023 11:29:35 +0200 Subject: [PATCH 04/18] chore(batch-exports-backfill): Bump migration --- .../0354_add_batch_export_backfill_model.py | 98 ------------------- 1 file changed, 98 deletions(-) delete mode 100644 posthog/migrations/0354_add_batch_export_backfill_model.py diff --git a/posthog/migrations/0354_add_batch_export_backfill_model.py b/posthog/migrations/0354_add_batch_export_backfill_model.py deleted file mode 100644 index 6f7d2c3e99f3f..0000000000000 --- a/posthog/migrations/0354_add_batch_export_backfill_model.py +++ /dev/null @@ -1,98 +0,0 @@ -# Generated by Django 3.2.19 on 2023-10-13 09:13 - -from django.db import migrations, models -import django.db.models.deletion -import posthog.models.utils - - -class Migration(migrations.Migration): - - dependencies = [ - ("posthog", "0353_add_5_minute_interval_to_batch_exports"), - ] - - operations = [ - migrations.AlterField( - model_name="batchexportdestination", - name="type", - field=models.CharField( - choices=[ - ("S3", "S3"), - ("Snowflake", "Snowflake"), - ("Postgres", "Postgres"), - ("BigQuery", "Bigquery"), - ("NoOp", "Noop"), - ], - help_text="A choice of supported BatchExportDestination types.", - max_length=64, - ), - ), - migrations.CreateModel( - name="BatchExportBackfill", - fields=[ - ( - "id", - models.UUIDField( - default=posthog.models.utils.UUIDT, editable=False, primary_key=True, serialize=False - ), - ), - ("start_at", models.DateTimeField(help_text="The start of the data interval.")), - ("end_at", models.DateTimeField(help_text="The end of the data interval.")), - ( - "status", - models.CharField( - choices=[ - ("Cancelled", "Cancelled"), - ("Completed", "Completed"), - ("ContinuedAsNew", "Continuedasnew"), - ("Failed", "Failed"), - ("Terminated", "Terminated"), - ("TimedOut", "Timedout"), - ("Running", "Running"), - ("Starting", "Starting"), - ], - help_text="The status of this backfill.", - max_length=64, - ), - ), - ( - "created_at", - models.DateTimeField( - auto_now_add=True, help_text="The timestamp at which this BatchExportBackfill was created." - ), - ), - ( - "finished_at", - models.DateTimeField( - help_text="The timestamp at which this BatchExportBackfill finished, successfully or not.", - null=True, - ), - ), - ( - "last_updated_at", - models.DateTimeField( - auto_now=True, help_text="The timestamp at which this BatchExportBackfill was last updated." - ), - ), - ( - "batch_export", - models.ForeignKey( - help_text="The BatchExport this backfill belongs to.", - on_delete=django.db.models.deletion.CASCADE, - to="posthog.batchexport", - ), - ), - ( - "team", - models.ForeignKey( - help_text="The team this belongs to.", - on_delete=django.db.models.deletion.CASCADE, - to="posthog.team", - ), - ), - ], - options={ - "abstract": False, - }, - ), - ] From c2ed89077e6a46f44ba20fc42d2aefc5f5c1149c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Far=C3=ADas=20Santana?= Date: Wed, 18 Oct 2023 13:33:59 +0200 Subject: [PATCH 05/18] feat(batch-exports): Abstract insert activity execution --- posthog/temporal/workflows/batch_exports.py | 81 ++++++++++++++++++++- 1 file changed, 80 insertions(+), 1 deletion(-) diff --git a/posthog/temporal/workflows/batch_exports.py b/posthog/temporal/workflows/batch_exports.py index e26d8901900b9..8fc3476a7bb68 100644 --- a/posthog/temporal/workflows/batch_exports.py +++ b/posthog/temporal/workflows/batch_exports.py @@ -14,7 +14,8 @@ import brotli from asgiref.sync import sync_to_async -from temporalio import activity, workflow +from temporalio import activity, exceptions, workflow +from temporalio.common import RetryPolicy from posthog.batch_exports.service import ( BatchExportsInputsProtocol, @@ -703,3 +704,81 @@ class UpdateBatchExportBackfillStatusInputs: async def update_batch_export_backfill_model_status(inputs: UpdateBatchExportBackfillStatusInputs): """Activity that updates the status of an BatchExportRun.""" await sync_to_async(update_batch_export_backfill_status)(backfill_id=uuid.UUID(inputs.id), status=inputs.status) # type: ignore + + +async def execute_batch_export_insert_activity( + activity, + inputs, + non_retryable_error_types: list[str], + update_inputs: UpdateBatchExportRunStatusInputs, + start_to_close_timeout_seconds: int = 3600, + maximum_attempts: int = 10, + initial_retry_interval_seconds: int = 10, + maximum_retry_interval_seconds: int = 120, +) -> None: + """Execute the main insert activity of a batch export handling any errors. + + All batch exports boil down to inserting some data somewhere. They all follow the same error + handling patterns: logging and updating run status. For this reason, we have this function + to abstract executing the main insert activity of each batch export. + + Args: + activity: The 'insert_into_*' activity function to execute. + inputs: The inputs to the activity. + non_retryable_error_types: A list of errors to not retry on when executing the activity. + update_inputs: Inputs to the update_export_run_status to run at the end. + start_to_close_timeout: A timeout for the 'insert_into_*' activity function. + maximum_attempts: Maximum number of retries for the 'insert_into_*' activity function. + Assuming the error that triggered the retry is not in non_retryable_error_types. + initial_retry_interval: + maximum_retry_interval: + """ + logger = get_batch_exports_logger(inputs=inputs) + + retry_policy = RetryPolicy( + initial_interval=dt.timedelta(seconds=initial_retry_interval_seconds), + maximum_interval=dt.timedelta(seconds=maximum_retry_interval_seconds), + maximum_attempts=maximum_attempts, + non_retryable_error_types=non_retryable_error_types, + ) + try: + await workflow.execute_activity( + activity, + inputs, + start_to_close_timeout=dt.timedelta(seconds=start_to_close_timeout_seconds), + retry_policy=retry_policy, + ) + except exceptions.ActivityError as e: + if isinstance(e.cause, exceptions.CancelledError): + logger.error("BatchExport was cancelled.") + update_inputs.status = "Cancelled" + else: + logger.exception("BatchExport failed.", exc_info=e.cause) + update_inputs.status = "Failed" + + update_inputs.latest_error = str(e.cause) + raise + + except Exception as e: + logger.exception("BatchExport failed with an unexpected error.", exc_info=e) + update_inputs.status = "Failed" + update_inputs.latest_error = "An unexpected error has ocurred" + raise + + else: + logger.info( + "Successfully finished exporting batch %s - %s", inputs.data_interval_start, inputs.data_interval_end + ) + + finally: + await workflow.execute_activity( + update_export_run_status, + update_inputs, + start_to_close_timeout=dt.timedelta(minutes=5), + retry_policy=RetryPolicy( + initial_interval=dt.timedelta(seconds=10), + maximum_interval=dt.timedelta(seconds=60), + maximum_attempts=0, + non_retryable_error_types=["NotNullViolation", "IntegrityError"], + ), + ) From 938557b639175a12f978b63dae150c83a74cbc93 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Far=C3=ADas=20Santana?= Date: Wed, 18 Oct 2023 13:35:13 +0200 Subject: [PATCH 06/18] feat(batch-exports): Add RedshiftBatchExportWorkflow --- posthog/batch_exports/service.py | 7 ++ posthog/temporal/workflows/__init__.py | 4 + .../workflows/postgres_batch_export.py | 37 +++--- .../workflows/redshift_batch_export.py | 112 ++++++++++++++++++ 4 files changed, 146 insertions(+), 14 deletions(-) create mode 100644 posthog/temporal/workflows/redshift_batch_export.py diff --git a/posthog/batch_exports/service.py b/posthog/batch_exports/service.py index 114f9693adec7..bd625c00f6f93 100644 --- a/posthog/batch_exports/service.py +++ b/posthog/batch_exports/service.py @@ -102,6 +102,13 @@ class PostgresBatchExportInputs: include_events: list[str] | None = None +@dataclass +class RedshiftBatchExportInputs(PostgresBatchExportInputs): + """Inputs for Redshift export workflow.""" + + pass + + @dataclass class BigQueryBatchExportInputs: """Inputs for BigQuery export workflow.""" diff --git a/posthog/temporal/workflows/__init__.py b/posthog/temporal/workflows/__init__.py index df7356f6ab997..c7909cfefc615 100644 --- a/posthog/temporal/workflows/__init__.py +++ b/posthog/temporal/workflows/__init__.py @@ -20,6 +20,9 @@ PostgresBatchExportWorkflow, insert_into_postgres_activity, ) +from posthog.temporal.workflows.redshift_batch_export import ( + RedshiftBatchExportWorkflow, +) from posthog.temporal.workflows.s3_batch_export import ( S3BatchExportWorkflow, insert_into_s3_activity, @@ -35,6 +38,7 @@ BigQueryBatchExportWorkflow, NoOpWorkflow, PostgresBatchExportWorkflow, + RedshiftBatchExportWorkflow, S3BatchExportWorkflow, SnowflakeBatchExportWorkflow, SquashPersonOverridesWorkflow, diff --git a/posthog/temporal/workflows/postgres_batch_export.py b/posthog/temporal/workflows/postgres_batch_export.py index 3c19c8eae8998..19c0960a9a45a 100644 --- a/posthog/temporal/workflows/postgres_batch_export.py +++ b/posthog/temporal/workflows/postgres_batch_export.py @@ -69,7 +69,7 @@ def copy_tsv_to_postgres(tsv_file, postgres_connection, schema: str, table_name: @dataclass class PostgresInsertInputs: - """Inputs for Postgres.""" + """Inputs for Postgres insert activity.""" team_id: int user: str @@ -84,6 +84,19 @@ class PostgresInsertInputs: port: int = 5432 exclude_events: list[str] | None = None include_events: list[str] | None = None + fields: list[tuple[str, str]] = [ + ("uuid", "VARCHAR(200)"), + ("event", "VARCHAR(200)"), + ("properties", "JSONB"), + ("elements", "JSONB"), + ("set", "JSONB"), + ("set_once", "JSONB"), + ("distinct_id", "VARCHAR(200)"), + ("team_id", "INTEGER"), + ("ip", "VARCHAR(200)"), + ("site_url", "VARCHAR(200)"), + ("timestamp", "TIMESTAMP WITH TIME ZONE"), + ] @activity.defn @@ -137,21 +150,17 @@ async def insert_into_postgres_activity(inputs: PostgresInsertInputs): result = cursor.execute( sql.SQL( """ - CREATE TABLE IF NOT EXISTS {} ( - "uuid" VARCHAR(200), - "event" VARCHAR(200), - "properties" JSONB, - "elements" JSONB, - "set" JSONB, - "set_once" JSONB, - "distinct_id" VARCHAR(200), - "team_id" INTEGER, - "ip" VARCHAR(200), - "site_url" VARCHAR(200), - "timestamp" TIMESTAMP WITH TIME ZONE + CREATE TABLE IF NOT EXISTS {table} ( + {fields} ) """ - ).format(table_identifier) + ).format( + table=table_identifier, + fields=sql.SQL(",").join( + sql.SQL("{field} {type}").format(field=sql.Identifier(field), type=sql.SQL(field_type)) + for field, field_type in inputs.fields + ), + ) ) schema_columns = [ diff --git a/posthog/temporal/workflows/redshift_batch_export.py b/posthog/temporal/workflows/redshift_batch_export.py new file mode 100644 index 0000000000000..8acce1c7aeb07 --- /dev/null +++ b/posthog/temporal/workflows/redshift_batch_export.py @@ -0,0 +1,112 @@ +import datetime as dt +import json +from dataclasses import dataclass + +from temporalio import workflow +from temporalio.common import RetryPolicy + +from posthog.batch_exports.service import RedshiftBatchExportInputs +from posthog.temporal.workflows.base import PostHogWorkflow +from posthog.temporal.workflows.batch_exports import ( + CreateBatchExportRunInputs, + UpdateBatchExportRunStatusInputs, + create_export_run, + execute_batch_export_insert_activity, + get_batch_exports_logger, + get_data_interval, +) +from posthog.temporal.workflows.postgres_batch_export import ( + PostgresInsertInputs, + insert_into_postgres_activity, +) + + +@dataclass +class RedshiftInsertInputs(PostgresInsertInputs): + """Inputs for Redshift insert activity. + + Inherit from PostgresInsertInputs as they are the same, but + update fields to account for JSONB not being supported in Redshift. + """ + + fields: list[tuple[str, str]] = [ + ("uuid", "VARCHAR(200)"), + ("event", "VARCHAR(200)"), + ("properties", "VARCHAR"), + ("elements", "VARCHAR"), + ("set", "VARCHAR"), + ("set_once", "VARCHAR"), + ("distinct_id", "VARCHAR(200)"), + ("team_id", "INTEGER"), + ("ip", "VARCHAR(200)"), + ("site_url", "VARCHAR(200)"), + ("timestamp", "TIMESTAMP WITH TIME ZONE"), + ] + + +@workflow.defn(name="redshift-export") +class RedshiftBatchExportWorkflow(PostHogWorkflow): + """A Temporal Workflow to export ClickHouse data into Postgres. + + This Workflow is intended to be executed both manually and by a Temporal + Schedule. When ran by a schedule, `data_interval_end` should be set to + `None` so that we will fetch the end of the interval from the Temporal + search attribute `TemporalScheduledStartTime`. + + This Workflow executes the same insert activity as the PostgresBatchExportWorkflow, + as Postgres and AWS Redshift are fairly compatible. The only differences are: + * Postgres JSONB fields are VARCHAR in Redshift. + * Non retryable errors can be different between both. + """ + + @staticmethod + def parse_inputs(inputs: list[str]) -> RedshiftBatchExportInputs: + """Parse inputs from the management command CLI.""" + loaded = json.loads(inputs[0]) + return RedshiftBatchExportInputs(**loaded) + + @workflow.run + async def run(self, inputs: RedshiftBatchExportInputs): + logger = get_batch_exports_logger(inputs=inputs) + data_interval_start, data_interval_end = get_data_interval(inputs.interval, inputs.data_interval_end) + logger.info("Starting Redshift export batch %s - %s", data_interval_start, data_interval_end) + + create_export_run_inputs = CreateBatchExportRunInputs( + team_id=inputs.team_id, + batch_export_id=inputs.batch_export_id, + data_interval_start=data_interval_start.isoformat(), + data_interval_end=data_interval_end.isoformat(), + ) + run_id = await workflow.execute_activity( + create_export_run, + create_export_run_inputs, + start_to_close_timeout=dt.timedelta(minutes=5), + retry_policy=RetryPolicy( + initial_interval=dt.timedelta(seconds=10), + maximum_interval=dt.timedelta(seconds=60), + maximum_attempts=0, + non_retryable_error_types=["NotNullViolation", "IntegrityError"], + ), + ) + + update_inputs = UpdateBatchExportRunStatusInputs(id=run_id, status="Completed") + + insert_inputs = RedshiftInsertInputs( + team_id=inputs.team_id, + user=inputs.user, + password=inputs.password, + host=inputs.host, + port=inputs.port, + database=inputs.database, + schema=inputs.schema, + table_name=inputs.table_name, + has_self_signed_cert=inputs.has_self_signed_cert, + data_interval_start=data_interval_start.isoformat(), + data_interval_end=data_interval_end.isoformat(), + exclude_events=inputs.exclude_events, + include_events=inputs.include_events, + ) + + await execute_batch_export_insert_activity( + insert_into_postgres_activity, insert_inputs, non_retryable_error_types=[], update_inputs=update_inputs + ) From 86b46014fc191f0e11c3485464822b11a4bdada9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Far=C3=ADas=20Santana?= Date: Wed, 18 Oct 2023 13:41:29 +0200 Subject: [PATCH 07/18] feat(batch-exports): Add Redshift to BatchExport destinations --- latest_migrations.manifest | 2 +- posthog/batch_exports/models.py | 2 ++ posthog/batch_exports/service.py | 1 + ...7_add_redshift_batch_export_destination.py | 28 +++++++++++++++++++ 4 files changed, 32 insertions(+), 1 deletion(-) create mode 100644 posthog/migrations/0357_add_redshift_batch_export_destination.py diff --git a/latest_migrations.manifest b/latest_migrations.manifest index 814358e2c8565..e74eafecde55d 100644 --- a/latest_migrations.manifest +++ b/latest_migrations.manifest @@ -5,7 +5,7 @@ contenttypes: 0002_remove_content_type_name ee: 0015_add_verified_properties otp_static: 0002_throttling otp_totp: 0002_auto_20190420_0723 -posthog: 0356_add_replay_cost_control +posthog: 0357_add_redshift_batch_export_destination sessions: 0001_initial social_django: 0010_uid_db_index two_factor: 0007_auto_20201201_1019 diff --git a/posthog/batch_exports/models.py b/posthog/batch_exports/models.py index 79a7928fd6b3c..30ad08bc13c86 100644 --- a/posthog/batch_exports/models.py +++ b/posthog/batch_exports/models.py @@ -26,6 +26,7 @@ class Destination(models.TextChoices): S3 = "S3" SNOWFLAKE = "Snowflake" POSTGRES = "Postgres" + REDSHIFT = "Redshift" BIGQUERY = "BigQuery" NOOP = "NoOp" @@ -33,6 +34,7 @@ class Destination(models.TextChoices): "S3": {"aws_access_key_id", "aws_secret_access_key"}, "Snowflake": set("password"), "Postgres": set("password"), + "Redshift": set("password"), "BigQuery": {"private_key", "private_key_id", "client_email", "token_uri"}, "NoOp": set(), } diff --git a/posthog/batch_exports/service.py b/posthog/batch_exports/service.py index bd625c00f6f93..26c85d8015fe2 100644 --- a/posthog/batch_exports/service.py +++ b/posthog/batch_exports/service.py @@ -142,6 +142,7 @@ class NoOpInputs: "S3": ("s3-export", S3BatchExportInputs), "Snowflake": ("snowflake-export", SnowflakeBatchExportInputs), "Postgres": ("postgres-export", PostgresBatchExportInputs), + "Redshift": ("redshift-export", RedshiftBatchExportInputs), "BigQuery": ("bigquery-export", BigQueryBatchExportInputs), "NoOp": ("no-op", NoOpInputs), } diff --git a/posthog/migrations/0357_add_redshift_batch_export_destination.py b/posthog/migrations/0357_add_redshift_batch_export_destination.py new file mode 100644 index 0000000000000..43ec67ac58073 --- /dev/null +++ b/posthog/migrations/0357_add_redshift_batch_export_destination.py @@ -0,0 +1,28 @@ +# Generated by Django 3.2.19 on 2023-10-18 11:40 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + dependencies = [ + ("posthog", "0356_add_replay_cost_control"), + ] + + operations = [ + migrations.AlterField( + model_name="batchexportdestination", + name="type", + field=models.CharField( + choices=[ + ("S3", "S3"), + ("Snowflake", "Snowflake"), + ("Postgres", "Postgres"), + ("Redshift", "Redshift"), + ("BigQuery", "Bigquery"), + ("NoOp", "Noop"), + ], + help_text="A choice of supported BatchExportDestination types.", + max_length=64, + ), + ), + ] From fd4172254b13732a087a841f5acf36d5d7ad8039 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Far=C3=ADas=20Santana?= Date: Wed, 18 Oct 2023 14:22:50 +0200 Subject: [PATCH 08/18] feat(batch-exports): Support properties_data_type Redshift plugin parameter --- posthog/batch_exports/service.py | 2 +- .../workflows/redshift_batch_export.py | 24 +++++++++++++++---- 2 files changed, 21 insertions(+), 5 deletions(-) diff --git a/posthog/batch_exports/service.py b/posthog/batch_exports/service.py index 26c85d8015fe2..547255fabd9f1 100644 --- a/posthog/batch_exports/service.py +++ b/posthog/batch_exports/service.py @@ -106,7 +106,7 @@ class PostgresBatchExportInputs: class RedshiftBatchExportInputs(PostgresBatchExportInputs): """Inputs for Redshift export workflow.""" - pass + properties_data_type: str = "varchar" @dataclass diff --git a/posthog/temporal/workflows/redshift_batch_export.py b/posthog/temporal/workflows/redshift_batch_export.py index 8acce1c7aeb07..2de9d1fdc5fa7 100644 --- a/posthog/temporal/workflows/redshift_batch_export.py +++ b/posthog/temporal/workflows/redshift_batch_export.py @@ -32,10 +32,10 @@ class RedshiftInsertInputs(PostgresInsertInputs): fields: list[tuple[str, str]] = [ ("uuid", "VARCHAR(200)"), ("event", "VARCHAR(200)"), - ("properties", "VARCHAR"), - ("elements", "VARCHAR"), - ("set", "VARCHAR"), - ("set_once", "VARCHAR"), + ("properties", "VARCHAR(65535)"), + ("elements", "VARCHAR(65535)"), + ("set", "VARCHAR(65535)"), + ("set_once", "VARCHAR(65535)"), ("distinct_id", "VARCHAR(200)"), ("team_id", "INTEGER"), ("ip", "VARCHAR(200)"), @@ -67,6 +67,7 @@ def parse_inputs(inputs: list[str]) -> RedshiftBatchExportInputs: @workflow.run async def run(self, inputs: RedshiftBatchExportInputs): + """Workflow implementation to export data to Redshift.""" logger = get_batch_exports_logger(inputs=inputs) data_interval_start, data_interval_end = get_data_interval(inputs.interval, inputs.data_interval_end) logger.info("Starting Redshift export batch %s - %s", data_interval_start, data_interval_end) @@ -91,6 +92,8 @@ async def run(self, inputs: RedshiftBatchExportInputs): update_inputs = UpdateBatchExportRunStatusInputs(id=run_id, status="Completed") + properties_type = "VARCHAR(65535)" if inputs.properties_data_type == "varchar" else "SUPER" + insert_inputs = RedshiftInsertInputs( team_id=inputs.team_id, user=inputs.user, @@ -105,6 +108,19 @@ async def run(self, inputs: RedshiftBatchExportInputs): data_interval_end=data_interval_end.isoformat(), exclude_events=inputs.exclude_events, include_events=inputs.include_events, + fields=[ + ("uuid", "VARCHAR(200)"), + ("event", "VARCHAR(200)"), + ("properties", properties_type), + ("elements", properties_type), + ("set", properties_type), + ("set_once", properties_type), + ("distinct_id", "VARCHAR(200)"), + ("team_id", "INTEGER"), + ("ip", "VARCHAR(200)"), + ("site_url", "VARCHAR(200)"), + ("timestamp", "TIMESTAMP WITH TIME ZONE"), + ], ) await execute_batch_export_insert_activity( From d4c106f26d8ebb8cc9e94ffe6e5e48a9b26c6ef4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Far=C3=ADas=20Santana?= Date: Thu, 19 Oct 2023 12:34:26 +0200 Subject: [PATCH 09/18] refactor(batch-exports): Insert rows instead of using COPY --- .../workflows/postgres_batch_export.py | 112 ++++++++---- .../workflows/redshift_batch_export.py | 171 ++++++++++++++---- 2 files changed, 213 insertions(+), 70 deletions(-) diff --git a/posthog/temporal/workflows/postgres_batch_export.py b/posthog/temporal/workflows/postgres_batch_export.py index 19c0960a9a45a..ff6aa2d2df855 100644 --- a/posthog/temporal/workflows/postgres_batch_export.py +++ b/posthog/temporal/workflows/postgres_batch_export.py @@ -1,9 +1,11 @@ +import collections.abc import contextlib import datetime as dt import json from dataclasses import dataclass import psycopg2 +import psycopg2.extensions from django.conf import settings from psycopg2 import sql from temporalio import activity, exceptions, workflow @@ -26,7 +28,7 @@ @contextlib.contextmanager -def postgres_connection(inputs): +def postgres_connection(inputs) -> collections.abc.Iterator[psycopg2.extensions.connection]: """Manage a Postgres connection.""" connection = psycopg2.connect( user=inputs.user, @@ -52,8 +54,22 @@ def postgres_connection(inputs): connection.close() -def copy_tsv_to_postgres(tsv_file, postgres_connection, schema: str, table_name: str, schema_columns): - """Execute a COPY FROM query with given connection to copy contents of tsv_file.""" +def copy_tsv_to_postgres( + tsv_file, + postgres_connection: psycopg2.extensions.connection, + schema: str, + table_name: str, + schema_columns: list[str], +): + """Execute a COPY FROM query with given connection to copy contents of tsv_file. + + Arguments: + tsv_file: A file-like object to interpret as TSV to copy its contents. + postgres_connection: A connection to Postgres as setup by psycopg2. + schema: An existing schema where to create the table. + table_name: The name of the table to create. + schema_columns: A list of column names. + """ tsv_file.seek(0) with postgres_connection.cursor() as cursor: @@ -67,6 +83,44 @@ def copy_tsv_to_postgres(tsv_file, postgres_connection, schema: str, table_name: ) +Field = tuple[str, str] +Fields = collections.abc.Iterable[Field] + + +def create_table_in_postgres( + postgres_connection: psycopg2.extensions.connection, schema: str | None, table_name: str, fields: Fields +) -> None: + """Create a table in a Postgres database if it doesn't exist already. + + Arguments: + postgres_connection: A connection to Postgres as setup by psycopg2. + schema: An existing schema where to create the table. + table_name: The name of the table to create. + fields: An iterable of (name, type) tuples representing the fields of the table. + """ + if schema: + table_identifier = sql.Identifier(schema, table_name) + else: + table_identifier = sql.Identifier(table_name) + + with postgres_connection.cursor() as cursor: + cursor.execute( + sql.SQL( + """ + CREATE TABLE IF NOT EXISTS {table} ( + {fields} + ) + """ + ).format( + table=table_identifier, + fields=sql.SQL(",").join( + sql.SQL("{field} {type}").format(field=sql.Identifier(field), type=sql.SQL(field_type)) + for field, field_type in fields + ), + ) + ) + + @dataclass class PostgresInsertInputs: """Inputs for Postgres insert activity.""" @@ -84,19 +138,6 @@ class PostgresInsertInputs: port: int = 5432 exclude_events: list[str] | None = None include_events: list[str] | None = None - fields: list[tuple[str, str]] = [ - ("uuid", "VARCHAR(200)"), - ("event", "VARCHAR(200)"), - ("properties", "JSONB"), - ("elements", "JSONB"), - ("set", "JSONB"), - ("set_once", "JSONB"), - ("distinct_id", "VARCHAR(200)"), - ("team_id", "INTEGER"), - ("ip", "VARCHAR(200)"), - ("site_url", "VARCHAR(200)"), - ("timestamp", "TIMESTAMP WITH TIME ZONE"), - ] @activity.defn @@ -141,27 +182,24 @@ async def insert_into_postgres_activity(inputs: PostgresInsertInputs): include_events=inputs.include_events, ) with postgres_connection(inputs) as connection: - with connection.cursor() as cursor: - if inputs.schema: - table_identifier = sql.Identifier(inputs.schema, inputs.table_name) - else: - table_identifier = sql.Identifier(inputs.table_name) - - result = cursor.execute( - sql.SQL( - """ - CREATE TABLE IF NOT EXISTS {table} ( - {fields} - ) - """ - ).format( - table=table_identifier, - fields=sql.SQL(",").join( - sql.SQL("{field} {type}").format(field=sql.Identifier(field), type=sql.SQL(field_type)) - for field, field_type in inputs.fields - ), - ) - ) + create_table_in_postgres( + connection, + schema=inputs.schema, + table_name=inputs.table_name, + fields=[ + ("uuid", "VARCHAR(200)"), + ("event", "VARCHAR(200)"), + ("properties", "JSONB"), + ("elements", "JSONB"), + ("set", "JSONB"), + ("set_once", "JSONB"), + ("distinct_id", "VARCHAR(200)"), + ("team_id", "INTEGER"), + ("ip", "VARCHAR(200)"), + ("site_url", "VARCHAR(200)"), + ("timestamp", "TIMESTAMP WITH TIME ZONE"), + ], + ) schema_columns = [ "uuid", diff --git a/posthog/temporal/workflows/redshift_batch_export.py b/posthog/temporal/workflows/redshift_batch_export.py index 2de9d1fdc5fa7..1bbea08d2a422 100644 --- a/posthog/temporal/workflows/redshift_batch_export.py +++ b/posthog/temporal/workflows/redshift_batch_export.py @@ -1,8 +1,13 @@ import datetime as dt import json +import typing from dataclasses import dataclass -from temporalio import workflow +import psycopg2 +import psycopg2.extensions +import psycopg2.extras +from psycopg2 import sql +from temporalio import activity, workflow from temporalio.common import RetryPolicy from posthog.batch_exports.service import RedshiftBatchExportInputs @@ -14,34 +19,148 @@ execute_batch_export_insert_activity, get_batch_exports_logger, get_data_interval, + get_results_iterator, + get_rows_count, ) +from posthog.temporal.workflows.clickhouse import get_client from posthog.temporal.workflows.postgres_batch_export import ( PostgresInsertInputs, - insert_into_postgres_activity, + create_table_in_postgres, + postgres_connection, ) +def insert_record_to_redshift( + record: dict[str, typing.Any], + redshift_connection: psycopg2.extensions.connection, + schema: str, + table: str, +): + """Execute an INSERT query with given Redshift connection. + + The recommended way to insert multiple values into Redshift is using a COPY statement (see: + https://docs.aws.amazon.com/redshift/latest/dg/r_COPY.html). However, Redshift cannot COPY from local + files like Postgres, but only from files in S3 or executing commands in SSH hosts. Setting that up would + be quite complex and require more configuration from the user compared to the old Redshift export plugin. + For this reasons, we are going with basic INSERT statements for now, and we can migrate to COPY from S3 + later if the need arises. + + Arguments: + record: A dictionary representing the record to insert. Each key should correspond to a column + in the destination table. + redshift_connection: A connection to Redshift setup by psycopg2. + schema: The schema that contains the table where to insert the record. + table: The name of the table where to insert the record. + """ + columns = record.keys() + + with redshift_connection.cursor() as cursor: + query = sql.SQL("INSERT INTO {table} {fields} VALUES {placeholder}").format( + table=sql.Identifier(schema, table), + fields=sql.SQL(", ").join(map(sql.Identifier, columns)), + placeholder=sql.Placeholder(), + ) + template = sql.SQL("({})").format(sql.SQL(", ").join(map(sql.Placeholder, columns))) + + psycopg2.extras.execute_values(cursor, query, record, template) + + @dataclass class RedshiftInsertInputs(PostgresInsertInputs): """Inputs for Redshift insert activity. - Inherit from PostgresInsertInputs as they are the same, but - update fields to account for JSONB not being supported in Redshift. + Inherit from PostgresInsertInputs as they are the same, but allow + for setting property_data_type which is unique to Redshift. """ - fields: list[tuple[str, str]] = [ - ("uuid", "VARCHAR(200)"), - ("event", "VARCHAR(200)"), - ("properties", "VARCHAR(65535)"), - ("elements", "VARCHAR(65535)"), - ("set", "VARCHAR(65535)"), - ("set_once", "VARCHAR(65535)"), - ("distinct_id", "VARCHAR(200)"), - ("team_id", "INTEGER"), - ("ip", "VARCHAR(200)"), - ("site_url", "VARCHAR(200)"), - ("timestamp", "TIMESTAMP WITH TIME ZONE"), - ] + properties_data_type: str = "varchar" + + +@activity.defn +async def insert_into_redshift_activity(inputs: RedshiftInsertInputs): + """Activity streams data from ClickHouse to Redshift.""" + logger = get_batch_exports_logger(inputs=inputs) + logger.info( + "Running Postgres export batch %s - %s", + inputs.data_interval_start, + inputs.data_interval_end, + ) + + async with get_client() as client: + if not await client.is_alive(): + raise ConnectionError("Cannot establish connection to ClickHouse") + + count = await get_rows_count( + client=client, + team_id=inputs.team_id, + interval_start=inputs.data_interval_start, + interval_end=inputs.data_interval_end, + exclude_events=inputs.exclude_events, + include_events=inputs.include_events, + ) + + if count == 0: + logger.info( + "Nothing to export in batch %s - %s", + inputs.data_interval_start, + inputs.data_interval_end, + ) + return + + logger.info("BatchExporting %s rows to Postgres", count) + + results_iterator = get_results_iterator( + client=client, + team_id=inputs.team_id, + interval_start=inputs.data_interval_start, + interval_end=inputs.data_interval_end, + exclude_events=inputs.exclude_events, + include_events=inputs.include_events, + ) + properties_type = "VARCHAR(65535)" if inputs.properties_data_type == "varchar" else "SUPER" + + with postgres_connection(inputs) as connection: + create_table_in_postgres( + connection, + schema=inputs.schema, + table_name=inputs.table_name, + fields=[ + ("uuid", "VARCHAR(200)"), + ("event", "VARCHAR(200)"), + ("properties", properties_type), + ("elements", properties_type), + ("set", properties_type), + ("set_once", properties_type), + ("distinct_id", "VARCHAR(200)"), + ("team_id", "INTEGER"), + ("ip", "VARCHAR(200)"), + ("site_url", "VARCHAR(200)"), + ("timestamp", "TIMESTAMP WITH TIME ZONE"), + ], + ) + + schema_columns = [ + "uuid", + "event", + "properties", + "elements", + "set", + "set_once", + "distinct_id", + "team_id", + "ip", + "site_url", + "timestamp", + ] + json_columns = ("properties", "elements", "set", "set_once") + + with postgres_connection(inputs) as connection: + for result in results_iterator: + record = { + key: json.dumps(result[key]) if key in json_columns and result[key] is not None else result[key] + for key in schema_columns + } + insert_record_to_redshift(record, connection, inputs.schema, inputs.table_name) @workflow.defn(name="redshift-export") @@ -92,8 +211,6 @@ async def run(self, inputs: RedshiftBatchExportInputs): update_inputs = UpdateBatchExportRunStatusInputs(id=run_id, status="Completed") - properties_type = "VARCHAR(65535)" if inputs.properties_data_type == "varchar" else "SUPER" - insert_inputs = RedshiftInsertInputs( team_id=inputs.team_id, user=inputs.user, @@ -108,21 +225,9 @@ async def run(self, inputs: RedshiftBatchExportInputs): data_interval_end=data_interval_end.isoformat(), exclude_events=inputs.exclude_events, include_events=inputs.include_events, - fields=[ - ("uuid", "VARCHAR(200)"), - ("event", "VARCHAR(200)"), - ("properties", properties_type), - ("elements", properties_type), - ("set", properties_type), - ("set_once", properties_type), - ("distinct_id", "VARCHAR(200)"), - ("team_id", "INTEGER"), - ("ip", "VARCHAR(200)"), - ("site_url", "VARCHAR(200)"), - ("timestamp", "TIMESTAMP WITH TIME ZONE"), - ], + properties_data_type=inputs.properties_data_type, ) await execute_batch_export_insert_activity( - insert_into_postgres_activity, insert_inputs, non_retryable_error_types=[], update_inputs=update_inputs + insert_into_redshift_activity, insert_inputs, non_retryable_error_types=[], update_inputs=update_inputs ) From 49e71f44252d29af2eb0dd77a45a25d1fb73046d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Far=C3=ADas=20Santana?= Date: Mon, 30 Oct 2023 19:32:28 +0100 Subject: [PATCH 10/18] test: Add unit test for insert_into_redshift_activity --- .../test_redshift_batch_export_workflow.py | 310 ++++++++++++++++++ .../workflows/redshift_batch_export.py | 46 ++- 2 files changed, 342 insertions(+), 14 deletions(-) create mode 100644 posthog/temporal/tests/batch_exports/test_redshift_batch_export_workflow.py diff --git a/posthog/temporal/tests/batch_exports/test_redshift_batch_export_workflow.py b/posthog/temporal/tests/batch_exports/test_redshift_batch_export_workflow.py new file mode 100644 index 0000000000000..8414d79317689 --- /dev/null +++ b/posthog/temporal/tests/batch_exports/test_redshift_batch_export_workflow.py @@ -0,0 +1,310 @@ +from uuid import uuid4 +from random import randint +import json +import datetime as dt +import os + +import psycopg2 +from psycopg2 import sql +import pytest +from django.conf import settings + +from posthog.temporal.tests.batch_exports.base import ( + EventValues, + amaterialize, + insert_events, +) +from posthog.temporal.workflows.clickhouse import ClickHouseClient +from posthog.temporal.workflows.redshift_batch_export import ( + RedshiftInsertInputs, + insert_into_redshift_activity, +) + +REQUIRED_ENV_VARS = ( + "REDSHIFT_USER", + "REDSHIFT_PASSWORD", + "REDSHIFT_HOST", +) + +pytestmark = pytest.mark.skipif( + any(env_var not in os.environ for env_var in REQUIRED_ENV_VARS), + reason="Redshift required env vars are not set", +) + + +def assert_events_in_redshift(connection, schema, table_name, events): + """Assert provided events written to a given Postgres table.""" + + inserted_events = [] + + with connection.cursor() as cursor: + cursor.execute(sql.SQL("SELECT * FROM {} ORDER BY timestamp").format(sql.Identifier(schema, table_name))) + columns = [column.name for column in cursor.description] + + for row in cursor.fetchall(): + event = dict(zip(columns, row)) + event["timestamp"] = dt.datetime.fromisoformat(event["timestamp"].isoformat()) + inserted_events.append(event) + + expected_events = [] + for event in events: + properties = event.get("properties", None) + elements_chain = event.get("elements_chain", None) + expected_event = { + "distinct_id": event.get("distinct_id"), + "elements": json.dumps(elements_chain) if elements_chain else None, + "event": event.get("event"), + "ip": properties.get("$ip", None) if properties else None, + "properties": json.dumps(properties) if properties else None, + "set": properties.get("$set", None) if properties else None, + "set_once": properties.get("$set_once", None) if properties else None, + # Kept for backwards compatibility, but not exported anymore. + "site_url": "", + # For compatibility with CH which doesn't parse timezone component, so we add it here assuming UTC. + "timestamp": dt.datetime.fromisoformat(event.get("timestamp") + "+00:00"), + "team_id": event.get("team_id"), + "uuid": event.get("uuid"), + } + expected_events.append(expected_event) + + expected_events.sort(key=lambda x: x["timestamp"]) + + assert len(inserted_events) == len(expected_events) + # First check one event, the first one, so that we can get a nice diff if + # the included data is different. + assert inserted_events[0] == expected_events[0] + assert inserted_events == expected_events + + +@pytest.fixture +def redshift_config(): + """Fixture to provide a default configuration for Redshift batch exports.""" + user = os.environ["REDSHIFT_USER"] + password = os.environ["REDSHIFT_PASSWORD"] + host = os.environ["REDSHIFT_HOST"] + port = os.environ.get("REDSHIFT_PORT", "5439") + + return { + "user": user, + "password": password, + "database": "exports_test_database", + "schema": "exports_test_schema", + "host": host, + "port": int(port), + } + + +@pytest.fixture +def setup_test_db(redshift_config): + """Fixture to manage a database for Redshift exports.""" + connection = psycopg2.connect( + user=redshift_config["user"], + password=redshift_config["password"], + host=redshift_config["host"], + port=redshift_config["port"], + database="dev", + ) + connection.set_session(autocommit=True) + + with connection.cursor() as cursor: + cursor.execute(sql.SQL("SELECT 1 FROM pg_database WHERE datname = %s"), (redshift_config["database"],)) + + if cursor.fetchone() is None: + cursor.execute(sql.SQL("CREATE DATABASE {}").format(sql.Identifier(redshift_config["database"]))) + + connection.close() + + # We need a new connection to connect to the database we just created. + connection = psycopg2.connect( + user=redshift_config["user"], + password=redshift_config["password"], + host=redshift_config["host"], + port=redshift_config["port"], + database=redshift_config["database"], + ) + connection.set_session(autocommit=True) + + with connection.cursor() as cursor: + cursor.execute(sql.SQL("CREATE SCHEMA IF NOT EXISTS {}").format(sql.Identifier(redshift_config["schema"]))) + + yield + + with connection.cursor() as cursor: + cursor.execute(sql.SQL("DROP SCHEMA {} CASCADE").format(sql.Identifier(redshift_config["schema"]))) + + connection.close() + + # We need a new connection to drop the database, as we cannot drop the current database. + connection = psycopg2.connect( + user=redshift_config["user"], + password=redshift_config["password"], + host=redshift_config["host"], + port=redshift_config["port"], + database="dev", + ) + connection.set_session(autocommit=True) + + with connection.cursor() as cursor: + cursor.execute(sql.SQL("DROP DATABASE {}").format(sql.Identifier(redshift_config["database"]))) + + connection.close() + + +@pytest.fixture +def psycopg2_connection(redshift_config, setup_test_db): + """Fixture to manage a psycopg2 connection.""" + connection = psycopg2.connect( + user=redshift_config["user"], + password=redshift_config["password"], + database=redshift_config["database"], + host=redshift_config["host"], + port=redshift_config["port"], + ) + + yield connection + + connection.close() + + +@pytest.mark.django_db +@pytest.mark.asyncio +async def test_insert_into_redshift_activity_inserts_data_into_redshift_table( + activity_environment, psycopg2_connection, redshift_config +): + """Test that the insert_into_postgres_activity function inserts data into a Postgres table.""" + + data_interval_start = "2023-04-20 14:00:00" + data_interval_end = "2023-04-25 15:00:00" + + # Generate a random team id integer. There's still a chance of a collision, + # but it's very small. + team_id = randint(1, 1000000) + + # Add a materialized column such that we can verify that it is NOT included + # in the export. + await amaterialize("events", "$browser") + + # Create enough events to ensure we span more than 5MB, the smallest + # multipart chunk size for multipart uploads to POSTGRES. + events: list[EventValues] = [ + { + "uuid": str(uuid4()), + "event": "test", + "_timestamp": "2023-04-20 14:30:00", + "timestamp": f"2023-04-20 14:30:00.{i:06d}", + "inserted_at": f"2023-04-20 14:30:00.{i:06d}", + "created_at": "2023-04-20 14:30:00.000000", + "distinct_id": str(uuid4()), + "person_id": str(uuid4()), + "person_properties": {"$browser": "Chrome", "$os": "Mac OS X"}, + "team_id": team_id, + "properties": {"$browser": "Chrome", "$os": "Mac OS X"}, + "elements_chain": 'strong.pricingpage:attr__class="pricingpage"nth-child="1"nth-of-type="1"text="A question?";', + } + # NOTE: we have to do a lot here, otherwise we do not trigger a + # multipart upload, and the minimum part chunk size is 5MB. + for i in range(1000) + ] + + events += [ + # Insert an events with an empty string in `properties` and + # `person_properties` to ensure that we handle empty strings correctly. + EventValues( + { + "uuid": str(uuid4()), + "event": "test", + "_timestamp": "2023-04-20 14:29:00", + "timestamp": "2023-04-20 14:29:00.000000", + "inserted_at": "2023-04-20 14:30:00.000000", + "created_at": "2023-04-20 14:29:00.000000", + "distinct_id": str(uuid4()), + "person_id": str(uuid4()), + "person_properties": None, + "team_id": team_id, + "properties": None, + "elements_chain": 'strong.pricingpage:attr__class="pricingpage"nth-child="1"nth-of-type="1"text="A question?";', + } + ) + ] + + ch_client = ClickHouseClient( + url=settings.CLICKHOUSE_HTTP_URL, + user=settings.CLICKHOUSE_USER, + password=settings.CLICKHOUSE_PASSWORD, + database=settings.CLICKHOUSE_DATABASE, + ) + + # Insert some data into the `sharded_events` table. + await insert_events( + client=ch_client, + events=events, + ) + + # Insert some events before the hour and after the hour, as well as some + # events from another team to ensure that we only export the events from + # the team that the batch export is for. + other_team_id = team_id + 1 + await insert_events( + client=ch_client, + events=[ + { + "uuid": str(uuid4()), + "event": "test", + "timestamp": "2023-04-20 13:30:00", + "_timestamp": "2023-04-20 13:30:00", + "inserted_at": "2023-04-20 13:30:00.000000", + "created_at": "2023-04-20 13:30:00.000000", + "person_id": str(uuid4()), + "distinct_id": str(uuid4()), + "team_id": team_id, + "properties": {"$browser": "Chrome", "$os": "Mac OS X"}, + "person_properties": {"$browser": "Chrome", "$os": "Mac OS X"}, + "elements_chain": 'strong.pricingpage:attr__class="pricingpage"nth-child="1"nth-of-type="1"text="A question?";', + }, + { + "uuid": str(uuid4()), + "event": "test", + "timestamp": "2023-04-20 15:30:00", + "_timestamp": "2023-04-20 13:30:00", + "inserted_at": "2023-04-20 13:30:00.000000", + "created_at": "2023-04-20 13:30:00.000000", + "person_id": str(uuid4()), + "distinct_id": str(uuid4()), + "team_id": team_id, + "properties": {"$browser": "Chrome", "$os": "Mac OS X"}, + "person_properties": {"$browser": "Chrome", "$os": "Mac OS X"}, + "elements_chain": 'strong.pricingpage:attr__class="pricingpage"nth-child="1"nth-of-type="1"text="A question?";', + }, + { + "uuid": str(uuid4()), + "event": "test", + "timestamp": "2023-04-20 14:30:00", + "_timestamp": "2023-04-20 14:30:00", + "inserted_at": "2023-04-20 14:30:00.000000", + "created_at": "2023-04-20 14:30:00.000000", + "person_id": str(uuid4()), + "distinct_id": str(uuid4()), + "team_id": other_team_id, + "properties": {"$browser": "Chrome", "$os": "Mac OS X"}, + "person_properties": {"$browser": "Chrome", "$os": "Mac OS X"}, + "elements_chain": 'strong.pricingpage:attr__class="pricingpage"nth-child="1"nth-of-type="1"text="A question?";', + }, + ], + ) + + insert_inputs = RedshiftInsertInputs( + team_id=team_id, + table_name="test_table", + data_interval_start=data_interval_start, + data_interval_end=data_interval_end, + **redshift_config, + ) + + await activity_environment.run(insert_into_redshift_activity, insert_inputs) + + assert_events_in_redshift( + connection=psycopg2_connection, + schema=redshift_config["schema"], + table_name="test_table", + events=events, + ) diff --git a/posthog/temporal/workflows/redshift_batch_export.py b/posthog/temporal/workflows/redshift_batch_export.py index 1bbea08d2a422..bd6537930a524 100644 --- a/posthog/temporal/workflows/redshift_batch_export.py +++ b/posthog/temporal/workflows/redshift_batch_export.py @@ -30,18 +30,19 @@ ) -def insert_record_to_redshift( - record: dict[str, typing.Any], +def insert_records_to_redshift( + records: list[dict[str, typing.Any]], redshift_connection: psycopg2.extensions.connection, schema: str, table: str, + batch_size: int = 100, ): """Execute an INSERT query with given Redshift connection. The recommended way to insert multiple values into Redshift is using a COPY statement (see: https://docs.aws.amazon.com/redshift/latest/dg/r_COPY.html). However, Redshift cannot COPY from local files like Postgres, but only from files in S3 or executing commands in SSH hosts. Setting that up would - be quite complex and require more configuration from the user compared to the old Redshift export plugin. + add complexity and require more configuration from the user compared to the old Redshift export plugin. For this reasons, we are going with basic INSERT statements for now, and we can migrate to COPY from S3 later if the need arises. @@ -51,18 +52,32 @@ def insert_record_to_redshift( redshift_connection: A connection to Redshift setup by psycopg2. schema: The schema that contains the table where to insert the record. table: The name of the table where to insert the record. + batch_size: Number of records to insert in batch. Setting this too high could + make us go OOM or exceed Redshift's SQL statement size limit (16MB). """ - columns = record.keys() + batch = [next(records)] + + columns = batch[0].keys() with redshift_connection.cursor() as cursor: - query = sql.SQL("INSERT INTO {table} {fields} VALUES {placeholder}").format( + query = sql.SQL("INSERT INTO {table} ({fields}) VALUES {placeholder}").format( table=sql.Identifier(schema, table), fields=sql.SQL(", ").join(map(sql.Identifier, columns)), placeholder=sql.Placeholder(), ) template = sql.SQL("({})").format(sql.SQL(", ").join(map(sql.Placeholder, columns))) - psycopg2.extras.execute_values(cursor, query, record, template) + for record in records: + batch.append(record) + + if len(batch) < batch_size: + continue + + psycopg2.extras.execute_values(cursor, query, batch, template) + batch = [] + + if len(batch) > 0: + psycopg2.extras.execute_values(cursor, query, batch, template) @dataclass @@ -128,7 +143,7 @@ async def insert_into_redshift_activity(inputs: RedshiftInsertInputs): ("uuid", "VARCHAR(200)"), ("event", "VARCHAR(200)"), ("properties", properties_type), - ("elements", properties_type), + ("elements", "VARCHAR(65535)"), ("set", properties_type), ("set_once", properties_type), ("distinct_id", "VARCHAR(200)"), @@ -152,15 +167,18 @@ async def insert_into_redshift_activity(inputs: RedshiftInsertInputs): "site_url", "timestamp", ] - json_columns = ("properties", "elements", "set", "set_once") + json_columns = ("properties", "set", "set_once") + + def map_to_record(result: dict) -> dict: + return { + key: json.dumps(result[key]) if key in json_columns and result[key] is not None else result[key] + for key in schema_columns + } with postgres_connection(inputs) as connection: - for result in results_iterator: - record = { - key: json.dumps(result[key]) if key in json_columns and result[key] is not None else result[key] - for key in schema_columns - } - insert_record_to_redshift(record, connection, inputs.schema, inputs.table_name) + insert_records_to_redshift( + (map_to_record(result) for result in results_iterator), connection, inputs.schema, inputs.table_name + ) @workflow.defn(name="redshift-export") From 09cfb8d8e46f41d57b824d11c9acdffc428dab22 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Far=C3=ADas=20Santana?= Date: Mon, 30 Oct 2023 21:00:52 +0100 Subject: [PATCH 11/18] fix: Address typing issue --- .../test_redshift_batch_export_workflow.py | 26 +++++++++++---- .../workflows/redshift_batch_export.py | 32 +++++++++++++------ 2 files changed, 41 insertions(+), 17 deletions(-) diff --git a/posthog/temporal/tests/batch_exports/test_redshift_batch_export_workflow.py b/posthog/temporal/tests/batch_exports/test_redshift_batch_export_workflow.py index 8414d79317689..90ddf571572e9 100644 --- a/posthog/temporal/tests/batch_exports/test_redshift_batch_export_workflow.py +++ b/posthog/temporal/tests/batch_exports/test_redshift_batch_export_workflow.py @@ -1,13 +1,13 @@ -from uuid import uuid4 -from random import randint -import json import datetime as dt +import json import os +from random import randint +from uuid import uuid4 import psycopg2 -from psycopg2 import sql import pytest from django.conf import settings +from psycopg2 import sql from posthog.temporal.tests.batch_exports.base import ( EventValues, @@ -33,7 +33,7 @@ def assert_events_in_redshift(connection, schema, table_name, events): - """Assert provided events written to a given Postgres table.""" + """Assert provided events written to a given Redshift table.""" inserted_events = [] @@ -78,7 +78,10 @@ def assert_events_in_redshift(connection, schema, table_name, events): @pytest.fixture def redshift_config(): - """Fixture to provide a default configuration for Redshift batch exports.""" + """Fixture to provide a default configuration for Redshift batch exports. + + Reads required env vars to construct configuration. + """ user = os.environ["REDSHIFT_USER"] password = os.environ["REDSHIFT_PASSWORD"] host = os.environ["REDSHIFT_HOST"] @@ -96,7 +99,16 @@ def redshift_config(): @pytest.fixture def setup_test_db(redshift_config): - """Fixture to manage a database for Redshift exports.""" + """Fixture to manage a database for Redshift export testing. + + Managing a test database involves the following steps: + 1. Creating a test database. + 2. Initializing a connection to that database. + 3. Creating a test schema. + 4. Yielding the connection to be used in tests. + 5. After tests, drop the test schema and any tables in it. + 6. Drop the test database. + """ connection = psycopg2.connect( user=redshift_config["user"], password=redshift_config["password"], diff --git a/posthog/temporal/workflows/redshift_batch_export.py b/posthog/temporal/workflows/redshift_batch_export.py index bd6537930a524..36721fd985822 100644 --- a/posthog/temporal/workflows/redshift_batch_export.py +++ b/posthog/temporal/workflows/redshift_batch_export.py @@ -1,3 +1,4 @@ +import collections.abc import datetime as dt import json import typing @@ -31,7 +32,7 @@ def insert_records_to_redshift( - records: list[dict[str, typing.Any]], + records: collections.abc.Iterator[dict[str, typing.Any]], redshift_connection: psycopg2.extensions.connection, schema: str, table: str, @@ -53,7 +54,8 @@ def insert_records_to_redshift( schema: The schema that contains the table where to insert the record. table: The name of the table where to insert the record. batch_size: Number of records to insert in batch. Setting this too high could - make us go OOM or exceed Redshift's SQL statement size limit (16MB). + make us go OOM or exceed Redshift's SQL statement size limit (16MB). Setting this too low + can significantly affect performance due to Redshift's poor handling of INSERTs. """ batch = [next(records)] @@ -93,7 +95,21 @@ class RedshiftInsertInputs(PostgresInsertInputs): @activity.defn async def insert_into_redshift_activity(inputs: RedshiftInsertInputs): - """Activity streams data from ClickHouse to Redshift.""" + """Activity to insert data from ClickHouse to Redshift. + + This activity executes the following steps: + 1. Check if anything is to be exported. + 2. Create destination table if not present. + 3. Query rows to export. + 4. Insert rows into Redshift. + + Args: + inputs: The dataclass holding inputs for this activity. The inputs + include: connection configuration (e.g. host, user, port), batch export + query parameters (e.g. team_id, data_interval_start, include_events), and + the Redshift-specific properties_data_type to indicate the type of JSON-like + fields. + """ logger = get_batch_exports_logger(inputs=inputs) logger.info( "Running Postgres export batch %s - %s", @@ -169,9 +185,10 @@ async def insert_into_redshift_activity(inputs: RedshiftInsertInputs): ] json_columns = ("properties", "set", "set_once") - def map_to_record(result: dict) -> dict: + def map_to_record(row: dict) -> dict: + """Map row to a record to insert to Redshift.""" return { - key: json.dumps(result[key]) if key in json_columns and result[key] is not None else result[key] + key: json.dumps(row[key]) if key in json_columns and row[key] is not None else row[key] for key in schema_columns } @@ -189,11 +206,6 @@ class RedshiftBatchExportWorkflow(PostHogWorkflow): Schedule. When ran by a schedule, `data_interval_end` should be set to `None` so that we will fetch the end of the interval from the Temporal search attribute `TemporalScheduledStartTime`. - - This Workflow executes the same insert activity as the PostgresBatchExportWorkflow, - as Postgres and AWS Redshift are fairly compatible. The only differences are: - * Postgres JSONB fields are VARCHAR in Redshift. - * Non retryable errors can be different between both. """ @staticmethod From 62591aa6f527b17e74ef0504622419640c876629 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Far=C3=ADas=20Santana?= Date: Mon, 30 Oct 2023 21:21:17 +0100 Subject: [PATCH 12/18] test: Add workflow test --- .../test_redshift_batch_export_workflow.py | 163 ++++++++++++++++++ 1 file changed, 163 insertions(+) diff --git a/posthog/temporal/tests/batch_exports/test_redshift_batch_export_workflow.py b/posthog/temporal/tests/batch_exports/test_redshift_batch_export_workflow.py index 90ddf571572e9..223291cbe15b9 100644 --- a/posthog/temporal/tests/batch_exports/test_redshift_batch_export_workflow.py +++ b/posthog/temporal/tests/batch_exports/test_redshift_batch_export_workflow.py @@ -7,15 +7,31 @@ import psycopg2 import pytest from django.conf import settings +from django.test import override_settings from psycopg2 import sql +from temporalio.common import RetryPolicy +from temporalio.testing import WorkflowEnvironment +from temporalio.worker import UnsandboxedWorkflowRunner, Worker +from posthog.api.test.test_organization import acreate_organization +from posthog.api.test.test_team import acreate_team from posthog.temporal.tests.batch_exports.base import ( EventValues, amaterialize, insert_events, ) +from posthog.temporal.tests.batch_exports.fixtures import ( + acreate_batch_export, + afetch_batch_export_runs, +) +from posthog.temporal.workflows.batch_exports import ( + create_export_run, + update_export_run_status, +) from posthog.temporal.workflows.clickhouse import ClickHouseClient from posthog.temporal.workflows.redshift_batch_export import ( + RedshiftBatchExportInputs, + RedshiftBatchExportWorkflow, RedshiftInsertInputs, insert_into_redshift_activity, ) @@ -320,3 +336,150 @@ async def test_insert_into_redshift_activity_inserts_data_into_redshift_table( table_name="test_table", events=events, ) + + +@pytest.mark.django_db +@pytest.mark.asyncio +@pytest.mark.parametrize("interval", ["hour", "day"]) +async def test_redshift_export_workflow( + redshift_config, + redshift_connection, + interval, +): + """Test Redshift Export Workflow end-to-end.""" + table_name = "test_workflow_table" + destination_data = { + "type": "Redshift", + "config": {**redshift_config, "table_name": table_name}, + } + batch_export_data = { + "name": "my-production-redshift-export", + "destination": destination_data, + "interval": interval, + } + + organization = await acreate_organization("test") + team = await acreate_team(organization=organization) + batch_export = await acreate_batch_export( + team_id=team.pk, + name=batch_export_data["name"], + destination_data=batch_export_data["destination"], + interval=batch_export_data["interval"], + ) + + events: list[EventValues] = [ + { + "uuid": str(uuid4()), + "event": "test", + "timestamp": "2023-04-25 13:30:00.000000", + "created_at": "2023-04-25 13:30:00.000000", + "inserted_at": "2023-04-25 13:30:00.000000", + "_timestamp": "2023-04-25 13:30:00", + "person_id": str(uuid4()), + "person_properties": {"$browser": "Chrome", "$os": "Mac OS X"}, + "team_id": team.pk, + "properties": { + "$browser": "Chrome", + "$os": "Mac OS X", + "$ip": "172.16.0.1", + "$current_url": "https://app.posthog.com", + }, + "distinct_id": str(uuid4()), + "elements_chain": 'strong.pricingpage:attr__class="pricingpage"nth-child="1"nth-of-type="1"text="A question?";', + }, + { + "uuid": str(uuid4()), + "event": "test", + "timestamp": "2023-04-25 14:29:00.000000", + "created_at": "2023-04-25 14:29:00.000000", + "inserted_at": "2023-04-25 14:29:00.000000", + "_timestamp": "2023-04-25 14:29:00", + "person_id": str(uuid4()), + "properties": { + "$browser": "Chrome", + "$os": "Mac OS X", + "$current_url": "https://app.posthog.com", + "$ip": "172.16.0.1", + }, + "team_id": team.pk, + "person_properties": {"$browser": "Chrome", "$os": "Mac OS X"}, + "distinct_id": str(uuid4()), + "elements_chain": 'strong.pricingpage:attr__class="pricingpage"nth-child="1"nth-of-type="1"text="A question?";', + }, + ] + + if interval == "day": + # Add an event outside the hour range but within the day range to ensure it's exported too. + events_outside_hour: list[EventValues] = [ + { + "uuid": str(uuid4()), + "event": "test", + "timestamp": "2023-04-25 00:30:00.000000", + "created_at": "2023-04-25 00:30:00.000000", + "inserted_at": "2023-04-25 00:30:00.000000", + "_timestamp": "2023-04-25 00:30:00", + "person_id": str(uuid4()), + "person_properties": {"$browser": "Chrome", "$os": "Mac OS X"}, + "team_id": team.pk, + "properties": { + "$browser": "Chrome", + "$os": "Mac OS X", + "$current_url": "https://app.posthog.com", + "$ip": "172.16.0.1", + }, + "distinct_id": str(uuid4()), + "elements_chain": 'strong.pricingpage:attr__class="pricingpage"nth-child="1"nth-of-type="1"text="A question?";', + } + ] + events += events_outside_hour + + ch_client = ClickHouseClient( + url=settings.CLICKHOUSE_HTTP_URL, + user=settings.CLICKHOUSE_USER, + password=settings.CLICKHOUSE_PASSWORD, + database=settings.CLICKHOUSE_DATABASE, + ) + + await insert_events( + client=ch_client, + events=events, + ) + + workflow_id = str(uuid4()) + inputs = RedshiftBatchExportInputs( + team_id=team.pk, + batch_export_id=str(batch_export.id), + data_interval_end="2023-04-25 14:30:00.000000", + interval=interval, + **batch_export.destination.config, + ) + + async with await WorkflowEnvironment.start_time_skipping() as activity_environment: + async with Worker( + activity_environment.client, + task_queue=settings.TEMPORAL_TASK_QUEUE, + workflows=[RedshiftBatchExportWorkflow], + activities=[ + create_export_run, + insert_into_redshift_activity, + update_export_run_status, + ], + workflow_runner=UnsandboxedWorkflowRunner(), + ): + with override_settings(BATCH_EXPORT_REDSHIFT_UPLOAD_CHUNK_SIZE_BYTES=5 * 1024**2): + await activity_environment.client.execute_workflow( + RedshiftBatchExportWorkflow.run, + inputs, + id=workflow_id, + task_queue=settings.TEMPORAL_TASK_QUEUE, + retry_policy=RetryPolicy(maximum_attempts=1), + execution_timeout=dt.timedelta(seconds=10), + ) + + runs = await afetch_batch_export_runs(batch_export_id=batch_export.id) + assert len(runs) == 1 + + run = runs[0] + assert run.status == "Completed" + + assert_events_in_redshift(redshift_connection, redshift_config["schema"], table_name, events) From f6fed219d33af2c40d0f8477288b4a79031856f9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Far=C3=ADas=20Santana?= Date: Tue, 31 Oct 2023 11:19:03 +0100 Subject: [PATCH 13/18] feat: Frontend support for Redshift batch exports --- .../batch_exports/BatchExportEditForm.tsx | 59 ++++++++++++++++++- .../batch_exports/batchExportEditLogic.ts | 22 ++++++- frontend/src/types.ts | 17 ++++++ 3 files changed, 96 insertions(+), 2 deletions(-) diff --git a/frontend/src/scenes/batch_exports/BatchExportEditForm.tsx b/frontend/src/scenes/batch_exports/BatchExportEditForm.tsx index d005ed357b72c..8ed7e5cdd50d9 100644 --- a/frontend/src/scenes/batch_exports/BatchExportEditForm.tsx +++ b/frontend/src/scenes/batch_exports/BatchExportEditForm.tsx @@ -148,7 +148,8 @@ export function BatchExportsEditForm(props: BatchExportsEditLogicProps): JSX.Ele + + + + + + + + ) : batchExportConfigForm.destination === 'Redshift' ? ( + <> + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + & Partial & + Partial & Partial & Partial & Partial & { - destination: 'S3' | 'Snowflake' | 'Postgres' | 'BigQuery' + destination: 'S3' | 'Snowflake' | 'Postgres' | 'BigQuery' | 'Redshift' start_at: Dayjs | null end_at: Dayjs | null json_config_file?: File[] | null @@ -64,6 +66,19 @@ const formFields = ( exclude_events: '', include_events: '', } + : destination === 'Redshift' + ? { + user: isNew ? (!config.user ? 'This field is required' : '') : '', + password: isNew ? (!config.password ? 'This field is required' : '') : '', + host: !config.host ? 'This field is required' : '', + port: !config.port ? 'This field is required' : '', + database: !config.database ? 'This field is required' : '', + schema: !config.schema ? 'This field is required' : '', + table_name: !config.table_name ? 'This field is required' : '', + properties_data_type: 'varchar', + exclude_events: '', + include_events: '', + } : destination === 'S3' ? { bucket_name: !config.bucket_name ? 'This field is required' : '', @@ -143,6 +158,11 @@ export const batchExportsEditLogic = kea([ type: 'S3', config: config, } as unknown as BatchExportDestinationS3) + : destination === 'Redshift' + ? ({ + type: 'Redshift', + config: config, + } as unknown as BatchExportDestinationRedshift) : destination === 'BigQuery' ? ({ type: 'BigQuery', diff --git a/frontend/src/types.ts b/frontend/src/types.ts index 21c2465b8e0d7..fe163c0105b5b 100644 --- a/frontend/src/types.ts +++ b/frontend/src/types.ts @@ -3204,11 +3204,28 @@ export type BatchExportDestinationBigQuery = { } } +export type BatchExportDestinationRedshift = { + type: 'Redshift' + config: { + user: string + password: string + host: string + port: number + database: string + schema: string + table_name: string + properties_data_type: boolean + exclude_events: string[] + include_events: string[] + } +} + export type BatchExportDestination = | BatchExportDestinationS3 | BatchExportDestinationSnowflake | BatchExportDestinationPostgres | BatchExportDestinationBigQuery + | BatchExportDestinationRedshift export type BatchExportConfiguration = { // User provided data for the export. This is the data that the user From 550350e7a4899dc75706605932d6917cddd5e6a0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Far=C3=ADas=20Santana?= Date: Tue, 31 Oct 2023 11:53:11 +0100 Subject: [PATCH 14/18] docs: Add tests README.md --- .../temporal/tests/batch_exports/README.md | 38 +++++++++++++++++++ 1 file changed, 38 insertions(+) create mode 100644 posthog/temporal/tests/batch_exports/README.md diff --git a/posthog/temporal/tests/batch_exports/README.md b/posthog/temporal/tests/batch_exports/README.md new file mode 100644 index 0000000000000..5d2b631338c4f --- /dev/null +++ b/posthog/temporal/tests/batch_exports/README.md @@ -0,0 +1,38 @@ +# Testing batch exports + +This module contains unit tests covering activities, workflows, and helper functions that power batch exports. Tests are divided by destination, and some destinations require setup steps to enable tests. + +## Testing BigQuery batch exports + +BigQuery batch exports can be tested against a real BigQuery instance, but doing so requires additional setup. For this reason, these tests are skipped unless an environment variable pointing to a BigQuery credentials file (`GOOGLE_APPLICATION_CREDENTIALS=/path/to/my/project-credentials.json`) is set. + +> :warning: Since BigQuery batch export tests require additional setup, we skip them by default and will not be ran by automated CI pipelines. Please ensure these tests pass when making changes that affect BigQuery batch exports. + +To enable testing for BigQuery batch exports, we require: +1. A BigQuery project and dataset +2. A BigQuery ServiceAccount with access to said project and dataset. See the [BigQuery batch export documentation](https://posthog.com/docs/cdp/batch-exports/bigquery#setting-up-bigquery-access) on detailed steps to setup a ServiceAccount. + +Then, a [key](https://cloud.google.com/iam/docs/keys-create-delete#creating) can be created for the BigQuery ServiceAccount and saved to a local file. For PostHog employees, this file should already be available under the PostHog password manager. + +Tests for BigQuery batch exports can be then run from the root of the `posthog` repo: + +```bash +DEBUG=1 GOOGLE_APPLICATION_CREDENTIALS=/path/to/my/project-credentials.json pytest posthog/temporal/tests/batch_exports/test_bigquery_batch_export_workflow.py +``` + +## Testing Redshift batch exports + +Redshift batch exports can be tested against a real Redshift (or Redshift Serverless) instance, with additional setup steps required. Due to this requirement, these tests are skipped unless Redshift credentials are specified in the environment. + +> :warning: Since Redshift batch export tests require additional setup, we skip them by default and will not be ran by automated CI pipelines. Please ensure these tests pass when making changes that affect Redshift batch exports. + +To enable testing for Redshift batch exports, we require: +1. A Redshift (or Redshift Serverless) instance. +2. Network access to this instance (via a VPN connection or jumphost, making a Redshift instance publicly available has serious security implications). +3. User credentials (user requires `CREATEDB` permissions for testing but **not** superuser access). + +For PostHog employees, check the password manager as a set of development credentials should already be available. With these credentials, and after connecting to the appropriate VPN, we can run the tests from the root of the `posthog` repo with: + +```bash +DEBUG=1 REDSHIFT_HOST=workgroup.111222333.region.redshift-serverless.amazonaws.com REDSHIFT_USER=test_user REDSHIFT_PASSWORD=test_password pytest posthog/temporal/tests/batch_exports/test_redshift_batch_export_workflow.py +``` From c08f8386972a83504e379cb834d25196f51b1a03 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Far=C3=ADas=20Santana?= Date: Tue, 31 Oct 2023 12:10:19 +0100 Subject: [PATCH 15/18] fix: Use correct fixture name in test --- .../batch_exports/test_redshift_batch_export_workflow.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/posthog/temporal/tests/batch_exports/test_redshift_batch_export_workflow.py b/posthog/temporal/tests/batch_exports/test_redshift_batch_export_workflow.py index 223291cbe15b9..812a4f7edd00c 100644 --- a/posthog/temporal/tests/batch_exports/test_redshift_batch_export_workflow.py +++ b/posthog/temporal/tests/batch_exports/test_redshift_batch_export_workflow.py @@ -343,7 +343,7 @@ async def test_insert_into_redshift_activity_inserts_data_into_redshift_table( @pytest.mark.parametrize("interval", ["hour", "day"]) async def test_redshift_export_workflow( redshift_config, - redshift_connection, + psycopg2_connection, interval, ): """Test Redshift Export Workflow end-to-end.""" @@ -482,4 +482,4 @@ async def test_redshift_export_workflow( run = runs[0] assert run.status == "Completed" - assert_events_in_redshift(redshift_connection, redshift_config["schema"], table_name, events) + assert_events_in_redshift(psycopg2_connection, redshift_config["schema"], table_name, events) From d2934f804469383bc15b61d14eaf95b84b7cdc73 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Far=C3=ADas=20Santana?= Date: Tue, 31 Oct 2023 12:35:14 +0100 Subject: [PATCH 16/18] fix: Set default properties data type --- frontend/src/scenes/batch_exports/BatchExportEditForm.tsx | 3 ++- frontend/src/scenes/batch_exports/batchExportEditLogic.ts | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/frontend/src/scenes/batch_exports/BatchExportEditForm.tsx b/frontend/src/scenes/batch_exports/BatchExportEditForm.tsx index 8ed7e5cdd50d9..78a5d8d4dd2e6 100644 --- a/frontend/src/scenes/batch_exports/BatchExportEditForm.tsx +++ b/frontend/src/scenes/batch_exports/BatchExportEditForm.tsx @@ -415,12 +415,13 @@ export function BatchExportsEditForm(props: BatchExportsEditLogicProps): JSX.Ele - + diff --git a/frontend/src/scenes/batch_exports/batchExportEditLogic.ts b/frontend/src/scenes/batch_exports/batchExportEditLogic.ts index 7c04dfcdf69be..5931b444d080d 100644 --- a/frontend/src/scenes/batch_exports/batchExportEditLogic.ts +++ b/frontend/src/scenes/batch_exports/batchExportEditLogic.ts @@ -75,7 +75,7 @@ const formFields = ( database: !config.database ? 'This field is required' : '', schema: !config.schema ? 'This field is required' : '', table_name: !config.table_name ? 'This field is required' : '', - properties_data_type: 'varchar', + properties_data_type: '', exclude_events: '', include_events: '', } From 16f02670f8e01c0ad80454b1b1678c93680de7ae Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Far=C3=ADas=20Santana?= Date: Wed, 1 Nov 2023 00:31:24 +0100 Subject: [PATCH 17/18] fix(batch-exports): Update test --- .../commands/test/test_create_batch_export_from_app.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/posthog/management/commands/test/test_create_batch_export_from_app.py b/posthog/management/commands/test/test_create_batch_export_from_app.py index b6832221d2d71..6932f518928d7 100644 --- a/posthog/management/commands/test/test_create_batch_export_from_app.py +++ b/posthog/management/commands/test/test_create_batch_export_from_app.py @@ -506,6 +506,7 @@ def test_create_batch_export_from_app_with_backfill(interval, plugin_config): batch_export_id = str(batch_export_data["id"]) workflows = wait_for_workflow_executions(temporal, query=f'TemporalScheduledById="{batch_export_id}"') - assert len(workflows) == 1 + # In the event the test takes too long, we may spawn more than one run + assert len(workflows) >= 1 workflow_execution = workflows[0] assert workflow_execution.workflow_type == f"{export_type.lower()}-export" From 01412f65f8235dd5f4b0b6b01fd261b47c2d50fb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Far=C3=ADas=20Santana?= Date: Wed, 1 Nov 2023 17:49:00 +0100 Subject: [PATCH 18/18] fix: Add activity to list of supported activities --- posthog/temporal/workflows/__init__.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/posthog/temporal/workflows/__init__.py b/posthog/temporal/workflows/__init__.py index c7909cfefc615..4b27b56500be6 100644 --- a/posthog/temporal/workflows/__init__.py +++ b/posthog/temporal/workflows/__init__.py @@ -22,6 +22,7 @@ ) from posthog.temporal.workflows.redshift_batch_export import ( RedshiftBatchExportWorkflow, + insert_into_redshift_activity, ) from posthog.temporal.workflows.s3_batch_export import ( S3BatchExportWorkflow, @@ -54,6 +55,7 @@ get_schedule_frequency, insert_into_bigquery_activity, insert_into_postgres_activity, + insert_into_redshift_activity, insert_into_s3_activity, insert_into_snowflake_activity, noop_activity,