Skip to content

Commit

Permalink
chore: Track live batch exports
Browse files Browse the repository at this point in the history
  • Loading branch information
tomasfarias committed Nov 22, 2023
1 parent d78450e commit 6eafac4
Showing 1 changed file with 18 additions and 0 deletions.
18 changes: 18 additions & 0 deletions posthog/batch_exports/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import structlog
from django.db import transaction
from django.utils.timezone import now
from prometheus_client import Gauge
from rest_framework import mixins, request, response, serializers, viewsets
from rest_framework.decorators import action
from rest_framework.exceptions import (
Expand Down Expand Up @@ -36,6 +37,7 @@
sync_batch_export,
unpause_batch_export,
)
from posthog.metrics import LABEL_TEAM_ID
from posthog.models import (
BatchExport,
BatchExportBackfill,
Expand All @@ -53,6 +55,14 @@

logger = structlog.get_logger(__name__)

LABEL_INTERVAL = "interval"

BATCH_EXPORTS_LIVE_GAUGE = Gauge(
"batch_exports_unpaused",
"Track batch exports that are live (i.e. unpaused) by teams by interval.",
labelnames=[LABEL_TEAM_ID, LABEL_INTERVAL],
)


def validate_date_input(date_input: Any) -> dt.datetime:
"""Parse any datetime input as a proper dt.datetime.
Expand Down Expand Up @@ -206,6 +216,8 @@ def create(self, validated_data: dict) -> BatchExport:
destination.save()
batch_export.save()

BATCH_EXPORTS_LIVE_GAUGE.labels(team_id=team_id, interval=batch_export.interval).inc()

return batch_export

def update(self, batch_export: BatchExport, validated_data: dict) -> BatchExport:
Expand Down Expand Up @@ -296,6 +308,8 @@ def pause(self, request: request.Request, *args, **kwargs) -> response.Response:
except BatchExportServiceError:
raise

BATCH_EXPORTS_LIVE_GAUGE.labels(team_id=team_id, interval=batch_export.interval).dec()

return response.Response({"paused": True})

@action(methods=["POST"], detail=True)
Expand All @@ -321,6 +335,8 @@ def unpause(self, request: request.Request, *args, **kwargs) -> response.Respons
except BatchExportServiceError:
raise

BATCH_EXPORTS_LIVE_GAUGE.labels(team_id=team_id, interval=batch_export.interval).inc()

return response.Response({"paused": False})

def perform_destroy(self, instance: BatchExport):
Expand All @@ -346,6 +362,8 @@ def perform_destroy(self, instance: BatchExport):
if backfill.status == BatchExportBackfill.Status.RUNNING:
cancel_running_batch_export_backfill(temporal, backfill.workflow_id)

BATCH_EXPORTS_LIVE_GAUGE.labels(team_id=instance.team.pk, interval=instance.interval).dec()


class BatchExportLogEntrySerializer(DataclassSerializer):
class Meta:
Expand Down

0 comments on commit 6eafac4

Please sign in to comment.