diff --git a/posthog/batch_exports/models.py b/posthog/batch_exports/models.py index 02c3cda51a4ea8..2df25a849ff5f9 100644 --- a/posthog/batch_exports/models.py +++ b/posthog/batch_exports/models.py @@ -28,12 +28,14 @@ class Destination(models.TextChoices): SNOWFLAKE = "Snowflake" POSTGRES = "Postgres" BIGQUERY = "BigQuery" + NOOP = "NoOp" secret_fields = { "S3": {"aws_access_key_id", "aws_secret_access_key"}, "Snowflake": set("password"), "Postgres": set("password"), "BigQuery": {"private_key", "private_key_id", "client_email", "token_uri"}, + "NoOp": set(), } type: models.CharField = models.CharField( diff --git a/posthog/batch_exports/service.py b/posthog/batch_exports/service.py index ddac9ff7d35594..0ad0ae31a843e3 100644 --- a/posthog/batch_exports/service.py +++ b/posthog/batch_exports/service.py @@ -122,11 +122,22 @@ class BigQueryBatchExportInputs: include_events: list[str] | None = None +@dataclass +class NoOpInputs: + """NoOp Workflow is used for testing, it takes a single argument to echo back.""" + + batch_export_id: str + team_id: int + interval: str = "hour" + arg: str = "" + + DESTINATION_WORKFLOWS = { "S3": ("s3-export", S3BatchExportInputs), "Snowflake": ("snowflake-export", SnowflakeBatchExportInputs), "Postgres": ("postgres-export", PostgresBatchExportInputs), "BigQuery": ("bigquery-export", BigQueryBatchExportInputs), + "NoOp": ("no-op", NoOpInputs), } @@ -377,6 +388,7 @@ async def updater(_: ScheduleUpdateInput) -> ScheduleUpdate: def create_batch_export_backfill( batch_export_id: UUID, + team_id: int, start_at: str, end_at: str, status: str = BatchExportRun.Status.RUNNING, @@ -392,6 +404,7 @@ def create_batch_export_backfill( status=status, start_at=dt.datetime.fromisoformat(start_at), end_at=dt.datetime.fromisoformat(end_at), + team_id=team_id, ) backfill.save() diff --git a/posthog/migrations/0354_add_batch_export_backfill_model.py b/posthog/migrations/0354_add_batch_export_backfill_model.py index 1428af2d258e12..6f7d2c3e99f3fe 100644 --- a/posthog/migrations/0354_add_batch_export_backfill_model.py +++ b/posthog/migrations/0354_add_batch_export_backfill_model.py @@ -1,4 +1,4 @@ -# Generated by Django 3.2.19 on 2023-10-11 08:17 +# Generated by Django 3.2.19 on 2023-10-13 09:13 from django.db import migrations, models import django.db.models.deletion @@ -12,6 +12,21 @@ class Migration(migrations.Migration): ] operations = [ + migrations.AlterField( + model_name="batchexportdestination", + name="type", + field=models.CharField( + choices=[ + ("S3", "S3"), + ("Snowflake", "Snowflake"), + ("Postgres", "Postgres"), + ("BigQuery", "Bigquery"), + ("NoOp", "Noop"), + ], + help_text="A choice of supported BatchExportDestination types.", + max_length=64, + ), + ), migrations.CreateModel( name="BatchExportBackfill", fields=[ diff --git a/posthog/temporal/tests/batch_exports/base.py b/posthog/temporal/tests/batch_exports/base.py index 88a52fe798426e..69c00111d46155 100644 --- a/posthog/temporal/tests/batch_exports/base.py +++ b/posthog/temporal/tests/batch_exports/base.py @@ -1,10 +1,12 @@ import datetime as dt import json import typing +import uuid from asgiref.sync import sync_to_async from ee.clickhouse.materialized_columns.columns import materialize +from posthog.batch_exports.models import BatchExportBackfill from posthog.temporal.workflows.clickhouse import ClickHouseClient @@ -79,3 +81,13 @@ def to_isoformat(d: str | None) -> str | None: if d is None: return None return dt.datetime.fromisoformat(d).replace(tzinfo=dt.timezone.utc).isoformat() + + +def fetch_batch_export_backfills(batch_export_id: uuid.UUID, limit: int = 100) -> list[BatchExportBackfill]: + """Fetch the BatchExportBackfills for a given BatchExport.""" + return list(BatchExportBackfill.objects.filter(batch_export_id=batch_export_id).order_by("-created_at")[:limit]) + + +async def afetch_batch_export_backfills(batch_export_id: uuid.UUID, limit: int = 100) -> list[BatchExportBackfill]: + """Fetch the BatchExportBackfills for a given BatchExport.""" + return await sync_to_async(fetch_batch_export_backfills)(batch_export_id, limit) # type: ignore diff --git a/posthog/temporal/tests/batch_exports/test_backfill_batch_export.py b/posthog/temporal/tests/batch_exports/test_backfill_batch_export.py index 96a07561dd7e18..f5eba4c9c00d66 100644 --- a/posthog/temporal/tests/batch_exports/test_backfill_batch_export.py +++ b/posthog/temporal/tests/batch_exports/test_backfill_batch_export.py @@ -6,17 +6,31 @@ import pytest_asyncio import temporalio import temporalio.client +import temporalio.common import temporalio.testing import temporalio.worker from django.conf import settings +from posthog.api.test.test_organization import acreate_organization +from posthog.api.test.test_team import acreate_team from posthog.temporal.client import connect +from posthog.temporal.tests.batch_exports.base import afetch_batch_export_backfills +from posthog.temporal.tests.batch_exports.fixtures import ( + acreate_batch_export, + adelete_batch_export, +) from posthog.temporal.workflows.backfill_batch_export import ( BackfillBatchExportInputs, + BackfillBatchExportWorkflow, + BackfillScheduleInputs, backfill_range, backfill_schedule, get_schedule_frequency, ) +from posthog.temporal.workflows.batch_exports import ( + create_batch_export_backfill_model, + update_batch_export_backfill_model_status, +) from posthog.temporal.workflows.noop import NoOpWorkflow, noop_activity @@ -38,38 +52,56 @@ async def temporal_client(): @pytest_asyncio.fixture async def temporal_schedule(temporal_client): """Manage a test Temopral Schedule yielding its handle.""" - schedule_id = str(uuid.uuid4()) - handle = await temporal_client.create_schedule( - schedule_id, - temporalio.client.Schedule( - action=temporalio.client.ScheduleActionStartWorkflow( - NoOpWorkflow.run, - "test-input", - id="test-schedule-workflow-id", - task_queue=settings.TEMPORAL_TASK_QUEUE, - ), - spec=temporalio.client.ScheduleSpec( - intervals=[temporalio.client.ScheduleIntervalSpec(every=dt.timedelta(minutes=1))] - ), - state=temporalio.client.ScheduleState(paused=True), - ), + destination_data = { + "type": "NoOp", + "config": {}, + } + + interval = "every 1 minutes" + batch_export_data = { + "name": "no-op-export", + "destination": destination_data, + "interval": interval, + "paused": True, + } + + organization = await acreate_organization("test") + team = await acreate_team(organization=organization) + batch_export = await acreate_batch_export( + team_id=team.pk, + name=batch_export_data["name"], + destination_data=batch_export_data["destination"], + interval=batch_export_data["interval"], ) + handle = temporal_client.get_schedule_handle(str(batch_export.id)) yield handle - await handle.delete() + await adelete_batch_export(batch_export, temporal_client) -@pytest.fixture -def temporal_worker(temporal_client): +@pytest_asyncio.fixture +async def temporal_worker(temporal_client): worker = temporalio.worker.Worker( temporal_client, task_queue=settings.TEMPORAL_TASK_QUEUE, - workflows=[NoOpWorkflow], - activities=[noop_activity], + workflows=[NoOpWorkflow, BackfillBatchExportWorkflow], + activities=[ + noop_activity, + backfill_schedule, + create_batch_export_backfill_model, + update_batch_export_backfill_model_status, + get_schedule_frequency, + ], workflow_runner=temporalio.worker.UnsandboxedWorkflowRunner(), ) - return worker + + worker_run = asyncio.create_task(worker.run()) + + yield worker + + worker_run.cancel() + await asyncio.wait([worker_run]) @pytest.mark.parametrize( @@ -133,40 +165,81 @@ def test_backfill_range(start_at, end_at, step, expected): @pytest.mark.asyncio -async def test_get_schedule_frequency(temporal_schedule): +@pytest.mark.django_db +async def test_get_schedule_frequency(activity_environment, temporal_worker, temporal_schedule): """Test get_schedule_frequency returns the correct interval.""" desc = await temporal_schedule.describe() - expected = desc.schedule.spec.intervals[0].every + expected = desc.schedule.spec.intervals[0].every.total_seconds() - result = await get_schedule_frequency(temporal_schedule) + result = await activity_environment.run(get_schedule_frequency, desc.id) assert result == expected @pytest.mark.asyncio +@pytest.mark.django_db async def test_backfill_schedule_activity(activity_environment, temporal_worker, temporal_schedule): """Test backfill_schedule activity schedules all backfill runs.""" start_at = dt.datetime(2023, 1, 1, 0, 0, 0, tzinfo=dt.timezone.utc) end_at = dt.datetime(2023, 1, 1, 0, 10, 0, tzinfo=dt.timezone.utc) desc = await temporal_schedule.describe() - inputs = BackfillBatchExportInputs( - team_id=1, + inputs = BackfillScheduleInputs( schedule_id=desc.id, start_at=start_at.isoformat(), end_at=end_at.isoformat(), buffer_limit=2, wait_delay=1.0, + frequency_seconds=desc.schedule.spec.intervals[0].every.total_seconds(), ) - worker_run = asyncio.create_task(temporal_worker.run()) await activity_environment.run(backfill_schedule, inputs) - worker_run.cancel() - await asyncio.wait([worker_run]) + desc = await temporal_schedule.describe() + result = desc.info.num_actions + expected = 10 + + assert result == expected + + +@pytest.mark.asyncio +@pytest.mark.django_db +async def test_backfill_batch_export_workflow(temporal_worker, temporal_schedule, temporal_client): + """Test BackfillBatchExportWorkflow executes all backfill runs and updates model.""" + start_at = dt.datetime(2023, 1, 1, 0, 0, 0, tzinfo=dt.timezone.utc) + end_at = dt.datetime(2023, 1, 1, 0, 10, 0, tzinfo=dt.timezone.utc) + + desc = await temporal_schedule.describe() + + workflow_id = str(uuid.uuid4()) + inputs = BackfillBatchExportInputs( + team_id=1, + schedule_id=desc.id, + start_at=start_at.isoformat(), + end_at=end_at.isoformat(), + buffer_limit=2, + wait_delay=1.0, + ) + + handle = await temporal_client.start_workflow( + BackfillBatchExportWorkflow.run, + inputs, + id=workflow_id, + task_queue=settings.TEMPORAL_TASK_QUEUE, + execution_timeout=dt.timedelta(minutes=1), + retry_policy=temporalio.common.RetryPolicy(maximum_attempts=1), + ) + await handle.result() desc = await temporal_schedule.describe() result = desc.info.num_actions expected = 10 assert result == expected + + backfills = await afetch_batch_export_backfills(batch_export_id=desc.id) + + assert len(backfills) == 1, "Expected one backfill to have been created" + + backfill = backfills.pop() + assert backfill.status == "Completed" diff --git a/posthog/temporal/workflows/backfill_batch_export.py b/posthog/temporal/workflows/backfill_batch_export.py index 1f0c686ee89bc5..3eab4116c4eea9 100644 --- a/posthog/temporal/workflows/backfill_batch_export.py +++ b/posthog/temporal/workflows/backfill_batch_export.py @@ -24,18 +24,6 @@ ) -@dataclasses.dataclass -class BackfillBatchExportInputs: - """Inputs for the BackfillBatchExport Workflow.""" - - team_id: int - schedule_id: str - start_at: str - end_at: str - buffer_limit: int = 1 - wait_delay: float = 5.0 - - class HeartbeatDetails(typing.NamedTuple): """Details sent over in a Temporal Activity heartbeat.""" @@ -72,7 +60,41 @@ async def heartbeat_while_running(*args, **kwargs): @temporalio.activity.defn -async def backfill_schedule(inputs: BackfillBatchExportInputs) -> None: +async def get_schedule_frequency(schedule_id: str) -> float: + """Return a Temporal Schedule's frequency. + + This assumes that the Schedule has one interval set. + """ + client = await connect( + settings.TEMPORAL_HOST, + settings.TEMPORAL_PORT, + settings.TEMPORAL_NAMESPACE, + settings.TEMPORAL_CLIENT_ROOT_CA, + settings.TEMPORAL_CLIENT_CERT, + settings.TEMPORAL_CLIENT_KEY, + ) + + handle = client.get_schedule_handle(schedule_id) + desc = await handle.describe() + + interval = desc.schedule.spec.intervals[0] + return interval.every.total_seconds() + + +@dataclasses.dataclass +class BackfillScheduleInputs: + """Inputs for the backfill_schedule Activity.""" + + schedule_id: str + start_at: str + end_at: str + frequency_seconds: float + buffer_limit: int = 1 + wait_delay: float = 5.0 + + +@temporalio.activity.defn +async def backfill_schedule(inputs: BackfillScheduleInputs) -> None: """Temporal Activity to backfill a Temporal Schedule. The backfill is broken up into batches of inputs.buffer_limit size. After a backfill batch is requested, @@ -115,7 +137,7 @@ async def backfill_schedule(inputs: BackfillBatchExportInputs) -> None: handle = client.get_schedule_handle(inputs.schedule_id) - frequency = await get_schedule_frequency(handle) + frequency = dt.timedelta(seconds=inputs.frequency_seconds) full_backfill_range = backfill_range(start_at, end_at, frequency * inputs.buffer_limit) for backfill_start_at, backfill_end_at in full_backfill_range: @@ -215,15 +237,33 @@ def check_workflow_executions_not_running(workflow_executions: list[temporalio.c ) -async def get_schedule_frequency(handle: temporalio.client.ScheduleHandle) -> dt.timedelta: - """Return a Temporal Schedule's frequency. +def backfill_range( + start_at: dt.datetime, end_at: dt.datetime, step: dt.timedelta +) -> typing.Generator[tuple[dt.datetime, dt.datetime], None, None]: + """Generate range of dates between start_at and end_at.""" + current = start_at - This assumes that the Schedule has one interval set. - """ - desc = await handle.describe() + while current < end_at: + current_end = current + step - interval = desc.schedule.spec.intervals[0] - return interval.every + if current_end > end_at: + current_end = end_at + + yield current, current_end + + current = current_end + + +@dataclasses.dataclass +class BackfillBatchExportInputs: + """Inputs for the BackfillBatchExport Workflow.""" + + team_id: int + schedule_id: str + start_at: str + end_at: str + buffer_limit: int = 1 + wait_delay: float = 5.0 @temporalio.workflow.defn(name="backfill-batch-export") @@ -268,12 +308,39 @@ async def run(self, inputs: BackfillBatchExportInputs) -> None: non_retryable_error_types=["NotNullViolation", "IntegrityError"], ), ) - update_inputs = UpdateBatchExportBackfillStatusInputs(id=backfill_id, status="Completed") + frequency_seconds = await temporalio.workflow.execute_activity( + get_schedule_frequency, + inputs.schedule_id, + start_to_close_timeout=dt.timedelta(minutes=1), + retry_policy=temporalio.common.RetryPolicy(maximum_attempts=0), + ) + + backfill_duration = dt.datetime.fromisoformat(inputs.end_at) - dt.datetime.fromisoformat(inputs.start_at) + number_of_expected_runs = backfill_duration / dt.timedelta(seconds=frequency_seconds) + + backfill_schedule_inputs = BackfillScheduleInputs( + schedule_id=inputs.schedule_id, + start_at=inputs.start_at, + end_at=inputs.end_at, + frequency_seconds=frequency_seconds, + buffer_limit=inputs.buffer_limit, + wait_delay=inputs.wait_delay, + ) try: await temporalio.workflow.execute_activity( - backfill_schedule, inputs, heartbeat_timeout=dt.timedelta(minutes=2) + backfill_schedule, + backfill_schedule_inputs, + retry_policy=temporalio.common.RetryPolicy( + initial_interval=dt.timedelta(seconds=10), + maximum_interval=dt.timedelta(seconds=60), + ), + # Temporal requires that we set a timeout. + # Allocate 5 minutes per expected number of runs to backfill as a timeout. + # The 5 minutes are just an assumption and we may tweak this in the future + start_to_close_timeout=dt.timedelta(minutes=5 * number_of_expected_runs), + heartbeat_timeout=dt.timedelta(minutes=2), ) except temporalio.exceptions.ActivityError as e: @@ -303,20 +370,3 @@ async def run(self, inputs: BackfillBatchExportInputs) -> None: non_retryable_error_types=["NotNullViolation", "IntegrityError"], ), ) - - -def backfill_range( - start_at: dt.datetime, end_at: dt.datetime, step: dt.timedelta -) -> typing.Generator[tuple[dt.datetime, dt.datetime], None, None]: - """Generate ranges of dates between start_at and end_at.""" - current = start_at - - while current < end_at: - current_end = current + step - - if current_end > end_at: - current_end = end_at - - yield current, current_end - - current = current_end diff --git a/posthog/temporal/workflows/batch_exports.py b/posthog/temporal/workflows/batch_exports.py index 4c4d08df046e6c..4144940e471a89 100644 --- a/posthog/temporal/workflows/batch_exports.py +++ b/posthog/temporal/workflows/batch_exports.py @@ -662,6 +662,7 @@ async def create_batch_export_backfill_model(inputs: CreateBatchExportBackfillIn start_at=inputs.start_at, end_at=inputs.end_at, status=inputs.status, + team_id=inputs.team_id, ) logger.info(f"Created BatchExportBackfill {run.id} in team {inputs.team_id}.") diff --git a/posthog/temporal/workflows/noop.py b/posthog/temporal/workflows/noop.py index a321168bec6c7f..20fa2f1ac097e0 100644 --- a/posthog/temporal/workflows/noop.py +++ b/posthog/temporal/workflows/noop.py @@ -1,22 +1,24 @@ +import json import logging from dataclasses import dataclass -from datetime import datetime, timedelta +from datetime import timedelta from typing import Any from temporalio import activity, workflow +from posthog.batch_exports.service import NoOpInputs from posthog.temporal.workflows.base import PostHogWorkflow @dataclass class NoopActivityArgs: - time: str + arg: str @activity.defn -async def noop_activity(input: NoopActivityArgs) -> str: - activity.logger.info(f"Running activity with parameter {input.time}") - output = f"OK - {input.time}" +async def noop_activity(inputs: NoopActivityArgs) -> str: + activity.logger.info(f"Running activity with parameter {inputs.arg}") + output = f"OK - {inputs.arg}" logging.warning(f"[Action] - Action executed on worker with output: {output}") return output @@ -29,17 +31,15 @@ def parse_inputs(inputs: list[str]) -> Any: We expect only one input, so we just return it and assume it's correct. """ - if not inputs: - # Preserving defaults from when this was the only workflow. - inputs = [datetime.now().isoformat()] - return inputs[0] + loaded = json.loads(inputs[0]) + return NoOpInputs(**loaded) @workflow.run - async def run(self, time: str) -> str: - workflow.logger.info(f"Running workflow with parameter {time}") + async def run(self, inputs: NoOpInputs) -> str: + workflow.logger.info(f"Running workflow with parameter {inputs.arg}") result = await workflow.execute_activity( noop_activity, - NoopActivityArgs(time), + NoopActivityArgs(inputs.arg), start_to_close_timeout=timedelta(seconds=60), schedule_to_close_timeout=timedelta(minutes=5), )