diff --git a/posthog/batch_exports/service.py b/posthog/batch_exports/service.py index f026fce03fb3f..c26be9a77ed1a 100644 --- a/posthog/batch_exports/service.py +++ b/posthog/batch_exports/service.py @@ -439,14 +439,16 @@ def create_batch_export_run( return run -def update_batch_export_run_status(run_id: UUID, status: str, latest_error: str | None) -> BatchExportRun: +def update_batch_export_run_status( + run_id: UUID, status: str, latest_error: str | None, records_completed: int = 0 +) -> BatchExportRun: """Update the status of an BatchExportRun with given id. Arguments: id: The id of the BatchExportRun to update. """ model = BatchExportRun.objects.filter(id=run_id) - updated = model.update(status=status, latest_error=latest_error) + updated = model.update(status=status, latest_error=latest_error, records_completed=records_completed) if not updated: raise ValueError(f"BatchExportRun with id {run_id} not found.") diff --git a/posthog/temporal/batch_exports/batch_exports.py b/posthog/temporal/batch_exports/batch_exports.py index 8fa61a370d5c3..c776e1f245ef3 100644 --- a/posthog/temporal/batch_exports/batch_exports.py +++ b/posthog/temporal/batch_exports/batch_exports.py @@ -534,6 +534,7 @@ class UpdateBatchExportRunStatusInputs: status: str team_id: int latest_error: str | None = None + records_completed: int = 0 @activity.defn @@ -545,6 +546,7 @@ async def update_export_run_status(inputs: UpdateBatchExportRunStatusInputs) -> run_id=uuid.UUID(inputs.id), status=inputs.status, latest_error=inputs.latest_error, + records_completed=inputs.records_completed, ) if batch_export_run.status in (BatchExportRun.Status.FAILED, BatchExportRun.Status.FAILED_RETRYABLE): @@ -664,13 +666,14 @@ async def execute_batch_export_insert_activity( ) try: - await workflow.execute_activity( + records_completed = await workflow.execute_activity( activity, inputs, start_to_close_timeout=dt.timedelta(seconds=start_to_close_timeout_seconds), heartbeat_timeout=dt.timedelta(seconds=heartbeat_timeout_seconds) if heartbeat_timeout_seconds else None, retry_policy=retry_policy, ) + update_inputs.records_completed = records_completed except exceptions.ActivityError as e: if isinstance(e.cause, exceptions.CancelledError): @@ -690,6 +693,7 @@ async def execute_batch_export_insert_activity( finally: get_export_finished_metric(status=update_inputs.status.lower()).add(1) + await workflow.execute_activity( update_export_run_status, update_inputs, diff --git a/posthog/temporal/batch_exports/bigquery_batch_export.py b/posthog/temporal/batch_exports/bigquery_batch_export.py index 9f39a302e9365..a0469de79bb9e 100644 --- a/posthog/temporal/batch_exports/bigquery_batch_export.py +++ b/posthog/temporal/batch_exports/bigquery_batch_export.py @@ -193,7 +193,7 @@ def bigquery_default_fields() -> list[BatchExportField]: @activity.defn -async def insert_into_bigquery_activity(inputs: BigQueryInsertInputs): +async def insert_into_bigquery_activity(inputs: BigQueryInsertInputs) -> int: """Activity streams data from ClickHouse to BigQuery.""" logger = await bind_temporal_worker_logger(team_id=inputs.team_id, destination="BigQuery") logger.info( @@ -230,7 +230,7 @@ async def insert_into_bigquery_activity(inputs: BigQueryInsertInputs): inputs.data_interval_start, inputs.data_interval_end, ) - return + return 0 logger.info("BatchExporting %s rows", count) @@ -354,6 +354,8 @@ async def flush_to_bigquery(bigquery_table, table_schema): jsonl_file.reset() + return jsonl_file.records_total + @workflow.defn(name="bigquery-export") class BigQueryBatchExportWorkflow(PostHogWorkflow): diff --git a/posthog/temporal/batch_exports/http_batch_export.py b/posthog/temporal/batch_exports/http_batch_export.py index fde06c6c48d85..8aca65c80ff38 100644 --- a/posthog/temporal/batch_exports/http_batch_export.py +++ b/posthog/temporal/batch_exports/http_batch_export.py @@ -152,7 +152,7 @@ async def post_json_file_to_url(url, batch_file, session: aiohttp.ClientSession) @activity.defn -async def insert_into_http_activity(inputs: HttpInsertInputs): +async def insert_into_http_activity(inputs: HttpInsertInputs) -> int: """Activity streams data from ClickHouse to an HTTP Endpoint.""" logger = await bind_temporal_worker_logger(team_id=inputs.team_id, destination="HTTP") logger.info( @@ -180,7 +180,7 @@ async def insert_into_http_activity(inputs: HttpInsertInputs): inputs.data_interval_start, inputs.data_interval_end, ) - return + return 0 logger.info("BatchExporting %s rows", count) @@ -303,6 +303,8 @@ async def flush_batch_to_http_endpoint(last_uploaded_timestamp: str, session: ai last_uploaded_timestamp = str(inserted_at) await flush_batch_to_http_endpoint(last_uploaded_timestamp, session) + return batch_file.records_total + @workflow.defn(name="http-export") class HttpBatchExportWorkflow(PostHogWorkflow): diff --git a/posthog/temporal/batch_exports/postgres_batch_export.py b/posthog/temporal/batch_exports/postgres_batch_export.py index ef528b96f35ad..5dbfc6faa4acf 100644 --- a/posthog/temporal/batch_exports/postgres_batch_export.py +++ b/posthog/temporal/batch_exports/postgres_batch_export.py @@ -234,7 +234,7 @@ class PostgresInsertInputs: @activity.defn -async def insert_into_postgres_activity(inputs: PostgresInsertInputs): +async def insert_into_postgres_activity(inputs: PostgresInsertInputs) -> int: """Activity streams data from ClickHouse to Postgres.""" logger = await bind_temporal_worker_logger(team_id=inputs.team_id, destination="PostgreSQL") logger.info( @@ -262,7 +262,7 @@ async def insert_into_postgres_activity(inputs: PostgresInsertInputs): inputs.data_interval_start, inputs.data_interval_end, ) - return + return 0 logger.info("BatchExporting %s rows", count) @@ -359,6 +359,8 @@ async def flush_to_postgres(): if pg_file.tell() > 0: await flush_to_postgres() + return pg_file.records_total + @workflow.defn(name="postgres-export") class PostgresBatchExportWorkflow(PostHogWorkflow): diff --git a/posthog/temporal/batch_exports/redshift_batch_export.py b/posthog/temporal/batch_exports/redshift_batch_export.py index e47dcd2924517..bc1549cef838f 100644 --- a/posthog/temporal/batch_exports/redshift_batch_export.py +++ b/posthog/temporal/batch_exports/redshift_batch_export.py @@ -171,7 +171,7 @@ async def insert_records_to_redshift( schema: str | None, table: str, batch_size: int = 100, -): +) -> int: """Execute an INSERT query with given Redshift connection. The recommended way to insert multiple values into Redshift is using a COPY statement (see: @@ -206,15 +206,20 @@ async def insert_records_to_redshift( template = sql.SQL("({})").format(sql.SQL(", ").join(map(sql.Placeholder, columns))) rows_exported = get_rows_exported_metric() + total_rows_exported = 0 + async with async_client_cursor_from_connection(redshift_connection) as cursor: batch = [] pre_query_str = pre_query.as_string(cursor).encode("utf-8") async def flush_to_redshift(batch): + nonlocal total_rows_exported + values = b",".join(batch).replace(b" E'", b" '") await cursor.execute(pre_query_str + values) rows_exported.add(len(batch)) + total_rows_exported += len(batch) # It would be nice to record BYTES_EXPORTED for Redshift, but it's not worth estimating # the byte size of each batch the way things are currently written. We can revisit this # in the future if we decide it's useful enough. @@ -230,6 +235,8 @@ async def flush_to_redshift(batch): if len(batch) > 0: await flush_to_redshift(batch) + return total_rows_exported + @contextlib.asynccontextmanager async def async_client_cursor_from_connection( @@ -264,7 +271,7 @@ class RedshiftInsertInputs(PostgresInsertInputs): @activity.defn -async def insert_into_redshift_activity(inputs: RedshiftInsertInputs): +async def insert_into_redshift_activity(inputs: RedshiftInsertInputs) -> int: """Activity to insert data from ClickHouse to Redshift. This activity executes the following steps: @@ -306,7 +313,7 @@ async def insert_into_redshift_activity(inputs: RedshiftInsertInputs): inputs.data_interval_start, inputs.data_interval_end, ) - return + return 0 logger.info("BatchExporting %s rows", count) @@ -383,13 +390,15 @@ def map_to_record(row: dict) -> dict: return record async with postgres_connection(inputs) as connection: - await insert_records_to_redshift( + records_completed = await insert_records_to_redshift( (map_to_record(record) for record_batch in record_iterator for record in record_batch.to_pylist()), connection, inputs.schema, inputs.table_name, ) + return records_completed + @workflow.defn(name="redshift-export") class RedshiftBatchExportWorkflow(PostHogWorkflow): diff --git a/posthog/temporal/batch_exports/s3_batch_export.py b/posthog/temporal/batch_exports/s3_batch_export.py index 365613fd49ae4..4d99cbeffd7c3 100644 --- a/posthog/temporal/batch_exports/s3_batch_export.py +++ b/posthog/temporal/batch_exports/s3_batch_export.py @@ -388,7 +388,7 @@ def s3_default_fields() -> list[BatchExportField]: @activity.defn -async def insert_into_s3_activity(inputs: S3InsertInputs): +async def insert_into_s3_activity(inputs: S3InsertInputs) -> int: """Activity to batch export data from PostHog's ClickHouse to S3. It currently only creates a single file per run, and uploads as a multipart upload. @@ -424,7 +424,7 @@ async def insert_into_s3_activity(inputs: S3InsertInputs): inputs.data_interval_start, inputs.data_interval_end, ) - return + return 0 logger.info("BatchExporting %s rows to S3", count) @@ -509,6 +509,8 @@ async def flush_to_s3(last_uploaded_part_timestamp: str, last=False): await s3_upload.complete() + return local_results_file.records_total + @workflow.defn(name="s3-export") class S3BatchExportWorkflow(PostHogWorkflow): diff --git a/posthog/temporal/batch_exports/snowflake_batch_export.py b/posthog/temporal/batch_exports/snowflake_batch_export.py index c97b495979a2b..be94eca89a799 100644 --- a/posthog/temporal/batch_exports/snowflake_batch_export.py +++ b/posthog/temporal/batch_exports/snowflake_batch_export.py @@ -388,7 +388,7 @@ async def copy_loaded_files_to_snowflake_table( @activity.defn -async def insert_into_snowflake_activity(inputs: SnowflakeInsertInputs): +async def insert_into_snowflake_activity(inputs: SnowflakeInsertInputs) -> int: """Activity streams data from ClickHouse to Snowflake. TODO: We're using JSON here, it's not the most efficient way to do this. @@ -430,7 +430,7 @@ async def insert_into_snowflake_activity(inputs: SnowflakeInsertInputs): inputs.data_interval_start, inputs.data_interval_end, ) - return + return 0 logger.info("BatchExporting %s rows", count) @@ -553,6 +553,8 @@ async def worker_shutdown_handler(): await copy_loaded_files_to_snowflake_table(connection, inputs.table_name) + return local_results_file.records_total + @workflow.defn(name="snowflake-export") class SnowflakeBatchExportWorkflow(PostHogWorkflow): diff --git a/posthog/temporal/tests/batch_exports/test_bigquery_batch_export_workflow.py b/posthog/temporal/tests/batch_exports/test_bigquery_batch_export_workflow.py index a7629309d940a..b2c46f6344dbc 100644 --- a/posthog/temporal/tests/batch_exports/test_bigquery_batch_export_workflow.py +++ b/posthog/temporal/tests/batch_exports/test_bigquery_batch_export_workflow.py @@ -453,6 +453,7 @@ async def test_bigquery_export_workflow( run = runs[0] assert run.status == "Completed" + assert run.records_completed == 100 ingested_timestamp = frozen_time().replace(tzinfo=dt.timezone.utc) assert_clickhouse_records_in_bigquery( @@ -566,6 +567,7 @@ class RefreshError(Exception): run = runs[0] assert run.status == "Failed" assert run.latest_error == "RefreshError: A useful error message" + assert run.records_completed == 0 async def test_bigquery_export_workflow_handles_cancellation(ateam, bigquery_batch_export, interval): diff --git a/posthog/temporal/tests/batch_exports/test_http_batch_export_workflow.py b/posthog/temporal/tests/batch_exports/test_http_batch_export_workflow.py index c2aa9646b6ddb..6267577472125 100644 --- a/posthog/temporal/tests/batch_exports/test_http_batch_export_workflow.py +++ b/posthog/temporal/tests/batch_exports/test_http_batch_export_workflow.py @@ -370,6 +370,7 @@ async def test_http_export_workflow( run = runs[0] assert run.status == "Completed" + assert run.records_completed == 100 await assert_clickhouse_records_in_mock_server( mock_server=mock_server, @@ -425,6 +426,7 @@ async def insert_into_http_activity_mocked(_: HttpInsertInputs) -> str: run = runs[0] assert run.status == "FailedRetryable" assert run.latest_error == "ValueError: A useful error message" + assert run.records_completed == 0 async def test_http_export_workflow_handles_insert_activity_non_retryable_errors(ateam, http_batch_export, interval): diff --git a/posthog/temporal/tests/batch_exports/test_postgres_batch_export_workflow.py b/posthog/temporal/tests/batch_exports/test_postgres_batch_export_workflow.py index e19e9c391dcfc..c486cc2747fcc 100644 --- a/posthog/temporal/tests/batch_exports/test_postgres_batch_export_workflow.py +++ b/posthog/temporal/tests/batch_exports/test_postgres_batch_export_workflow.py @@ -385,6 +385,7 @@ async def test_postgres_export_workflow( run = runs[0] assert run.status == "Completed" + assert run.records_completed == 100 await assert_clickhouse_records_in_postgres( postgres_connection=postgres_connection, @@ -494,6 +495,7 @@ class InsufficientPrivilege(Exception): run = runs[0] assert run.status == "Failed" assert run.latest_error == "InsufficientPrivilege: A useful error message" + assert run.records_completed == 0 async def test_postgres_export_workflow_handles_cancellation(ateam, postgres_batch_export, interval): diff --git a/posthog/temporal/tests/batch_exports/test_redshift_batch_export_workflow.py b/posthog/temporal/tests/batch_exports/test_redshift_batch_export_workflow.py index 301183f2998f4..173bed3a69bb3 100644 --- a/posthog/temporal/tests/batch_exports/test_redshift_batch_export_workflow.py +++ b/posthog/temporal/tests/batch_exports/test_redshift_batch_export_workflow.py @@ -433,6 +433,7 @@ async def test_redshift_export_workflow( run = runs[0] assert run.status == "Completed" + assert run.records_completed == 100 await assert_clickhouse_records_in_redshfit( redshift_connection=psycopg_connection, @@ -559,3 +560,4 @@ class InsufficientPrivilege(Exception): run = runs[0] assert run.status == "Failed" assert run.latest_error == "InsufficientPrivilege: A useful error message" + assert run.records_completed == 0 diff --git a/posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py b/posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py index 5e741f9223321..e04e345d11245 100644 --- a/posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py +++ b/posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py @@ -791,6 +791,7 @@ async def test_s3_export_workflow_defaults_to_timestamp_on_null_inserted_at( run = runs[0] assert run.status == "Completed" + assert run.records_completed == 100 await assert_clickhouse_records_in_s3( s3_compatible_client=minio_client, @@ -875,6 +876,7 @@ async def test_s3_export_workflow_with_minio_bucket_and_custom_key_prefix( run = runs[0] assert run.status == "Completed" + assert run.records_completed == 100 expected_key_prefix = s3_key_prefix.format( table="events", @@ -950,6 +952,7 @@ async def insert_into_s3_activity_mocked(_: S3InsertInputs) -> str: run = runs[0] assert run.status == "FailedRetryable" assert run.latest_error == "ValueError: A useful error message" + assert run.records_completed == 0 async def test_s3_export_workflow_handles_insert_activity_non_retryable_errors(ateam, s3_batch_export, interval): diff --git a/posthog/temporal/tests/batch_exports/test_snowflake_batch_export_workflow.py b/posthog/temporal/tests/batch_exports/test_snowflake_batch_export_workflow.py index f43803d46cbd4..f8c12a3d1369f 100644 --- a/posthog/temporal/tests/batch_exports/test_snowflake_batch_export_workflow.py +++ b/posthog/temporal/tests/batch_exports/test_snowflake_batch_export_workflow.py @@ -455,6 +455,7 @@ async def test_snowflake_export_workflow_exports_events( run = runs[0] assert run.status == "Completed" + assert run.records_completed == 10 @pytest.mark.parametrize("interval", ["hour", "day"], indirect=True) @@ -695,6 +696,7 @@ async def insert_into_snowflake_activity_mocked(_: SnowflakeInsertInputs) -> str run = runs[0] assert run.status == "FailedRetryable" assert run.latest_error == "ValueError: A useful error message" + assert run.records_completed == 0 async def test_snowflake_export_workflow_handles_insert_activity_non_retryable_errors(ateam, snowflake_batch_export):