diff --git a/posthog/temporal/workflows/batch_exports.py b/posthog/temporal/workflows/batch_exports.py index 6a2ecb90d3704..b003578a575ce 100644 --- a/posthog/temporal/workflows/batch_exports.py +++ b/posthog/temporal/workflows/batch_exports.py @@ -486,7 +486,6 @@ async def create_export_run(inputs: CreateBatchExportRunInputs) -> str: Intended to be used in all export workflows, usually at the start, to create a model instance to represent them in our database. """ - get_export_started_metric().add(1) logger = await bind_batch_exports_logger(team_id=inputs.team_id) logger.info( "Creating batch export for range %s - %s", @@ -519,8 +518,6 @@ class UpdateBatchExportRunStatusInputs: @activity.defn async def update_export_run_status(inputs: UpdateBatchExportRunStatusInputs): """Activity that updates the status of an BatchExportRun.""" - get_export_finished_metric(status=inputs.status.lower()).add(1) - logger = await bind_batch_exports_logger(team_id=inputs.team_id) batch_export_run = await sync_to_async(update_batch_export_run_status)( @@ -637,6 +634,7 @@ async def execute_batch_export_insert_activity( initial_retry_interval_seconds: When retrying, seconds until the first retry. maximum_retry_interval_seconds: Maximum interval in seconds between retries. """ + get_export_started_metric().add(1) retry_policy = RetryPolicy( initial_interval=dt.timedelta(seconds=initial_retry_interval_seconds), maximum_interval=dt.timedelta(seconds=maximum_retry_interval_seconds), @@ -667,6 +665,7 @@ async def execute_batch_export_insert_activity( raise finally: + get_export_finished_metric(status=update_inputs.status.lower()).add(1) await workflow.execute_activity( update_export_run_status, update_inputs, diff --git a/posthog/temporal/workflows/metrics.py b/posthog/temporal/workflows/metrics.py index 370690f328de5..8de87df38aad4 100644 --- a/posthog/temporal/workflows/metrics.py +++ b/posthog/temporal/workflows/metrics.py @@ -1,4 +1,4 @@ -from temporalio import activity +from temporalio import activity, workflow from temporalio.common import MetricCounter @@ -11,12 +11,12 @@ def get_bytes_exported_metric() -> MetricCounter: def get_export_started_metric() -> MetricCounter: - return activity.metric_meter().create_counter("batch_export_started", "Number of batch exports started.") + return workflow.metric_meter().create_counter("batch_export_started", "Number of batch exports started.") def get_export_finished_metric(status: str) -> MetricCounter: return ( - activity.metric_meter() + workflow.metric_meter() .with_additional_attributes({"status": status}) .create_counter( "batch_export_finished", "Number of batch exports finished, for any reason (including failure)."