Skip to content

Commit

Permalink
test(batch-exports-backfills): Add Workflow test
Browse files Browse the repository at this point in the history
  • Loading branch information
tomasfarias committed Oct 13, 2023
1 parent 0480b4b commit 6880f05
Show file tree
Hide file tree
Showing 8 changed files with 248 additions and 82 deletions.
2 changes: 2 additions & 0 deletions posthog/batch_exports/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,14 @@ class Destination(models.TextChoices):
SNOWFLAKE = "Snowflake"
POSTGRES = "Postgres"
BIGQUERY = "BigQuery"
NOOP = "NoOp"

secret_fields = {
"S3": {"aws_access_key_id", "aws_secret_access_key"},
"Snowflake": set("password"),
"Postgres": set("password"),
"BigQuery": {"private_key", "private_key_id", "client_email", "token_uri"},
"NoOp": set(),
}

type: models.CharField = models.CharField(
Expand Down
13 changes: 13 additions & 0 deletions posthog/batch_exports/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,11 +122,22 @@ class BigQueryBatchExportInputs:
include_events: list[str] | None = None


@dataclass
class NoOpInputs:
"""NoOp Workflow is used for testing, it takes a single argument to echo back."""

batch_export_id: str
team_id: int
interval: str = "hour"
arg: str = ""


DESTINATION_WORKFLOWS = {
"S3": ("s3-export", S3BatchExportInputs),
"Snowflake": ("snowflake-export", SnowflakeBatchExportInputs),
"Postgres": ("postgres-export", PostgresBatchExportInputs),
"BigQuery": ("bigquery-export", BigQueryBatchExportInputs),
"NoOp": ("no-op", NoOpInputs),
}


Expand Down Expand Up @@ -377,6 +388,7 @@ async def updater(_: ScheduleUpdateInput) -> ScheduleUpdate:

def create_batch_export_backfill(
batch_export_id: UUID,
team_id: int,
start_at: str,
end_at: str,
status: str = BatchExportRun.Status.RUNNING,
Expand All @@ -392,6 +404,7 @@ def create_batch_export_backfill(
status=status,
start_at=dt.datetime.fromisoformat(start_at),
end_at=dt.datetime.fromisoformat(end_at),
team_id=team_id,
)
backfill.save()

Expand Down
17 changes: 16 additions & 1 deletion posthog/migrations/0354_add_batch_export_backfill_model.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Generated by Django 3.2.19 on 2023-10-11 08:17
# Generated by Django 3.2.19 on 2023-10-13 09:13

from django.db import migrations, models
import django.db.models.deletion
Expand All @@ -12,6 +12,21 @@ class Migration(migrations.Migration):
]

operations = [
migrations.AlterField(
model_name="batchexportdestination",
name="type",
field=models.CharField(
choices=[
("S3", "S3"),
("Snowflake", "Snowflake"),
("Postgres", "Postgres"),
("BigQuery", "Bigquery"),
("NoOp", "Noop"),
],
help_text="A choice of supported BatchExportDestination types.",
max_length=64,
),
),
migrations.CreateModel(
name="BatchExportBackfill",
fields=[
Expand Down
12 changes: 12 additions & 0 deletions posthog/temporal/tests/batch_exports/base.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import datetime as dt
import json
import typing
import uuid

from asgiref.sync import sync_to_async

from ee.clickhouse.materialized_columns.columns import materialize
from posthog.batch_exports.models import BatchExportBackfill
from posthog.temporal.workflows.clickhouse import ClickHouseClient


Expand Down Expand Up @@ -79,3 +81,13 @@ def to_isoformat(d: str | None) -> str | None:
if d is None:
return None
return dt.datetime.fromisoformat(d).replace(tzinfo=dt.timezone.utc).isoformat()


def fetch_batch_export_backfills(batch_export_id: uuid.UUID, limit: int = 100) -> list[BatchExportBackfill]:
"""Fetch the BatchExportBackfills for a given BatchExport."""
return list(BatchExportBackfill.objects.filter(batch_export_id=batch_export_id).order_by("-created_at")[:limit])


async def afetch_batch_export_backfills(batch_export_id: uuid.UUID, limit: int = 100) -> list[BatchExportBackfill]:
"""Fetch the BatchExportBackfills for a given BatchExport."""
return await sync_to_async(fetch_batch_export_backfills)(batch_export_id, limit) # type: ignore
131 changes: 102 additions & 29 deletions posthog/temporal/tests/batch_exports/test_backfill_batch_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,31 @@
import pytest_asyncio
import temporalio
import temporalio.client
import temporalio.common
import temporalio.testing
import temporalio.worker
from django.conf import settings

from posthog.api.test.test_organization import acreate_organization
from posthog.api.test.test_team import acreate_team
from posthog.temporal.client import connect
from posthog.temporal.tests.batch_exports.base import afetch_batch_export_backfills
from posthog.temporal.tests.batch_exports.fixtures import (
acreate_batch_export,
adelete_batch_export,
)
from posthog.temporal.workflows.backfill_batch_export import (
BackfillBatchExportInputs,
BackfillBatchExportWorkflow,
BackfillScheduleInputs,
backfill_range,
backfill_schedule,
get_schedule_frequency,
)
from posthog.temporal.workflows.batch_exports import (
create_batch_export_backfill_model,
update_batch_export_backfill_model_status,
)
from posthog.temporal.workflows.noop import NoOpWorkflow, noop_activity


Expand All @@ -38,38 +52,56 @@ async def temporal_client():
@pytest_asyncio.fixture
async def temporal_schedule(temporal_client):
"""Manage a test Temopral Schedule yielding its handle."""
schedule_id = str(uuid.uuid4())
handle = await temporal_client.create_schedule(
schedule_id,
temporalio.client.Schedule(
action=temporalio.client.ScheduleActionStartWorkflow(
NoOpWorkflow.run,
"test-input",
id="test-schedule-workflow-id",
task_queue=settings.TEMPORAL_TASK_QUEUE,
),
spec=temporalio.client.ScheduleSpec(
intervals=[temporalio.client.ScheduleIntervalSpec(every=dt.timedelta(minutes=1))]
),
state=temporalio.client.ScheduleState(paused=True),
),
destination_data = {
"type": "NoOp",
"config": {},
}

interval = "every 1 minutes"
batch_export_data = {
"name": "no-op-export",
"destination": destination_data,
"interval": interval,
"paused": True,
}

organization = await acreate_organization("test")
team = await acreate_team(organization=organization)
batch_export = await acreate_batch_export(
team_id=team.pk,
name=batch_export_data["name"],
destination_data=batch_export_data["destination"],
interval=batch_export_data["interval"],
)

handle = temporal_client.get_schedule_handle(str(batch_export.id))
yield handle

await handle.delete()
await adelete_batch_export(batch_export, temporal_client)


@pytest.fixture
def temporal_worker(temporal_client):
@pytest_asyncio.fixture
async def temporal_worker(temporal_client):
worker = temporalio.worker.Worker(
temporal_client,
task_queue=settings.TEMPORAL_TASK_QUEUE,
workflows=[NoOpWorkflow],
activities=[noop_activity],
workflows=[NoOpWorkflow, BackfillBatchExportWorkflow],
activities=[
noop_activity,
backfill_schedule,
create_batch_export_backfill_model,
update_batch_export_backfill_model_status,
get_schedule_frequency,
],
workflow_runner=temporalio.worker.UnsandboxedWorkflowRunner(),
)
return worker

worker_run = asyncio.create_task(worker.run())

yield worker

worker_run.cancel()
await asyncio.wait([worker_run])


@pytest.mark.parametrize(
Expand Down Expand Up @@ -133,40 +165,81 @@ def test_backfill_range(start_at, end_at, step, expected):


@pytest.mark.asyncio
async def test_get_schedule_frequency(temporal_schedule):
@pytest.mark.django_db
async def test_get_schedule_frequency(activity_environment, temporal_worker, temporal_schedule):
"""Test get_schedule_frequency returns the correct interval."""
desc = await temporal_schedule.describe()
expected = desc.schedule.spec.intervals[0].every
expected = desc.schedule.spec.intervals[0].every.total_seconds()

result = await get_schedule_frequency(temporal_schedule)
result = await activity_environment.run(get_schedule_frequency, desc.id)

assert result == expected


@pytest.mark.asyncio
@pytest.mark.django_db
async def test_backfill_schedule_activity(activity_environment, temporal_worker, temporal_schedule):
"""Test backfill_schedule activity schedules all backfill runs."""
start_at = dt.datetime(2023, 1, 1, 0, 0, 0, tzinfo=dt.timezone.utc)
end_at = dt.datetime(2023, 1, 1, 0, 10, 0, tzinfo=dt.timezone.utc)

desc = await temporal_schedule.describe()
inputs = BackfillBatchExportInputs(
team_id=1,
inputs = BackfillScheduleInputs(
schedule_id=desc.id,
start_at=start_at.isoformat(),
end_at=end_at.isoformat(),
buffer_limit=2,
wait_delay=1.0,
frequency_seconds=desc.schedule.spec.intervals[0].every.total_seconds(),
)
worker_run = asyncio.create_task(temporal_worker.run())

await activity_environment.run(backfill_schedule, inputs)

worker_run.cancel()
await asyncio.wait([worker_run])
desc = await temporal_schedule.describe()
result = desc.info.num_actions
expected = 10

assert result == expected


@pytest.mark.asyncio
@pytest.mark.django_db
async def test_backfill_batch_export_workflow(temporal_worker, temporal_schedule, temporal_client):
"""Test BackfillBatchExportWorkflow executes all backfill runs and updates model."""
start_at = dt.datetime(2023, 1, 1, 0, 0, 0, tzinfo=dt.timezone.utc)
end_at = dt.datetime(2023, 1, 1, 0, 10, 0, tzinfo=dt.timezone.utc)

desc = await temporal_schedule.describe()

workflow_id = str(uuid.uuid4())
inputs = BackfillBatchExportInputs(
team_id=1,
schedule_id=desc.id,
start_at=start_at.isoformat(),
end_at=end_at.isoformat(),
buffer_limit=2,
wait_delay=1.0,
)

handle = await temporal_client.start_workflow(
BackfillBatchExportWorkflow.run,
inputs,
id=workflow_id,
task_queue=settings.TEMPORAL_TASK_QUEUE,
execution_timeout=dt.timedelta(minutes=1),
retry_policy=temporalio.common.RetryPolicy(maximum_attempts=1),
)
await handle.result()

desc = await temporal_schedule.describe()
result = desc.info.num_actions
expected = 10

assert result == expected

backfills = await afetch_batch_export_backfills(batch_export_id=desc.id)

assert len(backfills) == 1, "Expected one backfill to have been created"

backfill = backfills.pop()
assert backfill.status == "Completed"
Loading

0 comments on commit 6880f05

Please sign in to comment.