Skip to content

Commit

Permalink
chore: Track live batch exports
Browse files Browse the repository at this point in the history
  • Loading branch information
tomasfarias committed Nov 22, 2023
1 parent d78450e commit 48df2b9
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 2 deletions.
18 changes: 18 additions & 0 deletions posthog/batch_exports/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import structlog
from django.db import transaction
from django.utils.timezone import now
from prometheus_client import Gauge
from rest_framework import mixins, request, response, serializers, viewsets
from rest_framework.decorators import action
from rest_framework.exceptions import (
Expand Down Expand Up @@ -36,6 +37,7 @@
sync_batch_export,
unpause_batch_export,
)
from posthog.metrics import LABEL_TEAM_ID
from posthog.models import (
BatchExport,
BatchExportBackfill,
Expand All @@ -53,6 +55,14 @@

logger = structlog.get_logger(__name__)

LABEL_INTERVAL = "interval"

BATCH_EXPORTS_LIVE_GAUGE = Gauge(
"batch_exports_live",
"Track batch exports that are live (i.e. unpaused) by teams by interval.",
labelnames=[LABEL_TEAM_ID, LABEL_INTERVAL],
)


def validate_date_input(date_input: Any) -> dt.datetime:
"""Parse any datetime input as a proper dt.datetime.
Expand Down Expand Up @@ -206,6 +216,8 @@ def create(self, validated_data: dict) -> BatchExport:
destination.save()
batch_export.save()

BATCH_EXPORTS_LIVE_GAUGE.labels(team_id=team_id, interval=batch_export.interval).inc()

return batch_export

def update(self, batch_export: BatchExport, validated_data: dict) -> BatchExport:
Expand Down Expand Up @@ -296,6 +308,8 @@ def pause(self, request: request.Request, *args, **kwargs) -> response.Response:
except BatchExportServiceError:
raise

BATCH_EXPORTS_LIVE_GAUGE.labels(team_id=team_id, interval=batch_export.interval).dec()

return response.Response({"paused": True})

@action(methods=["POST"], detail=True)
Expand All @@ -321,6 +335,8 @@ def unpause(self, request: request.Request, *args, **kwargs) -> response.Respons
except BatchExportServiceError:
raise

BATCH_EXPORTS_LIVE_GAUGE.labels(team_id=team_id, interval=batch_export.interval).inc()

return response.Response({"paused": False})

def perform_destroy(self, instance: BatchExport):
Expand All @@ -346,6 +362,8 @@ def perform_destroy(self, instance: BatchExport):
if backfill.status == BatchExportBackfill.Status.RUNNING:
cancel_running_batch_export_backfill(temporal, backfill.workflow_id)

BATCH_EXPORTS_LIVE_GAUGE.labels(team_id=instance.team.pk, interval=instance.interval).dec()


class BatchExportLogEntrySerializer(DataclassSerializer):
class Meta:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ async def test_bigquery_export_workflow(
team_id=ateam.pk,
start_time=data_interval_start,
end_time=data_interval_end,
count=100,
count=100000,
count_outside_range=10,
count_other_team=10,
duplicate=True,
Expand Down Expand Up @@ -340,7 +340,7 @@ async def test_bigquery_export_workflow(
id=workflow_id,
task_queue=settings.TEMPORAL_TASK_QUEUE,
retry_policy=RetryPolicy(maximum_attempts=1),
execution_timeout=dt.timedelta(seconds=10),
execution_timeout=dt.timedelta(seconds=240),
)

runs = await afetch_batch_export_runs(batch_export_id=bigquery_batch_export.id)
Expand Down

0 comments on commit 48df2b9

Please sign in to comment.