Skip to content

Commit

Permalink
feat(batch-export-backfills): Wire up backfills to API
Browse files Browse the repository at this point in the history
  • Loading branch information
tomasfarias committed Oct 13, 2023
1 parent 6880f05 commit 2faa175
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 29 deletions.
4 changes: 3 additions & 1 deletion posthog/batch_exports/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,9 +248,11 @@ def backfill(self, request: request.Request, *args, **kwargs) -> response.Respon
if start_at >= end_at:
raise ValidationError("The initial backfill datetime 'start_at' happens after 'end_at'")

team_id = request.user.current_team.id

batch_export = self.get_object()
temporal = sync_connect()
backfill_export(temporal, str(batch_export.pk), start_at, end_at)
backfill_export(temporal, str(batch_export.pk), team_id, start_at, end_at)

return response.Response()

Expand Down
48 changes: 36 additions & 12 deletions posthog/batch_exports/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
Client,
Schedule,
ScheduleActionStartWorkflow,
ScheduleBackfill,
ScheduleIntervalSpec,
ScheduleOverlapPolicy,
SchedulePolicy,
Expand Down Expand Up @@ -250,38 +249,63 @@ async def describe_schedule(temporal: Client, schedule_id: str):
return await handle.describe()


@dataclass
class BackfillBatchExportInputs:
"""Inputs for the BackfillBatchExport Workflow."""

team_id: int
batch_export_id: str
start_at: str
end_at: str
buffer_limit: int = 1
wait_delay: float = 5.0


def backfill_export(
temporal: Client,
batch_export_id: str,
team_id: int,
start_at: dt.datetime,
end_at: dt.datetime,
overlap: ScheduleOverlapPolicy = ScheduleOverlapPolicy.BUFFER_ALL,
):
"""Creates an export run for the given BatchExport, and specified time range.
) -> None:
"""Starts a backfill for given team and batch export covering given date range.
Arguments:
temporal: A Temporal Client to trigger the workflow.
batch_export_id: The id of the BatchExport to backfill.
team_id: The id of the Team the BatchExport belongs to.
start_at: From when to backfill.
end_at: Up to when to backfill.
"""
try:
BatchExport.objects.get(id=batch_export_id)
BatchExport.objects.get(id=batch_export_id, team_id=team_id)
except BatchExport.DoesNotExist:
raise BatchExportIdError(batch_export_id)

schedule_backfill = ScheduleBackfill(start_at=start_at, end_at=end_at, overlap=overlap)
backfill_schedule(temporal=temporal, schedule_id=batch_export_id, schedule_backfill=schedule_backfill)
inputs = BackfillBatchExportInputs(
batch_export_id=batch_export_id,
team_id=team_id,
start_at=start_at.isoformat(),
end_at=end_at.isoformat(),
)
start_backfill_batch_export_workflow(temporal, inputs=inputs)


@async_to_sync
async def backfill_schedule(temporal: Client, schedule_id: str, schedule_backfill: ScheduleBackfill):
"""Async call the Temporal client to execute a backfill on the given schedule."""
handle = temporal.get_schedule_handle(schedule_id)
async def start_backfill_batch_export_workflow(temporal: Client, inputs: BackfillBatchExportInputs) -> None:
"""Async call to start a BackfillBatchExportWorkflow."""
handle = temporal.get_schedule_handle(inputs.batch_export_id)
description = await handle.describe()

if description.schedule.spec.jitter is not None:
schedule_backfill.end_at += description.schedule.spec.jitter
# Adjust end_at to account for jitter if present.
inputs.end_at = (dt.datetime.fromisoformat(inputs.end_at) + description.schedule.spec.jitter).isoformat()

await handle.backfill(schedule_backfill)
await temporal.start_workflow(
"backfill-batch-export",
inputs,
task_queue=settings.TEMPORAL_TASK_QUEUE,
)


def create_batch_export_run(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ async def test_backfill_batch_export_workflow(temporal_worker, temporal_schedule
workflow_id = str(uuid.uuid4())
inputs = BackfillBatchExportInputs(
team_id=1,
schedule_id=desc.id,
batch_export_id=desc.id,
start_at=start_at.isoformat(),
end_at=end_at.isoformat(),
buffer_limit=2,
Expand Down
8 changes: 8 additions & 0 deletions posthog/temporal/workflows/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
from typing import Callable, Sequence

from posthog.temporal.workflows.backfill_batch_export import (
BackfillBatchExportWorkflow,
backfill_schedule,
get_schedule_frequency,
)
from posthog.temporal.workflows.batch_exports import (
create_export_run,
update_export_run_status,
Expand All @@ -24,6 +29,7 @@
from posthog.temporal.workflows.squash_person_overrides import *

WORKFLOWS = [
BackfillBatchExportWorkflow,
BigQueryBatchExportWorkflow,
NoOpWorkflow,
PostgresBatchExportWorkflow,
Expand All @@ -33,10 +39,12 @@
]

ACTIVITIES: Sequence[Callable] = [
backfill_schedule,
create_export_run,
delete_squashed_person_overrides_from_clickhouse,
delete_squashed_person_overrides_from_postgres,
drop_dictionary,
get_schedule_frequency,
insert_into_bigquery_activity,
insert_into_postgres_activity,
insert_into_s3_activity,
Expand Down
19 changes: 4 additions & 15 deletions posthog/temporal/workflows/backfill_batch_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import temporalio.workflow
from django.conf import settings

from posthog.batch_exports.service import BackfillBatchExportInputs
from posthog.temporal.client import connect
from posthog.temporal.workflows.base import PostHogWorkflow
from posthog.temporal.workflows.batch_exports import (
Expand Down Expand Up @@ -254,18 +255,6 @@ def backfill_range(
current = current_end


@dataclasses.dataclass
class BackfillBatchExportInputs:
"""Inputs for the BackfillBatchExport Workflow."""

team_id: int
schedule_id: str
start_at: str
end_at: str
buffer_limit: int = 1
wait_delay: float = 5.0


@temporalio.workflow.defn(name="backfill-batch-export")
class BackfillBatchExportWorkflow(PostHogWorkflow):
"""A Temporal Workflow to manage a backfill of a batch export.
Expand All @@ -291,7 +280,7 @@ async def run(self, inputs: BackfillBatchExportInputs) -> None:

create_batch_export_backfill_inputs = CreateBatchExportBackfillInputs(
team_id=inputs.team_id,
batch_export_id=inputs.schedule_id,
batch_export_id=inputs.batch_export_id,
start_at=inputs.start_at,
end_at=inputs.end_at,
status="Running",
Expand All @@ -312,7 +301,7 @@ async def run(self, inputs: BackfillBatchExportInputs) -> None:

frequency_seconds = await temporalio.workflow.execute_activity(
get_schedule_frequency,
inputs.schedule_id,
inputs.batch_export_id,
start_to_close_timeout=dt.timedelta(minutes=1),
retry_policy=temporalio.common.RetryPolicy(maximum_attempts=0),
)
Expand All @@ -321,7 +310,7 @@ async def run(self, inputs: BackfillBatchExportInputs) -> None:
number_of_expected_runs = backfill_duration / dt.timedelta(seconds=frequency_seconds)

backfill_schedule_inputs = BackfillScheduleInputs(
schedule_id=inputs.schedule_id,
schedule_id=inputs.batch_export_id,
start_at=inputs.start_at,
end_at=inputs.end_at,
frequency_seconds=frequency_seconds,
Expand Down

0 comments on commit 2faa175

Please sign in to comment.