diff --git a/posthog/batch_exports/http.py b/posthog/batch_exports/http.py index cd8e24aca5cd64..06fb9866ac0e9c 100644 --- a/posthog/batch_exports/http.py +++ b/posthog/batch_exports/http.py @@ -248,9 +248,11 @@ def backfill(self, request: request.Request, *args, **kwargs) -> response.Respon if start_at >= end_at: raise ValidationError("The initial backfill datetime 'start_at' happens after 'end_at'") + team_id = request.user.current_team.id + batch_export = self.get_object() temporal = sync_connect() - backfill_export(temporal, str(batch_export.pk), start_at, end_at) + backfill_export(temporal, str(batch_export.pk), team_id, start_at, end_at) return response.Response() diff --git a/posthog/batch_exports/service.py b/posthog/batch_exports/service.py index 0ad0ae31a843e3..997ca0fbf61501 100644 --- a/posthog/batch_exports/service.py +++ b/posthog/batch_exports/service.py @@ -8,7 +8,6 @@ Client, Schedule, ScheduleActionStartWorkflow, - ScheduleBackfill, ScheduleIntervalSpec, ScheduleOverlapPolicy, SchedulePolicy, @@ -250,38 +249,63 @@ async def describe_schedule(temporal: Client, schedule_id: str): return await handle.describe() +@dataclass +class BackfillBatchExportInputs: + """Inputs for the BackfillBatchExport Workflow.""" + + team_id: int + batch_export_id: str + start_at: str + end_at: str + buffer_limit: int = 1 + wait_delay: float = 5.0 + + def backfill_export( temporal: Client, batch_export_id: str, + team_id: int, start_at: dt.datetime, end_at: dt.datetime, - overlap: ScheduleOverlapPolicy = ScheduleOverlapPolicy.BUFFER_ALL, -): - """Creates an export run for the given BatchExport, and specified time range. +) -> None: + """Starts a backfill for given team and batch export covering given date range. Arguments: + temporal: A Temporal Client to trigger the workflow. + batch_export_id: The id of the BatchExport to backfill. + team_id: The id of the Team the BatchExport belongs to. start_at: From when to backfill. end_at: Up to when to backfill. """ try: - BatchExport.objects.get(id=batch_export_id) + BatchExport.objects.get(id=batch_export_id, team_id=team_id) except BatchExport.DoesNotExist: raise BatchExportIdError(batch_export_id) - schedule_backfill = ScheduleBackfill(start_at=start_at, end_at=end_at, overlap=overlap) - backfill_schedule(temporal=temporal, schedule_id=batch_export_id, schedule_backfill=schedule_backfill) + inputs = BackfillBatchExportInputs( + batch_export_id=batch_export_id, + team_id=team_id, + start_at=start_at.isoformat(), + end_at=end_at.isoformat(), + ) + start_backfill_batch_export_workflow(temporal, inputs=inputs) @async_to_sync -async def backfill_schedule(temporal: Client, schedule_id: str, schedule_backfill: ScheduleBackfill): - """Async call the Temporal client to execute a backfill on the given schedule.""" - handle = temporal.get_schedule_handle(schedule_id) +async def start_backfill_batch_export_workflow(temporal: Client, inputs: BackfillBatchExportInputs) -> None: + """Async call to start a BackfillBatchExportWorkflow.""" + handle = temporal.get_schedule_handle(inputs.batch_export_id) description = await handle.describe() if description.schedule.spec.jitter is not None: - schedule_backfill.end_at += description.schedule.spec.jitter + # Adjust end_at to account for jitter if present. + inputs.end_at = (dt.datetime.fromisoformat(inputs.end_at) + description.schedule.spec.jitter).isoformat() - await handle.backfill(schedule_backfill) + await temporal.start_workflow( + "backfill-batch-export", + inputs, + task_queue=settings.TEMPORAL_TASK_QUEUE, + ) def create_batch_export_run( diff --git a/posthog/temporal/workflows/__init__.py b/posthog/temporal/workflows/__init__.py index 2e4224caf758be..18f63e3b14d94e 100644 --- a/posthog/temporal/workflows/__init__.py +++ b/posthog/temporal/workflows/__init__.py @@ -1,5 +1,10 @@ from typing import Callable, Sequence +from posthog.temporal.workflows.backfill_batch_export import ( + BackfillBatchExportWorkflow, + backfill_schedule, + get_schedule_frequency, +) from posthog.temporal.workflows.batch_exports import ( create_export_run, update_export_run_status, @@ -24,6 +29,7 @@ from posthog.temporal.workflows.squash_person_overrides import * WORKFLOWS = [ + BackfillBatchExportWorkflow, BigQueryBatchExportWorkflow, NoOpWorkflow, PostgresBatchExportWorkflow, @@ -33,10 +39,12 @@ ] ACTIVITIES: Sequence[Callable] = [ + backfill_schedule, create_export_run, delete_squashed_person_overrides_from_clickhouse, delete_squashed_person_overrides_from_postgres, drop_dictionary, + get_schedule_frequency, insert_into_bigquery_activity, insert_into_postgres_activity, insert_into_s3_activity, diff --git a/posthog/temporal/workflows/backfill_batch_export.py b/posthog/temporal/workflows/backfill_batch_export.py index 3eab4116c4eea9..d18393b5d795a5 100644 --- a/posthog/temporal/workflows/backfill_batch_export.py +++ b/posthog/temporal/workflows/backfill_batch_export.py @@ -13,6 +13,7 @@ import temporalio.workflow from django.conf import settings +from posthog.batch_exports.service import BackfillBatchExportInputs from posthog.temporal.client import connect from posthog.temporal.workflows.base import PostHogWorkflow from posthog.temporal.workflows.batch_exports import ( @@ -254,18 +255,6 @@ def backfill_range( current = current_end -@dataclasses.dataclass -class BackfillBatchExportInputs: - """Inputs for the BackfillBatchExport Workflow.""" - - team_id: int - schedule_id: str - start_at: str - end_at: str - buffer_limit: int = 1 - wait_delay: float = 5.0 - - @temporalio.workflow.defn(name="backfill-batch-export") class BackfillBatchExportWorkflow(PostHogWorkflow): """A Temporal Workflow to manage a backfill of a batch export. @@ -291,7 +280,7 @@ async def run(self, inputs: BackfillBatchExportInputs) -> None: create_batch_export_backfill_inputs = CreateBatchExportBackfillInputs( team_id=inputs.team_id, - batch_export_id=inputs.schedule_id, + batch_export_id=inputs.batch_export_id, start_at=inputs.start_at, end_at=inputs.end_at, status="Running", @@ -312,7 +301,7 @@ async def run(self, inputs: BackfillBatchExportInputs) -> None: frequency_seconds = await temporalio.workflow.execute_activity( get_schedule_frequency, - inputs.schedule_id, + inputs.batch_export_id, start_to_close_timeout=dt.timedelta(minutes=1), retry_policy=temporalio.common.RetryPolicy(maximum_attempts=0), ) @@ -321,7 +310,7 @@ async def run(self, inputs: BackfillBatchExportInputs) -> None: number_of_expected_runs = backfill_duration / dt.timedelta(seconds=frequency_seconds) backfill_schedule_inputs = BackfillScheduleInputs( - schedule_id=inputs.schedule_id, + schedule_id=inputs.batch_export_id, start_at=inputs.start_at, end_at=inputs.end_at, frequency_seconds=frequency_seconds,