Skip to content

Commit

Permalink
feat(batch-exports): Add backfill workflow (#17909)
Browse files Browse the repository at this point in the history
  • Loading branch information
tomasfarias authored Oct 19, 2023
1 parent 867887e commit a7ae3b0
Show file tree
Hide file tree
Showing 14 changed files with 955 additions and 36 deletions.
2 changes: 1 addition & 1 deletion latest_migrations.manifest
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ contenttypes: 0002_remove_content_type_name
ee: 0015_add_verified_properties
otp_static: 0002_throttling
otp_totp: 0002_auto_20190420_0723
posthog: 0354_organization_never_drop_data
posthog: 0355_add_batch_export_backfill_model
sessions: 0001_initial
social_django: 0010_uid_db_index
two_factor: 0007_auto_20201201_1019
4 changes: 3 additions & 1 deletion posthog/batch_exports/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,9 +248,11 @@ def backfill(self, request: request.Request, *args, **kwargs) -> response.Respon
if start_at >= end_at:
raise ValidationError("The initial backfill datetime 'start_at' happens after 'end_at'")

team_id = request.user.current_team.id

batch_export = self.get_object()
temporal = sync_connect()
backfill_export(temporal, str(batch_export.pk), start_at, end_at)
backfill_export(temporal, str(batch_export.pk), team_id, start_at, end_at)

return response.Response()

Expand Down
35 changes: 35 additions & 0 deletions posthog/batch_exports/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,14 @@ class Destination(models.TextChoices):
SNOWFLAKE = "Snowflake"
POSTGRES = "Postgres"
BIGQUERY = "BigQuery"
NOOP = "NoOp"

secret_fields = {
"S3": {"aws_access_key_id", "aws_secret_access_key"},
"Snowflake": set("password"),
"Postgres": set("password"),
"BigQuery": {"private_key", "private_key_id", "client_email", "token_uri"},
"NoOp": set(),
}

type: models.CharField = models.CharField(
Expand Down Expand Up @@ -225,3 +227,36 @@ def fetch_batch_export_log_entries(
return [
BatchExportLogEntry(*result) for result in typing.cast(list, sync_execute(clickhouse_query, clickhouse_kwargs))
]


class BatchExportBackfill(UUIDModel):
class Status(models.TextChoices):
"""Possible states of the BatchExportRun."""

CANCELLED = "Cancelled"
COMPLETED = "Completed"
CONTINUEDASNEW = "ContinuedAsNew"
FAILED = "Failed"
TERMINATED = "Terminated"
TIMEDOUT = "TimedOut"
RUNNING = "Running"
STARTING = "Starting"

team: models.ForeignKey = models.ForeignKey("Team", on_delete=models.CASCADE, help_text="The team this belongs to.")
batch_export = models.ForeignKey(
"BatchExport", on_delete=models.CASCADE, help_text="The BatchExport this backfill belongs to."
)
start_at: models.DateTimeField = models.DateTimeField(help_text="The start of the data interval.")
end_at: models.DateTimeField = models.DateTimeField(help_text="The end of the data interval.")
status: models.CharField = models.CharField(
choices=Status.choices, max_length=64, help_text="The status of this backfill."
)
created_at: models.DateTimeField = models.DateTimeField(
auto_now_add=True, help_text="The timestamp at which this BatchExportBackfill was created."
)
finished_at: models.DateTimeField = models.DateTimeField(
null=True, help_text="The timestamp at which this BatchExportBackfill finished, successfully or not."
)
last_updated_at: models.DateTimeField = models.DateTimeField(
auto_now=True, help_text="The timestamp at which this BatchExportBackfill was last updated."
)
110 changes: 95 additions & 15 deletions posthog/batch_exports/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
Client,
Schedule,
ScheduleActionStartWorkflow,
ScheduleBackfill,
ScheduleIntervalSpec,
ScheduleOverlapPolicy,
SchedulePolicy,
Expand All @@ -21,6 +20,7 @@
from posthog import settings
from posthog.batch_exports.models import (
BatchExport,
BatchExportBackfill,
BatchExportRun,
)
from posthog.temporal.client import sync_connect
Expand Down Expand Up @@ -121,11 +121,22 @@ class BigQueryBatchExportInputs:
include_events: list[str] | None = None


@dataclass
class NoOpInputs:
"""NoOp Workflow is used for testing, it takes a single argument to echo back."""

batch_export_id: str
team_id: int
interval: str = "hour"
arg: str = ""


DESTINATION_WORKFLOWS = {
"S3": ("s3-export", S3BatchExportInputs),
"Snowflake": ("snowflake-export", SnowflakeBatchExportInputs),
"Postgres": ("postgres-export", PostgresBatchExportInputs),
"BigQuery": ("bigquery-export", BigQueryBatchExportInputs),
"NoOp": ("no-op", NoOpInputs),
}


Expand Down Expand Up @@ -238,38 +249,64 @@ async def describe_schedule(temporal: Client, schedule_id: str):
return await handle.describe()


@dataclass
class BackfillBatchExportInputs:
"""Inputs for the BackfillBatchExport Workflow."""

team_id: int
batch_export_id: str
start_at: str
end_at: str
buffer_limit: int = 1
wait_delay: float = 5.0


def backfill_export(
temporal: Client,
batch_export_id: str,
team_id: int,
start_at: dt.datetime,
end_at: dt.datetime,
overlap: ScheduleOverlapPolicy = ScheduleOverlapPolicy.BUFFER_ALL,
):
"""Creates an export run for the given BatchExport, and specified time range.
) -> None:
"""Starts a backfill for given team and batch export covering given date range.
Arguments:
temporal: A Temporal Client to trigger the workflow.
batch_export_id: The id of the BatchExport to backfill.
team_id: The id of the Team the BatchExport belongs to.
start_at: From when to backfill.
end_at: Up to when to backfill.
"""
try:
BatchExport.objects.get(id=batch_export_id)
BatchExport.objects.get(id=batch_export_id, team_id=team_id)
except BatchExport.DoesNotExist:
raise BatchExportIdError(batch_export_id)

schedule_backfill = ScheduleBackfill(start_at=start_at, end_at=end_at, overlap=overlap)
backfill_schedule(temporal=temporal, schedule_id=batch_export_id, schedule_backfill=schedule_backfill)
inputs = BackfillBatchExportInputs(
batch_export_id=batch_export_id,
team_id=team_id,
start_at=start_at.isoformat(),
end_at=end_at.isoformat(),
)
start_backfill_batch_export_workflow(temporal, inputs=inputs)


@async_to_sync
async def backfill_schedule(temporal: Client, schedule_id: str, schedule_backfill: ScheduleBackfill):
"""Async call the Temporal client to execute a backfill on the given schedule."""
handle = temporal.get_schedule_handle(schedule_id)
async def start_backfill_batch_export_workflow(temporal: Client, inputs: BackfillBatchExportInputs) -> None:
"""Async call to start a BackfillBatchExportWorkflow."""
handle = temporal.get_schedule_handle(inputs.batch_export_id)
description = await handle.describe()

if description.schedule.spec.jitter is not None:
schedule_backfill.end_at += description.schedule.spec.jitter

await handle.backfill(schedule_backfill)
# Adjust end_at to account for jitter if present.
inputs.end_at = (dt.datetime.fromisoformat(inputs.end_at) + description.schedule.spec.jitter).isoformat()

await temporal.start_workflow(
"backfill-batch-export",
inputs,
id=f"{inputs.batch_export_id}-Backfill-{inputs.start_at}-{inputs.end_at}",
task_queue=settings.TEMPORAL_TASK_QUEUE,
)


def create_batch_export_run(
Expand All @@ -284,8 +321,10 @@ def create_batch_export_run(
as only the Workflows themselves can know when they start.
Args:
data_interval_start:
data_interval_end:
batch_export_id: The UUID of the BatchExport the BatchExportRun to create belongs to.
data_interval_start: The start of the period of data exported in this BatchExportRun.
data_interval_end: The end of the period of data exported in this BatchExportRun.
status: The initial status for the created BatchExportRun.
"""
run = BatchExportRun(
batch_export_id=batch_export_id,
Expand Down Expand Up @@ -372,3 +411,44 @@ async def updater(_: ScheduleUpdateInput) -> ScheduleUpdate:
return await handle.update(
updater=updater,
)


def create_batch_export_backfill(
batch_export_id: UUID,
team_id: int,
start_at: str,
end_at: str,
status: str = BatchExportRun.Status.RUNNING,
):
"""Create a BatchExportBackfill.
Args:
batch_export_id: The UUID of the BatchExport the BatchExportBackfill to create belongs to.
team_id: The id of the Team the BatchExportBackfill to create belongs to.
start_at: The start of the period to backfill in this BatchExportBackfill.
end_at: The end of the period to backfill in this BatchExportBackfill.
status: The initial status for the created BatchExportBackfill.
"""
backfill = BatchExportBackfill(
batch_export_id=batch_export_id,
status=status,
start_at=dt.datetime.fromisoformat(start_at),
end_at=dt.datetime.fromisoformat(end_at),
team_id=team_id,
)
backfill.save()

return backfill


def update_batch_export_backfill_status(backfill_id: UUID, status: str):
"""Update the status of an BatchExportBackfill with given id.
Arguments:
id: The id of the BatchExportBackfill to update.
status: The new status to assign to the BatchExportBackfill.
"""
updated = BatchExportBackfill.objects.filter(id=backfill_id).update(status=status)
if not updated:
raise ValueError(f"BatchExportBackfill with id {backfill_id} not found.")
4 changes: 3 additions & 1 deletion posthog/management/commands/create_batch_export_from_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,9 @@ def handle(self, *args, **options):
client = sync_connect()
end_at = dt.datetime.utcnow()
start_at = end_at - (dt.timedelta(hours=1) if interval == "hour" else dt.timedelta(days=1))
backfill_export(client, batch_export_id=str(batch_export.id), start_at=start_at, end_at=end_at)
backfill_export(
client, batch_export_id=str(batch_export.id), team_id=team_id, start_at=start_at, end_at=end_at
)
self.stdout.write(f"Triggered backfill for BatchExport '{name}'.")

self.stdout.write("Done!")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ async def list_workflows(temporal, schedule_id: str):
return workflows


@pytest.mark.django_db
@pytest.mark.django_db(transaction=True)
@pytest.mark.parametrize(
"interval,plugin_config",
[
Expand Down
98 changes: 98 additions & 0 deletions posthog/migrations/0355_add_batch_export_backfill_model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
# Generated by Django 3.2.19 on 2023-10-13 09:13

import django.db.models.deletion
from django.db import migrations, models

import posthog.models.utils


class Migration(migrations.Migration):
dependencies = [
("posthog", "0354_organization_never_drop_data"),
]

operations = [
migrations.AlterField(
model_name="batchexportdestination",
name="type",
field=models.CharField(
choices=[
("S3", "S3"),
("Snowflake", "Snowflake"),
("Postgres", "Postgres"),
("BigQuery", "Bigquery"),
("NoOp", "Noop"),
],
help_text="A choice of supported BatchExportDestination types.",
max_length=64,
),
),
migrations.CreateModel(
name="BatchExportBackfill",
fields=[
(
"id",
models.UUIDField(
default=posthog.models.utils.UUIDT, editable=False, primary_key=True, serialize=False
),
),
("start_at", models.DateTimeField(help_text="The start of the data interval.")),
("end_at", models.DateTimeField(help_text="The end of the data interval.")),
(
"status",
models.CharField(
choices=[
("Cancelled", "Cancelled"),
("Completed", "Completed"),
("ContinuedAsNew", "Continuedasnew"),
("Failed", "Failed"),
("Terminated", "Terminated"),
("TimedOut", "Timedout"),
("Running", "Running"),
("Starting", "Starting"),
],
help_text="The status of this backfill.",
max_length=64,
),
),
(
"created_at",
models.DateTimeField(
auto_now_add=True, help_text="The timestamp at which this BatchExportBackfill was created."
),
),
(
"finished_at",
models.DateTimeField(
help_text="The timestamp at which this BatchExportBackfill finished, successfully or not.",
null=True,
),
),
(
"last_updated_at",
models.DateTimeField(
auto_now=True, help_text="The timestamp at which this BatchExportBackfill was last updated."
),
),
(
"batch_export",
models.ForeignKey(
help_text="The BatchExport this backfill belongs to.",
on_delete=django.db.models.deletion.CASCADE,
to="posthog.batchexport",
),
),
(
"team",
models.ForeignKey(
help_text="The team this belongs to.",
on_delete=django.db.models.deletion.CASCADE,
to="posthog.team",
),
),
],
options={
"abstract": False,
},
),
]
12 changes: 12 additions & 0 deletions posthog/temporal/tests/batch_exports/base.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import datetime as dt
import json
import typing
import uuid

from asgiref.sync import sync_to_async

from ee.clickhouse.materialized_columns.columns import materialize
from posthog.batch_exports.models import BatchExportBackfill
from posthog.temporal.workflows.clickhouse import ClickHouseClient


Expand Down Expand Up @@ -79,3 +81,13 @@ def to_isoformat(d: str | None) -> str | None:
if d is None:
return None
return dt.datetime.fromisoformat(d).replace(tzinfo=dt.timezone.utc).isoformat()


def fetch_batch_export_backfills(batch_export_id: uuid.UUID, limit: int = 100) -> list[BatchExportBackfill]:
"""Fetch the BatchExportBackfills for a given BatchExport."""
return list(BatchExportBackfill.objects.filter(batch_export_id=batch_export_id).order_by("-created_at")[:limit])


async def afetch_batch_export_backfills(batch_export_id: uuid.UUID, limit: int = 100) -> list[BatchExportBackfill]:
"""Fetch the BatchExportBackfills for a given BatchExport."""
return await sync_to_async(fetch_batch_export_backfills)(batch_export_id, limit) # type: ignore
Loading

0 comments on commit a7ae3b0

Please sign in to comment.