From a7ae3b0f9da57c27336acf204a7de070f0a2bd77 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Far=C3=ADas=20Santana?= Date: Thu, 19 Oct 2023 15:09:19 +0200 Subject: [PATCH] feat(batch-exports): Add backfill workflow (#17909) --- latest_migrations.manifest | 2 +- posthog/batch_exports/http.py | 4 +- posthog/batch_exports/models.py | 35 ++ posthog/batch_exports/service.py | 110 +++++- .../commands/create_batch_export_from_app.py | 4 +- .../test/test_create_batch_export_from_app.py | 2 +- .../0355_add_batch_export_backfill_model.py | 98 +++++ posthog/temporal/tests/batch_exports/base.py | 12 + .../test_backfill_batch_export.py | 255 +++++++++++++ .../temporal/tests/test_encryption_codec.py | 23 +- posthog/temporal/workflows/__init__.py | 12 + .../workflows/backfill_batch_export.py | 359 ++++++++++++++++++ posthog/temporal/workflows/batch_exports.py | 51 +++ posthog/temporal/workflows/noop.py | 24 +- 14 files changed, 955 insertions(+), 36 deletions(-) create mode 100644 posthog/migrations/0355_add_batch_export_backfill_model.py create mode 100644 posthog/temporal/tests/batch_exports/test_backfill_batch_export.py create mode 100644 posthog/temporal/workflows/backfill_batch_export.py diff --git a/latest_migrations.manifest b/latest_migrations.manifest index a09d7f51cf469..646d97235dab6 100644 --- a/latest_migrations.manifest +++ b/latest_migrations.manifest @@ -5,7 +5,7 @@ contenttypes: 0002_remove_content_type_name ee: 0015_add_verified_properties otp_static: 0002_throttling otp_totp: 0002_auto_20190420_0723 -posthog: 0354_organization_never_drop_data +posthog: 0355_add_batch_export_backfill_model sessions: 0001_initial social_django: 0010_uid_db_index two_factor: 0007_auto_20201201_1019 diff --git a/posthog/batch_exports/http.py b/posthog/batch_exports/http.py index cd8e24aca5cd6..06fb9866ac0e9 100644 --- a/posthog/batch_exports/http.py +++ b/posthog/batch_exports/http.py @@ -248,9 +248,11 @@ def backfill(self, request: request.Request, *args, **kwargs) -> response.Respon if start_at >= end_at: raise ValidationError("The initial backfill datetime 'start_at' happens after 'end_at'") + team_id = request.user.current_team.id + batch_export = self.get_object() temporal = sync_connect() - backfill_export(temporal, str(batch_export.pk), start_at, end_at) + backfill_export(temporal, str(batch_export.pk), team_id, start_at, end_at) return response.Response() diff --git a/posthog/batch_exports/models.py b/posthog/batch_exports/models.py index b75cadfe3f894..633163b831238 100644 --- a/posthog/batch_exports/models.py +++ b/posthog/batch_exports/models.py @@ -27,12 +27,14 @@ class Destination(models.TextChoices): SNOWFLAKE = "Snowflake" POSTGRES = "Postgres" BIGQUERY = "BigQuery" + NOOP = "NoOp" secret_fields = { "S3": {"aws_access_key_id", "aws_secret_access_key"}, "Snowflake": set("password"), "Postgres": set("password"), "BigQuery": {"private_key", "private_key_id", "client_email", "token_uri"}, + "NoOp": set(), } type: models.CharField = models.CharField( @@ -225,3 +227,36 @@ def fetch_batch_export_log_entries( return [ BatchExportLogEntry(*result) for result in typing.cast(list, sync_execute(clickhouse_query, clickhouse_kwargs)) ] + + +class BatchExportBackfill(UUIDModel): + class Status(models.TextChoices): + """Possible states of the BatchExportRun.""" + + CANCELLED = "Cancelled" + COMPLETED = "Completed" + CONTINUEDASNEW = "ContinuedAsNew" + FAILED = "Failed" + TERMINATED = "Terminated" + TIMEDOUT = "TimedOut" + RUNNING = "Running" + STARTING = "Starting" + + team: models.ForeignKey = models.ForeignKey("Team", on_delete=models.CASCADE, help_text="The team this belongs to.") + batch_export = models.ForeignKey( + "BatchExport", on_delete=models.CASCADE, help_text="The BatchExport this backfill belongs to." + ) + start_at: models.DateTimeField = models.DateTimeField(help_text="The start of the data interval.") + end_at: models.DateTimeField = models.DateTimeField(help_text="The end of the data interval.") + status: models.CharField = models.CharField( + choices=Status.choices, max_length=64, help_text="The status of this backfill." + ) + created_at: models.DateTimeField = models.DateTimeField( + auto_now_add=True, help_text="The timestamp at which this BatchExportBackfill was created." + ) + finished_at: models.DateTimeField = models.DateTimeField( + null=True, help_text="The timestamp at which this BatchExportBackfill finished, successfully or not." + ) + last_updated_at: models.DateTimeField = models.DateTimeField( + auto_now=True, help_text="The timestamp at which this BatchExportBackfill was last updated." + ) diff --git a/posthog/batch_exports/service.py b/posthog/batch_exports/service.py index 2773dd73d7999..a90a2b2ff9a5c 100644 --- a/posthog/batch_exports/service.py +++ b/posthog/batch_exports/service.py @@ -8,7 +8,6 @@ Client, Schedule, ScheduleActionStartWorkflow, - ScheduleBackfill, ScheduleIntervalSpec, ScheduleOverlapPolicy, SchedulePolicy, @@ -21,6 +20,7 @@ from posthog import settings from posthog.batch_exports.models import ( BatchExport, + BatchExportBackfill, BatchExportRun, ) from posthog.temporal.client import sync_connect @@ -121,11 +121,22 @@ class BigQueryBatchExportInputs: include_events: list[str] | None = None +@dataclass +class NoOpInputs: + """NoOp Workflow is used for testing, it takes a single argument to echo back.""" + + batch_export_id: str + team_id: int + interval: str = "hour" + arg: str = "" + + DESTINATION_WORKFLOWS = { "S3": ("s3-export", S3BatchExportInputs), "Snowflake": ("snowflake-export", SnowflakeBatchExportInputs), "Postgres": ("postgres-export", PostgresBatchExportInputs), "BigQuery": ("bigquery-export", BigQueryBatchExportInputs), + "NoOp": ("no-op", NoOpInputs), } @@ -238,38 +249,64 @@ async def describe_schedule(temporal: Client, schedule_id: str): return await handle.describe() +@dataclass +class BackfillBatchExportInputs: + """Inputs for the BackfillBatchExport Workflow.""" + + team_id: int + batch_export_id: str + start_at: str + end_at: str + buffer_limit: int = 1 + wait_delay: float = 5.0 + + def backfill_export( temporal: Client, batch_export_id: str, + team_id: int, start_at: dt.datetime, end_at: dt.datetime, - overlap: ScheduleOverlapPolicy = ScheduleOverlapPolicy.BUFFER_ALL, -): - """Creates an export run for the given BatchExport, and specified time range. +) -> None: + """Starts a backfill for given team and batch export covering given date range. Arguments: + temporal: A Temporal Client to trigger the workflow. + batch_export_id: The id of the BatchExport to backfill. + team_id: The id of the Team the BatchExport belongs to. start_at: From when to backfill. end_at: Up to when to backfill. """ try: - BatchExport.objects.get(id=batch_export_id) + BatchExport.objects.get(id=batch_export_id, team_id=team_id) except BatchExport.DoesNotExist: raise BatchExportIdError(batch_export_id) - schedule_backfill = ScheduleBackfill(start_at=start_at, end_at=end_at, overlap=overlap) - backfill_schedule(temporal=temporal, schedule_id=batch_export_id, schedule_backfill=schedule_backfill) + inputs = BackfillBatchExportInputs( + batch_export_id=batch_export_id, + team_id=team_id, + start_at=start_at.isoformat(), + end_at=end_at.isoformat(), + ) + start_backfill_batch_export_workflow(temporal, inputs=inputs) @async_to_sync -async def backfill_schedule(temporal: Client, schedule_id: str, schedule_backfill: ScheduleBackfill): - """Async call the Temporal client to execute a backfill on the given schedule.""" - handle = temporal.get_schedule_handle(schedule_id) +async def start_backfill_batch_export_workflow(temporal: Client, inputs: BackfillBatchExportInputs) -> None: + """Async call to start a BackfillBatchExportWorkflow.""" + handle = temporal.get_schedule_handle(inputs.batch_export_id) description = await handle.describe() if description.schedule.spec.jitter is not None: - schedule_backfill.end_at += description.schedule.spec.jitter - - await handle.backfill(schedule_backfill) + # Adjust end_at to account for jitter if present. + inputs.end_at = (dt.datetime.fromisoformat(inputs.end_at) + description.schedule.spec.jitter).isoformat() + + await temporal.start_workflow( + "backfill-batch-export", + inputs, + id=f"{inputs.batch_export_id}-Backfill-{inputs.start_at}-{inputs.end_at}", + task_queue=settings.TEMPORAL_TASK_QUEUE, + ) def create_batch_export_run( @@ -284,8 +321,10 @@ def create_batch_export_run( as only the Workflows themselves can know when they start. Args: - data_interval_start: - data_interval_end: + batch_export_id: The UUID of the BatchExport the BatchExportRun to create belongs to. + data_interval_start: The start of the period of data exported in this BatchExportRun. + data_interval_end: The end of the period of data exported in this BatchExportRun. + status: The initial status for the created BatchExportRun. """ run = BatchExportRun( batch_export_id=batch_export_id, @@ -372,3 +411,44 @@ async def updater(_: ScheduleUpdateInput) -> ScheduleUpdate: return await handle.update( updater=updater, ) + + +def create_batch_export_backfill( + batch_export_id: UUID, + team_id: int, + start_at: str, + end_at: str, + status: str = BatchExportRun.Status.RUNNING, +): + """Create a BatchExportBackfill. + + + Args: + batch_export_id: The UUID of the BatchExport the BatchExportBackfill to create belongs to. + team_id: The id of the Team the BatchExportBackfill to create belongs to. + start_at: The start of the period to backfill in this BatchExportBackfill. + end_at: The end of the period to backfill in this BatchExportBackfill. + status: The initial status for the created BatchExportBackfill. + """ + backfill = BatchExportBackfill( + batch_export_id=batch_export_id, + status=status, + start_at=dt.datetime.fromisoformat(start_at), + end_at=dt.datetime.fromisoformat(end_at), + team_id=team_id, + ) + backfill.save() + + return backfill + + +def update_batch_export_backfill_status(backfill_id: UUID, status: str): + """Update the status of an BatchExportBackfill with given id. + + Arguments: + id: The id of the BatchExportBackfill to update. + status: The new status to assign to the BatchExportBackfill. + """ + updated = BatchExportBackfill.objects.filter(id=backfill_id).update(status=status) + if not updated: + raise ValueError(f"BatchExportBackfill with id {backfill_id} not found.") diff --git a/posthog/management/commands/create_batch_export_from_app.py b/posthog/management/commands/create_batch_export_from_app.py index eadf71532db02..510e0f3dbfa4d 100644 --- a/posthog/management/commands/create_batch_export_from_app.py +++ b/posthog/management/commands/create_batch_export_from_app.py @@ -115,7 +115,9 @@ def handle(self, *args, **options): client = sync_connect() end_at = dt.datetime.utcnow() start_at = end_at - (dt.timedelta(hours=1) if interval == "hour" else dt.timedelta(days=1)) - backfill_export(client, batch_export_id=str(batch_export.id), start_at=start_at, end_at=end_at) + backfill_export( + client, batch_export_id=str(batch_export.id), team_id=team_id, start_at=start_at, end_at=end_at + ) self.stdout.write(f"Triggered backfill for BatchExport '{name}'.") self.stdout.write("Done!") diff --git a/posthog/management/commands/test/test_create_batch_export_from_app.py b/posthog/management/commands/test/test_create_batch_export_from_app.py index bbbb36079d013..fb216dc4f2bb4 100644 --- a/posthog/management/commands/test/test_create_batch_export_from_app.py +++ b/posthog/management/commands/test/test_create_batch_export_from_app.py @@ -321,7 +321,7 @@ async def list_workflows(temporal, schedule_id: str): return workflows -@pytest.mark.django_db +@pytest.mark.django_db(transaction=True) @pytest.mark.parametrize( "interval,plugin_config", [ diff --git a/posthog/migrations/0355_add_batch_export_backfill_model.py b/posthog/migrations/0355_add_batch_export_backfill_model.py new file mode 100644 index 0000000000000..c558d2a74d7f8 --- /dev/null +++ b/posthog/migrations/0355_add_batch_export_backfill_model.py @@ -0,0 +1,98 @@ +# Generated by Django 3.2.19 on 2023-10-13 09:13 + +import django.db.models.deletion +from django.db import migrations, models + +import posthog.models.utils + + +class Migration(migrations.Migration): + dependencies = [ + ("posthog", "0354_organization_never_drop_data"), + ] + + operations = [ + migrations.AlterField( + model_name="batchexportdestination", + name="type", + field=models.CharField( + choices=[ + ("S3", "S3"), + ("Snowflake", "Snowflake"), + ("Postgres", "Postgres"), + ("BigQuery", "Bigquery"), + ("NoOp", "Noop"), + ], + help_text="A choice of supported BatchExportDestination types.", + max_length=64, + ), + ), + migrations.CreateModel( + name="BatchExportBackfill", + fields=[ + ( + "id", + models.UUIDField( + default=posthog.models.utils.UUIDT, editable=False, primary_key=True, serialize=False + ), + ), + ("start_at", models.DateTimeField(help_text="The start of the data interval.")), + ("end_at", models.DateTimeField(help_text="The end of the data interval.")), + ( + "status", + models.CharField( + choices=[ + ("Cancelled", "Cancelled"), + ("Completed", "Completed"), + ("ContinuedAsNew", "Continuedasnew"), + ("Failed", "Failed"), + ("Terminated", "Terminated"), + ("TimedOut", "Timedout"), + ("Running", "Running"), + ("Starting", "Starting"), + ], + help_text="The status of this backfill.", + max_length=64, + ), + ), + ( + "created_at", + models.DateTimeField( + auto_now_add=True, help_text="The timestamp at which this BatchExportBackfill was created." + ), + ), + ( + "finished_at", + models.DateTimeField( + help_text="The timestamp at which this BatchExportBackfill finished, successfully or not.", + null=True, + ), + ), + ( + "last_updated_at", + models.DateTimeField( + auto_now=True, help_text="The timestamp at which this BatchExportBackfill was last updated." + ), + ), + ( + "batch_export", + models.ForeignKey( + help_text="The BatchExport this backfill belongs to.", + on_delete=django.db.models.deletion.CASCADE, + to="posthog.batchexport", + ), + ), + ( + "team", + models.ForeignKey( + help_text="The team this belongs to.", + on_delete=django.db.models.deletion.CASCADE, + to="posthog.team", + ), + ), + ], + options={ + "abstract": False, + }, + ), + ] diff --git a/posthog/temporal/tests/batch_exports/base.py b/posthog/temporal/tests/batch_exports/base.py index 88a52fe798426..69c00111d4615 100644 --- a/posthog/temporal/tests/batch_exports/base.py +++ b/posthog/temporal/tests/batch_exports/base.py @@ -1,10 +1,12 @@ import datetime as dt import json import typing +import uuid from asgiref.sync import sync_to_async from ee.clickhouse.materialized_columns.columns import materialize +from posthog.batch_exports.models import BatchExportBackfill from posthog.temporal.workflows.clickhouse import ClickHouseClient @@ -79,3 +81,13 @@ def to_isoformat(d: str | None) -> str | None: if d is None: return None return dt.datetime.fromisoformat(d).replace(tzinfo=dt.timezone.utc).isoformat() + + +def fetch_batch_export_backfills(batch_export_id: uuid.UUID, limit: int = 100) -> list[BatchExportBackfill]: + """Fetch the BatchExportBackfills for a given BatchExport.""" + return list(BatchExportBackfill.objects.filter(batch_export_id=batch_export_id).order_by("-created_at")[:limit]) + + +async def afetch_batch_export_backfills(batch_export_id: uuid.UUID, limit: int = 100) -> list[BatchExportBackfill]: + """Fetch the BatchExportBackfills for a given BatchExport.""" + return await sync_to_async(fetch_batch_export_backfills)(batch_export_id, limit) # type: ignore diff --git a/posthog/temporal/tests/batch_exports/test_backfill_batch_export.py b/posthog/temporal/tests/batch_exports/test_backfill_batch_export.py new file mode 100644 index 0000000000000..d91eec51651d0 --- /dev/null +++ b/posthog/temporal/tests/batch_exports/test_backfill_batch_export.py @@ -0,0 +1,255 @@ +import asyncio +import datetime as dt +import uuid + +import pytest +import pytest_asyncio +import temporalio +import temporalio.client +import temporalio.common +import temporalio.testing +import temporalio.worker +from asgiref.sync import sync_to_async +from django.conf import settings + +from posthog.api.test.test_organization import acreate_organization +from posthog.api.test.test_team import acreate_team +from posthog.temporal.client import connect +from posthog.temporal.tests.batch_exports.base import afetch_batch_export_backfills +from posthog.temporal.tests.batch_exports.fixtures import ( + acreate_batch_export, + adelete_batch_export, +) +from posthog.temporal.workflows.backfill_batch_export import ( + BackfillBatchExportInputs, + BackfillBatchExportWorkflow, + BackfillScheduleInputs, + backfill_range, + backfill_schedule, + get_schedule_frequency, +) +from posthog.temporal.workflows.batch_exports import ( + create_batch_export_backfill_model, + update_batch_export_backfill_model_status, +) +from posthog.temporal.workflows.noop import NoOpWorkflow, noop_activity + + +@pytest_asyncio.fixture +async def temporal_client(): + """Yield a Temporal Client.""" + client = await connect( + settings.TEMPORAL_HOST, + settings.TEMPORAL_PORT, + settings.TEMPORAL_NAMESPACE, + settings.TEMPORAL_CLIENT_ROOT_CA, + settings.TEMPORAL_CLIENT_CERT, + settings.TEMPORAL_CLIENT_KEY, + ) + + return client + + +@pytest_asyncio.fixture +async def team(): + organization = await acreate_organization("test") + team = await acreate_team(organization=organization) + + yield team + + sync_to_async(team.delete)() + sync_to_async(organization.delete)() + + +@pytest_asyncio.fixture +async def temporal_schedule(temporal_client, team): + """Manage a test Temopral Schedule yielding its handle.""" + destination_data = { + "type": "NoOp", + "config": {}, + } + + interval = "every 1 minutes" + batch_export_data = { + "name": "no-op-export", + "destination": destination_data, + "interval": interval, + "paused": True, + } + + batch_export = await acreate_batch_export( + team_id=team.pk, + name=batch_export_data["name"], + destination_data=batch_export_data["destination"], + interval=batch_export_data["interval"], + ) + + handle = temporal_client.get_schedule_handle(str(batch_export.id)) + yield handle + + await adelete_batch_export(batch_export, temporal_client) + + +@pytest_asyncio.fixture +async def temporal_worker(temporal_client): + worker = temporalio.worker.Worker( + temporal_client, + task_queue=settings.TEMPORAL_TASK_QUEUE, + workflows=[NoOpWorkflow, BackfillBatchExportWorkflow], + activities=[ + noop_activity, + backfill_schedule, + create_batch_export_backfill_model, + update_batch_export_backfill_model_status, + get_schedule_frequency, + ], + workflow_runner=temporalio.worker.UnsandboxedWorkflowRunner(), + ) + + worker_run = asyncio.create_task(worker.run()) + + yield worker + + worker_run.cancel() + await asyncio.wait([worker_run]) + + +@pytest.mark.parametrize( + "start_at,end_at,step,expected", + [ + ( + dt.datetime(2023, 1, 1, 0, 0, 0, tzinfo=dt.timezone.utc), + dt.datetime(2023, 1, 2, 0, 0, 0, tzinfo=dt.timezone.utc), + dt.timedelta(days=1), + [ + ( + dt.datetime(2023, 1, 1, 0, 0, 0, tzinfo=dt.timezone.utc), + dt.datetime(2023, 1, 2, 0, 0, 0, tzinfo=dt.timezone.utc), + ) + ], + ), + ( + dt.datetime(2023, 1, 1, 0, 0, 0, tzinfo=dt.timezone.utc), + dt.datetime(2023, 1, 2, 0, 0, 0, tzinfo=dt.timezone.utc), + dt.timedelta(hours=12), + [ + ( + dt.datetime(2023, 1, 1, 0, 0, 0, tzinfo=dt.timezone.utc), + dt.datetime(2023, 1, 1, 12, 0, 0, tzinfo=dt.timezone.utc), + ), + ( + dt.datetime(2023, 1, 1, 12, 0, 0, tzinfo=dt.timezone.utc), + dt.datetime(2023, 1, 2, 0, 0, 0, tzinfo=dt.timezone.utc), + ), + ], + ), + ( + dt.datetime(2023, 1, 1, 0, 0, 0, tzinfo=dt.timezone.utc), + dt.datetime(2023, 1, 5, 0, 0, 0, tzinfo=dt.timezone.utc), + dt.timedelta(days=1), + [ + ( + dt.datetime(2023, 1, 1, 0, 0, 0, tzinfo=dt.timezone.utc), + dt.datetime(2023, 1, 2, 0, 0, 0, tzinfo=dt.timezone.utc), + ), + ( + dt.datetime(2023, 1, 2, 0, 0, 0, tzinfo=dt.timezone.utc), + dt.datetime(2023, 1, 3, 0, 0, 0, tzinfo=dt.timezone.utc), + ), + ( + dt.datetime(2023, 1, 3, 0, 0, 0, tzinfo=dt.timezone.utc), + dt.datetime(2023, 1, 4, 0, 0, 0, tzinfo=dt.timezone.utc), + ), + ( + dt.datetime(2023, 1, 4, 0, 0, 0, tzinfo=dt.timezone.utc), + dt.datetime(2023, 1, 5, 0, 0, 0, tzinfo=dt.timezone.utc), + ), + ], + ), + ], +) +def test_backfill_range(start_at, end_at, step, expected): + """Test the backfill_range function yields expected ranges of dates.""" + result = list(backfill_range(start_at, end_at, step)) + assert result == expected + + +@pytest.mark.asyncio +@pytest.mark.django_db(transaction=True) +async def test_get_schedule_frequency(activity_environment, temporal_worker, temporal_schedule): + """Test get_schedule_frequency returns the correct interval.""" + desc = await temporal_schedule.describe() + expected = desc.schedule.spec.intervals[0].every.total_seconds() + + result = await activity_environment.run(get_schedule_frequency, desc.id) + + assert result == expected + + +@pytest.mark.asyncio +@pytest.mark.django_db(transaction=True) +async def test_backfill_schedule_activity(activity_environment, temporal_worker, temporal_schedule): + """Test backfill_schedule activity schedules all backfill runs.""" + start_at = dt.datetime(2023, 1, 1, 0, 0, 0, tzinfo=dt.timezone.utc) + end_at = dt.datetime(2023, 1, 1, 0, 10, 0, tzinfo=dt.timezone.utc) + + desc = await temporal_schedule.describe() + inputs = BackfillScheduleInputs( + schedule_id=desc.id, + start_at=start_at.isoformat(), + end_at=end_at.isoformat(), + buffer_limit=2, + wait_delay=1.0, + frequency_seconds=desc.schedule.spec.intervals[0].every.total_seconds(), + ) + + await activity_environment.run(backfill_schedule, inputs) + + desc = await temporal_schedule.describe() + result = desc.info.num_actions + expected = 10 + + assert result >= expected + + +@pytest.mark.django_db(transaction=True) +@pytest.mark.asyncio +async def test_backfill_batch_export_workflow(temporal_worker, temporal_schedule, temporal_client, team): + """Test BackfillBatchExportWorkflow executes all backfill runs and updates model.""" + start_at = dt.datetime(2023, 1, 1, 0, 0, 0, tzinfo=dt.timezone.utc) + end_at = dt.datetime(2023, 1, 1, 0, 10, 0, tzinfo=dt.timezone.utc) + + desc = await temporal_schedule.describe() + + workflow_id = str(uuid.uuid4()) + inputs = BackfillBatchExportInputs( + team_id=team.pk, + batch_export_id=desc.id, + start_at=start_at.isoformat(), + end_at=end_at.isoformat(), + buffer_limit=2, + wait_delay=1.0, + ) + + handle = await temporal_client.start_workflow( + BackfillBatchExportWorkflow.run, + inputs, + id=workflow_id, + task_queue=settings.TEMPORAL_TASK_QUEUE, + execution_timeout=dt.timedelta(minutes=1), + retry_policy=temporalio.common.RetryPolicy(maximum_attempts=1), + ) + await handle.result() + + desc = await temporal_schedule.describe() + result = desc.info.num_actions + expected = 10 + + assert result == expected + + backfills = await afetch_batch_export_backfills(batch_export_id=desc.id) + + assert len(backfills) == 1, "Expected one backfill to have been created" + + backfill = backfills.pop() + assert backfill.status == "Completed" diff --git a/posthog/temporal/tests/test_encryption_codec.py b/posthog/temporal/tests/test_encryption_codec.py index 92c43223672b9..a3d9c33ef478d 100644 --- a/posthog/temporal/tests/test_encryption_codec.py +++ b/posthog/temporal/tests/test_encryption_codec.py @@ -1,4 +1,5 @@ import dataclasses +import json import uuid import pytest @@ -8,6 +9,7 @@ from temporalio.client import Client from temporalio.worker import UnsandboxedWorkflowRunner, Worker +from posthog.batch_exports.service import NoOpInputs from posthog.temporal.codec import EncryptionCodec from posthog.temporal.workflows.noop import NoOpWorkflow, noop_activity @@ -42,12 +44,22 @@ async def test_payloads_are_encrypted(): workflow_id = uuid.uuid4() input_str = str(uuid.uuid4()) + no_op_result_str = f"OK - {input_str}" - no_op_activity_input_str = f'{{"time":"{input_str}"}}' + inputs = NoOpInputs( + arg=input_str, + batch_export_id="123", + team_id=1, + ) + # The no-op Workflow can only produce a limited set of results, so we'll check if the events match any of these. # Either it's the final result (no_op_result_str), the input to an activity (no_op_activity_input_str), or the - # input to the workflow (input_str). In all cases, data is encoded. - expected_results = (f'"{no_op_result_str}"'.encode(), f'"{input_str}"'.encode(), no_op_activity_input_str.encode()) + # input to the workflow (inputs). + expected_results = ( + no_op_result_str, + {"arg": input_str}, + dataclasses.asdict(inputs), + ) async with Worker( client, @@ -58,7 +70,7 @@ async def test_payloads_are_encrypted(): ) as worker: handle = await client.start_workflow( NoOpWorkflow.run, - input_str, + inputs, id=f"workflow-{workflow_id}", task_queue=worker.task_queue, ) @@ -76,4 +88,5 @@ async def test_payloads_are_encrypted(): assert payload.metadata["encoding"] == b"binary/encrypted" decoded_payloads = await codec.decode([payload]) - assert decoded_payloads[0].data in expected_results + loaded_payload = json.loads(decoded_payloads[0].data) + assert loaded_payload in expected_results diff --git a/posthog/temporal/workflows/__init__.py b/posthog/temporal/workflows/__init__.py index 2e4224caf758b..df7356f6ab997 100644 --- a/posthog/temporal/workflows/__init__.py +++ b/posthog/temporal/workflows/__init__.py @@ -1,7 +1,14 @@ from typing import Callable, Sequence +from posthog.temporal.workflows.backfill_batch_export import ( + BackfillBatchExportWorkflow, + backfill_schedule, + get_schedule_frequency, +) from posthog.temporal.workflows.batch_exports import ( + create_batch_export_backfill_model, create_export_run, + update_batch_export_backfill_model_status, update_export_run_status, ) from posthog.temporal.workflows.bigquery_batch_export import ( @@ -24,6 +31,7 @@ from posthog.temporal.workflows.squash_person_overrides import * WORKFLOWS = [ + BackfillBatchExportWorkflow, BigQueryBatchExportWorkflow, NoOpWorkflow, PostgresBatchExportWorkflow, @@ -33,10 +41,13 @@ ] ACTIVITIES: Sequence[Callable] = [ + backfill_schedule, + create_batch_export_backfill_model, create_export_run, delete_squashed_person_overrides_from_clickhouse, delete_squashed_person_overrides_from_postgres, drop_dictionary, + get_schedule_frequency, insert_into_bigquery_activity, insert_into_postgres_activity, insert_into_s3_activity, @@ -46,5 +57,6 @@ prepare_person_overrides, select_persons_to_delete, squash_events_partition, + update_batch_export_backfill_model_status, update_export_run_status, ] diff --git a/posthog/temporal/workflows/backfill_batch_export.py b/posthog/temporal/workflows/backfill_batch_export.py new file mode 100644 index 0000000000000..21f8c21398633 --- /dev/null +++ b/posthog/temporal/workflows/backfill_batch_export.py @@ -0,0 +1,359 @@ +import asyncio +import collections.abc +import dataclasses +import datetime as dt +import json +import typing + +import temporalio +import temporalio.activity +import temporalio.client +import temporalio.common +import temporalio.exceptions +import temporalio.workflow +from django.conf import settings + +from posthog.batch_exports.service import BackfillBatchExportInputs +from posthog.temporal.client import connect +from posthog.temporal.workflows.base import PostHogWorkflow +from posthog.temporal.workflows.batch_exports import ( + CreateBatchExportBackfillInputs, + UpdateBatchExportBackfillStatusInputs, + create_batch_export_backfill_model, + get_batch_exports_logger, + update_batch_export_backfill_model_status, +) + + +class HeartbeatDetails(typing.NamedTuple): + """Details sent over in a Temporal Activity heartbeat.""" + + schedule_id: str + start_at: str + end_at: str + wait_start_at: str + + def make_activity_heartbeat_while_running( + self, function_to_run: collections.abc.Callable, heartbeat_every: dt.timedelta + ) -> collections.abc.Callable[..., collections.abc.Coroutine]: + """Return a callable that returns a coroutine that hearbeats with these HeartbeatDetails. + + The returned callable wraps 'function_to_run' while heartbeatting 'factor' times for every + 'heartbeat_timeout'. + """ + + async def heartbeat() -> None: + """Heartbeat factor times every heartbeat_timeout.""" + while True: + await asyncio.sleep(heartbeat_every.total_seconds()) + temporalio.activity.heartbeat(self) + + async def heartbeat_while_running(*args, **kwargs): + """Wrap 'function_to_run' to asynchronously heartbeat while awaiting.""" + heartbeat_task = asyncio.create_task(heartbeat()) + + try: + return await function_to_run(*args, **kwargs) + finally: + heartbeat_task.cancel() + await asyncio.wait([heartbeat_task]) + + return heartbeat_while_running + + +@temporalio.activity.defn +async def get_schedule_frequency(schedule_id: str) -> float: + """Return a Temporal Schedule's frequency. + + This assumes that the Schedule has one interval set. + """ + client = await connect( + settings.TEMPORAL_HOST, + settings.TEMPORAL_PORT, + settings.TEMPORAL_NAMESPACE, + settings.TEMPORAL_CLIENT_ROOT_CA, + settings.TEMPORAL_CLIENT_CERT, + settings.TEMPORAL_CLIENT_KEY, + ) + + handle = client.get_schedule_handle(schedule_id) + desc = await handle.describe() + + interval = desc.schedule.spec.intervals[0] + return interval.every.total_seconds() + + +@dataclasses.dataclass +class BackfillScheduleInputs: + """Inputs for the backfill_schedule Activity.""" + + schedule_id: str + start_at: str + end_at: str + frequency_seconds: float + buffer_limit: int = 1 + wait_delay: float = 5.0 + + +@temporalio.activity.defn +async def backfill_schedule(inputs: BackfillScheduleInputs) -> None: + """Temporal Activity to backfill a Temporal Schedule. + + The backfill is broken up into batches of inputs.buffer_limit size. After a backfill batch is requested, + we wait for it to be done before continuing with the next. + + This activity heartbeats while waiting to allow cancelling an ongoing backfill. + """ + start_at = dt.datetime.fromisoformat(inputs.start_at) + end_at = dt.datetime.fromisoformat(inputs.end_at) + + client = await connect( + settings.TEMPORAL_HOST, + settings.TEMPORAL_PORT, + settings.TEMPORAL_NAMESPACE, + settings.TEMPORAL_CLIENT_ROOT_CA, + settings.TEMPORAL_CLIENT_CERT, + settings.TEMPORAL_CLIENT_KEY, + ) + + heartbeat_timeout = temporalio.activity.info().heartbeat_timeout + + details = temporalio.activity.info().heartbeat_details + + if details: + # If we receive details from a previous run, it means we were restarted for some reason. + # Let's not double-backfill and instead wait for any outstanding runs. + last_activity_details = details[0] + + details = HeartbeatDetails( + schedule_id=inputs.schedule_id, + start_at=last_activity_details.start_at, + end_at=last_activity_details.end_at, + wait_start_at=last_activity_details.wait_start_at, + ) + + await wait_for_schedule_backfill_in_range_with_heartbeat(details, client, heartbeat_timeout, inputs.wait_delay) + + # Update start_at to resume from the end of the period we just waited for + start_at = dt.datetime.fromisoformat(last_activity_details.end_at) + + handle = client.get_schedule_handle(inputs.schedule_id) + + frequency = dt.timedelta(seconds=inputs.frequency_seconds) + full_backfill_range = backfill_range(start_at, end_at, frequency * inputs.buffer_limit) + + for backfill_start_at, backfill_end_at in full_backfill_range: + utcnow = dt.datetime.utcnow() + + backfill = temporalio.client.ScheduleBackfill( + start_at=backfill_start_at, + end_at=backfill_end_at, + overlap=temporalio.client.ScheduleOverlapPolicy.ALLOW_ALL, + ) + await handle.backfill(backfill) + + details = HeartbeatDetails( + schedule_id=inputs.schedule_id, + start_at=backfill_start_at.isoformat(), + end_at=backfill_end_at.isoformat(), + wait_start_at=utcnow.isoformat(), + ) + + await wait_for_schedule_backfill_in_range_with_heartbeat(details, client, heartbeat_timeout, inputs.wait_delay) + + +async def wait_for_schedule_backfill_in_range_with_heartbeat( + heartbeat_details: HeartbeatDetails, + client: temporalio.client.Client, + heartbeat_timeout: dt.timedelta | None = None, + wait_delay: float = 5.0, +): + """Decide if heartbeating is required while waiting for a backfill in range to finish.""" + if heartbeat_timeout: + wait_func = heartbeat_details.make_activity_heartbeat_while_running( + wait_for_schedule_backfill_in_range, heartbeat_every=dt.timedelta(seconds=1) + ) + else: + wait_func = wait_for_schedule_backfill_in_range + + await wait_func( + client, + heartbeat_details.schedule_id, + dt.datetime.fromisoformat(heartbeat_details.start_at), + dt.datetime.fromisoformat(heartbeat_details.end_at), + dt.datetime.fromisoformat(heartbeat_details.wait_start_at), + wait_delay, + ) + + +async def wait_for_schedule_backfill_in_range( + client: temporalio.client.Client, + schedule_id: str, + start_at: dt.datetime, + end_at: dt.datetime, + now: dt.datetime, + wait_delay: float = 5.0, +) -> None: + """Wait for a Temporal Schedule backfill in a date range to be finished. + + We can use the TemporalScheduledById and the TemporalScheduledStartTime to identify the Workflow executions + runs that fall under this Temporal Schedule's backfill. However, there could be regularly scheduled runs returned + by a query on just these two fields. So, we take the 'now' argument to provide a lower bound for the Workflow + execution start time, assuming that backfill runs will have started recently after 'now' whereas regularly + scheduled runs happened sometime in the past, before 'now'. This should hold true for historical backfills, + but the heuristic fails for "future backfills", which should not be allowed. + """ + query = ( + f'TemporalScheduledById="{schedule_id}" ' + f'AND TemporalScheduledStartTime >= "{start_at.isoformat()}" ' + f'AND TemporalScheduledStartTime <= "{end_at.isoformat()}" ' + f'AND StartTime >= "{now.isoformat()}"' + ) + + done = False + while not done: + await asyncio.sleep(wait_delay) + + workflows = [workflow async for workflow in client.list_workflows(query=query)] + + if not workflows: + # Backfill hasn't started yet. + continue + + if check_workflow_executions_not_running(workflows) is False: + continue + + done = True + + +def check_workflow_executions_not_running(workflow_executions: list[temporalio.client.WorkflowExecution]) -> bool: + """Check if a list of Worflow Executions has any still running.""" + return all( + workflow_execution.status != temporalio.client.WorkflowExecutionStatus.RUNNING + for workflow_execution in workflow_executions + ) + + +def backfill_range( + start_at: dt.datetime, end_at: dt.datetime, step: dt.timedelta +) -> typing.Generator[tuple[dt.datetime, dt.datetime], None, None]: + """Generate range of dates between start_at and end_at.""" + current = start_at + + while current < end_at: + current_end = current + step + + if current_end > end_at: + current_end = end_at + + yield current, current_end + + current = current_end + + +@temporalio.workflow.defn(name="backfill-batch-export") +class BackfillBatchExportWorkflow(PostHogWorkflow): + """A Temporal Workflow to manage a backfill of a batch export. + + Temporal Schedule backfills are limited in the number of batch periods we can buffer. This limit + has been confirmed to be less than 1000. So, when triggering a backfill of more than 1000 batch + periods (about a month for hourly batch exports), we need this Workflow to manage its progress. + + We also report on the progress by updating the BatchExportBackfill model. + """ + + @staticmethod + def parse_inputs(inputs: list[str]) -> BackfillBatchExportInputs: + """Parse inputs from the management command CLI.""" + loaded = json.loads(inputs[0]) + return BackfillBatchExportInputs(**loaded) + + @temporalio.workflow.run + async def run(self, inputs: BackfillBatchExportInputs) -> None: + """Workflow implementation to backfill a BatchExport.""" + logger = get_batch_exports_logger(inputs=inputs) + logger.info( + "Starting Backfill for BatchExport %s: %s - %s", inputs.batch_export_id, inputs.start_at, inputs.end_at + ) + + create_batch_export_backfill_inputs = CreateBatchExportBackfillInputs( + team_id=inputs.team_id, + batch_export_id=inputs.batch_export_id, + start_at=inputs.start_at, + end_at=inputs.end_at, + status="Running", + ) + + backfill_id = await temporalio.workflow.execute_activity( + create_batch_export_backfill_model, + create_batch_export_backfill_inputs, + start_to_close_timeout=dt.timedelta(minutes=5), + retry_policy=temporalio.common.RetryPolicy( + initial_interval=dt.timedelta(seconds=10), + maximum_interval=dt.timedelta(seconds=60), + maximum_attempts=0, + non_retryable_error_types=["NotNullViolation", "IntegrityError"], + ), + ) + update_inputs = UpdateBatchExportBackfillStatusInputs(id=backfill_id, status="Completed") + + frequency_seconds = await temporalio.workflow.execute_activity( + get_schedule_frequency, + inputs.batch_export_id, + start_to_close_timeout=dt.timedelta(minutes=1), + retry_policy=temporalio.common.RetryPolicy(maximum_attempts=0), + ) + + backfill_duration = dt.datetime.fromisoformat(inputs.end_at) - dt.datetime.fromisoformat(inputs.start_at) + number_of_expected_runs = backfill_duration / dt.timedelta(seconds=frequency_seconds) + + backfill_schedule_inputs = BackfillScheduleInputs( + schedule_id=inputs.batch_export_id, + start_at=inputs.start_at, + end_at=inputs.end_at, + frequency_seconds=frequency_seconds, + buffer_limit=inputs.buffer_limit, + wait_delay=inputs.wait_delay, + ) + try: + await temporalio.workflow.execute_activity( + backfill_schedule, + backfill_schedule_inputs, + retry_policy=temporalio.common.RetryPolicy( + initial_interval=dt.timedelta(seconds=10), + maximum_interval=dt.timedelta(seconds=60), + ), + # Temporal requires that we set a timeout. + # Allocate 5 minutes per expected number of runs to backfill as a timeout. + # The 5 minutes are just an assumption and we may tweak this in the future + start_to_close_timeout=dt.timedelta(minutes=5 * number_of_expected_runs), + heartbeat_timeout=dt.timedelta(minutes=2), + ) + + except temporalio.exceptions.ActivityError as e: + if isinstance(e.cause, temporalio.exceptions.CancelledError): + logger.error("Backfill was cancelled.") + update_inputs.status = "Cancelled" + else: + logger.exception("Backfill failed.", exc_info=e.cause) + update_inputs.status = "Failed" + + raise + + except Exception as e: + logger.exception("Backfill failed with an unexpected error.", exc_info=e) + update_inputs.status = "Failed" + raise + + finally: + await temporalio.workflow.execute_activity( + update_batch_export_backfill_model_status, + update_inputs, + start_to_close_timeout=dt.timedelta(minutes=5), + retry_policy=temporalio.common.RetryPolicy( + initial_interval=dt.timedelta(seconds=10), + maximum_interval=dt.timedelta(seconds=60), + maximum_attempts=0, + non_retryable_error_types=["NotNullViolation", "IntegrityError"], + ), + ) diff --git a/posthog/temporal/workflows/batch_exports.py b/posthog/temporal/workflows/batch_exports.py index 79a0917fd4f43..4144940e471a8 100644 --- a/posthog/temporal/workflows/batch_exports.py +++ b/posthog/temporal/workflows/batch_exports.py @@ -18,7 +18,9 @@ from posthog.batch_exports.service import ( BatchExportsInputsProtocol, + create_batch_export_backfill, create_batch_export_run, + update_batch_export_backfill_status, update_batch_export_run_status, ) from posthog.kafka_client.client import KafkaProducer @@ -631,3 +633,52 @@ class UpdateBatchExportRunStatusInputs: async def update_export_run_status(inputs: UpdateBatchExportRunStatusInputs): """Activity that updates the status of an BatchExportRun.""" await sync_to_async(update_batch_export_run_status)(run_id=uuid.UUID(inputs.id), status=inputs.status, latest_error=inputs.latest_error) # type: ignore + + +@dataclasses.dataclass +class CreateBatchExportBackfillInputs: + team_id: int + batch_export_id: str + start_at: str + end_at: str + status: str + + +@activity.defn +async def create_batch_export_backfill_model(inputs: CreateBatchExportBackfillInputs) -> str: + """Activity that creates an BatchExportBackfill. + + Intended to be used in all export workflows, usually at the start, to create a model + instance to represent them in our database. + """ + logger = get_batch_exports_logger(inputs=inputs) + logger.info(f"Creating BatchExportBackfill model instance in team {inputs.team_id}.") + + # 'sync_to_async' type hints are fixed in asgiref>=3.4.1 + # But one of our dependencies is pinned to asgiref==3.3.2. + # Remove these comments once we upgrade. + run = await sync_to_async(create_batch_export_backfill)( # type: ignore + batch_export_id=uuid.UUID(inputs.batch_export_id), + start_at=inputs.start_at, + end_at=inputs.end_at, + status=inputs.status, + team_id=inputs.team_id, + ) + + logger.info(f"Created BatchExportBackfill {run.id} in team {inputs.team_id}.") + + return str(run.id) + + +@dataclasses.dataclass +class UpdateBatchExportBackfillStatusInputs: + """Inputs to the update_batch_export_backfill_status activity.""" + + id: str + status: str + + +@activity.defn +async def update_batch_export_backfill_model_status(inputs: UpdateBatchExportBackfillStatusInputs): + """Activity that updates the status of an BatchExportRun.""" + await sync_to_async(update_batch_export_backfill_status)(backfill_id=uuid.UUID(inputs.id), status=inputs.status) # type: ignore diff --git a/posthog/temporal/workflows/noop.py b/posthog/temporal/workflows/noop.py index a321168bec6c7..20fa2f1ac097e 100644 --- a/posthog/temporal/workflows/noop.py +++ b/posthog/temporal/workflows/noop.py @@ -1,22 +1,24 @@ +import json import logging from dataclasses import dataclass -from datetime import datetime, timedelta +from datetime import timedelta from typing import Any from temporalio import activity, workflow +from posthog.batch_exports.service import NoOpInputs from posthog.temporal.workflows.base import PostHogWorkflow @dataclass class NoopActivityArgs: - time: str + arg: str @activity.defn -async def noop_activity(input: NoopActivityArgs) -> str: - activity.logger.info(f"Running activity with parameter {input.time}") - output = f"OK - {input.time}" +async def noop_activity(inputs: NoopActivityArgs) -> str: + activity.logger.info(f"Running activity with parameter {inputs.arg}") + output = f"OK - {inputs.arg}" logging.warning(f"[Action] - Action executed on worker with output: {output}") return output @@ -29,17 +31,15 @@ def parse_inputs(inputs: list[str]) -> Any: We expect only one input, so we just return it and assume it's correct. """ - if not inputs: - # Preserving defaults from when this was the only workflow. - inputs = [datetime.now().isoformat()] - return inputs[0] + loaded = json.loads(inputs[0]) + return NoOpInputs(**loaded) @workflow.run - async def run(self, time: str) -> str: - workflow.logger.info(f"Running workflow with parameter {time}") + async def run(self, inputs: NoOpInputs) -> str: + workflow.logger.info(f"Running workflow with parameter {inputs.arg}") result = await workflow.execute_activity( noop_activity, - NoopActivityArgs(time), + NoopActivityArgs(inputs.arg), start_to_close_timeout=timedelta(seconds=60), schedule_to_close_timeout=timedelta(minutes=5), )