Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(batch-exports): Add backfill workflow #17909

Merged
merged 15 commits into from
Oct 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion latest_migrations.manifest
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ contenttypes: 0002_remove_content_type_name
ee: 0015_add_verified_properties
otp_static: 0002_throttling
otp_totp: 0002_auto_20190420_0723
posthog: 0354_organization_never_drop_data
posthog: 0355_add_batch_export_backfill_model
sessions: 0001_initial
social_django: 0010_uid_db_index
two_factor: 0007_auto_20201201_1019
4 changes: 3 additions & 1 deletion posthog/batch_exports/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,9 +248,11 @@ def backfill(self, request: request.Request, *args, **kwargs) -> response.Respon
if start_at >= end_at:
raise ValidationError("The initial backfill datetime 'start_at' happens after 'end_at'")

team_id = request.user.current_team.id

batch_export = self.get_object()
temporal = sync_connect()
backfill_export(temporal, str(batch_export.pk), start_at, end_at)
backfill_export(temporal, str(batch_export.pk), team_id, start_at, end_at)

return response.Response()

Expand Down
35 changes: 35 additions & 0 deletions posthog/batch_exports/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,14 @@ class Destination(models.TextChoices):
SNOWFLAKE = "Snowflake"
POSTGRES = "Postgres"
BIGQUERY = "BigQuery"
NOOP = "NoOp"

secret_fields = {
"S3": {"aws_access_key_id", "aws_secret_access_key"},
"Snowflake": set("password"),
"Postgres": set("password"),
"BigQuery": {"private_key", "private_key_id", "client_email", "token_uri"},
"NoOp": set(),
}

type: models.CharField = models.CharField(
Expand Down Expand Up @@ -225,3 +227,36 @@ def fetch_batch_export_log_entries(
return [
BatchExportLogEntry(*result) for result in typing.cast(list, sync_execute(clickhouse_query, clickhouse_kwargs))
]


class BatchExportBackfill(UUIDModel):
class Status(models.TextChoices):
"""Possible states of the BatchExportRun."""

CANCELLED = "Cancelled"
COMPLETED = "Completed"
CONTINUEDASNEW = "ContinuedAsNew"
FAILED = "Failed"
TERMINATED = "Terminated"
TIMEDOUT = "TimedOut"
RUNNING = "Running"
STARTING = "Starting"

team: models.ForeignKey = models.ForeignKey("Team", on_delete=models.CASCADE, help_text="The team this belongs to.")
batch_export = models.ForeignKey(
"BatchExport", on_delete=models.CASCADE, help_text="The BatchExport this backfill belongs to."
)
start_at: models.DateTimeField = models.DateTimeField(help_text="The start of the data interval.")
end_at: models.DateTimeField = models.DateTimeField(help_text="The end of the data interval.")
status: models.CharField = models.CharField(
choices=Status.choices, max_length=64, help_text="The status of this backfill."
)
created_at: models.DateTimeField = models.DateTimeField(
auto_now_add=True, help_text="The timestamp at which this BatchExportBackfill was created."
)
finished_at: models.DateTimeField = models.DateTimeField(
null=True, help_text="The timestamp at which this BatchExportBackfill finished, successfully or not."
)
last_updated_at: models.DateTimeField = models.DateTimeField(
auto_now=True, help_text="The timestamp at which this BatchExportBackfill was last updated."
)
110 changes: 95 additions & 15 deletions posthog/batch_exports/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
Client,
Schedule,
ScheduleActionStartWorkflow,
ScheduleBackfill,
ScheduleIntervalSpec,
ScheduleOverlapPolicy,
SchedulePolicy,
Expand All @@ -21,6 +20,7 @@
from posthog import settings
from posthog.batch_exports.models import (
BatchExport,
BatchExportBackfill,
BatchExportRun,
)
from posthog.temporal.client import sync_connect
Expand Down Expand Up @@ -121,11 +121,22 @@ class BigQueryBatchExportInputs:
include_events: list[str] | None = None


@dataclass
class NoOpInputs:
"""NoOp Workflow is used for testing, it takes a single argument to echo back."""

batch_export_id: str
team_id: int
interval: str = "hour"
arg: str = ""


DESTINATION_WORKFLOWS = {
"S3": ("s3-export", S3BatchExportInputs),
"Snowflake": ("snowflake-export", SnowflakeBatchExportInputs),
"Postgres": ("postgres-export", PostgresBatchExportInputs),
"BigQuery": ("bigquery-export", BigQueryBatchExportInputs),
"NoOp": ("no-op", NoOpInputs),
}


Expand Down Expand Up @@ -238,38 +249,64 @@ async def describe_schedule(temporal: Client, schedule_id: str):
return await handle.describe()


@dataclass
class BackfillBatchExportInputs:
"""Inputs for the BackfillBatchExport Workflow."""

team_id: int
batch_export_id: str
start_at: str
end_at: str
buffer_limit: int = 1
wait_delay: float = 5.0


def backfill_export(
temporal: Client,
batch_export_id: str,
team_id: int,
start_at: dt.datetime,
end_at: dt.datetime,
overlap: ScheduleOverlapPolicy = ScheduleOverlapPolicy.BUFFER_ALL,
):
"""Creates an export run for the given BatchExport, and specified time range.
) -> None:
"""Starts a backfill for given team and batch export covering given date range.

Arguments:
temporal: A Temporal Client to trigger the workflow.
batch_export_id: The id of the BatchExport to backfill.
team_id: The id of the Team the BatchExport belongs to.
start_at: From when to backfill.
end_at: Up to when to backfill.
"""
try:
BatchExport.objects.get(id=batch_export_id)
BatchExport.objects.get(id=batch_export_id, team_id=team_id)
except BatchExport.DoesNotExist:
raise BatchExportIdError(batch_export_id)

schedule_backfill = ScheduleBackfill(start_at=start_at, end_at=end_at, overlap=overlap)
backfill_schedule(temporal=temporal, schedule_id=batch_export_id, schedule_backfill=schedule_backfill)
inputs = BackfillBatchExportInputs(
batch_export_id=batch_export_id,
team_id=team_id,
start_at=start_at.isoformat(),
end_at=end_at.isoformat(),
)
start_backfill_batch_export_workflow(temporal, inputs=inputs)


@async_to_sync
async def backfill_schedule(temporal: Client, schedule_id: str, schedule_backfill: ScheduleBackfill):
"""Async call the Temporal client to execute a backfill on the given schedule."""
handle = temporal.get_schedule_handle(schedule_id)
async def start_backfill_batch_export_workflow(temporal: Client, inputs: BackfillBatchExportInputs) -> None:
"""Async call to start a BackfillBatchExportWorkflow."""
handle = temporal.get_schedule_handle(inputs.batch_export_id)
description = await handle.describe()

if description.schedule.spec.jitter is not None:
schedule_backfill.end_at += description.schedule.spec.jitter

await handle.backfill(schedule_backfill)
# Adjust end_at to account for jitter if present.
inputs.end_at = (dt.datetime.fromisoformat(inputs.end_at) + description.schedule.spec.jitter).isoformat()

await temporal.start_workflow(
"backfill-batch-export",
inputs,
id=f"{inputs.batch_export_id}-Backfill-{inputs.start_at}-{inputs.end_at}",
task_queue=settings.TEMPORAL_TASK_QUEUE,
)


def create_batch_export_run(
Expand All @@ -284,8 +321,10 @@ def create_batch_export_run(
as only the Workflows themselves can know when they start.

Args:
data_interval_start:
data_interval_end:
batch_export_id: The UUID of the BatchExport the BatchExportRun to create belongs to.
data_interval_start: The start of the period of data exported in this BatchExportRun.
data_interval_end: The end of the period of data exported in this BatchExportRun.
status: The initial status for the created BatchExportRun.
"""
run = BatchExportRun(
batch_export_id=batch_export_id,
Expand Down Expand Up @@ -372,3 +411,44 @@ async def updater(_: ScheduleUpdateInput) -> ScheduleUpdate:
return await handle.update(
updater=updater,
)


def create_batch_export_backfill(
batch_export_id: UUID,
team_id: int,
start_at: str,
end_at: str,
status: str = BatchExportRun.Status.RUNNING,
):
"""Create a BatchExportBackfill.


Args:
batch_export_id: The UUID of the BatchExport the BatchExportBackfill to create belongs to.
team_id: The id of the Team the BatchExportBackfill to create belongs to.
start_at: The start of the period to backfill in this BatchExportBackfill.
end_at: The end of the period to backfill in this BatchExportBackfill.
status: The initial status for the created BatchExportBackfill.
"""
backfill = BatchExportBackfill(
batch_export_id=batch_export_id,
status=status,
start_at=dt.datetime.fromisoformat(start_at),
end_at=dt.datetime.fromisoformat(end_at),
team_id=team_id,
)
backfill.save()

return backfill


def update_batch_export_backfill_status(backfill_id: UUID, status: str):
"""Update the status of an BatchExportBackfill with given id.

Arguments:
id: The id of the BatchExportBackfill to update.
status: The new status to assign to the BatchExportBackfill.
"""
updated = BatchExportBackfill.objects.filter(id=backfill_id).update(status=status)
if not updated:
raise ValueError(f"BatchExportBackfill with id {backfill_id} not found.")
4 changes: 3 additions & 1 deletion posthog/management/commands/create_batch_export_from_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,9 @@ def handle(self, *args, **options):
client = sync_connect()
end_at = dt.datetime.utcnow()
start_at = end_at - (dt.timedelta(hours=1) if interval == "hour" else dt.timedelta(days=1))
backfill_export(client, batch_export_id=str(batch_export.id), start_at=start_at, end_at=end_at)
backfill_export(
client, batch_export_id=str(batch_export.id), team_id=team_id, start_at=start_at, end_at=end_at
)
self.stdout.write(f"Triggered backfill for BatchExport '{name}'.")

self.stdout.write("Done!")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ async def list_workflows(temporal, schedule_id: str):
return workflows


@pytest.mark.django_db
@pytest.mark.django_db(transaction=True)
@pytest.mark.parametrize(
"interval,plugin_config",
[
Expand Down
98 changes: 98 additions & 0 deletions posthog/migrations/0355_add_batch_export_backfill_model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
# Generated by Django 3.2.19 on 2023-10-13 09:13

import django.db.models.deletion
from django.db import migrations, models

import posthog.models.utils


class Migration(migrations.Migration):
dependencies = [
("posthog", "0354_organization_never_drop_data"),
]

operations = [
migrations.AlterField(
model_name="batchexportdestination",
name="type",
field=models.CharField(
choices=[
("S3", "S3"),
("Snowflake", "Snowflake"),
("Postgres", "Postgres"),
("BigQuery", "Bigquery"),
("NoOp", "Noop"),
],
help_text="A choice of supported BatchExportDestination types.",
max_length=64,
),
),
migrations.CreateModel(
name="BatchExportBackfill",
fields=[
(
"id",
models.UUIDField(
default=posthog.models.utils.UUIDT, editable=False, primary_key=True, serialize=False
),
),
("start_at", models.DateTimeField(help_text="The start of the data interval.")),
("end_at", models.DateTimeField(help_text="The end of the data interval.")),
(
"status",
models.CharField(
choices=[
("Cancelled", "Cancelled"),
("Completed", "Completed"),
("ContinuedAsNew", "Continuedasnew"),
("Failed", "Failed"),
("Terminated", "Terminated"),
("TimedOut", "Timedout"),
("Running", "Running"),
("Starting", "Starting"),
],
help_text="The status of this backfill.",
max_length=64,
),
),
(
"created_at",
models.DateTimeField(
auto_now_add=True, help_text="The timestamp at which this BatchExportBackfill was created."
),
),
(
"finished_at",
models.DateTimeField(
help_text="The timestamp at which this BatchExportBackfill finished, successfully or not.",
null=True,
),
),
(
"last_updated_at",
models.DateTimeField(
auto_now=True, help_text="The timestamp at which this BatchExportBackfill was last updated."
),
),
(
"batch_export",
models.ForeignKey(
help_text="The BatchExport this backfill belongs to.",
on_delete=django.db.models.deletion.CASCADE,
to="posthog.batchexport",
),
),
(
"team",
models.ForeignKey(
help_text="The team this belongs to.",
on_delete=django.db.models.deletion.CASCADE,
to="posthog.team",
),
),
],
options={
"abstract": False,
},
),
]
12 changes: 12 additions & 0 deletions posthog/temporal/tests/batch_exports/base.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import datetime as dt
import json
import typing
import uuid

from asgiref.sync import sync_to_async

from ee.clickhouse.materialized_columns.columns import materialize
from posthog.batch_exports.models import BatchExportBackfill
from posthog.temporal.workflows.clickhouse import ClickHouseClient


Expand Down Expand Up @@ -79,3 +81,13 @@ def to_isoformat(d: str | None) -> str | None:
if d is None:
return None
return dt.datetime.fromisoformat(d).replace(tzinfo=dt.timezone.utc).isoformat()


def fetch_batch_export_backfills(batch_export_id: uuid.UUID, limit: int = 100) -> list[BatchExportBackfill]:
"""Fetch the BatchExportBackfills for a given BatchExport."""
return list(BatchExportBackfill.objects.filter(batch_export_id=batch_export_id).order_by("-created_at")[:limit])


async def afetch_batch_export_backfills(batch_export_id: uuid.UUID, limit: int = 100) -> list[BatchExportBackfill]:
"""Fetch the BatchExportBackfills for a given BatchExport."""
return await sync_to_async(fetch_batch_export_backfills)(batch_export_id, limit) # type: ignore
Loading
Loading