From f0a4c12d4675f5b1a3cbc3426c91459b0a5608ed Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Far=C3=ADas=20Santana?= Date: Tue, 19 Mar 2024 17:10:15 +0100 Subject: [PATCH 01/11] feat: Start tracking records exported --- posthog/batch_exports/service.py | 6 ++++-- posthog/temporal/batch_exports/batch_exports.py | 6 +++++- posthog/temporal/batch_exports/bigquery_batch_export.py | 6 ++++-- posthog/temporal/batch_exports/postgres_batch_export.py | 6 ++++-- posthog/temporal/batch_exports/redshift_batch_export.py | 9 ++++++++- posthog/temporal/batch_exports/s3_batch_export.py | 6 ++++-- posthog/temporal/batch_exports/snowflake_batch_export.py | 6 ++++-- .../tests/batch_exports/test_s3_batch_export_workflow.py | 3 +++ 8 files changed, 36 insertions(+), 12 deletions(-) diff --git a/posthog/batch_exports/service.py b/posthog/batch_exports/service.py index f3d5715220bf3..50ae6b0fd21f8 100644 --- a/posthog/batch_exports/service.py +++ b/posthog/batch_exports/service.py @@ -438,14 +438,16 @@ def create_batch_export_run( return run -def update_batch_export_run_status(run_id: UUID, status: str, latest_error: str | None) -> BatchExportRun: +def update_batch_export_run_status( + run_id: UUID, status: str, latest_error: str | None, records_completed: int = 0 +) -> BatchExportRun: """Update the status of an BatchExportRun with given id. Arguments: id: The id of the BatchExportRun to update. """ model = BatchExportRun.objects.filter(id=run_id) - updated = model.update(status=status, latest_error=latest_error) + updated = model.update(status=status, latest_error=latest_error, records_completed=records_completed) if not updated: raise ValueError(f"BatchExportRun with id {run_id} not found.") diff --git a/posthog/temporal/batch_exports/batch_exports.py b/posthog/temporal/batch_exports/batch_exports.py index 8fa61a370d5c3..8c87718ae3590 100644 --- a/posthog/temporal/batch_exports/batch_exports.py +++ b/posthog/temporal/batch_exports/batch_exports.py @@ -534,6 +534,7 @@ class UpdateBatchExportRunStatusInputs: status: str team_id: int latest_error: str | None = None + records_completed: int = 0 @activity.defn @@ -545,6 +546,7 @@ async def update_export_run_status(inputs: UpdateBatchExportRunStatusInputs) -> run_id=uuid.UUID(inputs.id), status=inputs.status, latest_error=inputs.latest_error, + records_completed=inputs.records_completed, ) if batch_export_run.status in (BatchExportRun.Status.FAILED, BatchExportRun.Status.FAILED_RETRYABLE): @@ -664,7 +666,7 @@ async def execute_batch_export_insert_activity( ) try: - await workflow.execute_activity( + records_completed = await workflow.execute_activity( activity, inputs, start_to_close_timeout=dt.timedelta(seconds=start_to_close_timeout_seconds), @@ -690,6 +692,8 @@ async def execute_batch_export_insert_activity( finally: get_export_finished_metric(status=update_inputs.status.lower()).add(1) + + update_inputs.records_completed = records_completed await workflow.execute_activity( update_export_run_status, update_inputs, diff --git a/posthog/temporal/batch_exports/bigquery_batch_export.py b/posthog/temporal/batch_exports/bigquery_batch_export.py index 9f39a302e9365..97ee8fef228c3 100644 --- a/posthog/temporal/batch_exports/bigquery_batch_export.py +++ b/posthog/temporal/batch_exports/bigquery_batch_export.py @@ -193,7 +193,7 @@ def bigquery_default_fields() -> list[BatchExportField]: @activity.defn -async def insert_into_bigquery_activity(inputs: BigQueryInsertInputs): +async def insert_into_bigquery_activity(inputs: BigQueryInsertInputs) -> int: """Activity streams data from ClickHouse to BigQuery.""" logger = await bind_temporal_worker_logger(team_id=inputs.team_id, destination="BigQuery") logger.info( @@ -230,7 +230,7 @@ async def insert_into_bigquery_activity(inputs: BigQueryInsertInputs): inputs.data_interval_start, inputs.data_interval_end, ) - return + return 0 logger.info("BatchExporting %s rows", count) @@ -354,6 +354,8 @@ async def flush_to_bigquery(bigquery_table, table_schema): jsonl_file.reset() + return jsonl_file.records_total + @workflow.defn(name="bigquery-export") class BigQueryBatchExportWorkflow(PostHogWorkflow): diff --git a/posthog/temporal/batch_exports/postgres_batch_export.py b/posthog/temporal/batch_exports/postgres_batch_export.py index ef528b96f35ad..ab46cadc6941f 100644 --- a/posthog/temporal/batch_exports/postgres_batch_export.py +++ b/posthog/temporal/batch_exports/postgres_batch_export.py @@ -234,7 +234,7 @@ class PostgresInsertInputs: @activity.defn -async def insert_into_postgres_activity(inputs: PostgresInsertInputs): +async def insert_into_postgres_activity(inputs: PostgresInsertInputs) -> int: """Activity streams data from ClickHouse to Postgres.""" logger = await bind_temporal_worker_logger(team_id=inputs.team_id, destination="PostgreSQL") logger.info( @@ -262,7 +262,7 @@ async def insert_into_postgres_activity(inputs: PostgresInsertInputs): inputs.data_interval_start, inputs.data_interval_end, ) - return + return 0 logger.info("BatchExporting %s rows", count) @@ -359,6 +359,8 @@ async def flush_to_postgres(): if pg_file.tell() > 0: await flush_to_postgres() + return pg_file.records_total + @workflow.defn(name="postgres-export") class PostgresBatchExportWorkflow(PostHogWorkflow): diff --git a/posthog/temporal/batch_exports/redshift_batch_export.py b/posthog/temporal/batch_exports/redshift_batch_export.py index e47dcd2924517..6a3d97cf6ed58 100644 --- a/posthog/temporal/batch_exports/redshift_batch_export.py +++ b/posthog/temporal/batch_exports/redshift_batch_export.py @@ -171,7 +171,7 @@ async def insert_records_to_redshift( schema: str | None, table: str, batch_size: int = 100, -): +) -> int: """Execute an INSERT query with given Redshift connection. The recommended way to insert multiple values into Redshift is using a COPY statement (see: @@ -206,15 +206,20 @@ async def insert_records_to_redshift( template = sql.SQL("({})").format(sql.SQL(", ").join(map(sql.Placeholder, columns))) rows_exported = get_rows_exported_metric() + total_rows_exported = 0 + async with async_client_cursor_from_connection(redshift_connection) as cursor: batch = [] pre_query_str = pre_query.as_string(cursor).encode("utf-8") async def flush_to_redshift(batch): + nonlocal total_rows_exported + values = b",".join(batch).replace(b" E'", b" '") await cursor.execute(pre_query_str + values) rows_exported.add(len(batch)) + total_rows_exported += len(batch) # It would be nice to record BYTES_EXPORTED for Redshift, but it's not worth estimating # the byte size of each batch the way things are currently written. We can revisit this # in the future if we decide it's useful enough. @@ -230,6 +235,8 @@ async def flush_to_redshift(batch): if len(batch) > 0: await flush_to_redshift(batch) + return total_rows_exported + @contextlib.asynccontextmanager async def async_client_cursor_from_connection( diff --git a/posthog/temporal/batch_exports/s3_batch_export.py b/posthog/temporal/batch_exports/s3_batch_export.py index 29ccf7f9628c1..30fd55b5b9248 100644 --- a/posthog/temporal/batch_exports/s3_batch_export.py +++ b/posthog/temporal/batch_exports/s3_batch_export.py @@ -382,7 +382,7 @@ def s3_default_fields() -> list[BatchExportField]: @activity.defn -async def insert_into_s3_activity(inputs: S3InsertInputs): +async def insert_into_s3_activity(inputs: S3InsertInputs) -> int: """Activity to batch export data from PostHog's ClickHouse to S3. It currently only creates a single file per run, and uploads as a multipart upload. @@ -418,7 +418,7 @@ async def insert_into_s3_activity(inputs: S3InsertInputs): inputs.data_interval_start, inputs.data_interval_end, ) - return + return 0 logger.info("BatchExporting %s rows to S3", count) @@ -503,6 +503,8 @@ async def flush_to_s3(last_uploaded_part_timestamp: str, last=False): await s3_upload.complete() + return local_results_file.records_total + @workflow.defn(name="s3-export") class S3BatchExportWorkflow(PostHogWorkflow): diff --git a/posthog/temporal/batch_exports/snowflake_batch_export.py b/posthog/temporal/batch_exports/snowflake_batch_export.py index c97b495979a2b..be94eca89a799 100644 --- a/posthog/temporal/batch_exports/snowflake_batch_export.py +++ b/posthog/temporal/batch_exports/snowflake_batch_export.py @@ -388,7 +388,7 @@ async def copy_loaded_files_to_snowflake_table( @activity.defn -async def insert_into_snowflake_activity(inputs: SnowflakeInsertInputs): +async def insert_into_snowflake_activity(inputs: SnowflakeInsertInputs) -> int: """Activity streams data from ClickHouse to Snowflake. TODO: We're using JSON here, it's not the most efficient way to do this. @@ -430,7 +430,7 @@ async def insert_into_snowflake_activity(inputs: SnowflakeInsertInputs): inputs.data_interval_start, inputs.data_interval_end, ) - return + return 0 logger.info("BatchExporting %s rows", count) @@ -553,6 +553,8 @@ async def worker_shutdown_handler(): await copy_loaded_files_to_snowflake_table(connection, inputs.table_name) + return local_results_file.records_total + @workflow.defn(name="snowflake-export") class SnowflakeBatchExportWorkflow(PostHogWorkflow): diff --git a/posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py b/posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py index 17b1aaaa94d4a..b1691e1a9d7e8 100644 --- a/posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py +++ b/posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py @@ -805,6 +805,7 @@ async def test_s3_export_workflow_defaults_to_timestamp_on_null_inserted_at( run = runs[0] assert run.status == "Completed" + assert run.records_completed == 100 await assert_clickhouse_records_in_s3( s3_compatible_client=minio_client, @@ -893,6 +894,7 @@ async def test_s3_export_workflow_with_minio_bucket_and_custom_key_prefix( run = runs[0] assert run.status == "Completed" + assert run.records_completed == 100 expected_key_prefix = s3_key_prefix.format( table="events", @@ -968,6 +970,7 @@ async def insert_into_s3_activity_mocked(_: S3InsertInputs) -> str: run = runs[0] assert run.status == "FailedRetryable" assert run.latest_error == "ValueError: A useful error message" + assert run.records_completed == 0 async def test_s3_export_workflow_handles_insert_activity_non_retryable_errors(ateam, s3_batch_export, interval): From 04bbd272cd6ad6e4ebbeec8c63dfa1260bb12732 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Far=C3=ADas=20Santana?= Date: Tue, 19 Mar 2024 17:46:03 +0100 Subject: [PATCH 02/11] fix: Move records_completed inside try --- posthog/temporal/batch_exports/batch_exports.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/posthog/temporal/batch_exports/batch_exports.py b/posthog/temporal/batch_exports/batch_exports.py index 8c87718ae3590..c776e1f245ef3 100644 --- a/posthog/temporal/batch_exports/batch_exports.py +++ b/posthog/temporal/batch_exports/batch_exports.py @@ -673,6 +673,7 @@ async def execute_batch_export_insert_activity( heartbeat_timeout=dt.timedelta(seconds=heartbeat_timeout_seconds) if heartbeat_timeout_seconds else None, retry_policy=retry_policy, ) + update_inputs.records_completed = records_completed except exceptions.ActivityError as e: if isinstance(e.cause, exceptions.CancelledError): @@ -693,7 +694,6 @@ async def execute_batch_export_insert_activity( finally: get_export_finished_metric(status=update_inputs.status.lower()).add(1) - update_inputs.records_completed = records_completed await workflow.execute_activity( update_export_run_status, update_inputs, From c5fcb78f70c6127879b337920aef5b0fb0408ae7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Far=C3=ADas=20Santana?= Date: Tue, 19 Mar 2024 17:46:11 +0100 Subject: [PATCH 03/11] test: Add more destination tests --- .../tests/batch_exports/test_bigquery_batch_export_workflow.py | 2 ++ .../tests/batch_exports/test_postgres_batch_export_workflow.py | 2 ++ .../tests/batch_exports/test_redshift_batch_export_workflow.py | 2 ++ .../tests/batch_exports/test_snowflake_batch_export_workflow.py | 2 ++ 4 files changed, 8 insertions(+) diff --git a/posthog/temporal/tests/batch_exports/test_bigquery_batch_export_workflow.py b/posthog/temporal/tests/batch_exports/test_bigquery_batch_export_workflow.py index a7629309d940a..b2c46f6344dbc 100644 --- a/posthog/temporal/tests/batch_exports/test_bigquery_batch_export_workflow.py +++ b/posthog/temporal/tests/batch_exports/test_bigquery_batch_export_workflow.py @@ -453,6 +453,7 @@ async def test_bigquery_export_workflow( run = runs[0] assert run.status == "Completed" + assert run.records_completed == 100 ingested_timestamp = frozen_time().replace(tzinfo=dt.timezone.utc) assert_clickhouse_records_in_bigquery( @@ -566,6 +567,7 @@ class RefreshError(Exception): run = runs[0] assert run.status == "Failed" assert run.latest_error == "RefreshError: A useful error message" + assert run.records_completed == 0 async def test_bigquery_export_workflow_handles_cancellation(ateam, bigquery_batch_export, interval): diff --git a/posthog/temporal/tests/batch_exports/test_postgres_batch_export_workflow.py b/posthog/temporal/tests/batch_exports/test_postgres_batch_export_workflow.py index e19e9c391dcfc..c486cc2747fcc 100644 --- a/posthog/temporal/tests/batch_exports/test_postgres_batch_export_workflow.py +++ b/posthog/temporal/tests/batch_exports/test_postgres_batch_export_workflow.py @@ -385,6 +385,7 @@ async def test_postgres_export_workflow( run = runs[0] assert run.status == "Completed" + assert run.records_completed == 100 await assert_clickhouse_records_in_postgres( postgres_connection=postgres_connection, @@ -494,6 +495,7 @@ class InsufficientPrivilege(Exception): run = runs[0] assert run.status == "Failed" assert run.latest_error == "InsufficientPrivilege: A useful error message" + assert run.records_completed == 0 async def test_postgres_export_workflow_handles_cancellation(ateam, postgres_batch_export, interval): diff --git a/posthog/temporal/tests/batch_exports/test_redshift_batch_export_workflow.py b/posthog/temporal/tests/batch_exports/test_redshift_batch_export_workflow.py index 301183f2998f4..173bed3a69bb3 100644 --- a/posthog/temporal/tests/batch_exports/test_redshift_batch_export_workflow.py +++ b/posthog/temporal/tests/batch_exports/test_redshift_batch_export_workflow.py @@ -433,6 +433,7 @@ async def test_redshift_export_workflow( run = runs[0] assert run.status == "Completed" + assert run.records_completed == 100 await assert_clickhouse_records_in_redshfit( redshift_connection=psycopg_connection, @@ -559,3 +560,4 @@ class InsufficientPrivilege(Exception): run = runs[0] assert run.status == "Failed" assert run.latest_error == "InsufficientPrivilege: A useful error message" + assert run.records_completed == 0 diff --git a/posthog/temporal/tests/batch_exports/test_snowflake_batch_export_workflow.py b/posthog/temporal/tests/batch_exports/test_snowflake_batch_export_workflow.py index f43803d46cbd4..f8c12a3d1369f 100644 --- a/posthog/temporal/tests/batch_exports/test_snowflake_batch_export_workflow.py +++ b/posthog/temporal/tests/batch_exports/test_snowflake_batch_export_workflow.py @@ -455,6 +455,7 @@ async def test_snowflake_export_workflow_exports_events( run = runs[0] assert run.status == "Completed" + assert run.records_completed == 10 @pytest.mark.parametrize("interval", ["hour", "day"], indirect=True) @@ -695,6 +696,7 @@ async def insert_into_snowflake_activity_mocked(_: SnowflakeInsertInputs) -> str run = runs[0] assert run.status == "FailedRetryable" assert run.latest_error == "ValueError: A useful error message" + assert run.records_completed == 0 async def test_snowflake_export_workflow_handles_insert_activity_non_retryable_errors(ateam, snowflake_batch_export): From 063dc3712a486c1660ed3bb69c7ce158ed59e521 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Far=C3=ADas=20Santana?= Date: Tue, 19 Mar 2024 18:14:21 +0100 Subject: [PATCH 04/11] fix: Redshift returns int --- posthog/temporal/batch_exports/redshift_batch_export.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/posthog/temporal/batch_exports/redshift_batch_export.py b/posthog/temporal/batch_exports/redshift_batch_export.py index 6a3d97cf6ed58..bc1549cef838f 100644 --- a/posthog/temporal/batch_exports/redshift_batch_export.py +++ b/posthog/temporal/batch_exports/redshift_batch_export.py @@ -271,7 +271,7 @@ class RedshiftInsertInputs(PostgresInsertInputs): @activity.defn -async def insert_into_redshift_activity(inputs: RedshiftInsertInputs): +async def insert_into_redshift_activity(inputs: RedshiftInsertInputs) -> int: """Activity to insert data from ClickHouse to Redshift. This activity executes the following steps: @@ -313,7 +313,7 @@ async def insert_into_redshift_activity(inputs: RedshiftInsertInputs): inputs.data_interval_start, inputs.data_interval_end, ) - return + return 0 logger.info("BatchExporting %s rows", count) @@ -390,13 +390,15 @@ def map_to_record(row: dict) -> dict: return record async with postgres_connection(inputs) as connection: - await insert_records_to_redshift( + records_completed = await insert_records_to_redshift( (map_to_record(record) for record_batch in record_iterator for record in record_batch.to_pylist()), connection, inputs.schema, inputs.table_name, ) + return records_completed + @workflow.defn(name="redshift-export") class RedshiftBatchExportWorkflow(PostHogWorkflow): From 855d9f03753c5820d8015d8e02dc85f71ef00d1a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Far=C3=ADas=20Santana?= Date: Tue, 19 Mar 2024 18:32:10 +0100 Subject: [PATCH 05/11] fix: Also return records_total from HTTP --- posthog/temporal/batch_exports/http_batch_export.py | 6 ++++-- .../tests/batch_exports/test_http_batch_export_workflow.py | 2 ++ 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/posthog/temporal/batch_exports/http_batch_export.py b/posthog/temporal/batch_exports/http_batch_export.py index fde06c6c48d85..8aca65c80ff38 100644 --- a/posthog/temporal/batch_exports/http_batch_export.py +++ b/posthog/temporal/batch_exports/http_batch_export.py @@ -152,7 +152,7 @@ async def post_json_file_to_url(url, batch_file, session: aiohttp.ClientSession) @activity.defn -async def insert_into_http_activity(inputs: HttpInsertInputs): +async def insert_into_http_activity(inputs: HttpInsertInputs) -> int: """Activity streams data from ClickHouse to an HTTP Endpoint.""" logger = await bind_temporal_worker_logger(team_id=inputs.team_id, destination="HTTP") logger.info( @@ -180,7 +180,7 @@ async def insert_into_http_activity(inputs: HttpInsertInputs): inputs.data_interval_start, inputs.data_interval_end, ) - return + return 0 logger.info("BatchExporting %s rows", count) @@ -303,6 +303,8 @@ async def flush_batch_to_http_endpoint(last_uploaded_timestamp: str, session: ai last_uploaded_timestamp = str(inserted_at) await flush_batch_to_http_endpoint(last_uploaded_timestamp, session) + return batch_file.records_total + @workflow.defn(name="http-export") class HttpBatchExportWorkflow(PostHogWorkflow): diff --git a/posthog/temporal/tests/batch_exports/test_http_batch_export_workflow.py b/posthog/temporal/tests/batch_exports/test_http_batch_export_workflow.py index c2aa9646b6ddb..6267577472125 100644 --- a/posthog/temporal/tests/batch_exports/test_http_batch_export_workflow.py +++ b/posthog/temporal/tests/batch_exports/test_http_batch_export_workflow.py @@ -370,6 +370,7 @@ async def test_http_export_workflow( run = runs[0] assert run.status == "Completed" + assert run.records_completed == 100 await assert_clickhouse_records_in_mock_server( mock_server=mock_server, @@ -425,6 +426,7 @@ async def insert_into_http_activity_mocked(_: HttpInsertInputs) -> str: run = runs[0] assert run.status == "FailedRetryable" assert run.latest_error == "ValueError: A useful error message" + assert run.records_completed == 0 async def test_http_export_workflow_handles_insert_activity_non_retryable_errors(ateam, http_batch_export, interval): From 6bcccadfeba30e697f48baa7c4211b541a4223ed Mon Sep 17 00:00:00 2001 From: github-actions <41898282+github-actions[bot]@users.noreply.github.com> Date: Tue, 19 Mar 2024 17:56:03 +0000 Subject: [PATCH 06/11] Update query snapshots --- .../test/__snapshots__/test_session_recordings.ambr | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/posthog/session_recordings/test/__snapshots__/test_session_recordings.ambr b/posthog/session_recordings/test/__snapshots__/test_session_recordings.ambr index fad3c08168d0b..c2982e4f8526f 100644 --- a/posthog/session_recordings/test/__snapshots__/test_session_recordings.ambr +++ b/posthog/session_recordings/test/__snapshots__/test_session_recordings.ambr @@ -778,7 +778,7 @@ FROM "posthog_persondistinctid" INNER JOIN "posthog_person" ON ("posthog_persondistinctid"."person_id" = "posthog_person"."id") WHERE ("posthog_persondistinctid"."distinct_id" IN ('user2', - 'user_one_2') + 'user_one_0') AND "posthog_persondistinctid"."team_id" = 2) /*controller='project_session_recordings-list',route='api/projects/%28%3FP%3Cparent_lookup_team_id%3E%5B%5E/.%5D%2B%29/session_recordings/%3F%24'*/ ''' # --- From 6fc01688152cf15d9e6b8fb5caffe6290d43660f Mon Sep 17 00:00:00 2001 From: github-actions <41898282+github-actions[bot]@users.noreply.github.com> Date: Tue, 19 Mar 2024 18:31:40 +0000 Subject: [PATCH 07/11] Update query snapshots --- .../test/__snapshots__/test_session_recordings.ambr | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/posthog/session_recordings/test/__snapshots__/test_session_recordings.ambr b/posthog/session_recordings/test/__snapshots__/test_session_recordings.ambr index c2982e4f8526f..fad3c08168d0b 100644 --- a/posthog/session_recordings/test/__snapshots__/test_session_recordings.ambr +++ b/posthog/session_recordings/test/__snapshots__/test_session_recordings.ambr @@ -778,7 +778,7 @@ FROM "posthog_persondistinctid" INNER JOIN "posthog_person" ON ("posthog_persondistinctid"."person_id" = "posthog_person"."id") WHERE ("posthog_persondistinctid"."distinct_id" IN ('user2', - 'user_one_0') + 'user_one_2') AND "posthog_persondistinctid"."team_id" = 2) /*controller='project_session_recordings-list',route='api/projects/%28%3FP%3Cparent_lookup_team_id%3E%5B%5E/.%5D%2B%29/session_recordings/%3F%24'*/ ''' # --- From 3f7ae99b7b5a4b94f4facdf22f5ae56118e13e43 Mon Sep 17 00:00:00 2001 From: github-actions <41898282+github-actions[bot]@users.noreply.github.com> Date: Tue, 19 Mar 2024 18:54:51 +0000 Subject: [PATCH 08/11] Update query snapshots --- .../test/__snapshots__/test_session_recordings.ambr | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/posthog/session_recordings/test/__snapshots__/test_session_recordings.ambr b/posthog/session_recordings/test/__snapshots__/test_session_recordings.ambr index fad3c08168d0b..c2982e4f8526f 100644 --- a/posthog/session_recordings/test/__snapshots__/test_session_recordings.ambr +++ b/posthog/session_recordings/test/__snapshots__/test_session_recordings.ambr @@ -778,7 +778,7 @@ FROM "posthog_persondistinctid" INNER JOIN "posthog_person" ON ("posthog_persondistinctid"."person_id" = "posthog_person"."id") WHERE ("posthog_persondistinctid"."distinct_id" IN ('user2', - 'user_one_2') + 'user_one_0') AND "posthog_persondistinctid"."team_id" = 2) /*controller='project_session_recordings-list',route='api/projects/%28%3FP%3Cparent_lookup_team_id%3E%5B%5E/.%5D%2B%29/session_recordings/%3F%24'*/ ''' # --- From d31c20bb1807404725c8126b60dc7825978652b1 Mon Sep 17 00:00:00 2001 From: github-actions <41898282+github-actions[bot]@users.noreply.github.com> Date: Tue, 19 Mar 2024 19:13:00 +0000 Subject: [PATCH 09/11] Update query snapshots --- .../test/__snapshots__/test_session_recordings.ambr | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/posthog/session_recordings/test/__snapshots__/test_session_recordings.ambr b/posthog/session_recordings/test/__snapshots__/test_session_recordings.ambr index c2982e4f8526f..fad3c08168d0b 100644 --- a/posthog/session_recordings/test/__snapshots__/test_session_recordings.ambr +++ b/posthog/session_recordings/test/__snapshots__/test_session_recordings.ambr @@ -778,7 +778,7 @@ FROM "posthog_persondistinctid" INNER JOIN "posthog_person" ON ("posthog_persondistinctid"."person_id" = "posthog_person"."id") WHERE ("posthog_persondistinctid"."distinct_id" IN ('user2', - 'user_one_0') + 'user_one_2') AND "posthog_persondistinctid"."team_id" = 2) /*controller='project_session_recordings-list',route='api/projects/%28%3FP%3Cparent_lookup_team_id%3E%5B%5E/.%5D%2B%29/session_recordings/%3F%24'*/ ''' # --- From e27439a9e1d5b7cf298689f3be2eadf6f6f248cd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Far=C3=ADas=20Santana?= Date: Wed, 20 Mar 2024 10:53:26 +0100 Subject: [PATCH 10/11] fix: Return inside context manager Co-authored-by: Brett Hoerner --- posthog/temporal/batch_exports/bigquery_batch_export.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/posthog/temporal/batch_exports/bigquery_batch_export.py b/posthog/temporal/batch_exports/bigquery_batch_export.py index 97ee8fef228c3..a0469de79bb9e 100644 --- a/posthog/temporal/batch_exports/bigquery_batch_export.py +++ b/posthog/temporal/batch_exports/bigquery_batch_export.py @@ -354,7 +354,7 @@ async def flush_to_bigquery(bigquery_table, table_schema): jsonl_file.reset() - return jsonl_file.records_total + return jsonl_file.records_total @workflow.defn(name="bigquery-export") From 852648d9c8223b53b85116e3b8a7dc8061eaa52f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Far=C3=ADas=20Santana?= Date: Wed, 20 Mar 2024 10:53:36 +0100 Subject: [PATCH 11/11] fix: Return inside context manager Co-authored-by: Brett Hoerner --- posthog/temporal/batch_exports/postgres_batch_export.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/posthog/temporal/batch_exports/postgres_batch_export.py b/posthog/temporal/batch_exports/postgres_batch_export.py index ab46cadc6941f..5dbfc6faa4acf 100644 --- a/posthog/temporal/batch_exports/postgres_batch_export.py +++ b/posthog/temporal/batch_exports/postgres_batch_export.py @@ -359,7 +359,7 @@ async def flush_to_postgres(): if pg_file.tell() > 0: await flush_to_postgres() - return pg_file.records_total + return pg_file.records_total @workflow.defn(name="postgres-export")