From f0a4c12d4675f5b1a3cbc3426c91459b0a5608ed Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Far=C3=ADas=20Santana?= Date: Tue, 19 Mar 2024 17:10:15 +0100 Subject: [PATCH] feat: Start tracking records exported --- posthog/batch_exports/service.py | 6 ++++-- posthog/temporal/batch_exports/batch_exports.py | 6 +++++- posthog/temporal/batch_exports/bigquery_batch_export.py | 6 ++++-- posthog/temporal/batch_exports/postgres_batch_export.py | 6 ++++-- posthog/temporal/batch_exports/redshift_batch_export.py | 9 ++++++++- posthog/temporal/batch_exports/s3_batch_export.py | 6 ++++-- posthog/temporal/batch_exports/snowflake_batch_export.py | 6 ++++-- .../tests/batch_exports/test_s3_batch_export_workflow.py | 3 +++ 8 files changed, 36 insertions(+), 12 deletions(-) diff --git a/posthog/batch_exports/service.py b/posthog/batch_exports/service.py index f3d5715220bf3..50ae6b0fd21f8 100644 --- a/posthog/batch_exports/service.py +++ b/posthog/batch_exports/service.py @@ -438,14 +438,16 @@ def create_batch_export_run( return run -def update_batch_export_run_status(run_id: UUID, status: str, latest_error: str | None) -> BatchExportRun: +def update_batch_export_run_status( + run_id: UUID, status: str, latest_error: str | None, records_completed: int = 0 +) -> BatchExportRun: """Update the status of an BatchExportRun with given id. Arguments: id: The id of the BatchExportRun to update. """ model = BatchExportRun.objects.filter(id=run_id) - updated = model.update(status=status, latest_error=latest_error) + updated = model.update(status=status, latest_error=latest_error, records_completed=records_completed) if not updated: raise ValueError(f"BatchExportRun with id {run_id} not found.") diff --git a/posthog/temporal/batch_exports/batch_exports.py b/posthog/temporal/batch_exports/batch_exports.py index 8fa61a370d5c3..8c87718ae3590 100644 --- a/posthog/temporal/batch_exports/batch_exports.py +++ b/posthog/temporal/batch_exports/batch_exports.py @@ -534,6 +534,7 @@ class UpdateBatchExportRunStatusInputs: status: str team_id: int latest_error: str | None = None + records_completed: int = 0 @activity.defn @@ -545,6 +546,7 @@ async def update_export_run_status(inputs: UpdateBatchExportRunStatusInputs) -> run_id=uuid.UUID(inputs.id), status=inputs.status, latest_error=inputs.latest_error, + records_completed=inputs.records_completed, ) if batch_export_run.status in (BatchExportRun.Status.FAILED, BatchExportRun.Status.FAILED_RETRYABLE): @@ -664,7 +666,7 @@ async def execute_batch_export_insert_activity( ) try: - await workflow.execute_activity( + records_completed = await workflow.execute_activity( activity, inputs, start_to_close_timeout=dt.timedelta(seconds=start_to_close_timeout_seconds), @@ -690,6 +692,8 @@ async def execute_batch_export_insert_activity( finally: get_export_finished_metric(status=update_inputs.status.lower()).add(1) + + update_inputs.records_completed = records_completed await workflow.execute_activity( update_export_run_status, update_inputs, diff --git a/posthog/temporal/batch_exports/bigquery_batch_export.py b/posthog/temporal/batch_exports/bigquery_batch_export.py index 9f39a302e9365..97ee8fef228c3 100644 --- a/posthog/temporal/batch_exports/bigquery_batch_export.py +++ b/posthog/temporal/batch_exports/bigquery_batch_export.py @@ -193,7 +193,7 @@ def bigquery_default_fields() -> list[BatchExportField]: @activity.defn -async def insert_into_bigquery_activity(inputs: BigQueryInsertInputs): +async def insert_into_bigquery_activity(inputs: BigQueryInsertInputs) -> int: """Activity streams data from ClickHouse to BigQuery.""" logger = await bind_temporal_worker_logger(team_id=inputs.team_id, destination="BigQuery") logger.info( @@ -230,7 +230,7 @@ async def insert_into_bigquery_activity(inputs: BigQueryInsertInputs): inputs.data_interval_start, inputs.data_interval_end, ) - return + return 0 logger.info("BatchExporting %s rows", count) @@ -354,6 +354,8 @@ async def flush_to_bigquery(bigquery_table, table_schema): jsonl_file.reset() + return jsonl_file.records_total + @workflow.defn(name="bigquery-export") class BigQueryBatchExportWorkflow(PostHogWorkflow): diff --git a/posthog/temporal/batch_exports/postgres_batch_export.py b/posthog/temporal/batch_exports/postgres_batch_export.py index ef528b96f35ad..ab46cadc6941f 100644 --- a/posthog/temporal/batch_exports/postgres_batch_export.py +++ b/posthog/temporal/batch_exports/postgres_batch_export.py @@ -234,7 +234,7 @@ class PostgresInsertInputs: @activity.defn -async def insert_into_postgres_activity(inputs: PostgresInsertInputs): +async def insert_into_postgres_activity(inputs: PostgresInsertInputs) -> int: """Activity streams data from ClickHouse to Postgres.""" logger = await bind_temporal_worker_logger(team_id=inputs.team_id, destination="PostgreSQL") logger.info( @@ -262,7 +262,7 @@ async def insert_into_postgres_activity(inputs: PostgresInsertInputs): inputs.data_interval_start, inputs.data_interval_end, ) - return + return 0 logger.info("BatchExporting %s rows", count) @@ -359,6 +359,8 @@ async def flush_to_postgres(): if pg_file.tell() > 0: await flush_to_postgres() + return pg_file.records_total + @workflow.defn(name="postgres-export") class PostgresBatchExportWorkflow(PostHogWorkflow): diff --git a/posthog/temporal/batch_exports/redshift_batch_export.py b/posthog/temporal/batch_exports/redshift_batch_export.py index e47dcd2924517..6a3d97cf6ed58 100644 --- a/posthog/temporal/batch_exports/redshift_batch_export.py +++ b/posthog/temporal/batch_exports/redshift_batch_export.py @@ -171,7 +171,7 @@ async def insert_records_to_redshift( schema: str | None, table: str, batch_size: int = 100, -): +) -> int: """Execute an INSERT query with given Redshift connection. The recommended way to insert multiple values into Redshift is using a COPY statement (see: @@ -206,15 +206,20 @@ async def insert_records_to_redshift( template = sql.SQL("({})").format(sql.SQL(", ").join(map(sql.Placeholder, columns))) rows_exported = get_rows_exported_metric() + total_rows_exported = 0 + async with async_client_cursor_from_connection(redshift_connection) as cursor: batch = [] pre_query_str = pre_query.as_string(cursor).encode("utf-8") async def flush_to_redshift(batch): + nonlocal total_rows_exported + values = b",".join(batch).replace(b" E'", b" '") await cursor.execute(pre_query_str + values) rows_exported.add(len(batch)) + total_rows_exported += len(batch) # It would be nice to record BYTES_EXPORTED for Redshift, but it's not worth estimating # the byte size of each batch the way things are currently written. We can revisit this # in the future if we decide it's useful enough. @@ -230,6 +235,8 @@ async def flush_to_redshift(batch): if len(batch) > 0: await flush_to_redshift(batch) + return total_rows_exported + @contextlib.asynccontextmanager async def async_client_cursor_from_connection( diff --git a/posthog/temporal/batch_exports/s3_batch_export.py b/posthog/temporal/batch_exports/s3_batch_export.py index 29ccf7f9628c1..30fd55b5b9248 100644 --- a/posthog/temporal/batch_exports/s3_batch_export.py +++ b/posthog/temporal/batch_exports/s3_batch_export.py @@ -382,7 +382,7 @@ def s3_default_fields() -> list[BatchExportField]: @activity.defn -async def insert_into_s3_activity(inputs: S3InsertInputs): +async def insert_into_s3_activity(inputs: S3InsertInputs) -> int: """Activity to batch export data from PostHog's ClickHouse to S3. It currently only creates a single file per run, and uploads as a multipart upload. @@ -418,7 +418,7 @@ async def insert_into_s3_activity(inputs: S3InsertInputs): inputs.data_interval_start, inputs.data_interval_end, ) - return + return 0 logger.info("BatchExporting %s rows to S3", count) @@ -503,6 +503,8 @@ async def flush_to_s3(last_uploaded_part_timestamp: str, last=False): await s3_upload.complete() + return local_results_file.records_total + @workflow.defn(name="s3-export") class S3BatchExportWorkflow(PostHogWorkflow): diff --git a/posthog/temporal/batch_exports/snowflake_batch_export.py b/posthog/temporal/batch_exports/snowflake_batch_export.py index c97b495979a2b..be94eca89a799 100644 --- a/posthog/temporal/batch_exports/snowflake_batch_export.py +++ b/posthog/temporal/batch_exports/snowflake_batch_export.py @@ -388,7 +388,7 @@ async def copy_loaded_files_to_snowflake_table( @activity.defn -async def insert_into_snowflake_activity(inputs: SnowflakeInsertInputs): +async def insert_into_snowflake_activity(inputs: SnowflakeInsertInputs) -> int: """Activity streams data from ClickHouse to Snowflake. TODO: We're using JSON here, it's not the most efficient way to do this. @@ -430,7 +430,7 @@ async def insert_into_snowflake_activity(inputs: SnowflakeInsertInputs): inputs.data_interval_start, inputs.data_interval_end, ) - return + return 0 logger.info("BatchExporting %s rows", count) @@ -553,6 +553,8 @@ async def worker_shutdown_handler(): await copy_loaded_files_to_snowflake_table(connection, inputs.table_name) + return local_results_file.records_total + @workflow.defn(name="snowflake-export") class SnowflakeBatchExportWorkflow(PostHogWorkflow): diff --git a/posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py b/posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py index 17b1aaaa94d4a..b1691e1a9d7e8 100644 --- a/posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py +++ b/posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py @@ -805,6 +805,7 @@ async def test_s3_export_workflow_defaults_to_timestamp_on_null_inserted_at( run = runs[0] assert run.status == "Completed" + assert run.records_completed == 100 await assert_clickhouse_records_in_s3( s3_compatible_client=minio_client, @@ -893,6 +894,7 @@ async def test_s3_export_workflow_with_minio_bucket_and_custom_key_prefix( run = runs[0] assert run.status == "Completed" + assert run.records_completed == 100 expected_key_prefix = s3_key_prefix.format( table="events", @@ -968,6 +970,7 @@ async def insert_into_s3_activity_mocked(_: S3InsertInputs) -> str: run = runs[0] assert run.status == "FailedRetryable" assert run.latest_error == "ValueError: A useful error message" + assert run.records_completed == 0 async def test_s3_export_workflow_handles_insert_activity_non_retryable_errors(ateam, s3_batch_export, interval):