diff --git a/posthog/batch_exports/http.py b/posthog/batch_exports/http.py index cef17ab628f32..3455366057ca9 100644 --- a/posthog/batch_exports/http.py +++ b/posthog/batch_exports/http.py @@ -5,6 +5,7 @@ import structlog from django.db import transaction from django.utils.timezone import now +from prometheus_client import Gauge from rest_framework import mixins, request, response, serializers, viewsets from rest_framework.decorators import action from rest_framework.exceptions import ( @@ -36,6 +37,7 @@ sync_batch_export, unpause_batch_export, ) +from posthog.metrics import LABEL_TEAM_ID from posthog.models import ( BatchExport, BatchExportBackfill, @@ -53,6 +55,14 @@ logger = structlog.get_logger(__name__) +LABEL_INTERVAL = "interval" + +BATCH_EXPORTS_LIVE_GAUGE = Gauge( + "batch_exports_live", + "Track batch exports that are live (i.e. unpaused) by teams by interval.", + labelnames=[LABEL_TEAM_ID, LABEL_INTERVAL], +) + def validate_date_input(date_input: Any) -> dt.datetime: """Parse any datetime input as a proper dt.datetime. @@ -206,6 +216,8 @@ def create(self, validated_data: dict) -> BatchExport: destination.save() batch_export.save() + BATCH_EXPORTS_LIVE_GAUGE.labels(team_id=team_id, interval=batch_export.interval).inc() + return batch_export def update(self, batch_export: BatchExport, validated_data: dict) -> BatchExport: @@ -296,6 +308,8 @@ def pause(self, request: request.Request, *args, **kwargs) -> response.Response: except BatchExportServiceError: raise + BATCH_EXPORTS_LIVE_GAUGE.labels(team_id=team_id, interval=batch_export.interval).dec() + return response.Response({"paused": True}) @action(methods=["POST"], detail=True) @@ -321,6 +335,8 @@ def unpause(self, request: request.Request, *args, **kwargs) -> response.Respons except BatchExportServiceError: raise + BATCH_EXPORTS_LIVE_GAUGE.labels(team_id=team_id, interval=batch_export.interval).inc() + return response.Response({"paused": False}) def perform_destroy(self, instance: BatchExport): @@ -346,6 +362,8 @@ def perform_destroy(self, instance: BatchExport): if backfill.status == BatchExportBackfill.Status.RUNNING: cancel_running_batch_export_backfill(temporal, backfill.workflow_id) + BATCH_EXPORTS_LIVE_GAUGE.labels(team_id=instance.team.pk, interval=instance.interval).dec() + class BatchExportLogEntrySerializer(DataclassSerializer): class Meta: diff --git a/posthog/temporal/tests/batch_exports/test_bigquery_batch_export_workflow.py b/posthog/temporal/tests/batch_exports/test_bigquery_batch_export_workflow.py index d6696407c60e2..cb96ad225e3ba 100644 --- a/posthog/temporal/tests/batch_exports/test_bigquery_batch_export_workflow.py +++ b/posthog/temporal/tests/batch_exports/test_bigquery_batch_export_workflow.py @@ -291,7 +291,7 @@ async def test_bigquery_export_workflow( team_id=ateam.pk, start_time=data_interval_start, end_time=data_interval_end, - count=100, + count=100000, count_outside_range=10, count_other_team=10, duplicate=True, @@ -340,7 +340,7 @@ async def test_bigquery_export_workflow( id=workflow_id, task_queue=settings.TEMPORAL_TASK_QUEUE, retry_policy=RetryPolicy(maximum_attempts=1), - execution_timeout=dt.timedelta(seconds=10), + execution_timeout=dt.timedelta(seconds=240), ) runs = await afetch_batch_export_runs(batch_export_id=bigquery_batch_export.id)