From de17595c1d6b7beb41c0c26c6f85f0ed2929a09f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Far=C3=ADas=20Santana?= Date: Thu, 13 Jun 2024 11:14:23 +0200 Subject: [PATCH] refactor: Remove early row count and update batch export metrics (#22810) * refactor: Update metrics to fetch counts at request time * refactor: Remove count from batch exports * fix: Move import to method * fix: Add function * fix: Typing fixes * fix: Move early return in main workflow activity * test: Docstring updates and more tests * fix: Actually use include and exclude events * refactor: Switch to counting runs * feat: Frontend display number of runs instead of events for batchexports * Update UI snapshots for `chromium` (2) * Update UI snapshots for `chromium` (2) * Update UI snapshots for `chromium` (2) * Update UI snapshots for `chromium` (2) --------- Co-authored-by: github-actions <41898282+github-actions[bot]@users.noreply.github.com> --- .../scenes/pipeline/AppMetricSparkLine.tsx | 4 +- posthog/api/app_metrics.py | 73 ++++++++----------- posthog/api/test/test_app_metrics.py | 46 ++++++++++-- .../temporal/batch_exports/batch_exports.py | 50 +------------ .../batch_exports/bigquery_batch_export.py | 29 +++----- .../batch_exports/http_batch_export.py | 17 +---- .../batch_exports/postgres_batch_export.py | 26 ++----- .../batch_exports/redshift_batch_export.py | 26 ++----- .../temporal/batch_exports/s3_batch_export.py | 23 ++---- .../batch_exports/snowflake_batch_export.py | 27 ++----- posthog/temporal/batch_exports/utils.py | 15 +++- .../test_bigquery_batch_export_workflow.py | 61 +++++++++++++++- .../test_http_batch_export_workflow.py | 2 - .../test_postgres_batch_export_workflow.py | 64 +++++++++++++++- .../test_redshift_batch_export_workflow.py | 2 - .../tests/batch_exports/test_run_updates.py | 11 +-- .../test_s3_batch_export_workflow.py | 68 ++++++++++++++++- .../test_snowflake_batch_export_workflow.py | 6 +- posthog/temporal/tests/batch_exports/utils.py | 4 +- 19 files changed, 313 insertions(+), 241 deletions(-) diff --git a/frontend/src/scenes/pipeline/AppMetricSparkLine.tsx b/frontend/src/scenes/pipeline/AppMetricSparkLine.tsx index 51c6eacfe3116..025b632215e95 100644 --- a/frontend/src/scenes/pipeline/AppMetricSparkLine.tsx +++ b/frontend/src/scenes/pipeline/AppMetricSparkLine.tsx @@ -17,14 +17,14 @@ export function AppMetricSparkLine({ pipelineNode }: { pipelineNode: PipelineNod const displayData: SparklineTimeSeries[] = [ { color: 'success', - name: 'Events sent', + name: pipelineNode.backend == 'batch_export' ? 'Runs succeeded' : 'Events sent', values: successes, }, ] if (appMetricsResponse?.metrics.failures.some((failure) => failure > 0)) { displayData.push({ color: 'danger', - name: 'Events dropped', + name: pipelineNode.backend == 'batch_export' ? 'Runs failed' : 'Events dropped', values: failures, }) } diff --git a/posthog/api/app_metrics.py b/posthog/api/app_metrics.py index 61612980e24f5..45f4f1a48c194 100644 --- a/posthog/api/app_metrics.py +++ b/posthog/api/app_metrics.py @@ -1,9 +1,8 @@ import datetime as dt import uuid from typing import Any - -from django.db.models import Q, Sum -from django.db.models.functions import Coalesce, TruncDay +from django.db.models import Q, Count +from django.db.models.functions import TruncDay from rest_framework import mixins, request, response, viewsets from rest_framework.decorators import action @@ -32,11 +31,8 @@ class AppMetricsViewSet(TeamAndOrgViewSetMixin, mixins.RetrieveModelMixin, views def retrieve(self, request: request.Request, *args: Any, **kwargs: Any) -> response.Response: try: - rows = self.get_batch_export_runs_app_metrics_queryset(batch_export_id=kwargs["pk"]) + dates, successes, failures = self.get_batch_export_runs_app_metrics_queryset(batch_export_id=kwargs["pk"]) - dates = [row["dates"].strftime("%Y-%m-%d") for row in rows] - successes = [row["successes"] for row in rows] - failures = [row["failures"] for row in rows] return response.Response( { "metrics": { @@ -83,30 +79,7 @@ def error_details(self, request: request.Request, *args: Any, **kwargs: Any) -> return response.Response({"result": error_details}) def get_batch_export_runs_app_metrics_queryset(self, batch_export_id: str): - """Use the Django ORM to fetch app metrics for batch export runs. - - Attempts to (roughly) match the following (much more readable) query: - ``` - select - date_trunc('day', last_updated_at) as dates, - sum(case when status = 'Completed' then coalesce(records_total_count, 0) else 0) as successes, - sum(case when status != 'Completed' then coalesce(records_total_count, 0) else 0) as failures - from - posthog_batchexportrun - where - batch_export_id = :batch_export_id - and last_updated_at between :date_from and :date_to - and status != 'Running' - group by - date_trunc('day', last_updated_at) - order by - dates - ``` - - A truncated 'last_updated_at' is used as the grouping date as it reflects when a particular run - was last updated. It feels easier to explain to users that if they see metrics for today, those - correspond to runs that happened today, even if the runs themselves exported data from a year ago - (because it was a backfill). + """Use the Django ORM and ClickHouse to fetch app metrics for batch export runs. Raises: ValueError: If provided 'batch_export_id' is not a valid UUID. @@ -120,22 +93,40 @@ def get_batch_export_runs_app_metrics_queryset(self, batch_export_id: str): relative_date_parse(before, self.team.timezone_info) if before else dt.datetime.now(dt.timezone.utc) ) date_range = (after_datetime, before_datetime) - return ( - BatchExportRun.objects.filter(batch_export_id=batch_export_uuid, last_updated_at__range=date_range) - .annotate(dates=TruncDay("last_updated_at")) - .values("dates") - .annotate( - successes=Sum( - Coalesce("records_total_count", 0), filter=Q(status=BatchExportRun.Status.COMPLETED), default=0 + runs = ( + BatchExportRun.objects.select_related("batch_export__destination") + .filter( + batch_export_id=batch_export_uuid, + last_updated_at__range=date_range, + status__in=( + BatchExportRun.Status.COMPLETED, + BatchExportRun.Status.FAILED, + BatchExportRun.Status.FAILED_RETRYABLE, ), - failures=Sum( - Coalesce("records_total_count", 0), filter=~Q(status=BatchExportRun.Status.COMPLETED), default=0 + ) + .annotate(day=TruncDay("last_updated_at")) + .values("day") + .annotate( + successes=Count("data_interval_end", filter=Q(status=BatchExportRun.Status.COMPLETED)), + failures=Count( + "data_interval_end", + filter=(Q(status=BatchExportRun.Status.FAILED) | Q(status=BatchExportRun.Status.FAILED_RETRYABLE)), ), ) - .order_by("dates") + .order_by("day") .all() ) + dates = [] + successes = [] + failures = [] + for run in runs: + dates.append(run["day"].strftime("%Y-%m-%d")) + successes.append(run["successes"]) + failures.append(run["failures"]) + + return dates, successes, failures + class HistoricalExportsAppMetricsViewSet( TeamAndOrgViewSetMixin, diff --git a/posthog/api/test/test_app_metrics.py b/posthog/api/test/test_app_metrics.py index dd9c01ba023b6..4fbe4f8bd7efc 100644 --- a/posthog/api/test/test_app_metrics.py +++ b/posthog/api/test/test_app_metrics.py @@ -1,5 +1,6 @@ import datetime as dt import json +import uuid from unittest import mock from freezegun.api import freeze_time @@ -8,6 +9,7 @@ from posthog.api.test.batch_exports.conftest import start_test_worker from posthog.api.test.batch_exports.operations import create_batch_export_ok from posthog.batch_exports.models import BatchExportRun +from posthog.client import sync_execute from posthog.models.activity_logging.activity_log import Detail, Trigger, log_activity from posthog.models.plugin import Plugin, PluginConfig from posthog.models.utils import UUIDT @@ -18,6 +20,20 @@ SAMPLE_PAYLOAD = {"dateRange": ["2021-06-10", "2022-06-12"], "parallelism": 1} +def insert_event(team_id: int, timestamp: dt.datetime, event: str = "test-event"): + sync_execute( + "INSERT INTO `sharded_events` (uuid, team_id, event, timestamp) VALUES", + [ + { + "uuid": uuid.uuid4(), + "team_id": team_id, + "event": event, + "timestamp": timestamp, + } + ], + ) + + @freeze_time("2021-12-05T13:23:00Z") class TestAppMetricsAPI(ClickhouseTestMixin, APIBaseTest): maxDiff = None @@ -88,6 +104,7 @@ def test_retrieve_batch_export_runs_app_metrics(self): "prefix": "posthog-events/", "aws_access_key_id": "abc123", "aws_secret_access_key": "secret", + "include_events": ["test-event"], }, } @@ -119,8 +136,18 @@ def test_retrieve_batch_export_runs_app_metrics(self): data_interval_end=last_updated_at, data_interval_start=last_updated_at - dt.timedelta(hours=1), status=BatchExportRun.Status.COMPLETED, - records_completed=3, - records_total_count=3, + ) + BatchExportRun.objects.create( + batch_export_id=batch_export_id, + data_interval_end=last_updated_at, + data_interval_start=last_updated_at - dt.timedelta(hours=1), + status=BatchExportRun.Status.COMPLETED, + ) + BatchExportRun.objects.create( + batch_export_id=batch_export_id, + data_interval_end=last_updated_at, + data_interval_start=last_updated_at - dt.timedelta(hours=1), + status=BatchExportRun.Status.COMPLETED, ) BatchExportRun.objects.create( @@ -128,8 +155,12 @@ def test_retrieve_batch_export_runs_app_metrics(self): data_interval_end=last_updated_at - dt.timedelta(hours=2), data_interval_start=last_updated_at - dt.timedelta(hours=3), status=BatchExportRun.Status.FAILED, - records_completed=0, - records_total_count=5, + ) + BatchExportRun.objects.create( + batch_export_id=batch_export_id, + data_interval_end=last_updated_at - dt.timedelta(hours=2), + data_interval_start=last_updated_at - dt.timedelta(hours=3), + status=BatchExportRun.Status.FAILED_RETRYABLE, ) response = self.client.get(f"/api/projects/@current/app_metrics/{batch_export_id}?date_from=-7d") @@ -149,8 +180,8 @@ def test_retrieve_batch_export_runs_app_metrics(self): ], "successes": [3, 3, 3, 3, 3, 3, 3], "successes_on_retry": [0, 0, 0, 0, 0, 0, 0], - "failures": [5, 5, 5, 5, 5, 5, 5], - "totals": {"successes": 21, "successes_on_retry": 0, "failures": 35}, + "failures": [2, 2, 2, 2, 2, 2, 2], + "totals": {"successes": 21, "successes_on_retry": 0, "failures": 14}, }, "errors": None, }, @@ -166,6 +197,7 @@ def test_retrieve_batch_export_runs_app_metrics_defaults_to_zero(self): "prefix": "posthog-events/", "aws_access_key_id": "abc123", "aws_secret_access_key": "secret", + "exclude_events": ["exclude-me"], }, } @@ -197,8 +229,6 @@ def test_retrieve_batch_export_runs_app_metrics_defaults_to_zero(self): data_interval_end=last_updated_at, data_interval_start=last_updated_at - dt.timedelta(hours=1), status=BatchExportRun.Status.COMPLETED, - records_completed=1, - records_total_count=1, ) response = self.client.get(f"/api/projects/@current/app_metrics/{batch_export_id}?date_from=-7d") diff --git a/posthog/temporal/batch_exports/batch_exports.py b/posthog/temporal/batch_exports/batch_exports.py index 5cc6320bedb7c..1de6b551981ed 100644 --- a/posthog/temporal/batch_exports/batch_exports.py +++ b/posthog/temporal/batch_exports/batch_exports.py @@ -1,4 +1,3 @@ -import asyncio import collections.abc import dataclasses import datetime as dt @@ -27,7 +26,7 @@ get_export_finished_metric, get_export_started_metric, ) -from posthog.temporal.common.clickhouse import ClickHouseClient, get_client +from posthog.temporal.common.clickhouse import ClickHouseClient from posthog.temporal.common.client import connect from posthog.temporal.common.logger import bind_temporal_worker_logger @@ -329,12 +328,11 @@ class StartBatchExportRunInputs: is_backfill: bool = False -RecordsTotalCount = int | None BatchExportRunId = str @activity.defn -async def start_batch_export_run(inputs: StartBatchExportRunInputs) -> tuple[BatchExportRunId, RecordsTotalCount]: +async def start_batch_export_run(inputs: StartBatchExportRunInputs) -> BatchExportRunId: """Activity that creates an BatchExportRun and returns the count of records to export. Intended to be used in all export workflows, usually at the start, to create a model @@ -350,56 +348,14 @@ async def start_batch_export_run(inputs: StartBatchExportRunInputs) -> tuple[Bat inputs.data_interval_end, ) - delta = dt.datetime.fromisoformat(inputs.data_interval_end) - dt.datetime.fromisoformat(inputs.data_interval_start) - async with get_client(team_id=inputs.team_id) as client: - if not await client.is_alive(): - raise ConnectionError("Cannot establish connection to ClickHouse") - - try: - count = await asyncio.wait_for( - get_rows_count( - client=client, - team_id=inputs.team_id, - interval_start=inputs.data_interval_start, - interval_end=inputs.data_interval_end, - exclude_events=inputs.exclude_events, - include_events=inputs.include_events, - is_backfill=inputs.is_backfill, - ), - timeout=(delta / 12).total_seconds(), - ) - except asyncio.TimeoutError: - count = None - - if count is None: - logger.info( - "Batch export for range %s - %s will continue without a count of rows to export", - inputs.data_interval_start, - inputs.data_interval_end, - ) - elif count > 0: - logger.info( - "Batch export for range %s - %s will export %s rows", - inputs.data_interval_start, - inputs.data_interval_end, - count, - ) - else: - logger.info( - "Batch export for range %s - %s has no rows to export", - inputs.data_interval_start, - inputs.data_interval_end, - ) - run = await acreate_batch_export_run( batch_export_id=uuid.UUID(inputs.batch_export_id), data_interval_start=inputs.data_interval_start, data_interval_end=inputs.data_interval_end, status=BatchExportRun.Status.STARTING, - records_total_count=count, ) - return str(run.id), count + return str(run.id) @dataclasses.dataclass diff --git a/posthog/temporal/batch_exports/bigquery_batch_export.py b/posthog/temporal/batch_exports/bigquery_batch_export.py index 7bef7c9d51f55..9190d736a724c 100644 --- a/posthog/temporal/batch_exports/bigquery_batch_export.py +++ b/posthog/temporal/batch_exports/bigquery_batch_export.py @@ -24,7 +24,6 @@ StartBatchExportRunInputs, default_fields, execute_batch_export_insert_activity, - finish_batch_export_run, get_data_interval, iter_records, start_batch_export_run, @@ -251,6 +250,10 @@ async def insert_into_bigquery_activity(inputs: BigQueryInsertInputs) -> Records is_backfill=inputs.is_backfill, ) + first_record_batch, records_iterator = peek_first_and_rewind(records_iterator) + if first_record_batch is None: + return 0 + bigquery_table = None inserted_at = None @@ -270,8 +273,6 @@ async def flush_to_bigquery(bigquery_table, table_schema): rows_exported.add(jsonl_file.records_since_last_reset) bytes_exported.add(jsonl_file.bytes_since_last_reset) - first_record, records_iterator = peek_first_and_rewind(records_iterator) - if inputs.use_json_type is True: json_type = "JSON" json_columns = ["properties", "set", "set_once", "person_properties"] @@ -296,8 +297,10 @@ async def flush_to_bigquery(bigquery_table, table_schema): ] else: - column_names = [column for column in first_record.schema.names if column != "_inserted_at"] - record_schema = first_record.select(column_names).schema + column_names = [ + column for column in first_record_batch.schema.names if column != "_inserted_at" + ] + record_schema = first_record_batch.select(column_names).schema schema = get_bigquery_fields_from_record_schema(record_schema, known_json_columns=json_columns) bigquery_table = await create_table_in_bigquery( @@ -371,7 +374,7 @@ async def run(self, inputs: BigQueryBatchExportInputs): include_events=inputs.include_events, is_backfill=inputs.is_backfill, ) - run_id, records_total_count = await workflow.execute_activity( + run_id = await workflow.execute_activity( start_batch_export_run, start_batch_export_run_inputs, start_to_close_timeout=dt.timedelta(minutes=5), @@ -390,20 +393,6 @@ async def run(self, inputs: BigQueryBatchExportInputs): team_id=inputs.team_id, ) - if records_total_count == 0: - await workflow.execute_activity( - finish_batch_export_run, - finish_inputs, - start_to_close_timeout=dt.timedelta(minutes=5), - retry_policy=RetryPolicy( - initial_interval=dt.timedelta(seconds=10), - maximum_interval=dt.timedelta(seconds=60), - maximum_attempts=0, - non_retryable_error_types=["NotNullViolation", "IntegrityError"], - ), - ) - return - insert_inputs = BigQueryInsertInputs( team_id=inputs.team_id, table_id=inputs.table_id, diff --git a/posthog/temporal/batch_exports/http_batch_export.py b/posthog/temporal/batch_exports/http_batch_export.py index ea6cd6adbdab3..92ff0e9d58792 100644 --- a/posthog/temporal/batch_exports/http_batch_export.py +++ b/posthog/temporal/batch_exports/http_batch_export.py @@ -21,7 +21,6 @@ RecordsCompleted, StartBatchExportRunInputs, execute_batch_export_insert_activity, - finish_batch_export_run, get_data_interval, iter_records, start_batch_export_run, @@ -328,7 +327,7 @@ async def run(self, inputs: HttpBatchExportInputs): include_events=inputs.include_events, is_backfill=inputs.is_backfill, ) - run_id, records_total_count = await workflow.execute_activity( + run_id = await workflow.execute_activity( start_batch_export_run, start_batch_export_run_inputs, start_to_close_timeout=dt.timedelta(minutes=5), @@ -347,20 +346,6 @@ async def run(self, inputs: HttpBatchExportInputs): team_id=inputs.team_id, ) - if records_total_count == 0: - await workflow.execute_activity( - finish_batch_export_run, - finish_inputs, - start_to_close_timeout=dt.timedelta(minutes=5), - retry_policy=RetryPolicy( - initial_interval=dt.timedelta(seconds=10), - maximum_interval=dt.timedelta(seconds=60), - maximum_attempts=0, - non_retryable_error_types=["NotNullViolation", "IntegrityError"], - ), - ) - return - insert_inputs = HttpInsertInputs( team_id=inputs.team_id, url=inputs.url, diff --git a/posthog/temporal/batch_exports/postgres_batch_export.py b/posthog/temporal/batch_exports/postgres_batch_export.py index a0a11ddfb19f6..4408bb83b863f 100644 --- a/posthog/temporal/batch_exports/postgres_batch_export.py +++ b/posthog/temporal/batch_exports/postgres_batch_export.py @@ -26,7 +26,6 @@ StartBatchExportRunInputs, default_fields, execute_batch_export_insert_activity, - finish_batch_export_run, get_data_interval, iter_records, start_batch_export_run, @@ -282,6 +281,9 @@ async def insert_into_postgres_activity(inputs: PostgresInsertInputs) -> Records extra_query_parameters=query_parameters, is_backfill=inputs.is_backfill, ) + first_record_batch, record_iterator = peek_first_and_rewind(record_iterator) + if first_record_batch is None: + return 0 if inputs.batch_export_schema is None: table_fields = [ @@ -299,10 +301,8 @@ async def insert_into_postgres_activity(inputs: PostgresInsertInputs) -> Records ] else: - first_record, record_iterator = peek_first_and_rewind(record_iterator) - - column_names = [column for column in first_record.schema.names if column != "_inserted_at"] - record_schema = first_record.select(column_names).schema + column_names = [column for column in first_record_batch.schema.names if column != "_inserted_at"] + record_schema = first_record_batch.select(column_names).schema table_fields = get_postgres_fields_from_record_schema( record_schema, known_json_columns=["properties", "set", "set_once", "person_properties"] ) @@ -390,7 +390,7 @@ async def run(self, inputs: PostgresBatchExportInputs): include_events=inputs.include_events, is_backfill=inputs.is_backfill, ) - run_id, records_total_count = await workflow.execute_activity( + run_id = await workflow.execute_activity( start_batch_export_run, start_batch_export_run_inputs, start_to_close_timeout=dt.timedelta(minutes=5), @@ -409,20 +409,6 @@ async def run(self, inputs: PostgresBatchExportInputs): team_id=inputs.team_id, ) - if records_total_count == 0: - await workflow.execute_activity( - finish_batch_export_run, - finish_inputs, - start_to_close_timeout=dt.timedelta(minutes=5), - retry_policy=RetryPolicy( - initial_interval=dt.timedelta(seconds=10), - maximum_interval=dt.timedelta(seconds=60), - maximum_attempts=0, - non_retryable_error_types=["NotNullViolation", "IntegrityError"], - ), - ) - return - insert_inputs = PostgresInsertInputs( team_id=inputs.team_id, user=inputs.user, diff --git a/posthog/temporal/batch_exports/redshift_batch_export.py b/posthog/temporal/batch_exports/redshift_batch_export.py index c9c9aa68a920a..f2467800764f2 100644 --- a/posthog/temporal/batch_exports/redshift_batch_export.py +++ b/posthog/temporal/batch_exports/redshift_batch_export.py @@ -21,7 +21,6 @@ StartBatchExportRunInputs, default_fields, execute_batch_export_insert_activity, - finish_batch_export_run, get_data_interval, iter_records, start_batch_export_run, @@ -325,6 +324,9 @@ async def insert_into_redshift_activity(inputs: RedshiftInsertInputs) -> Records extra_query_parameters=query_parameters, is_backfill=inputs.is_backfill, ) + first_record_batch, record_iterator = peek_first_and_rewind(record_iterator) + if first_record_batch is None: + return 0 known_super_columns = ["properties", "set", "set_once", "person_properties"] @@ -348,10 +350,8 @@ async def insert_into_redshift_activity(inputs: RedshiftInsertInputs) -> Records ("timestamp", "TIMESTAMP WITH TIME ZONE"), ] else: - first_record, record_iterator = peek_first_and_rewind(record_iterator) - - column_names = [column for column in first_record.schema.names if column != "_inserted_at"] - record_schema = first_record.select(column_names).schema + column_names = [column for column in first_record_batch.schema.names if column != "_inserted_at"] + record_schema = first_record_batch.select(column_names).schema table_fields = get_redshift_fields_from_record_schema( record_schema, known_super_columns=known_super_columns ) @@ -420,7 +420,7 @@ async def run(self, inputs: RedshiftBatchExportInputs): include_events=inputs.include_events, is_backfill=inputs.is_backfill, ) - run_id, records_total_count = await workflow.execute_activity( + run_id = await workflow.execute_activity( start_batch_export_run, start_batch_export_run_inputs, start_to_close_timeout=dt.timedelta(minutes=5), @@ -439,20 +439,6 @@ async def run(self, inputs: RedshiftBatchExportInputs): team_id=inputs.team_id, ) - if records_total_count == 0: - await workflow.execute_activity( - finish_batch_export_run, - finish_inputs, - start_to_close_timeout=dt.timedelta(minutes=5), - retry_policy=RetryPolicy( - initial_interval=dt.timedelta(seconds=10), - maximum_interval=dt.timedelta(seconds=60), - maximum_attempts=0, - non_retryable_error_types=["NotNullViolation", "IntegrityError"], - ), - ) - return - insert_inputs = RedshiftInsertInputs( team_id=inputs.team_id, user=inputs.user, diff --git a/posthog/temporal/batch_exports/s3_batch_export.py b/posthog/temporal/batch_exports/s3_batch_export.py index fed53ee249ea2..43ad45257a3be 100644 --- a/posthog/temporal/batch_exports/s3_batch_export.py +++ b/posthog/temporal/batch_exports/s3_batch_export.py @@ -27,7 +27,6 @@ StartBatchExportRunInputs, default_fields, execute_batch_export_insert_activity, - finish_batch_export_run, get_data_interval, iter_records, start_batch_export_run, @@ -465,6 +464,11 @@ async def insert_into_s3_activity(inputs: S3InsertInputs) -> RecordsCompleted: is_backfill=inputs.is_backfill, ) + first_record_batch, record_iterator = peek_first_and_rewind(record_iterator) + + if first_record_batch is None: + return 0 + async with s3_upload as s3_upload: async def flush_to_s3( @@ -488,7 +492,6 @@ async def flush_to_s3( heartbeater.details = (str(last_inserted_at), s3_upload.to_state()) - first_record_batch, record_iterator = peek_first_and_rewind(record_iterator) first_record_batch = cast_record_batch_json_columns(first_record_batch) column_names = first_record_batch.column_names column_names.pop(column_names.index("_inserted_at")) @@ -634,7 +637,7 @@ async def run(self, inputs: S3BatchExportInputs): include_events=inputs.include_events, is_backfill=inputs.is_backfill, ) - run_id, records_total_count = await workflow.execute_activity( + run_id = await workflow.execute_activity( start_batch_export_run, start_batch_export_run_inputs, start_to_close_timeout=dt.timedelta(minutes=5), @@ -653,20 +656,6 @@ async def run(self, inputs: S3BatchExportInputs): team_id=inputs.team_id, ) - if records_total_count == 0: - await workflow.execute_activity( - finish_batch_export_run, - finish_inputs, - start_to_close_timeout=dt.timedelta(minutes=5), - retry_policy=RetryPolicy( - initial_interval=dt.timedelta(seconds=10), - maximum_interval=dt.timedelta(seconds=60), - maximum_attempts=0, - non_retryable_error_types=["NotNullViolation", "IntegrityError"], - ), - ) - return - insert_inputs = S3InsertInputs( bucket_name=inputs.bucket_name, region=inputs.region, diff --git a/posthog/temporal/batch_exports/snowflake_batch_export.py b/posthog/temporal/batch_exports/snowflake_batch_export.py index c547ebafc7db0..73e6c23fb2f49 100644 --- a/posthog/temporal/batch_exports/snowflake_batch_export.py +++ b/posthog/temporal/batch_exports/snowflake_batch_export.py @@ -27,7 +27,6 @@ StartBatchExportRunInputs, default_fields, execute_batch_export_insert_activity, - finish_batch_export_run, get_data_interval, iter_records, start_batch_export_run, @@ -474,6 +473,10 @@ async def flush_to_snowflake( extra_query_parameters=query_parameters, is_backfill=inputs.is_backfill, ) + first_record_batch, record_iterator = peek_first_and_rewind(record_iterator) + + if first_record_batch is None: + return 0 known_variant_columns = ["properties", "people_set", "people_set_once", "person_properties"] if inputs.batch_export_schema is None: @@ -492,10 +495,8 @@ async def flush_to_snowflake( ] else: - first_record, record_iterator = peek_first_and_rewind(record_iterator) - - column_names = [column for column in first_record.schema.names if column != "_inserted_at"] - record_schema = first_record.select(column_names).schema + column_names = [column for column in first_record_batch.schema.names if column != "_inserted_at"] + record_schema = first_record_batch.select(column_names).schema table_fields = get_snowflake_fields_from_record_schema( record_schema, known_variant_columns=known_variant_columns, @@ -572,7 +573,7 @@ async def run(self, inputs: SnowflakeBatchExportInputs): include_events=inputs.include_events, is_backfill=inputs.is_backfill, ) - run_id, records_total_count = await workflow.execute_activity( + run_id = await workflow.execute_activity( start_batch_export_run, start_batch_export_run_inputs, start_to_close_timeout=dt.timedelta(minutes=5), @@ -591,20 +592,6 @@ async def run(self, inputs: SnowflakeBatchExportInputs): team_id=inputs.team_id, ) - if records_total_count == 0: - await workflow.execute_activity( - finish_batch_export_run, - finish_inputs, - start_to_close_timeout=dt.timedelta(minutes=5), - retry_policy=RetryPolicy( - initial_interval=dt.timedelta(seconds=10), - maximum_interval=dt.timedelta(seconds=60), - maximum_attempts=0, - non_retryable_error_types=["NotNullViolation", "IntegrityError"], - ), - ) - return - insert_inputs = SnowflakeInsertInputs( team_id=inputs.team_id, user=inputs.user, diff --git a/posthog/temporal/batch_exports/utils.py b/posthog/temporal/batch_exports/utils.py index c10ede32d778c..8a589ec378733 100644 --- a/posthog/temporal/batch_exports/utils.py +++ b/posthog/temporal/batch_exports/utils.py @@ -10,7 +10,7 @@ def peek_first_and_rewind( gen: collections.abc.Generator[T, None, None], -) -> tuple[T, collections.abc.Generator[T, None, None]]: +) -> tuple[T | None, collections.abc.Generator[T, None, None]]: """Peek into the first element in a generator and rewind the advance. The generator is advanced and cannot be reversed, so we create a new one that first @@ -19,10 +19,19 @@ def peek_first_and_rewind( Returns: A tuple with the first element of the generator and the generator itself. """ - first = next(gen) + try: + first = next(gen) + except StopIteration: + first = None def rewind_gen() -> collections.abc.Generator[T, None, None]: - """Yield the item we popped to rewind the generator.""" + """Yield the item we popped to rewind the generator. + + Return early if the generator is empty. + """ + if first is None: + return + yield first yield from gen diff --git a/posthog/temporal/tests/batch_exports/test_bigquery_batch_export_workflow.py b/posthog/temporal/tests/batch_exports/test_bigquery_batch_export_workflow.py index fdc7e35570500..b74e784492556 100644 --- a/posthog/temporal/tests/batch_exports/test_bigquery_batch_export_workflow.py +++ b/posthog/temporal/tests/batch_exports/test_bigquery_batch_export_workflow.py @@ -455,7 +455,6 @@ async def test_bigquery_export_workflow( run = runs[0] assert run.status == "Completed" assert run.records_completed == 100 - assert run.records_total_count == 100 ingested_timestamp = frozen_time().replace(tzinfo=dt.timezone.utc) assert_clickhouse_records_in_bigquery( @@ -474,6 +473,65 @@ async def test_bigquery_export_workflow( ) +@pytest.mark.parametrize("interval", ["hour"]) +@pytest.mark.parametrize("exclude_events", [None], indirect=True) +@pytest.mark.parametrize("batch_export_schema", TEST_SCHEMAS) +async def test_bigquery_export_workflow_without_events( + clickhouse_client, + bigquery_batch_export, + interval, + exclude_events, + ateam, + table_id, + use_json_type, + batch_export_schema, +): + """Test the BigQuery Export Workflow without any events to export. + + The workflow should update the batch export run status to completed and set 0 as `records_completed`. + """ + data_interval_end = dt.datetime.fromisoformat("2023-04-25T14:30:00.000000+00:00") + + workflow_id = str(uuid4()) + inputs = BigQueryBatchExportInputs( + team_id=ateam.pk, + batch_export_id=str(bigquery_batch_export.id), + data_interval_end=data_interval_end.isoformat(), + interval=interval, + batch_export_schema=batch_export_schema, + **bigquery_batch_export.destination.config, + ) + + with freeze_time(TEST_TIME): + async with await WorkflowEnvironment.start_time_skipping() as activity_environment: + async with Worker( + activity_environment.client, + task_queue=settings.TEMPORAL_TASK_QUEUE, + workflows=[BigQueryBatchExportWorkflow], + activities=[ + start_batch_export_run, + insert_into_bigquery_activity, + finish_batch_export_run, + ], + workflow_runner=UnsandboxedWorkflowRunner(), + ): + await activity_environment.client.execute_workflow( + BigQueryBatchExportWorkflow.run, + inputs, + id=workflow_id, + task_queue=settings.TEMPORAL_TASK_QUEUE, + retry_policy=RetryPolicy(maximum_attempts=1), + execution_timeout=dt.timedelta(seconds=10), + ) + + runs = await afetch_batch_export_runs(batch_export_id=bigquery_batch_export.id) + assert len(runs) == 1 + + run = runs[0] + assert run.status == "Completed" + assert run.records_completed == 0 + + async def test_bigquery_export_workflow_handles_insert_activity_errors(ateam, bigquery_batch_export, interval): """Test that BigQuery Export Workflow can gracefully handle errors when inserting BigQuery data.""" data_interval_end = dt.datetime.fromisoformat("2023-04-25T14:30:00.000000+00:00") @@ -570,7 +628,6 @@ class RefreshError(Exception): assert run.status == "Failed" assert run.latest_error == "RefreshError: A useful error message" assert run.records_completed is None - assert run.records_total_count == 1 async def test_bigquery_export_workflow_handles_cancellation(ateam, bigquery_batch_export, interval): diff --git a/posthog/temporal/tests/batch_exports/test_http_batch_export_workflow.py b/posthog/temporal/tests/batch_exports/test_http_batch_export_workflow.py index 7b7e2b566743f..4dfb8563ff943 100644 --- a/posthog/temporal/tests/batch_exports/test_http_batch_export_workflow.py +++ b/posthog/temporal/tests/batch_exports/test_http_batch_export_workflow.py @@ -433,7 +433,6 @@ async def insert_into_http_activity_mocked(_: HttpInsertInputs) -> str: assert run.status == "FailedRetryable" assert run.latest_error == "ValueError: A useful error message" assert run.records_completed is None - assert run.records_total_count == 1 async def test_http_export_workflow_handles_insert_activity_non_retryable_errors(ateam, http_batch_export, interval): @@ -484,7 +483,6 @@ class NonRetryableResponseError(Exception): assert run.status == "Failed" assert run.latest_error == "NonRetryableResponseError: A useful error message" assert run.records_completed is None - assert run.records_total_count == 1 async def test_http_export_workflow_handles_cancellation(ateam, http_batch_export, interval): diff --git a/posthog/temporal/tests/batch_exports/test_postgres_batch_export_workflow.py b/posthog/temporal/tests/batch_exports/test_postgres_batch_export_workflow.py index 14d190d0bbe35..5dedb8c8c0faf 100644 --- a/posthog/temporal/tests/batch_exports/test_postgres_batch_export_workflow.py +++ b/posthog/temporal/tests/batch_exports/test_postgres_batch_export_workflow.py @@ -387,7 +387,6 @@ async def test_postgres_export_workflow( run = runs[0] assert run.status == "Completed" assert run.records_completed == 100 - assert run.records_total_count == 100 await assert_clickhouse_records_in_postgres( postgres_connection=postgres_connection, @@ -402,6 +401,66 @@ async def test_postgres_export_workflow( ) +@pytest.mark.parametrize("interval", ["hour"], indirect=True) +@pytest.mark.parametrize("exclude_events", [None], indirect=True) +@pytest.mark.parametrize("batch_export_schema", TEST_SCHEMAS) +async def test_postgres_export_workflow_without_events( + clickhouse_client, + postgres_config, + postgres_connection, + postgres_batch_export, + interval, + exclude_events, + ateam, + table_name, + batch_export_schema, +): + """Test Postgres Export Workflow end-to-end without any events to export. + + The workflow should update the batch export run status to completed and set 0 as `records_completed`. + """ + data_interval_end = dt.datetime.fromisoformat("2023-04-25T14:30:00.000000+00:00") + + workflow_id = str(uuid.uuid4()) + inputs = PostgresBatchExportInputs( + team_id=ateam.pk, + batch_export_id=str(postgres_batch_export.id), + data_interval_end=data_interval_end.isoformat(), + interval=interval, + batch_export_schema=batch_export_schema, + **postgres_batch_export.destination.config, + ) + + async with await WorkflowEnvironment.start_time_skipping() as activity_environment: + async with Worker( + activity_environment.client, + task_queue=settings.TEMPORAL_TASK_QUEUE, + workflows=[PostgresBatchExportWorkflow], + activities=[ + start_batch_export_run, + insert_into_postgres_activity, + finish_batch_export_run, + ], + workflow_runner=UnsandboxedWorkflowRunner(), + ): + with override_settings(BATCH_EXPORT_POSTGRES_UPLOAD_CHUNK_SIZE_BYTES=5 * 1024**2): + await activity_environment.client.execute_workflow( + PostgresBatchExportWorkflow.run, + inputs, + id=workflow_id, + task_queue=settings.TEMPORAL_TASK_QUEUE, + retry_policy=RetryPolicy(maximum_attempts=1), + execution_timeout=dt.timedelta(seconds=10), + ) + + runs = await afetch_batch_export_runs(batch_export_id=postgres_batch_export.id) + assert len(runs) == 1 + + run = runs[0] + assert run.status == "Completed" + assert run.records_completed == 0 + + async def test_postgres_export_workflow_handles_insert_activity_errors(ateam, postgres_batch_export, interval): """Test that Postgres Export Workflow can gracefully handle errors when inserting Postgres data.""" data_interval_end = dt.datetime.fromisoformat("2023-04-25T14:30:00.000000+00:00") @@ -447,7 +506,6 @@ async def insert_into_postgres_activity_mocked(_: PostgresInsertInputs) -> str: assert run.status == "FailedRetryable" assert run.latest_error == "ValueError: A useful error message" assert run.records_completed is None - assert run.records_total_count == 1 async def test_postgres_export_workflow_handles_insert_activity_non_retryable_errors( @@ -500,7 +558,6 @@ class InsufficientPrivilege(Exception): assert run.status == "Failed" assert run.latest_error == "InsufficientPrivilege: A useful error message" assert run.records_completed is None - assert run.records_total_count == 1 async def test_postgres_export_workflow_handles_cancellation(ateam, postgres_batch_export, interval): @@ -554,4 +611,3 @@ async def never_finish_activity(_: PostgresInsertInputs) -> str: assert run.status == "Cancelled" assert run.latest_error == "Cancelled" assert run.records_completed is None - assert run.records_total_count == 1 diff --git a/posthog/temporal/tests/batch_exports/test_redshift_batch_export_workflow.py b/posthog/temporal/tests/batch_exports/test_redshift_batch_export_workflow.py index c8d8f3e5faf93..3ddbb805c21c4 100644 --- a/posthog/temporal/tests/batch_exports/test_redshift_batch_export_workflow.py +++ b/posthog/temporal/tests/batch_exports/test_redshift_batch_export_workflow.py @@ -511,7 +511,6 @@ async def insert_into_redshift_activity_mocked(_: RedshiftInsertInputs) -> str: assert run.status == "FailedRetryable" assert run.latest_error == "ValueError: A useful error message" assert run.records_completed is None - assert run.records_total_count == 1 async def test_redshift_export_workflow_handles_insert_activity_non_retryable_errors( @@ -564,4 +563,3 @@ class InsufficientPrivilege(Exception): assert run.status == "Failed" assert run.latest_error == "InsufficientPrivilege: A useful error message" assert run.records_completed is None - assert run.records_total_count == 1 diff --git a/posthog/temporal/tests/batch_exports/test_run_updates.py b/posthog/temporal/tests/batch_exports/test_run_updates.py index 4a0e26b0741b6..1e50e13325b82 100644 --- a/posthog/temporal/tests/batch_exports/test_run_updates.py +++ b/posthog/temporal/tests/batch_exports/test_run_updates.py @@ -95,7 +95,7 @@ async def test_start_batch_export_run(activity_environment, team, batch_export): data_interval_end=end.isoformat(), ) - run_id, records_total_count = await activity_environment.run(start_batch_export_run, inputs) + run_id = await activity_environment.run(start_batch_export_run, inputs) runs = BatchExportRun.objects.filter(id=run_id) assert await sync_to_async(runs.exists)() # type:ignore @@ -104,7 +104,6 @@ async def test_start_batch_export_run(activity_environment, team, batch_export): assert run is not None assert run.data_interval_start == start assert run.data_interval_end == end - assert run.records_total_count == records_total_count @pytest.mark.django_db(transaction=True) @@ -121,13 +120,12 @@ async def test_finish_batch_export_run(activity_environment, team, batch_export) data_interval_end=end.isoformat(), ) - run_id, records_total_count = await activity_environment.run(start_batch_export_run, inputs) + run_id = await activity_environment.run(start_batch_export_run, inputs) runs = BatchExportRun.objects.filter(id=run_id) run = await sync_to_async(runs.first)() # type:ignore assert run is not None assert run.status == "Starting" - assert run.records_total_count == records_total_count finish_inputs = FinishBatchExportRunInputs( id=str(run_id), @@ -141,7 +139,6 @@ async def test_finish_batch_export_run(activity_environment, team, batch_export) run = await sync_to_async(runs.first)() # type:ignore assert run is not None assert run.status == "Completed" - assert run.records_total_count == records_total_count @pytest.mark.django_db(transaction=True) @@ -162,7 +159,7 @@ async def test_finish_batch_export_run_pauses_if_reaching_failure_threshold(acti failure_threshold = 10 for run_number in range(1, failure_threshold * 2): - run_id, _ = await activity_environment.run(start_batch_export_run, inputs) + run_id = await activity_environment.run(start_batch_export_run, inputs) finish_inputs = FinishBatchExportRunInputs( id=str(run_id), @@ -199,7 +196,7 @@ async def test_finish_batch_export_run_never_pauses_with_small_check_window(acti batch_export_id = str(batch_export.id) failure_threshold = 1 - run_id, _ = await activity_environment.run(start_batch_export_run, inputs) + run_id = await activity_environment.run(start_batch_export_run, inputs) finish_inputs = FinishBatchExportRunInputs( id=str(run_id), diff --git a/posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py b/posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py index 5ef937ad7c962..77d9e6a5486bb 100644 --- a/posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py +++ b/posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py @@ -44,7 +44,6 @@ adelete_batch_export, afetch_batch_export_runs, ) - from posthog.temporal.tests.utils.s3 import read_parquet_from_s3, read_s3_data_as_json pytestmark = [pytest.mark.asyncio, pytest.mark.django_db] @@ -529,6 +528,70 @@ async def test_s3_export_workflow_with_minio_bucket( ) +@pytest.mark.parametrize("interval", ["hour"], indirect=True) +@pytest.mark.parametrize("compression", [None], indirect=True) +@pytest.mark.parametrize("exclude_events", [None], indirect=True) +@pytest.mark.parametrize("batch_export_schema", TEST_S3_SCHEMAS) +async def test_s3_export_workflow_with_minio_bucket_without_events( + clickhouse_client, + minio_client, + ateam, + s3_batch_export, + bucket_name, + interval, + compression, + exclude_events, + s3_key_prefix, + batch_export_schema, +): + """Test S3BatchExport Workflow end-to-end without any events to export. + + The workflow should update the batch export run status to completed and set 0 as `records_completed`. + """ + data_interval_end = dt.datetime.fromisoformat("2023-04-25T14:30:00.000000+00:00") + + workflow_id = str(uuid4()) + inputs = S3BatchExportInputs( + team_id=ateam.pk, + batch_export_id=str(s3_batch_export.id), + data_interval_end=data_interval_end.isoformat(), + interval=interval, + batch_export_schema=batch_export_schema, + **s3_batch_export.destination.config, + ) + + async with await WorkflowEnvironment.start_time_skipping() as activity_environment: + async with Worker( + activity_environment.client, + task_queue=settings.TEMPORAL_TASK_QUEUE, + workflows=[S3BatchExportWorkflow], + activities=[ + start_batch_export_run, + insert_into_s3_activity, + finish_batch_export_run, + ], + workflow_runner=UnsandboxedWorkflowRunner(), + ): + await activity_environment.client.execute_workflow( + S3BatchExportWorkflow.run, + inputs, + id=workflow_id, + task_queue=settings.TEMPORAL_TASK_QUEUE, + retry_policy=RetryPolicy(maximum_attempts=1), + execution_timeout=dt.timedelta(minutes=10), + ) + + runs = await afetch_batch_export_runs(batch_export_id=s3_batch_export.id) + assert len(runs) == 1 + + run = runs[0] + assert run.status == "Completed" + assert run.records_completed == 0 + + objects = await minio_client.list_objects_v2(Bucket=bucket_name, Prefix=s3_key_prefix) + assert len(objects.get("Contents", [])) == 0 + + @pytest_asyncio.fixture async def s3_client(bucket_name, s3_key_prefix): """Manage an S3 client to interact with an S3 bucket. @@ -813,7 +876,6 @@ async def test_s3_export_workflow_defaults_to_timestamp_on_null_inserted_at( run = runs[0] assert run.status == "Completed" assert run.records_completed == 100 - assert run.records_total_count == 100 await assert_clickhouse_records_in_s3( s3_compatible_client=minio_client, @@ -899,7 +961,6 @@ async def test_s3_export_workflow_with_minio_bucket_and_custom_key_prefix( run = runs[0] assert run.status == "Completed" assert run.records_completed == 100 - assert run.records_total_count == 100 expected_key_prefix = s3_key_prefix.format( table="events", @@ -976,7 +1037,6 @@ async def insert_into_s3_activity_mocked(_: S3InsertInputs) -> str: assert run.status == "FailedRetryable" assert run.latest_error == "ValueError: A useful error message" assert run.records_completed is None - assert run.records_total_count == 1 async def test_s3_export_workflow_handles_insert_activity_non_retryable_errors(ateam, s3_batch_export, interval): diff --git a/posthog/temporal/tests/batch_exports/test_snowflake_batch_export_workflow.py b/posthog/temporal/tests/batch_exports/test_snowflake_batch_export_workflow.py index 6652ac224b22a..857da4f3ce99f 100644 --- a/posthog/temporal/tests/batch_exports/test_snowflake_batch_export_workflow.py +++ b/posthog/temporal/tests/batch_exports/test_snowflake_batch_export_workflow.py @@ -275,7 +275,7 @@ def query_request_handler(request: PreparedRequest): "rowset": rowset, "total": 1, "returned": 1, - "queryId": "query-id", + "queryId": str(uuid4()), "queryResultFormat": "json", }, } @@ -463,7 +463,7 @@ async def test_snowflake_export_workflow_exports_events( @pytest.mark.parametrize("interval", ["hour", "day"], indirect=True) -async def test_snowflake_export_workflow_without_events(ateam, snowflake_batch_export, interval): +async def test_snowflake_export_workflow_without_events(ateam, snowflake_batch_export, interval, truncate_events): workflow_id = str(uuid4()) inputs = SnowflakeBatchExportInputs( team_id=ateam.pk, @@ -704,7 +704,6 @@ async def insert_into_snowflake_activity_mocked(_: SnowflakeInsertInputs) -> str assert run.status == "FailedRetryable" assert run.latest_error == "ValueError: A useful error message" assert run.records_completed is None - assert run.records_total_count == 1 async def test_snowflake_export_workflow_handles_insert_activity_non_retryable_errors(ateam, snowflake_batch_export): @@ -752,7 +751,6 @@ class ForbiddenError(Exception): assert run.status == "Failed" assert run.latest_error == "ForbiddenError: A useful error message" assert run.records_completed is None - assert run.records_total_count == 1 async def test_snowflake_export_workflow_handles_cancellation_mocked(ateam, snowflake_batch_export): diff --git a/posthog/temporal/tests/batch_exports/utils.py b/posthog/temporal/tests/batch_exports/utils.py index 7c7140983bc7f..2c48c26248dc9 100644 --- a/posthog/temporal/tests/batch_exports/utils.py +++ b/posthog/temporal/tests/batch_exports/utils.py @@ -9,7 +9,7 @@ @activity.defn(name="start_batch_export_run") -async def mocked_start_batch_export_run(inputs: StartBatchExportRunInputs) -> tuple[str, int]: +async def mocked_start_batch_export_run(inputs: StartBatchExportRunInputs) -> str: """Create a run and return some count >0 to avoid early return.""" run = await sync_to_async(create_batch_export_run)( batch_export_id=uuid.UUID(inputs.batch_export_id), @@ -19,4 +19,4 @@ async def mocked_start_batch_export_run(inputs: StartBatchExportRunInputs) -> tu records_total_count=1, ) - return str(run.id), 1 + return str(run.id)